Compare commits
8 commits
35f4c16ab0
...
db9b549e59
| Author | SHA1 | Date | |
|---|---|---|---|
| db9b549e59 | |||
| 09cb805ee2 | |||
| d3d4964210 | |||
| 5b84273cbf | |||
| 30aa837cf4 | |||
| 79d5603416 | |||
| 8e29587ec1 | |||
| 8b489490e1 |
11 changed files with 391 additions and 24 deletions
15
README
15
README
|
|
@ -1,4 +1,15 @@
|
||||||
-*- mode: org -*-
|
-*- mode: org -*-
|
||||||
|
|
||||||
This Guile library provides useful patterns and functionality to use
|
* Guile Knots
|
||||||
Guile Fibers.
|
|
||||||
|
Guile Knots is a library providing higher-level patterns and building
|
||||||
|
blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]].
|
||||||
|
|
||||||
|
This includes:
|
||||||
|
|
||||||
|
- Parallel map/for-each with configurable concurrency limits
|
||||||
|
- Resource and thread pools
|
||||||
|
- Fiber-aware promises for lazy and eager parallel evaluation
|
||||||
|
- Timeouts for fibers and I/O ports
|
||||||
|
- A HTTP web server
|
||||||
|
- Non-blocking socket utilities
|
||||||
|
|
|
||||||
40
knots.scm
40
knots.scm
|
|
@ -29,6 +29,11 @@
|
||||||
spawn-fiber/knots))
|
spawn-fiber/knots))
|
||||||
|
|
||||||
(define (call-with-default-io-waiters thunk)
|
(define (call-with-default-io-waiters thunk)
|
||||||
|
"Run THUNK with Guile's default blocking I/O waiters active.
|
||||||
|
|
||||||
|
This is useful when restoring the default Guile I/O waiters from
|
||||||
|
within a context (like Fibers) where different I/O waiters are used,
|
||||||
|
for example when creating a new thread from a fiber."
|
||||||
(parameterize
|
(parameterize
|
||||||
((current-read-waiter (@@ (ice-9 suspendable-ports)
|
((current-read-waiter (@@ (ice-9 suspendable-ports)
|
||||||
default-read-waiter))
|
default-read-waiter))
|
||||||
|
|
@ -37,15 +42,33 @@
|
||||||
(thunk)))
|
(thunk)))
|
||||||
|
|
||||||
(define (wait-when-system-clock-behind)
|
(define (wait-when-system-clock-behind)
|
||||||
(let ((start-of-the-year-2000 946684800))
|
"Block until the system clock reads at least 2001-01-02.
|
||||||
|
|
||||||
|
Useful at startup in environments (virtual machines, embedded systems)
|
||||||
|
where the clock may start at or near the Unix epoch. Prints a warning
|
||||||
|
to the current error port every 20 seconds while waiting."
|
||||||
|
;; Jan 02 2001 02:00:00
|
||||||
|
(let ((start-of-the-year-2001 978400800))
|
||||||
(while (< (current-time)
|
(while (< (current-time)
|
||||||
start-of-the-year-2000)
|
start-of-the-year-2001)
|
||||||
(simple-format (current-error-port)
|
(simple-format (current-error-port)
|
||||||
"warning: system clock potentially behind, waiting\n")
|
"warning: system clock potentially behind, waiting\n")
|
||||||
(sleep 20))))
|
(sleep 20))))
|
||||||
|
|
||||||
;; Copied from (fibers web server)
|
;; Copied from (fibers web server)
|
||||||
(define (call-with-sigint thunk cvar)
|
(define (call-with-sigint thunk cvar)
|
||||||
|
"Run THUNK with a SIGINT handler that signals the Fibers condition
|
||||||
|
CVAR. Restores the previous handler when THUNK returns.
|
||||||
|
|
||||||
|
Typical usage is to pass a condition variable to this procedure and
|
||||||
|
wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
|
||||||
|
|
||||||
|
@example
|
||||||
|
(let ((quit-cvar (make-condition)))
|
||||||
|
(call-with-sigint
|
||||||
|
(lambda () (wait quit-cvar))
|
||||||
|
quit-cvar))
|
||||||
|
@end example"
|
||||||
(let ((handler #f))
|
(let ((handler #f))
|
||||||
(dynamic-wind
|
(dynamic-wind
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -96,6 +119,11 @@
|
||||||
(raise-exception exn)))))
|
(raise-exception exn)))))
|
||||||
|
|
||||||
(define* (display/knots obj #:optional (port (current-output-port)))
|
(define* (display/knots obj #:optional (port (current-output-port)))
|
||||||
|
"Write OBJ to PORT (default: current output port) as a UTF-8 byte
|
||||||
|
sequence via @code{put-bytevector}.
|
||||||
|
|
||||||
|
When used with ports without buffering, this should be safer than
|
||||||
|
display."
|
||||||
(put-bytevector
|
(put-bytevector
|
||||||
port
|
port
|
||||||
(string->utf8
|
(string->utf8
|
||||||
|
|
@ -104,6 +132,8 @@
|
||||||
(display obj port))))))
|
(display obj port))))))
|
||||||
|
|
||||||
(define (simple-format/knots port s . args)
|
(define (simple-format/knots port s . args)
|
||||||
|
"Like @code{simple-format} but should be safer when used with a port
|
||||||
|
without buffering."
|
||||||
(let ((str (apply simple-format #f s args)))
|
(let ((str (apply simple-format #f s args)))
|
||||||
(if (eq? #f port)
|
(if (eq? #f port)
|
||||||
str
|
str
|
||||||
|
|
@ -114,6 +144,8 @@
|
||||||
port)))))
|
port)))))
|
||||||
|
|
||||||
(define (format/knots port s . args)
|
(define (format/knots port s . args)
|
||||||
|
"Like @code{format} but should be safer when used with a port
|
||||||
|
without buffering."
|
||||||
(let ((str (apply format #f s args)))
|
(let ((str (apply format #f s args)))
|
||||||
(if (eq? #f port)
|
(if (eq? #f port)
|
||||||
str
|
str
|
||||||
|
|
@ -232,6 +264,10 @@
|
||||||
(display/knots error-string port)))
|
(display/knots error-string port)))
|
||||||
|
|
||||||
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
||||||
|
"Spawn a fiber to run THUNK, with knots exception handling.
|
||||||
|
|
||||||
|
Accepts the same optional SCHEDULER and @code{#:parallel?} arguments
|
||||||
|
as @code{spawn-fiber}."
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,16 @@
|
||||||
|
|
||||||
(define* (non-blocking-open-socket-for-uri uri
|
(define* (non-blocking-open-socket-for-uri uri
|
||||||
#:key (verify-certificate? #t))
|
#:key (verify-certificate? #t))
|
||||||
|
"Open a socket for URI and return it as a non-blocking port.
|
||||||
|
|
||||||
|
For HTTPS URIs the TLS handshake is completed while the socket is
|
||||||
|
still blocking (required because Guile's TLS wrapper does not support
|
||||||
|
non-blocking handshakes), then the underlying socket is made
|
||||||
|
non-blocking. For plain HTTP the socket is made non-blocking
|
||||||
|
immediately.
|
||||||
|
|
||||||
|
@code{#:verify-certificate?} controls TLS certificate verification
|
||||||
|
and defaults to @code{#t}."
|
||||||
(define tls-wrap
|
(define tls-wrap
|
||||||
(@@ (web client) tls-wrap))
|
(@@ (web client) tls-wrap))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -267,6 +267,16 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
#:key (parallelism 1)
|
#:key (parallelism 1)
|
||||||
(input-channel (make-channel))
|
(input-channel (make-channel))
|
||||||
(process-channel input-channel))
|
(process-channel input-channel))
|
||||||
|
"Convert PROC into a procedure backed by @code{#:parallelism}
|
||||||
|
(default: 1) background fibers. Returns a wrapper that sends its
|
||||||
|
arguments to one of the fibers and blocks until the result is
|
||||||
|
returned.
|
||||||
|
|
||||||
|
@code{#:input-channel} is the channel that callers write requests to;
|
||||||
|
defaults to a fresh channel. @code{#:process-channel} is the channel
|
||||||
|
the fibers read from; defaults to @code{#:input-channel}. Setting
|
||||||
|
them differently allows external parties to bypass the wrapper and
|
||||||
|
write directly to @code{process-channel}."
|
||||||
(for-each
|
(for-each
|
||||||
(lambda _
|
(lambda _
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
|
|
@ -318,6 +328,12 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
(resource-pool parallelism-limiter-resource-pool))
|
(resource-pool parallelism-limiter-resource-pool))
|
||||||
|
|
||||||
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||||
|
"Return a parallelism limiter that allows at most LIMIT concurrent
|
||||||
|
fibers to execute within @code{with-parallelism-limiter} at the same
|
||||||
|
time. Further fibers block until a slot becomes free.
|
||||||
|
|
||||||
|
@code{#:name} is a string used in log messages. Defaults to
|
||||||
|
@code{\"unnamed\"}."
|
||||||
(make-parallelism-limiter-record
|
(make-parallelism-limiter-record
|
||||||
(make-fixed-size-resource-pool
|
(make-fixed-size-resource-pool
|
||||||
(iota limit)
|
(iota limit)
|
||||||
|
|
@ -329,6 +345,9 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
parallelism-limiter)))
|
parallelism-limiter)))
|
||||||
|
|
||||||
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||||
|
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
|
||||||
|
slot, and return the values from THUNK. Blocks if no slot is
|
||||||
|
currently available."
|
||||||
(call-with-resource-from-pool
|
(call-with-resource-from-pool
|
||||||
(parallelism-limiter-resource-pool parallelism-limiter)
|
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||||
(lambda _
|
(lambda _
|
||||||
|
|
|
||||||
|
|
@ -41,12 +41,21 @@
|
||||||
(evaluated-condition fibers-promise-evaluated-condition))
|
(evaluated-condition fibers-promise-evaluated-condition))
|
||||||
|
|
||||||
(define (fibers-delay thunk)
|
(define (fibers-delay thunk)
|
||||||
|
"Return a new fiber-aware promise that will evaluate THUNK when
|
||||||
|
first forced. THUNK is not called until @code{fibers-force} is
|
||||||
|
called on the promise."
|
||||||
(make-fibers-promise
|
(make-fibers-promise
|
||||||
thunk
|
thunk
|
||||||
(make-atomic-box #f)
|
(make-atomic-box #f)
|
||||||
(make-condition)))
|
(make-condition)))
|
||||||
|
|
||||||
(define (fibers-force fp)
|
(define (fibers-force fp)
|
||||||
|
"Force the fiber-aware promise FP, returning its values.
|
||||||
|
|
||||||
|
The first call evaluates the promise's thunk. Concurrent callers
|
||||||
|
block on a condition variable until evaluation finishes, then receive
|
||||||
|
the same result. If the thunk raises an exception, the exception is
|
||||||
|
stored and re-raised for all callers."
|
||||||
(unless (fibers-promise? fp)
|
(unless (fibers-promise? fp)
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-exception
|
(make-exception
|
||||||
|
|
@ -108,6 +117,9 @@
|
||||||
|
|
||||||
|
|
||||||
(define (fibers-delay/eager thunk)
|
(define (fibers-delay/eager thunk)
|
||||||
|
"Return a new fiber-aware promise and immediately begin evaluating
|
||||||
|
THUNK in a new fiber. Exceptions during eager evaluation are silently
|
||||||
|
discarded; they will be re-raised when @code{fibers-force} is called."
|
||||||
(let ((promise (fibers-delay thunk)))
|
(let ((promise (fibers-delay thunk)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -121,10 +133,15 @@
|
||||||
promise))
|
promise))
|
||||||
|
|
||||||
(define (fibers-promise-reset fp)
|
(define (fibers-promise-reset fp)
|
||||||
|
"Reset the fiber-aware promise FP so that the next call to
|
||||||
|
@code{fibers-force} re-evaluates its thunk."
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
#f))
|
#f))
|
||||||
|
|
||||||
(define (fibers-promise-result-available? fp)
|
(define (fibers-promise-result-available? fp)
|
||||||
|
"Return @code{#t} if the fiber-aware promise FP has been evaluated
|
||||||
|
(successfully or with an exception) and @code{#f} if evaluation has
|
||||||
|
not yet started or is still in progress."
|
||||||
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
||||||
(not (or (eq? val #f)
|
(not (or (eq? val #f)
|
||||||
(eq? val 'started)))))
|
(eq? val 'started)))))
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,12 @@
|
||||||
#:export (spawn-queueing-fiber))
|
#:export (spawn-queueing-fiber))
|
||||||
|
|
||||||
(define (spawn-queueing-fiber dest-channel)
|
(define (spawn-queueing-fiber dest-channel)
|
||||||
|
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
|
||||||
|
Returns a new input channel.
|
||||||
|
|
||||||
|
Multiple producers can put items on the returned channel concurrently.
|
||||||
|
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
|
||||||
|
a time, preserving arrival order."
|
||||||
(define queue (make-q))
|
(define queue (make-q))
|
||||||
|
|
||||||
(let ((queue-channel (make-channel)))
|
(let ((queue-channel (make-channel)))
|
||||||
|
|
|
||||||
|
|
@ -154,6 +154,33 @@
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
|
"Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or
|
||||||
|
vector of pre-existing resource values.
|
||||||
|
|
||||||
|
Use @code{with-resource-from-pool} or
|
||||||
|
@code{call-with-resource-from-pool} to borrow a resource and return it
|
||||||
|
automatically when done.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:name
|
||||||
|
A optional string used in log messages.
|
||||||
|
Defaults to @code{\"unnamed\"}.
|
||||||
|
|
||||||
|
@item #:default-checkout-timeout
|
||||||
|
Default checkout timeout when requesting a resource from the pool,
|
||||||
|
unset by default.
|
||||||
|
|
||||||
|
@item #:default-max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a resource. When
|
||||||
|
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
||||||
|
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
||||||
|
|
||||||
|
@item #:scheduler
|
||||||
|
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
||||||
|
to the current scheduler.
|
||||||
|
@end table"
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
(define destroy-condition
|
(define destroy-condition
|
||||||
(make-condition))
|
(make-condition))
|
||||||
|
|
@ -513,6 +540,59 @@
|
||||||
(add-resources-parallelism 1)
|
(add-resources-parallelism 1)
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
|
"Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk
|
||||||
|
called to create each new resource value. MAX-SIZE is the maximum
|
||||||
|
number of resources the pool will hold simultaneously.
|
||||||
|
|
||||||
|
Resources are created on demand when a checkout is requested and the
|
||||||
|
pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or
|
||||||
|
@code{call-with-resource-from-pool} to request a resource and return
|
||||||
|
it automatically when done.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:min-size
|
||||||
|
Minimum number of resources to keep alive even when idle. Defaults to
|
||||||
|
@code{0}.
|
||||||
|
|
||||||
|
@item #:idle-seconds
|
||||||
|
Seconds a resource may remain unused before being destroyed, provided
|
||||||
|
the pool is above @code{#:min-size}. Defaults to @code{#f} (never
|
||||||
|
expire idle resources).
|
||||||
|
|
||||||
|
@item #:lifetime
|
||||||
|
Maximum number of checkouts a single resource will serve before being
|
||||||
|
destroyed and replaced by a fresh one. Defaults to @code{#f} (no
|
||||||
|
limit).
|
||||||
|
|
||||||
|
@item #:destructor
|
||||||
|
A procedure called as @code{(destructor resource)} when a resource is
|
||||||
|
removed from the pool. Defaults to @code{#f}.
|
||||||
|
|
||||||
|
@item #:add-resources-parallelism
|
||||||
|
Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the
|
||||||
|
pool needs to grow. Allowing resources to be created in parallel can
|
||||||
|
result in more resources being created than can fit inside the pool,
|
||||||
|
if this happens, the surplus resources are destroyed. Defaults to
|
||||||
|
@code{1}.
|
||||||
|
|
||||||
|
@item #:name
|
||||||
|
A string used in log messages. Defaults to @code{\"unnamed\"}.
|
||||||
|
|
||||||
|
@item #:default-checkout-timeout
|
||||||
|
Default checkout timeout when requesting a resource from the pool,
|
||||||
|
unset by default.
|
||||||
|
|
||||||
|
@item #:default-max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a resource. When
|
||||||
|
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
||||||
|
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
||||||
|
|
||||||
|
@item #:scheduler
|
||||||
|
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
||||||
|
to the current scheduler.
|
||||||
|
@end table"
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
(define destroy-condition
|
(define destroy-condition
|
||||||
(make-condition))
|
(make-condition))
|
||||||
|
|
@ -1172,6 +1252,10 @@
|
||||||
pool)
|
pool)
|
||||||
|
|
||||||
(define (destroy-resource-pool pool)
|
(define (destroy-resource-pool pool)
|
||||||
|
"Destroy POOL, preventing any new checkouts. Blocks until all
|
||||||
|
checked-out resources have been returned, running the pool's
|
||||||
|
@code{#:destructor} on each. Any fibers waiting for a resource
|
||||||
|
receive @code{&resource-pool-destroyed}."
|
||||||
(perform-operation
|
(perform-operation
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
|
|
@ -1388,6 +1472,23 @@ available. Return the resource once PROC has returned."
|
||||||
(lambda (resource) exp ...)))
|
(lambda (resource) exp ...)))
|
||||||
|
|
||||||
(define* (resource-pool-stats pool #:key (timeout 5))
|
(define* (resource-pool-stats pool #:key (timeout 5))
|
||||||
|
"Return an alist of statistics for POOL with the following keys:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item resources
|
||||||
|
Total number of resources currently held by the pool.
|
||||||
|
@item available
|
||||||
|
Number of resources not currently checked out.
|
||||||
|
@item waiters
|
||||||
|
Number of fibers currently queued waiting for a resource.
|
||||||
|
@item checkout-failure-count
|
||||||
|
Cumulative number of checkouts where an exception was raised inside
|
||||||
|
the proc.
|
||||||
|
@end table
|
||||||
|
|
||||||
|
Blocks waiting for the pool fiber to respond. @code{#:timeout} is
|
||||||
|
the number of seconds to wait; defaults to @code{5}. Raises
|
||||||
|
@code{&resource-pool-timeout} if the pool does not respond in time."
|
||||||
(define channel
|
(define channel
|
||||||
(resource-pool-channel pool))
|
(resource-pool-channel pool))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,15 @@
|
||||||
rest)))))
|
rest)))))
|
||||||
|
|
||||||
(define* (fibers-sort! items less #:key parallelism)
|
(define* (fibers-sort! items less #:key parallelism)
|
||||||
|
"Sort ITEMS destructively using LESS as the comparison procedure,
|
||||||
|
using a parallel merge sort. Returns the sorted list.
|
||||||
|
|
||||||
|
Splits ITEMS into chunks, sorts each in an eager fiber-promise in
|
||||||
|
parallel, then merges pairs of sorted chunks in parallel until one
|
||||||
|
sorted list remains.
|
||||||
|
|
||||||
|
@code{#:parallelism} sets the number of initial chunks. Defaults to
|
||||||
|
the current fibers parallelism."
|
||||||
(define requested-chunk-count
|
(define requested-chunk-count
|
||||||
(or parallelism
|
(or parallelism
|
||||||
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
||||||
|
|
|
||||||
|
|
@ -163,12 +163,13 @@ from there, or #f if that would be an empty string."
|
||||||
|
|
||||||
(define-record-type <fixed-size-thread-pool>
|
(define-record-type <fixed-size-thread-pool>
|
||||||
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
||||||
default-checkout-timeout)
|
default-checkout-timeout threads)
|
||||||
fixed-size-thread-pool?
|
fixed-size-thread-pool?
|
||||||
(channel fixed-size-thread-pool-channel)
|
(channel fixed-size-thread-pool-channel)
|
||||||
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
||||||
(current-procedures fixed-size-thread-pool-current-procedures)
|
(current-procedures fixed-size-thread-pool-current-procedures)
|
||||||
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
|
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
|
||||||
|
(threads fixed-size-thread-pool-threads))
|
||||||
|
|
||||||
;; Since both thread pool records have this field, use a procedure
|
;; Since both thread pool records have this field, use a procedure
|
||||||
;; than handles the appropriate accessor
|
;; than handles the appropriate accessor
|
||||||
|
|
@ -211,6 +212,52 @@ from there, or #f if that would be an empty string."
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout)
|
default-checkout-timeout)
|
||||||
|
"Create a pool of SIZE threads started immediately. Use
|
||||||
|
@code{call-with-thread} to run a procedure in one of the threads.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:thread-initializer
|
||||||
|
A thunk called once when each thread starts. Its return value is
|
||||||
|
passed as extra arguments to every procedure run in that thread.
|
||||||
|
Defaults to @code{#f} (no extra arguments).
|
||||||
|
|
||||||
|
@item #:thread-destructor
|
||||||
|
A procedure called with the value returned by @code{#:thread-initializer}
|
||||||
|
when a thread exits. Defaults to @code{#f}.
|
||||||
|
|
||||||
|
@item #:thread-lifetime
|
||||||
|
Maximum number of procedures a thread will run before restarting (and
|
||||||
|
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
|
||||||
|
limit).
|
||||||
|
|
||||||
|
@item #:expire-on-exception?
|
||||||
|
When @code{#t}, replace a thread after any unhandled exception.
|
||||||
|
Defaults to @code{#f}.
|
||||||
|
|
||||||
|
@item #:use-default-io-waiters?
|
||||||
|
When @code{#t} (the default), each thread uses blocking I/O waiters so
|
||||||
|
that port reads and writes block the thread rather than trying to
|
||||||
|
suspend a fiber.
|
||||||
|
|
||||||
|
@item #:name
|
||||||
|
String used in thread names and log messages. Defaults to
|
||||||
|
@code{\"unnamed\"}.
|
||||||
|
|
||||||
|
@item #:default-checkout-timeout
|
||||||
|
Seconds to wait for a free thread slot before raising
|
||||||
|
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
|
||||||
|
forever).
|
||||||
|
|
||||||
|
@item #:delay-logger
|
||||||
|
Called as @code{(delay-logger seconds proc)} with the time spent
|
||||||
|
waiting for a thread to become available.
|
||||||
|
|
||||||
|
@item #:duration-logger
|
||||||
|
Called as @code{(duration-logger seconds proc)} after each procedure
|
||||||
|
completes.
|
||||||
|
@end table"
|
||||||
(define channel
|
(define channel
|
||||||
(make-channel))
|
(make-channel))
|
||||||
|
|
||||||
|
|
@ -380,19 +427,20 @@ from there, or #f if that would be an empty string."
|
||||||
(initializer/safe)
|
(initializer/safe)
|
||||||
'()))))))))
|
'()))))))))
|
||||||
|
|
||||||
(for-each
|
(define threads
|
||||||
(lambda (i)
|
(map (lambda (i)
|
||||||
(if use-default-io-waiters?
|
(if use-default-io-waiters?
|
||||||
(call-with-default-io-waiters
|
(call-with-default-io-waiters
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-thread i channel)))
|
(start-thread i channel)))
|
||||||
(start-thread i channel)))
|
(start-thread i channel)))
|
||||||
(iota size))
|
(iota size)))
|
||||||
|
|
||||||
(fixed-size-thread-pool channel
|
(fixed-size-thread-pool channel
|
||||||
param
|
param
|
||||||
thread-proc-vector
|
thread-proc-vector
|
||||||
default-checkout-timeout))
|
default-checkout-timeout
|
||||||
|
threads))
|
||||||
|
|
||||||
(define* (make-thread-pool max-size
|
(define* (make-thread-pool max-size
|
||||||
#:key
|
#:key
|
||||||
|
|
@ -408,8 +456,34 @@ from there, or #f if that would be an empty string."
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
|
||||||
arguments of the thread pool procedure."
|
@code{call-with-thread} to run a procedure in one of the threads.
|
||||||
|
|
||||||
|
Unlike @code{make-fixed-size-thread-pool}, threads are created on
|
||||||
|
demand and may be reclaimed when idle (controlled by @code{#:min-size}
|
||||||
|
and the resource pool's idle management).
|
||||||
|
|
||||||
|
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
|
||||||
|
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
|
||||||
|
@code{#:use-default-io-waiters?}, @code{#:name},
|
||||||
|
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
|
||||||
|
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
|
||||||
|
plus:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:min-size
|
||||||
|
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
|
||||||
|
the pool is pre-filled and never shrinks).
|
||||||
|
|
||||||
|
@item #:scheduler
|
||||||
|
Fibers scheduler for the pool's internal resource pool fiber. Defaults
|
||||||
|
to the current scheduler.
|
||||||
|
|
||||||
|
@item #:default-max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a thread. Raises
|
||||||
|
@code{&thread-pool-timeout-error} when exceeded. Defaults to
|
||||||
|
@code{#f} (no limit).
|
||||||
|
@end table"
|
||||||
(define param
|
(define param
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
|
||||||
|
|
@ -444,8 +518,34 @@ arguments of the thread pool procedure."
|
||||||
channel
|
channel
|
||||||
destroy-thread-on-exception?
|
destroy-thread-on-exception?
|
||||||
(max-waiters 'default))
|
(max-waiters 'default))
|
||||||
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
"Run PROC in THREAD-POOL and return its values, blocking until
|
||||||
If already in the thread pool, call PROC immediately."
|
complete. If called from within a thread that already belongs to
|
||||||
|
THREAD-POOL, PROC is called directly in that thread.
|
||||||
|
|
||||||
|
Optional keyword arguments:
|
||||||
|
|
||||||
|
@table @code
|
||||||
|
@item #:checkout-timeout
|
||||||
|
Seconds to wait for a free thread before raising
|
||||||
|
@code{&thread-pool-timeout-error}. Defaults to the pool's
|
||||||
|
@code{#:default-checkout-timeout}.
|
||||||
|
|
||||||
|
@item #:max-waiters
|
||||||
|
Maximum number of fibers that may queue waiting for a thread (for
|
||||||
|
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
|
||||||
|
|
||||||
|
@item #:destroy-thread-on-exception?
|
||||||
|
When @code{#t}, destroy the thread after PROC raises an exception.
|
||||||
|
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
|
||||||
|
@code{#f}.
|
||||||
|
|
||||||
|
@item #:duration-logger
|
||||||
|
Called as @code{(duration-logger seconds)} after PROC completes
|
||||||
|
(whether or not it raised an exception).
|
||||||
|
|
||||||
|
@item #:channel
|
||||||
|
Override the channel used to communicate with the thread.
|
||||||
|
@end table"
|
||||||
(define (handle-proc fixed-size-thread-pool
|
(define (handle-proc fixed-size-thread-pool
|
||||||
reply-channel
|
reply-channel
|
||||||
start-time
|
start-time
|
||||||
|
|
@ -529,9 +629,13 @@ If already in the thread pool, call PROC immediately."
|
||||||
destroy-thread-on-exception?))))))))
|
destroy-thread-on-exception?))))))))
|
||||||
|
|
||||||
(define (destroy-thread-pool pool)
|
(define (destroy-thread-pool pool)
|
||||||
|
"Destroy POOL, stopping all of its threads and calling the destructor
|
||||||
|
if specified. This procedure will block until the destruction is
|
||||||
|
complete."
|
||||||
(if (fixed-size-thread-pool? pool)
|
(if (fixed-size-thread-pool? pool)
|
||||||
(put-message
|
(let ((channel (fixed-size-thread-pool-channel pool))
|
||||||
(fixed-size-thread-pool-channel pool)
|
(threads (fixed-size-thread-pool-threads pool)))
|
||||||
'destroy)
|
(for-each (lambda _ (put-message channel 'destroy)) threads)
|
||||||
|
(for-each join-thread threads))
|
||||||
(destroy-resource-pool
|
(destroy-resource-pool
|
||||||
(thread-pool-resource-pool pool))))
|
(thread-pool-resource-pool pool))))
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,16 @@
|
||||||
|
|
||||||
with-port-timeouts))
|
with-port-timeouts))
|
||||||
|
|
||||||
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
|
(define* (with-fibers-timeout thunk #:key
|
||||||
|
timeout
|
||||||
|
(on-timeout
|
||||||
|
(const *unspecified*)))
|
||||||
|
"Run THUNK in a new fiber and return its values, waiting TIMEOUT
|
||||||
|
seconds for it to finish. If THUNK does not complete within TIMEOUT
|
||||||
|
seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout
|
||||||
|
returns the result of ON-TIMEOUT instead.
|
||||||
|
|
||||||
|
If THUNK raises an exception it is re-raised in the calling fiber."
|
||||||
(let ((channel (make-channel)))
|
(let ((channel (make-channel)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -110,7 +119,7 @@
|
||||||
(exception-predicate &port-write-timeout-error))
|
(exception-predicate &port-write-timeout-error))
|
||||||
|
|
||||||
(define (readable? port)
|
(define (readable? port)
|
||||||
"Test if PORT is writable."
|
"Test if PORT is readable."
|
||||||
(= 1 (port-poll port "r" 0)))
|
(= 1 (port-poll port "r" 0)))
|
||||||
|
|
||||||
(define (writable? port)
|
(define (writable? port)
|
||||||
|
|
@ -151,6 +160,21 @@
|
||||||
#:key timeout
|
#:key timeout
|
||||||
(read-timeout timeout)
|
(read-timeout timeout)
|
||||||
(write-timeout timeout))
|
(write-timeout timeout))
|
||||||
|
"Run THUNK with per-operation I/O timeouts on all ports. If any
|
||||||
|
read or write blocks for longer than the given number of seconds, an
|
||||||
|
exception is raised.
|
||||||
|
|
||||||
|
@code{#:timeout} sets both read and write timeouts.
|
||||||
|
@code{#:read-timeout} and @code{#:write-timeout} specify the timeout
|
||||||
|
for reads and writes respectively. All three default to @code{#f} (no
|
||||||
|
timeout).
|
||||||
|
|
||||||
|
This procedure works both with fibers, and without fibers by using the
|
||||||
|
poll system call with a timeout.
|
||||||
|
|
||||||
|
On read timeout, raises @code{&port-read-timeout-error}. On write
|
||||||
|
timeout, raises @code{&port-write-timeout-error}. Both carry the
|
||||||
|
@code{thunk} and @code{port} fields from @code{&port-timeout-error}."
|
||||||
(define (no-fibers-wait thunk port mode timeout)
|
(define (no-fibers-wait thunk port mode timeout)
|
||||||
(define poll-timeout-ms 200)
|
(define poll-timeout-ms 200)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
(use-modules (tests)
|
(use-modules (tests)
|
||||||
(ice-9 atomic)
|
(ice-9 atomic)
|
||||||
|
(ice-9 threads)
|
||||||
(srfi srfi-71)
|
(srfi srfi-71)
|
||||||
(fibers)
|
(fibers)
|
||||||
(unit-test)
|
(unit-test)
|
||||||
|
|
@ -142,4 +143,33 @@
|
||||||
ref-and-decrement))
|
ref-and-decrement))
|
||||||
(error)))
|
(error)))
|
||||||
|
|
||||||
|
;; Test that the destructor is called when a size 1 fixed-size thread
|
||||||
|
;; pool is destroyed, and that destroy-thread-pool blocks until it has
|
||||||
|
;; completed.
|
||||||
|
(let* ((destructor-called? #f)
|
||||||
|
(thread-pool
|
||||||
|
(make-fixed-size-thread-pool
|
||||||
|
1
|
||||||
|
#:thread-destructor
|
||||||
|
(lambda ()
|
||||||
|
(set! destructor-called? #t)))))
|
||||||
|
(destroy-thread-pool thread-pool)
|
||||||
|
(assert-equal #t destructor-called?))
|
||||||
|
|
||||||
|
;; Test that the destructor is called for every thread when a
|
||||||
|
;; multi-thread fixed-size thread pool is destroyed, and that
|
||||||
|
;; destroy-thread-pool blocks until all destructors have completed.
|
||||||
|
(let* ((destructor-count 0)
|
||||||
|
(mutex (make-mutex))
|
||||||
|
(pool-size 3)
|
||||||
|
(thread-pool
|
||||||
|
(make-fixed-size-thread-pool
|
||||||
|
pool-size
|
||||||
|
#:thread-destructor
|
||||||
|
(lambda ()
|
||||||
|
(with-mutex mutex
|
||||||
|
(set! destructor-count (+ destructor-count 1)))))))
|
||||||
|
(destroy-thread-pool thread-pool)
|
||||||
|
(assert-equal pool-size destructor-count))
|
||||||
|
|
||||||
(display "thread-pool test finished successfully\n")
|
(display "thread-pool test finished successfully\n")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue