diff --git a/README b/README index 693b3a0..e593a79 100644 --- a/README +++ b/README @@ -1,15 +1,4 @@ -*- mode: org -*- -* Guile Knots - -Guile Knots is a library providing higher-level patterns and building -blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]]. - -This includes: - -- Parallel map/for-each with configurable concurrency limits -- Resource and thread pools -- Fiber-aware promises for lazy and eager parallel evaluation -- Timeouts for fibers and I/O ports -- A HTTP web server -- Non-blocking socket utilities +This Guile library provides useful patterns and functionality to use +Guile Fibers. diff --git a/knots.scm b/knots.scm index 304931e..765db5a 100644 --- a/knots.scm +++ b/knots.scm @@ -29,11 +29,6 @@ spawn-fiber/knots)) (define (call-with-default-io-waiters thunk) - "Run THUNK with Guile's default blocking I/O waiters active. - -This is useful when restoring the default Guile I/O waiters from -within a context (like Fibers) where different I/O waiters are used, -for example when creating a new thread from a fiber." (parameterize ((current-read-waiter (@@ (ice-9 suspendable-ports) default-read-waiter)) @@ -42,33 +37,15 @@ for example when creating a new thread from a fiber." (thunk))) (define (wait-when-system-clock-behind) - "Block until the system clock reads at least 2001-01-02. - -Useful at startup in environments (virtual machines, embedded systems) -where the clock may start at or near the Unix epoch. Prints a warning -to the current error port every 20 seconds while waiting." - ;; Jan 02 2001 02:00:00 - (let ((start-of-the-year-2001 978400800)) + (let ((start-of-the-year-2000 946684800)) (while (< (current-time) - start-of-the-year-2001) + start-of-the-year-2000) (simple-format (current-error-port) "warning: system clock potentially behind, waiting\n") (sleep 20)))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) - "Run THUNK with a SIGINT handler that signals the Fibers condition -CVAR. Restores the previous handler when THUNK returns. - -Typical usage is to pass a condition variable to this procedure and -wait on CVAR in a fiber to implement clean shutdown on Ctrl-C: - -@example -(let ((quit-cvar (make-condition))) - (call-with-sigint - (lambda () (wait quit-cvar)) - quit-cvar)) -@end example" (let ((handler #f)) (dynamic-wind (lambda () @@ -119,11 +96,6 @@ wait on CVAR in a fiber to implement clean shutdown on Ctrl-C: (raise-exception exn))))) (define* (display/knots obj #:optional (port (current-output-port))) - "Write OBJ to PORT (default: current output port) as a UTF-8 byte -sequence via @code{put-bytevector}. - -When used with ports without buffering, this should be safer than -display." (put-bytevector port (string->utf8 @@ -132,8 +104,6 @@ display." (display obj port)))))) (define (simple-format/knots port s . args) - "Like @code{simple-format} but should be safer when used with a port -without buffering." (let ((str (apply simple-format #f s args))) (if (eq? #f port) str @@ -144,8 +114,6 @@ without buffering." port))))) (define (format/knots port s . args) - "Like @code{format} but should be safer when used with a port -without buffering." (let ((str (apply format #f s args))) (if (eq? #f port) str @@ -264,10 +232,6 @@ without buffering." (display/knots error-string port))) (define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?) - "Spawn a fiber to run THUNK, with knots exception handling. - -Accepts the same optional SCHEDULER and @code{#:parallel?} arguments -as @code{spawn-fiber}." (spawn-fiber (lambda () (with-exception-handler diff --git a/knots/non-blocking.scm b/knots/non-blocking.scm index cd029fe..4473b63 100644 --- a/knots/non-blocking.scm +++ b/knots/non-blocking.scm @@ -32,16 +32,6 @@ (define* (non-blocking-open-socket-for-uri uri #:key (verify-certificate? #t)) - "Open a socket for URI and return it as a non-blocking port. - -For HTTPS URIs the TLS handshake is completed while the socket is -still blocking (required because Guile's TLS wrapper does not support -non-blocking handshakes), then the underlying socket is made -non-blocking. For plain HTTP the socket is made non-blocking -immediately. - -@code{#:verify-certificate?} controls TLS certificate verification -and defaults to @code{#t}." (define tls-wrap (@@ (web client) tls-wrap)) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index e78e6e2..7631055 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -267,16 +267,6 @@ invocation of PROC finishes. REPORT is passed the results for each #:key (parallelism 1) (input-channel (make-channel)) (process-channel input-channel)) - "Convert PROC into a procedure backed by @code{#:parallelism} -(default: 1) background fibers. Returns a wrapper that sends its -arguments to one of the fibers and blocks until the result is -returned. - -@code{#:input-channel} is the channel that callers write requests to; -defaults to a fresh channel. @code{#:process-channel} is the channel -the fibers read from; defaults to @code{#:input-channel}. Setting -them differently allows external parties to bypass the wrapper and -write directly to @code{process-channel}." (for-each (lambda _ (spawn-fiber @@ -328,12 +318,6 @@ write directly to @code{process-channel}." (resource-pool parallelism-limiter-resource-pool)) (define* (make-parallelism-limiter limit #:key (name "unnamed")) - "Return a parallelism limiter that allows at most LIMIT concurrent -fibers to execute within @code{with-parallelism-limiter} at the same -time. Further fibers block until a slot becomes free. - -@code{#:name} is a string used in log messages. Defaults to -@code{\"unnamed\"}." (make-parallelism-limiter-record (make-fixed-size-resource-pool (iota limit) @@ -345,9 +329,6 @@ time. Further fibers block until a slot becomes free. parallelism-limiter))) (define* (call-with-parallelism-limiter parallelism-limiter thunk) - "Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the -slot, and return the values from THUNK. Blocks if no slot is -currently available." (call-with-resource-from-pool (parallelism-limiter-resource-pool parallelism-limiter) (lambda _ diff --git a/knots/promise.scm b/knots/promise.scm index b85fe64..c01d219 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -41,21 +41,12 @@ (evaluated-condition fibers-promise-evaluated-condition)) (define (fibers-delay thunk) - "Return a new fiber-aware promise that will evaluate THUNK when -first forced. THUNK is not called until @code{fibers-force} is -called on the promise." (make-fibers-promise thunk (make-atomic-box #f) (make-condition))) (define (fibers-force fp) - "Force the fiber-aware promise FP, returning its values. - -The first call evaluates the promise's thunk. Concurrent callers -block on a condition variable until evaluation finishes, then receive -the same result. If the thunk raises an exception, the exception is -stored and re-raised for all callers." (unless (fibers-promise? fp) (raise-exception (make-exception @@ -117,9 +108,6 @@ stored and re-raised for all callers." (define (fibers-delay/eager thunk) - "Return a new fiber-aware promise and immediately begin evaluating -THUNK in a new fiber. Exceptions during eager evaluation are silently -discarded; they will be re-raised when @code{fibers-force} is called." (let ((promise (fibers-delay thunk))) (spawn-fiber (lambda () @@ -133,15 +121,10 @@ discarded; they will be re-raised when @code{fibers-force} is called." promise)) (define (fibers-promise-reset fp) - "Reset the fiber-aware promise FP so that the next call to -@code{fibers-force} re-evaluates its thunk." (atomic-box-set! (fibers-promise-values-box fp) #f)) (define (fibers-promise-result-available? fp) - "Return @code{#t} if the fiber-aware promise FP has been evaluated -(successfully or with an exception) and @code{#f} if evaluation has -not yet started or is still in progress." (let ((val (atomic-box-ref (fibers-promise-values-box fp)))) (not (or (eq? val #f) (eq? val 'started))))) diff --git a/knots/queue.scm b/knots/queue.scm index 2ca9b10..ec9f703 100644 --- a/knots/queue.scm +++ b/knots/queue.scm @@ -25,12 +25,6 @@ #:export (spawn-queueing-fiber)) (define (spawn-queueing-fiber dest-channel) - "Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order. -Returns a new input channel. - -Multiple producers can put items on the returned channel concurrently. -The fiber buffers them locally and forwards them to DEST-CHANNEL one at -a time, preserving arrival order." (define queue (make-q)) (let ((queue-channel (make-channel))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index f06a156..bdaad8f 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -154,33 +154,6 @@ (name "unnamed") default-checkout-timeout default-max-waiters) - "Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or -vector of pre-existing resource values. - -Use @code{with-resource-from-pool} or -@code{call-with-resource-from-pool} to borrow a resource and return it -automatically when done. - -Optional keyword arguments: - -@table @code -@item #:name -A optional string used in log messages. -Defaults to @code{\"unnamed\"}. - -@item #:default-checkout-timeout -Default checkout timeout when requesting a resource from the pool, -unset by default. - -@item #:default-max-waiters -Maximum number of fibers that may queue waiting for a resource. When -this limit is exceeded, @code{&resource-pool-too-many-waiters} is -raised when a resource is requested. Defaults to @code{#f} (no limit). - -@item #:scheduler -The Fibers scheduler to use for the pool's internal fiber. Defaults -to the current scheduler. -@end table" (define channel (make-channel)) (define destroy-condition (make-condition)) @@ -540,59 +513,6 @@ to the current scheduler. (add-resources-parallelism 1) default-checkout-timeout default-max-waiters) - "Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk -called to create each new resource value. MAX-SIZE is the maximum -number of resources the pool will hold simultaneously. - -Resources are created on demand when a checkout is requested and the -pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or -@code{call-with-resource-from-pool} to request a resource and return -it automatically when done. - -Optional keyword arguments: - -@table @code -@item #:min-size -Minimum number of resources to keep alive even when idle. Defaults to -@code{0}. - -@item #:idle-seconds -Seconds a resource may remain unused before being destroyed, provided -the pool is above @code{#:min-size}. Defaults to @code{#f} (never -expire idle resources). - -@item #:lifetime -Maximum number of checkouts a single resource will serve before being -destroyed and replaced by a fresh one. Defaults to @code{#f} (no -limit). - -@item #:destructor -A procedure called as @code{(destructor resource)} when a resource is -removed from the pool. Defaults to @code{#f}. - -@item #:add-resources-parallelism -Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the -pool needs to grow. Allowing resources to be created in parallel can -result in more resources being created than can fit inside the pool, -if this happens, the surplus resources are destroyed. Defaults to -@code{1}. - -@item #:name -A string used in log messages. Defaults to @code{\"unnamed\"}. - -@item #:default-checkout-timeout -Default checkout timeout when requesting a resource from the pool, -unset by default. - -@item #:default-max-waiters -Maximum number of fibers that may queue waiting for a resource. When -this limit is exceeded, @code{&resource-pool-too-many-waiters} is -raised when a resource is requested. Defaults to @code{#f} (no limit). - -@item #:scheduler -The Fibers scheduler to use for the pool's internal fiber. Defaults -to the current scheduler. -@end table" (define channel (make-channel)) (define destroy-condition (make-condition)) @@ -1252,10 +1172,6 @@ to the current scheduler. pool) (define (destroy-resource-pool pool) - "Destroy POOL, preventing any new checkouts. Blocks until all -checked-out resources have been returned, running the pool's -@code{#:destructor} on each. Any fibers waiting for a resource -receive @code{&resource-pool-destroyed}." (perform-operation (choice-operation (wrap-operation @@ -1472,23 +1388,6 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - "Return an alist of statistics for POOL with the following keys: - -@table @code -@item resources -Total number of resources currently held by the pool. -@item available -Number of resources not currently checked out. -@item waiters -Number of fibers currently queued waiting for a resource. -@item checkout-failure-count -Cumulative number of checkouts where an exception was raised inside -the proc. -@end table - -Blocks waiting for the pool fiber to respond. @code{#:timeout} is -the number of seconds to wait; defaults to @code{5}. Raises -@code{&resource-pool-timeout} if the pool does not respond in time." (define channel (resource-pool-channel pool)) diff --git a/knots/sort.scm b/knots/sort.scm index 94d49f8..dcad052 100644 --- a/knots/sort.scm +++ b/knots/sort.scm @@ -54,15 +54,6 @@ rest))))) (define* (fibers-sort! items less #:key parallelism) - "Sort ITEMS destructively using LESS as the comparison procedure, -using a parallel merge sort. Returns the sorted list. - -Splits ITEMS into chunks, sorts each in an eager fiber-promise in -parallel, then merges pairs of sorted chunks in parallel until one -sorted list remains. - -@code{#:parallelism} sets the number of initial chunks. Defaults to -the current fibers parallelism." (define requested-chunk-count (or parallelism (+ 1 (length (scheduler-remote-peers (current-scheduler)))))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index f8c44b2..22c1b5c 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -163,13 +163,12 @@ from there, or #f if that would be an empty string." (define-record-type (fixed-size-thread-pool channel arguments-parameter current-procedures - default-checkout-timeout threads) + default-checkout-timeout) fixed-size-thread-pool? (channel fixed-size-thread-pool-channel) (arguments-parameter fixed-size-thread-pool-arguments-parameter) (current-procedures fixed-size-thread-pool-current-procedures) - (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout) - (threads fixed-size-thread-pool-threads)) + (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)) ;; Since both thread pool records have this field, use a procedure ;; than handles the appropriate accessor @@ -212,52 +211,6 @@ from there, or #f if that would be an empty string." (name "unnamed") (use-default-io-waiters? #t) default-checkout-timeout) - "Create a pool of SIZE threads started immediately. Use -@code{call-with-thread} to run a procedure in one of the threads. - -Optional keyword arguments: - -@table @code -@item #:thread-initializer -A thunk called once when each thread starts. Its return value is -passed as extra arguments to every procedure run in that thread. -Defaults to @code{#f} (no extra arguments). - -@item #:thread-destructor -A procedure called with the value returned by @code{#:thread-initializer} -when a thread exits. Defaults to @code{#f}. - -@item #:thread-lifetime -Maximum number of procedures a thread will run before restarting (and -re-running @code{#:thread-initializer}). Defaults to @code{#f} (no -limit). - -@item #:expire-on-exception? -When @code{#t}, replace a thread after any unhandled exception. -Defaults to @code{#f}. - -@item #:use-default-io-waiters? -When @code{#t} (the default), each thread uses blocking I/O waiters so -that port reads and writes block the thread rather than trying to -suspend a fiber. - -@item #:name -String used in thread names and log messages. Defaults to -@code{\"unnamed\"}. - -@item #:default-checkout-timeout -Seconds to wait for a free thread slot before raising -@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait -forever). - -@item #:delay-logger -Called as @code{(delay-logger seconds proc)} with the time spent -waiting for a thread to become available. - -@item #:duration-logger -Called as @code{(duration-logger seconds proc)} after each procedure -completes. -@end table" (define channel (make-channel)) @@ -427,20 +380,19 @@ completes. (initializer/safe) '())))))))) - (define threads - (map (lambda (i) - (if use-default-io-waiters? - (call-with-default-io-waiters - (lambda () - (start-thread i channel))) - (start-thread i channel))) - (iota size))) + (for-each + (lambda (i) + (if use-default-io-waiters? + (call-with-default-io-waiters + (lambda () + (start-thread i channel))) + (start-thread i channel))) + (iota size)) (fixed-size-thread-pool channel param thread-proc-vector - default-checkout-timeout - threads)) + default-checkout-timeout)) (define* (make-thread-pool max-size #:key @@ -456,34 +408,8 @@ completes. (use-default-io-waiters? #t) default-checkout-timeout default-max-waiters) - "Create a dynamic thread pool with up to MAX-SIZE threads. Use -@code{call-with-thread} to run a procedure in one of the threads. - -Unlike @code{make-fixed-size-thread-pool}, threads are created on -demand and may be reclaimed when idle (controlled by @code{#:min-size} -and the resource pool's idle management). - -Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor}, -@code{#:thread-lifetime}, @code{#:expire-on-exception?}, -@code{#:use-default-io-waiters?}, @code{#:name}, -@code{#:default-checkout-timeout}, @code{#:delay-logger}, and -@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool}, -plus: - -@table @code -@item #:min-size -Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@: -the pool is pre-filled and never shrinks). - -@item #:scheduler -Fibers scheduler for the pool's internal resource pool fiber. Defaults -to the current scheduler. - -@item #:default-max-waiters -Maximum number of fibers that may queue waiting for a thread. Raises -@code{&thread-pool-timeout-error} when exceeded. Defaults to -@code{#f} (no limit). -@end table" + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the thread pool procedure." (define param (make-parameter #f)) @@ -518,34 +444,8 @@ Maximum number of fibers that may queue waiting for a thread. Raises channel destroy-thread-on-exception? (max-waiters 'default)) - "Run PROC in THREAD-POOL and return its values, blocking until -complete. If called from within a thread that already belongs to -THREAD-POOL, PROC is called directly in that thread. - -Optional keyword arguments: - -@table @code -@item #:checkout-timeout -Seconds to wait for a free thread before raising -@code{&thread-pool-timeout-error}. Defaults to the pool's -@code{#:default-checkout-timeout}. - -@item #:max-waiters -Maximum number of fibers that may queue waiting for a thread (for -dynamic pools). Defaults to the pool's @code{#:default-max-waiters}. - -@item #:destroy-thread-on-exception? -When @code{#t}, destroy the thread after PROC raises an exception. -Equivalent to per-call @code{#:expire-on-exception?}. Defaults to -@code{#f}. - -@item #:duration-logger -Called as @code{(duration-logger seconds)} after PROC completes -(whether or not it raised an exception). - -@item #:channel -Override the channel used to communicate with the thread. -@end table" + "Send PROC to the thread pool through CHANNEL. Return the result of PROC. +If already in the thread pool, call PROC immediately." (define (handle-proc fixed-size-thread-pool reply-channel start-time @@ -629,13 +529,9 @@ Override the channel used to communicate with the thread. destroy-thread-on-exception?)))))))) (define (destroy-thread-pool pool) - "Destroy POOL, stopping all of its threads and calling the destructor -if specified. This procedure will block until the destruction is -complete." (if (fixed-size-thread-pool? pool) - (let ((channel (fixed-size-thread-pool-channel pool)) - (threads (fixed-size-thread-pool-threads pool))) - (for-each (lambda _ (put-message channel 'destroy)) threads) - (for-each join-thread threads)) + (put-message + (fixed-size-thread-pool-channel pool) + 'destroy) (destroy-resource-pool (thread-pool-resource-pool pool)))) diff --git a/knots/timeout.scm b/knots/timeout.scm index 2df2ddd..a65a095 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -45,16 +45,7 @@ with-port-timeouts)) -(define* (with-fibers-timeout thunk #:key - timeout - (on-timeout - (const *unspecified*))) - "Run THUNK in a new fiber and return its values, waiting TIMEOUT -seconds for it to finish. If THUNK does not complete within TIMEOUT -seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout -returns the result of ON-TIMEOUT instead. - -If THUNK raises an exception it is re-raised in the calling fiber." +(define* (with-fibers-timeout thunk #:key timeout on-timeout) (let ((channel (make-channel))) (spawn-fiber (lambda () @@ -119,7 +110,7 @@ If THUNK raises an exception it is re-raised in the calling fiber." (exception-predicate &port-write-timeout-error)) (define (readable? port) - "Test if PORT is readable." + "Test if PORT is writable." (= 1 (port-poll port "r" 0))) (define (writable? port) @@ -160,21 +151,6 @@ If THUNK raises an exception it is re-raised in the calling fiber." #:key timeout (read-timeout timeout) (write-timeout timeout)) - "Run THUNK with per-operation I/O timeouts on all ports. If any -read or write blocks for longer than the given number of seconds, an -exception is raised. - -@code{#:timeout} sets both read and write timeouts. -@code{#:read-timeout} and @code{#:write-timeout} specify the timeout -for reads and writes respectively. All three default to @code{#f} (no -timeout). - -This procedure works both with fibers, and without fibers by using the -poll system call with a timeout. - -On read timeout, raises @code{&port-read-timeout-error}. On write -timeout, raises @code{&port-write-timeout-error}. Both carry the -@code{thunk} and @code{port} fields from @code{&port-timeout-error}." (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index dc22119..e3a1cdd 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -1,6 +1,5 @@ (use-modules (tests) (ice-9 atomic) - (ice-9 threads) (srfi srfi-71) (fibers) (unit-test) @@ -143,33 +142,4 @@ ref-and-decrement)) (error))) -;; Test that the destructor is called when a size 1 fixed-size thread -;; pool is destroyed, and that destroy-thread-pool blocks until it has -;; completed. -(let* ((destructor-called? #f) - (thread-pool - (make-fixed-size-thread-pool - 1 - #:thread-destructor - (lambda () - (set! destructor-called? #t))))) - (destroy-thread-pool thread-pool) - (assert-equal #t destructor-called?)) - -;; Test that the destructor is called for every thread when a -;; multi-thread fixed-size thread pool is destroyed, and that -;; destroy-thread-pool blocks until all destructors have completed. -(let* ((destructor-count 0) - (mutex (make-mutex)) - (pool-size 3) - (thread-pool - (make-fixed-size-thread-pool - pool-size - #:thread-destructor - (lambda () - (with-mutex mutex - (set! destructor-count (+ destructor-count 1))))))) - (destroy-thread-pool thread-pool) - (assert-equal pool-size destructor-count)) - (display "thread-pool test finished successfully\n")