Add more documentation
This commit is contained in:
parent
5b84273cbf
commit
d3d4964210
4 changed files with 134 additions and 4 deletions
|
|
@ -318,6 +318,12 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
(resource-pool parallelism-limiter-resource-pool))
|
(resource-pool parallelism-limiter-resource-pool))
|
||||||
|
|
||||||
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||||
|
"Return a parallelism limiter that allows at most LIMIT concurrent
|
||||||
|
fibers to execute within @code{with-parallelism-limiter} at the same
|
||||||
|
time. Further fibers block until a slot becomes free.
|
||||||
|
|
||||||
|
@code{#:name} is a string used in log messages. Defaults to
|
||||||
|
@code{\"unnamed\"}."
|
||||||
(make-parallelism-limiter-record
|
(make-parallelism-limiter-record
|
||||||
(make-fixed-size-resource-pool
|
(make-fixed-size-resource-pool
|
||||||
(iota limit)
|
(iota limit)
|
||||||
|
|
@ -329,6 +335,9 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
parallelism-limiter)))
|
parallelism-limiter)))
|
||||||
|
|
||||||
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||||
|
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
|
||||||
|
slot, and return the values from THUNK. Blocks if no slot is
|
||||||
|
currently available."
|
||||||
(call-with-resource-from-pool
|
(call-with-resource-from-pool
|
||||||
(parallelism-limiter-resource-pool parallelism-limiter)
|
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||||
(lambda _
|
(lambda _
|
||||||
|
|
|
||||||
|
|
@ -41,12 +41,21 @@
|
||||||
(evaluated-condition fibers-promise-evaluated-condition))
|
(evaluated-condition fibers-promise-evaluated-condition))
|
||||||
|
|
||||||
(define (fibers-delay thunk)
|
(define (fibers-delay thunk)
|
||||||
|
"Return a new fiber-aware promise that will evaluate THUNK when
|
||||||
|
first forced. THUNK is not called until @code{fibers-force} is
|
||||||
|
called on the promise."
|
||||||
(make-fibers-promise
|
(make-fibers-promise
|
||||||
thunk
|
thunk
|
||||||
(make-atomic-box #f)
|
(make-atomic-box #f)
|
||||||
(make-condition)))
|
(make-condition)))
|
||||||
|
|
||||||
(define (fibers-force fp)
|
(define (fibers-force fp)
|
||||||
|
"Force the fiber-aware promise FP, returning its values.
|
||||||
|
|
||||||
|
The first call evaluates the promise's thunk. Concurrent callers
|
||||||
|
block on a condition variable until evaluation finishes, then receive
|
||||||
|
the same result. If the thunk raises an exception, the exception is
|
||||||
|
stored and re-raised for all callers."
|
||||||
(unless (fibers-promise? fp)
|
(unless (fibers-promise? fp)
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-exception
|
(make-exception
|
||||||
|
|
@ -108,6 +117,9 @@
|
||||||
|
|
||||||
|
|
||||||
(define (fibers-delay/eager thunk)
|
(define (fibers-delay/eager thunk)
|
||||||
|
"Return a new fiber-aware promise and immediately begin evaluating
|
||||||
|
THUNK in a new fiber. Exceptions during eager evaluation are silently
|
||||||
|
discarded; they will be re-raised when @code{fibers-force} is called."
|
||||||
(let ((promise (fibers-delay thunk)))
|
(let ((promise (fibers-delay thunk)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -121,10 +133,15 @@
|
||||||
promise))
|
promise))
|
||||||
|
|
||||||
(define (fibers-promise-reset fp)
|
(define (fibers-promise-reset fp)
|
||||||
|
"Reset the fiber-aware promise FP so that the next call to
|
||||||
|
@code{fibers-force} re-evaluates its thunk."
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
#f))
|
#f))
|
||||||
|
|
||||||
(define (fibers-promise-result-available? fp)
|
(define (fibers-promise-result-available? fp)
|
||||||
|
"Return @code{#t} if the fiber-aware promise FP has been evaluated
|
||||||
|
(successfully or with an exception) and @code{#f} if evaluation has
|
||||||
|
not yet started or is still in progress."
|
||||||
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
||||||
(not (or (eq? val #f)
|
(not (or (eq? val #f)
|
||||||
(eq? val 'started)))))
|
(eq? val 'started)))))
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,12 @@
|
||||||
#:export (spawn-queueing-fiber))
|
#:export (spawn-queueing-fiber))
|
||||||
|
|
||||||
(define (spawn-queueing-fiber dest-channel)
|
(define (spawn-queueing-fiber dest-channel)
|
||||||
|
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
|
||||||
|
Returns a new input channel.
|
||||||
|
|
||||||
|
Multiple producers can put items on the returned channel concurrently.
|
||||||
|
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
|
||||||
|
a time, preserving arrival order."
|
||||||
(define queue (make-q))
|
(define queue (make-q))
|
||||||
|
|
||||||
(let ((queue-channel (make-channel)))
|
(let ((queue-channel (make-channel)))
|
||||||
|
|
|
||||||
|
|
@ -211,6 +211,52 @@ from there, or #f if that would be an empty string."
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout)
|
default-checkout-timeout)
|
||||||
|
"Create a pool of SIZE threads started immediately. Use
|
||||||
|
@code{call-with-thread} to run a procedure in one of the threads.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:thread-initializer
|
||||||
|
A thunk called once when each thread starts. Its return value is
|
||||||
|
passed as extra arguments to every procedure run in that thread.
|
||||||
|
Defaults to @code{#f} (no extra arguments).
|
||||||
|
|
||||||
|
@item #:thread-destructor
|
||||||
|
A procedure called with the value returned by @code{#:thread-initializer}
|
||||||
|
when a thread exits. Defaults to @code{#f}.
|
||||||
|
|
||||||
|
@item #:thread-lifetime
|
||||||
|
Maximum number of procedures a thread will run before restarting (and
|
||||||
|
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
|
||||||
|
limit).
|
||||||
|
|
||||||
|
@item #:expire-on-exception?
|
||||||
|
When @code{#t}, replace a thread after any unhandled exception.
|
||||||
|
Defaults to @code{#f}.
|
||||||
|
|
||||||
|
@item #:use-default-io-waiters?
|
||||||
|
When @code{#t} (the default), each thread uses blocking I/O waiters so
|
||||||
|
that port reads and writes block the thread rather than trying to
|
||||||
|
suspend a fiber.
|
||||||
|
|
||||||
|
@item #:name
|
||||||
|
String used in thread names and log messages. Defaults to
|
||||||
|
@code{\"unnamed\"}.
|
||||||
|
|
||||||
|
@item #:default-checkout-timeout
|
||||||
|
Seconds to wait for a free thread slot before raising
|
||||||
|
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
|
||||||
|
forever).
|
||||||
|
|
||||||
|
@item #:delay-logger
|
||||||
|
Called as @code{(delay-logger seconds proc)} with the time spent
|
||||||
|
waiting for a thread to become available.
|
||||||
|
|
||||||
|
@item #:duration-logger
|
||||||
|
Called as @code{(duration-logger seconds proc)} after each procedure
|
||||||
|
completes.
|
||||||
|
@end table"
|
||||||
(define channel
|
(define channel
|
||||||
(make-channel))
|
(make-channel))
|
||||||
|
|
||||||
|
|
@ -408,8 +454,34 @@ from there, or #f if that would be an empty string."
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
|
||||||
arguments of the thread pool procedure."
|
@code{call-with-thread} to run a procedure in one of the threads.
|
||||||
|
|
||||||
|
Unlike @code{make-fixed-size-thread-pool}, threads are created on
|
||||||
|
demand and may be reclaimed when idle (controlled by @code{#:min-size}
|
||||||
|
and the resource pool's idle management).
|
||||||
|
|
||||||
|
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
|
||||||
|
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
|
||||||
|
@code{#:use-default-io-waiters?}, @code{#:name},
|
||||||
|
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
|
||||||
|
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
|
||||||
|
plus:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:min-size
|
||||||
|
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
|
||||||
|
the pool is pre-filled and never shrinks).
|
||||||
|
|
||||||
|
@item #:scheduler
|
||||||
|
Fibers scheduler for the pool's internal resource pool fiber. Defaults
|
||||||
|
to the current scheduler.
|
||||||
|
|
||||||
|
@item #:default-max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a thread. Raises
|
||||||
|
@code{&thread-pool-timeout-error} when exceeded. Defaults to
|
||||||
|
@code{#f} (no limit).
|
||||||
|
@end table"
|
||||||
(define param
|
(define param
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
|
||||||
|
|
@ -444,8 +516,34 @@ arguments of the thread pool procedure."
|
||||||
channel
|
channel
|
||||||
destroy-thread-on-exception?
|
destroy-thread-on-exception?
|
||||||
(max-waiters 'default))
|
(max-waiters 'default))
|
||||||
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
"Run PROC in THREAD-POOL and return its values, blocking until
|
||||||
If already in the thread pool, call PROC immediately."
|
complete. If called from within a thread that already belongs to
|
||||||
|
THREAD-POOL, PROC is called directly in that thread.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:checkout-timeout
|
||||||
|
Seconds to wait for a free thread before raising
|
||||||
|
@code{&thread-pool-timeout-error}. Defaults to the pool's
|
||||||
|
@code{#:default-checkout-timeout}.
|
||||||
|
|
||||||
|
@item #:max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a thread (for
|
||||||
|
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
|
||||||
|
|
||||||
|
@item #:destroy-thread-on-exception?
|
||||||
|
When @code{#t}, destroy the thread after PROC raises an exception.
|
||||||
|
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
|
||||||
|
@code{#f}.
|
||||||
|
|
||||||
|
@item #:duration-logger
|
||||||
|
Called as @code{(duration-logger seconds)} after PROC completes
|
||||||
|
(whether or not it raised an exception).
|
||||||
|
|
||||||
|
@item #:channel
|
||||||
|
Override the channel used to communicate with the thread.
|
||||||
|
@end table"
|
||||||
(define (handle-proc fixed-size-thread-pool
|
(define (handle-proc fixed-size-thread-pool
|
||||||
reply-channel
|
reply-channel
|
||||||
start-time
|
start-time
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue