diff --git a/README b/README index e593a79..693b3a0 100644 --- a/README +++ b/README @@ -1,4 +1,15 @@ -*- mode: org -*- -This Guile library provides useful patterns and functionality to use -Guile Fibers. +* Guile Knots + +Guile Knots is a library providing higher-level patterns and building +blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]]. + +This includes: + +- Parallel map/for-each with configurable concurrency limits +- Resource and thread pools +- Fiber-aware promises for lazy and eager parallel evaluation +- Timeouts for fibers and I/O ports +- A HTTP web server +- Non-blocking socket utilities diff --git a/knots.scm b/knots.scm index 765db5a..304931e 100644 --- a/knots.scm +++ b/knots.scm @@ -29,6 +29,11 @@ spawn-fiber/knots)) (define (call-with-default-io-waiters thunk) + "Run THUNK with Guile's default blocking I/O waiters active. + +This is useful when restoring the default Guile I/O waiters from +within a context (like Fibers) where different I/O waiters are used, +for example when creating a new thread from a fiber." (parameterize ((current-read-waiter (@@ (ice-9 suspendable-ports) default-read-waiter)) @@ -37,15 +42,33 @@ (thunk))) (define (wait-when-system-clock-behind) - (let ((start-of-the-year-2000 946684800)) + "Block until the system clock reads at least 2001-01-02. + +Useful at startup in environments (virtual machines, embedded systems) +where the clock may start at or near the Unix epoch. Prints a warning +to the current error port every 20 seconds while waiting." + ;; Jan 02 2001 02:00:00 + (let ((start-of-the-year-2001 978400800)) (while (< (current-time) - start-of-the-year-2000) + start-of-the-year-2001) (simple-format (current-error-port) "warning: system clock potentially behind, waiting\n") (sleep 20)))) ;; Copied from (fibers web server) (define (call-with-sigint thunk cvar) + "Run THUNK with a SIGINT handler that signals the Fibers condition +CVAR. Restores the previous handler when THUNK returns. + +Typical usage is to pass a condition variable to this procedure and +wait on CVAR in a fiber to implement clean shutdown on Ctrl-C: + +@example +(let ((quit-cvar (make-condition))) + (call-with-sigint + (lambda () (wait quit-cvar)) + quit-cvar)) +@end example" (let ((handler #f)) (dynamic-wind (lambda () @@ -96,6 +119,11 @@ (raise-exception exn))))) (define* (display/knots obj #:optional (port (current-output-port))) + "Write OBJ to PORT (default: current output port) as a UTF-8 byte +sequence via @code{put-bytevector}. + +When used with ports without buffering, this should be safer than +display." (put-bytevector port (string->utf8 @@ -104,6 +132,8 @@ (display obj port)))))) (define (simple-format/knots port s . args) + "Like @code{simple-format} but should be safer when used with a port +without buffering." (let ((str (apply simple-format #f s args))) (if (eq? #f port) str @@ -114,6 +144,8 @@ port))))) (define (format/knots port s . args) + "Like @code{format} but should be safer when used with a port +without buffering." (let ((str (apply format #f s args))) (if (eq? #f port) str @@ -232,6 +264,10 @@ (display/knots error-string port))) (define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?) + "Spawn a fiber to run THUNK, with knots exception handling. + +Accepts the same optional SCHEDULER and @code{#:parallel?} arguments +as @code{spawn-fiber}." (spawn-fiber (lambda () (with-exception-handler diff --git a/knots/non-blocking.scm b/knots/non-blocking.scm index 4473b63..cd029fe 100644 --- a/knots/non-blocking.scm +++ b/knots/non-blocking.scm @@ -32,6 +32,16 @@ (define* (non-blocking-open-socket-for-uri uri #:key (verify-certificate? #t)) + "Open a socket for URI and return it as a non-blocking port. + +For HTTPS URIs the TLS handshake is completed while the socket is +still blocking (required because Guile's TLS wrapper does not support +non-blocking handshakes), then the underlying socket is made +non-blocking. For plain HTTP the socket is made non-blocking +immediately. + +@code{#:verify-certificate?} controls TLS certificate verification +and defaults to @code{#t}." (define tls-wrap (@@ (web client) tls-wrap)) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 7631055..e78e6e2 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -267,6 +267,16 @@ invocation of PROC finishes. REPORT is passed the results for each #:key (parallelism 1) (input-channel (make-channel)) (process-channel input-channel)) + "Convert PROC into a procedure backed by @code{#:parallelism} +(default: 1) background fibers. Returns a wrapper that sends its +arguments to one of the fibers and blocks until the result is +returned. + +@code{#:input-channel} is the channel that callers write requests to; +defaults to a fresh channel. @code{#:process-channel} is the channel +the fibers read from; defaults to @code{#:input-channel}. Setting +them differently allows external parties to bypass the wrapper and +write directly to @code{process-channel}." (for-each (lambda _ (spawn-fiber @@ -318,6 +328,12 @@ invocation of PROC finishes. REPORT is passed the results for each (resource-pool parallelism-limiter-resource-pool)) (define* (make-parallelism-limiter limit #:key (name "unnamed")) + "Return a parallelism limiter that allows at most LIMIT concurrent +fibers to execute within @code{with-parallelism-limiter} at the same +time. Further fibers block until a slot becomes free. + +@code{#:name} is a string used in log messages. Defaults to +@code{\"unnamed\"}." (make-parallelism-limiter-record (make-fixed-size-resource-pool (iota limit) @@ -329,6 +345,9 @@ invocation of PROC finishes. REPORT is passed the results for each parallelism-limiter))) (define* (call-with-parallelism-limiter parallelism-limiter thunk) + "Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the +slot, and return the values from THUNK. Blocks if no slot is +currently available." (call-with-resource-from-pool (parallelism-limiter-resource-pool parallelism-limiter) (lambda _ diff --git a/knots/promise.scm b/knots/promise.scm index c01d219..b85fe64 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -41,12 +41,21 @@ (evaluated-condition fibers-promise-evaluated-condition)) (define (fibers-delay thunk) + "Return a new fiber-aware promise that will evaluate THUNK when +first forced. THUNK is not called until @code{fibers-force} is +called on the promise." (make-fibers-promise thunk (make-atomic-box #f) (make-condition))) (define (fibers-force fp) + "Force the fiber-aware promise FP, returning its values. + +The first call evaluates the promise's thunk. Concurrent callers +block on a condition variable until evaluation finishes, then receive +the same result. If the thunk raises an exception, the exception is +stored and re-raised for all callers." (unless (fibers-promise? fp) (raise-exception (make-exception @@ -108,6 +117,9 @@ (define (fibers-delay/eager thunk) + "Return a new fiber-aware promise and immediately begin evaluating +THUNK in a new fiber. Exceptions during eager evaluation are silently +discarded; they will be re-raised when @code{fibers-force} is called." (let ((promise (fibers-delay thunk))) (spawn-fiber (lambda () @@ -121,10 +133,15 @@ promise)) (define (fibers-promise-reset fp) + "Reset the fiber-aware promise FP so that the next call to +@code{fibers-force} re-evaluates its thunk." (atomic-box-set! (fibers-promise-values-box fp) #f)) (define (fibers-promise-result-available? fp) + "Return @code{#t} if the fiber-aware promise FP has been evaluated +(successfully or with an exception) and @code{#f} if evaluation has +not yet started or is still in progress." (let ((val (atomic-box-ref (fibers-promise-values-box fp)))) (not (or (eq? val #f) (eq? val 'started))))) diff --git a/knots/queue.scm b/knots/queue.scm index ec9f703..2ca9b10 100644 --- a/knots/queue.scm +++ b/knots/queue.scm @@ -25,6 +25,12 @@ #:export (spawn-queueing-fiber)) (define (spawn-queueing-fiber dest-channel) + "Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order. +Returns a new input channel. + +Multiple producers can put items on the returned channel concurrently. +The fiber buffers them locally and forwards them to DEST-CHANNEL one at +a time, preserving arrival order." (define queue (make-q)) (let ((queue-channel (make-channel))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index bdaad8f..f06a156 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -154,6 +154,33 @@ (name "unnamed") default-checkout-timeout default-max-waiters) + "Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or +vector of pre-existing resource values. + +Use @code{with-resource-from-pool} or +@code{call-with-resource-from-pool} to borrow a resource and return it +automatically when done. + +Optional keyword arguments: + +@table @code +@item #:name +A optional string used in log messages. +Defaults to @code{\"unnamed\"}. + +@item #:default-checkout-timeout +Default checkout timeout when requesting a resource from the pool, +unset by default. + +@item #:default-max-waiters +Maximum number of fibers that may queue waiting for a resource. When +this limit is exceeded, @code{&resource-pool-too-many-waiters} is +raised when a resource is requested. Defaults to @code{#f} (no limit). + +@item #:scheduler +The Fibers scheduler to use for the pool's internal fiber. Defaults +to the current scheduler. +@end table" (define channel (make-channel)) (define destroy-condition (make-condition)) @@ -513,6 +540,59 @@ (add-resources-parallelism 1) default-checkout-timeout default-max-waiters) + "Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk +called to create each new resource value. MAX-SIZE is the maximum +number of resources the pool will hold simultaneously. + +Resources are created on demand when a checkout is requested and the +pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or +@code{call-with-resource-from-pool} to request a resource and return +it automatically when done. + +Optional keyword arguments: + +@table @code +@item #:min-size +Minimum number of resources to keep alive even when idle. Defaults to +@code{0}. + +@item #:idle-seconds +Seconds a resource may remain unused before being destroyed, provided +the pool is above @code{#:min-size}. Defaults to @code{#f} (never +expire idle resources). + +@item #:lifetime +Maximum number of checkouts a single resource will serve before being +destroyed and replaced by a fresh one. Defaults to @code{#f} (no +limit). + +@item #:destructor +A procedure called as @code{(destructor resource)} when a resource is +removed from the pool. Defaults to @code{#f}. + +@item #:add-resources-parallelism +Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the +pool needs to grow. Allowing resources to be created in parallel can +result in more resources being created than can fit inside the pool, +if this happens, the surplus resources are destroyed. Defaults to +@code{1}. + +@item #:name +A string used in log messages. Defaults to @code{\"unnamed\"}. + +@item #:default-checkout-timeout +Default checkout timeout when requesting a resource from the pool, +unset by default. + +@item #:default-max-waiters +Maximum number of fibers that may queue waiting for a resource. When +this limit is exceeded, @code{&resource-pool-too-many-waiters} is +raised when a resource is requested. Defaults to @code{#f} (no limit). + +@item #:scheduler +The Fibers scheduler to use for the pool's internal fiber. Defaults +to the current scheduler. +@end table" (define channel (make-channel)) (define destroy-condition (make-condition)) @@ -1172,6 +1252,10 @@ pool) (define (destroy-resource-pool pool) + "Destroy POOL, preventing any new checkouts. Blocks until all +checked-out resources have been returned, running the pool's +@code{#:destructor} on each. Any fibers waiting for a resource +receive @code{&resource-pool-destroyed}." (perform-operation (choice-operation (wrap-operation @@ -1388,6 +1472,23 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) + "Return an alist of statistics for POOL with the following keys: + +@table @code +@item resources +Total number of resources currently held by the pool. +@item available +Number of resources not currently checked out. +@item waiters +Number of fibers currently queued waiting for a resource. +@item checkout-failure-count +Cumulative number of checkouts where an exception was raised inside +the proc. +@end table + +Blocks waiting for the pool fiber to respond. @code{#:timeout} is +the number of seconds to wait; defaults to @code{5}. Raises +@code{&resource-pool-timeout} if the pool does not respond in time." (define channel (resource-pool-channel pool)) diff --git a/knots/sort.scm b/knots/sort.scm index dcad052..94d49f8 100644 --- a/knots/sort.scm +++ b/knots/sort.scm @@ -54,6 +54,15 @@ rest))))) (define* (fibers-sort! items less #:key parallelism) + "Sort ITEMS destructively using LESS as the comparison procedure, +using a parallel merge sort. Returns the sorted list. + +Splits ITEMS into chunks, sorts each in an eager fiber-promise in +parallel, then merges pairs of sorted chunks in parallel until one +sorted list remains. + +@code{#:parallelism} sets the number of initial chunks. Defaults to +the current fibers parallelism." (define requested-chunk-count (or parallelism (+ 1 (length (scheduler-remote-peers (current-scheduler)))))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 22c1b5c..f8c44b2 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -163,12 +163,13 @@ from there, or #f if that would be an empty string." (define-record-type (fixed-size-thread-pool channel arguments-parameter current-procedures - default-checkout-timeout) + default-checkout-timeout threads) fixed-size-thread-pool? (channel fixed-size-thread-pool-channel) (arguments-parameter fixed-size-thread-pool-arguments-parameter) (current-procedures fixed-size-thread-pool-current-procedures) - (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)) + (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout) + (threads fixed-size-thread-pool-threads)) ;; Since both thread pool records have this field, use a procedure ;; than handles the appropriate accessor @@ -211,6 +212,52 @@ from there, or #f if that would be an empty string." (name "unnamed") (use-default-io-waiters? #t) default-checkout-timeout) + "Create a pool of SIZE threads started immediately. Use +@code{call-with-thread} to run a procedure in one of the threads. + +Optional keyword arguments: + +@table @code +@item #:thread-initializer +A thunk called once when each thread starts. Its return value is +passed as extra arguments to every procedure run in that thread. +Defaults to @code{#f} (no extra arguments). + +@item #:thread-destructor +A procedure called with the value returned by @code{#:thread-initializer} +when a thread exits. Defaults to @code{#f}. + +@item #:thread-lifetime +Maximum number of procedures a thread will run before restarting (and +re-running @code{#:thread-initializer}). Defaults to @code{#f} (no +limit). + +@item #:expire-on-exception? +When @code{#t}, replace a thread after any unhandled exception. +Defaults to @code{#f}. + +@item #:use-default-io-waiters? +When @code{#t} (the default), each thread uses blocking I/O waiters so +that port reads and writes block the thread rather than trying to +suspend a fiber. + +@item #:name +String used in thread names and log messages. Defaults to +@code{\"unnamed\"}. + +@item #:default-checkout-timeout +Seconds to wait for a free thread slot before raising +@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait +forever). + +@item #:delay-logger +Called as @code{(delay-logger seconds proc)} with the time spent +waiting for a thread to become available. + +@item #:duration-logger +Called as @code{(duration-logger seconds proc)} after each procedure +completes. +@end table" (define channel (make-channel)) @@ -380,19 +427,20 @@ from there, or #f if that would be an empty string." (initializer/safe) '())))))))) - (for-each - (lambda (i) - (if use-default-io-waiters? - (call-with-default-io-waiters - (lambda () - (start-thread i channel))) - (start-thread i channel))) - (iota size)) + (define threads + (map (lambda (i) + (if use-default-io-waiters? + (call-with-default-io-waiters + (lambda () + (start-thread i channel))) + (start-thread i channel))) + (iota size))) (fixed-size-thread-pool channel param thread-proc-vector - default-checkout-timeout)) + default-checkout-timeout + threads)) (define* (make-thread-pool max-size #:key @@ -408,8 +456,34 @@ from there, or #f if that would be an empty string." (use-default-io-waiters? #t) default-checkout-timeout default-max-waiters) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the thread pool procedure." + "Create a dynamic thread pool with up to MAX-SIZE threads. Use +@code{call-with-thread} to run a procedure in one of the threads. + +Unlike @code{make-fixed-size-thread-pool}, threads are created on +demand and may be reclaimed when idle (controlled by @code{#:min-size} +and the resource pool's idle management). + +Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor}, +@code{#:thread-lifetime}, @code{#:expire-on-exception?}, +@code{#:use-default-io-waiters?}, @code{#:name}, +@code{#:default-checkout-timeout}, @code{#:delay-logger}, and +@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool}, +plus: + +@table @code +@item #:min-size +Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@: +the pool is pre-filled and never shrinks). + +@item #:scheduler +Fibers scheduler for the pool's internal resource pool fiber. Defaults +to the current scheduler. + +@item #:default-max-waiters +Maximum number of fibers that may queue waiting for a thread. Raises +@code{&thread-pool-timeout-error} when exceeded. Defaults to +@code{#f} (no limit). +@end table" (define param (make-parameter #f)) @@ -444,8 +518,34 @@ arguments of the thread pool procedure." channel destroy-thread-on-exception? (max-waiters 'default)) - "Send PROC to the thread pool through CHANNEL. Return the result of PROC. -If already in the thread pool, call PROC immediately." + "Run PROC in THREAD-POOL and return its values, blocking until +complete. If called from within a thread that already belongs to +THREAD-POOL, PROC is called directly in that thread. + +Optional keyword arguments: + +@table @code +@item #:checkout-timeout +Seconds to wait for a free thread before raising +@code{&thread-pool-timeout-error}. Defaults to the pool's +@code{#:default-checkout-timeout}. + +@item #:max-waiters +Maximum number of fibers that may queue waiting for a thread (for +dynamic pools). Defaults to the pool's @code{#:default-max-waiters}. + +@item #:destroy-thread-on-exception? +When @code{#t}, destroy the thread after PROC raises an exception. +Equivalent to per-call @code{#:expire-on-exception?}. Defaults to +@code{#f}. + +@item #:duration-logger +Called as @code{(duration-logger seconds)} after PROC completes +(whether or not it raised an exception). + +@item #:channel +Override the channel used to communicate with the thread. +@end table" (define (handle-proc fixed-size-thread-pool reply-channel start-time @@ -529,9 +629,13 @@ If already in the thread pool, call PROC immediately." destroy-thread-on-exception?)))))))) (define (destroy-thread-pool pool) + "Destroy POOL, stopping all of its threads and calling the destructor +if specified. This procedure will block until the destruction is +complete." (if (fixed-size-thread-pool? pool) - (put-message - (fixed-size-thread-pool-channel pool) - 'destroy) + (let ((channel (fixed-size-thread-pool-channel pool)) + (threads (fixed-size-thread-pool-threads pool))) + (for-each (lambda _ (put-message channel 'destroy)) threads) + (for-each join-thread threads)) (destroy-resource-pool (thread-pool-resource-pool pool)))) diff --git a/knots/timeout.scm b/knots/timeout.scm index a65a095..2df2ddd 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -45,7 +45,16 @@ with-port-timeouts)) -(define* (with-fibers-timeout thunk #:key timeout on-timeout) +(define* (with-fibers-timeout thunk #:key + timeout + (on-timeout + (const *unspecified*))) + "Run THUNK in a new fiber and return its values, waiting TIMEOUT +seconds for it to finish. If THUNK does not complete within TIMEOUT +seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout +returns the result of ON-TIMEOUT instead. + +If THUNK raises an exception it is re-raised in the calling fiber." (let ((channel (make-channel))) (spawn-fiber (lambda () @@ -110,7 +119,7 @@ (exception-predicate &port-write-timeout-error)) (define (readable? port) - "Test if PORT is writable." + "Test if PORT is readable." (= 1 (port-poll port "r" 0))) (define (writable? port) @@ -151,6 +160,21 @@ #:key timeout (read-timeout timeout) (write-timeout timeout)) + "Run THUNK with per-operation I/O timeouts on all ports. If any +read or write blocks for longer than the given number of seconds, an +exception is raised. + +@code{#:timeout} sets both read and write timeouts. +@code{#:read-timeout} and @code{#:write-timeout} specify the timeout +for reads and writes respectively. All three default to @code{#f} (no +timeout). + +This procedure works both with fibers, and without fibers by using the +poll system call with a timeout. + +On read timeout, raises @code{&port-read-timeout-error}. On write +timeout, raises @code{&port-write-timeout-error}. Both carry the +@code{thunk} and @code{port} fields from @code{&port-timeout-error}." (define (no-fibers-wait thunk port mode timeout) (define poll-timeout-ms 200) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index e3a1cdd..dc22119 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -1,5 +1,6 @@ (use-modules (tests) (ice-9 atomic) + (ice-9 threads) (srfi srfi-71) (fibers) (unit-test) @@ -142,4 +143,33 @@ ref-and-decrement)) (error))) +;; Test that the destructor is called when a size 1 fixed-size thread +;; pool is destroyed, and that destroy-thread-pool blocks until it has +;; completed. +(let* ((destructor-called? #f) + (thread-pool + (make-fixed-size-thread-pool + 1 + #:thread-destructor + (lambda () + (set! destructor-called? #t))))) + (destroy-thread-pool thread-pool) + (assert-equal #t destructor-called?)) + +;; Test that the destructor is called for every thread when a +;; multi-thread fixed-size thread pool is destroyed, and that +;; destroy-thread-pool blocks until all destructors have completed. +(let* ((destructor-count 0) + (mutex (make-mutex)) + (pool-size 3) + (thread-pool + (make-fixed-size-thread-pool + pool-size + #:thread-destructor + (lambda () + (with-mutex mutex + (set! destructor-count (+ destructor-count 1))))))) + (destroy-thread-pool thread-pool) + (assert-equal pool-size destructor-count)) + (display "thread-pool test finished successfully\n")