diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 7631055..e1e1d90 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -318,6 +318,12 @@ invocation of PROC finishes. REPORT is passed the results for each (resource-pool parallelism-limiter-resource-pool)) (define* (make-parallelism-limiter limit #:key (name "unnamed")) + "Return a parallelism limiter that allows at most LIMIT concurrent +fibers to execute within @code{with-parallelism-limiter} at the same +time. Further fibers block until a slot becomes free. + +@code{#:name} is a string used in log messages. Defaults to +@code{\"unnamed\"}." (make-parallelism-limiter-record (make-fixed-size-resource-pool (iota limit) @@ -329,6 +335,9 @@ invocation of PROC finishes. REPORT is passed the results for each parallelism-limiter))) (define* (call-with-parallelism-limiter parallelism-limiter thunk) + "Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the +slot, and return the values from THUNK. Blocks if no slot is +currently available." (call-with-resource-from-pool (parallelism-limiter-resource-pool parallelism-limiter) (lambda _ diff --git a/knots/promise.scm b/knots/promise.scm index c01d219..b85fe64 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -41,12 +41,21 @@ (evaluated-condition fibers-promise-evaluated-condition)) (define (fibers-delay thunk) + "Return a new fiber-aware promise that will evaluate THUNK when +first forced. THUNK is not called until @code{fibers-force} is +called on the promise." (make-fibers-promise thunk (make-atomic-box #f) (make-condition))) (define (fibers-force fp) + "Force the fiber-aware promise FP, returning its values. + +The first call evaluates the promise's thunk. Concurrent callers +block on a condition variable until evaluation finishes, then receive +the same result. If the thunk raises an exception, the exception is +stored and re-raised for all callers." (unless (fibers-promise? fp) (raise-exception (make-exception @@ -108,6 +117,9 @@ (define (fibers-delay/eager thunk) + "Return a new fiber-aware promise and immediately begin evaluating +THUNK in a new fiber. Exceptions during eager evaluation are silently +discarded; they will be re-raised when @code{fibers-force} is called." (let ((promise (fibers-delay thunk))) (spawn-fiber (lambda () @@ -121,10 +133,15 @@ promise)) (define (fibers-promise-reset fp) + "Reset the fiber-aware promise FP so that the next call to +@code{fibers-force} re-evaluates its thunk." (atomic-box-set! (fibers-promise-values-box fp) #f)) (define (fibers-promise-result-available? fp) + "Return @code{#t} if the fiber-aware promise FP has been evaluated +(successfully or with an exception) and @code{#f} if evaluation has +not yet started or is still in progress." (let ((val (atomic-box-ref (fibers-promise-values-box fp)))) (not (or (eq? val #f) (eq? val 'started))))) diff --git a/knots/queue.scm b/knots/queue.scm index ec9f703..2ca9b10 100644 --- a/knots/queue.scm +++ b/knots/queue.scm @@ -25,6 +25,12 @@ #:export (spawn-queueing-fiber)) (define (spawn-queueing-fiber dest-channel) + "Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order. +Returns a new input channel. + +Multiple producers can put items on the returned channel concurrently. +The fiber buffers them locally and forwards them to DEST-CHANNEL one at +a time, preserving arrival order." (define queue (make-q)) (let ((queue-channel (make-channel))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 22c1b5c..3a68a12 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -211,6 +211,52 @@ from there, or #f if that would be an empty string." (name "unnamed") (use-default-io-waiters? #t) default-checkout-timeout) + "Create a pool of SIZE threads started immediately. Use +@code{call-with-thread} to run a procedure in one of the threads. + +Optional keyword arguments: + +@table @code +@item #:thread-initializer +A thunk called once when each thread starts. Its return value is +passed as extra arguments to every procedure run in that thread. +Defaults to @code{#f} (no extra arguments). + +@item #:thread-destructor +A procedure called with the value returned by @code{#:thread-initializer} +when a thread exits. Defaults to @code{#f}. + +@item #:thread-lifetime +Maximum number of procedures a thread will run before restarting (and +re-running @code{#:thread-initializer}). Defaults to @code{#f} (no +limit). + +@item #:expire-on-exception? +When @code{#t}, replace a thread after any unhandled exception. +Defaults to @code{#f}. + +@item #:use-default-io-waiters? +When @code{#t} (the default), each thread uses blocking I/O waiters so +that port reads and writes block the thread rather than trying to +suspend a fiber. + +@item #:name +String used in thread names and log messages. Defaults to +@code{\"unnamed\"}. + +@item #:default-checkout-timeout +Seconds to wait for a free thread slot before raising +@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait +forever). + +@item #:delay-logger +Called as @code{(delay-logger seconds proc)} with the time spent +waiting for a thread to become available. + +@item #:duration-logger +Called as @code{(duration-logger seconds proc)} after each procedure +completes. +@end table" (define channel (make-channel)) @@ -408,8 +454,34 @@ from there, or #f if that would be an empty string." (use-default-io-waiters? #t) default-checkout-timeout default-max-waiters) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the thread pool procedure." + "Create a dynamic thread pool with up to MAX-SIZE threads. Use +@code{call-with-thread} to run a procedure in one of the threads. + +Unlike @code{make-fixed-size-thread-pool}, threads are created on +demand and may be reclaimed when idle (controlled by @code{#:min-size} +and the resource pool's idle management). + +Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor}, +@code{#:thread-lifetime}, @code{#:expire-on-exception?}, +@code{#:use-default-io-waiters?}, @code{#:name}, +@code{#:default-checkout-timeout}, @code{#:delay-logger}, and +@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool}, +plus: + +@table @code +@item #:min-size +Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@: +the pool is pre-filled and never shrinks). + +@item #:scheduler +Fibers scheduler for the pool's internal resource pool fiber. Defaults +to the current scheduler. + +@item #:default-max-waiters +Maximum number of fibers that may queue waiting for a thread. Raises +@code{&thread-pool-timeout-error} when exceeded. Defaults to +@code{#f} (no limit). +@end table" (define param (make-parameter #f)) @@ -444,8 +516,34 @@ arguments of the thread pool procedure." channel destroy-thread-on-exception? (max-waiters 'default)) - "Send PROC to the thread pool through CHANNEL. Return the result of PROC. -If already in the thread pool, call PROC immediately." + "Run PROC in THREAD-POOL and return its values, blocking until +complete. If called from within a thread that already belongs to +THREAD-POOL, PROC is called directly in that thread. + +Optional keyword arguments: + +@table @code +@item #:checkout-timeout +Seconds to wait for a free thread before raising +@code{&thread-pool-timeout-error}. Defaults to the pool's +@code{#:default-checkout-timeout}. + +@item #:max-waiters +Maximum number of fibers that may queue waiting for a thread (for +dynamic pools). Defaults to the pool's @code{#:default-max-waiters}. + +@item #:destroy-thread-on-exception? +When @code{#t}, destroy the thread after PROC raises an exception. +Equivalent to per-call @code{#:expire-on-exception?}. Defaults to +@code{#f}. + +@item #:duration-logger +Called as @code{(duration-logger seconds)} after PROC completes +(whether or not it raised an exception). + +@item #:channel +Override the channel used to communicate with the thread. +@end table" (define (handle-proc fixed-size-thread-pool reply-channel start-time