Compare commits
No commits in common. "db9b549e59726d287613da92d3d1afd3cdffb391" and "35f4c16ab0b3846cd10f5209b39a6a3f5bf8a3f1" have entirely different histories.
db9b549e59
...
35f4c16ab0
11 changed files with 24 additions and 391 deletions
15
README
15
README
|
|
@ -1,15 +1,4 @@
|
||||||
-*- mode: org -*-
|
-*- mode: org -*-
|
||||||
|
|
||||||
* Guile Knots
|
This Guile library provides useful patterns and functionality to use
|
||||||
|
Guile Fibers.
|
||||||
Guile Knots is a library providing higher-level patterns and building
|
|
||||||
blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]].
|
|
||||||
|
|
||||||
This includes:
|
|
||||||
|
|
||||||
- Parallel map/for-each with configurable concurrency limits
|
|
||||||
- Resource and thread pools
|
|
||||||
- Fiber-aware promises for lazy and eager parallel evaluation
|
|
||||||
- Timeouts for fibers and I/O ports
|
|
||||||
- A HTTP web server
|
|
||||||
- Non-blocking socket utilities
|
|
||||||
|
|
|
||||||
40
knots.scm
40
knots.scm
|
|
@ -29,11 +29,6 @@
|
||||||
spawn-fiber/knots))
|
spawn-fiber/knots))
|
||||||
|
|
||||||
(define (call-with-default-io-waiters thunk)
|
(define (call-with-default-io-waiters thunk)
|
||||||
"Run THUNK with Guile's default blocking I/O waiters active.
|
|
||||||
|
|
||||||
This is useful when restoring the default Guile I/O waiters from
|
|
||||||
within a context (like Fibers) where different I/O waiters are used,
|
|
||||||
for example when creating a new thread from a fiber."
|
|
||||||
(parameterize
|
(parameterize
|
||||||
((current-read-waiter (@@ (ice-9 suspendable-ports)
|
((current-read-waiter (@@ (ice-9 suspendable-ports)
|
||||||
default-read-waiter))
|
default-read-waiter))
|
||||||
|
|
@ -42,33 +37,15 @@ for example when creating a new thread from a fiber."
|
||||||
(thunk)))
|
(thunk)))
|
||||||
|
|
||||||
(define (wait-when-system-clock-behind)
|
(define (wait-when-system-clock-behind)
|
||||||
"Block until the system clock reads at least 2001-01-02.
|
(let ((start-of-the-year-2000 946684800))
|
||||||
|
|
||||||
Useful at startup in environments (virtual machines, embedded systems)
|
|
||||||
where the clock may start at or near the Unix epoch. Prints a warning
|
|
||||||
to the current error port every 20 seconds while waiting."
|
|
||||||
;; Jan 02 2001 02:00:00
|
|
||||||
(let ((start-of-the-year-2001 978400800))
|
|
||||||
(while (< (current-time)
|
(while (< (current-time)
|
||||||
start-of-the-year-2001)
|
start-of-the-year-2000)
|
||||||
(simple-format (current-error-port)
|
(simple-format (current-error-port)
|
||||||
"warning: system clock potentially behind, waiting\n")
|
"warning: system clock potentially behind, waiting\n")
|
||||||
(sleep 20))))
|
(sleep 20))))
|
||||||
|
|
||||||
;; Copied from (fibers web server)
|
;; Copied from (fibers web server)
|
||||||
(define (call-with-sigint thunk cvar)
|
(define (call-with-sigint thunk cvar)
|
||||||
"Run THUNK with a SIGINT handler that signals the Fibers condition
|
|
||||||
CVAR. Restores the previous handler when THUNK returns.
|
|
||||||
|
|
||||||
Typical usage is to pass a condition variable to this procedure and
|
|
||||||
wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
|
|
||||||
|
|
||||||
@example
|
|
||||||
(let ((quit-cvar (make-condition)))
|
|
||||||
(call-with-sigint
|
|
||||||
(lambda () (wait quit-cvar))
|
|
||||||
quit-cvar))
|
|
||||||
@end example"
|
|
||||||
(let ((handler #f))
|
(let ((handler #f))
|
||||||
(dynamic-wind
|
(dynamic-wind
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -119,11 +96,6 @@ wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
|
||||||
(raise-exception exn)))))
|
(raise-exception exn)))))
|
||||||
|
|
||||||
(define* (display/knots obj #:optional (port (current-output-port)))
|
(define* (display/knots obj #:optional (port (current-output-port)))
|
||||||
"Write OBJ to PORT (default: current output port) as a UTF-8 byte
|
|
||||||
sequence via @code{put-bytevector}.
|
|
||||||
|
|
||||||
When used with ports without buffering, this should be safer than
|
|
||||||
display."
|
|
||||||
(put-bytevector
|
(put-bytevector
|
||||||
port
|
port
|
||||||
(string->utf8
|
(string->utf8
|
||||||
|
|
@ -132,8 +104,6 @@ display."
|
||||||
(display obj port))))))
|
(display obj port))))))
|
||||||
|
|
||||||
(define (simple-format/knots port s . args)
|
(define (simple-format/knots port s . args)
|
||||||
"Like @code{simple-format} but should be safer when used with a port
|
|
||||||
without buffering."
|
|
||||||
(let ((str (apply simple-format #f s args)))
|
(let ((str (apply simple-format #f s args)))
|
||||||
(if (eq? #f port)
|
(if (eq? #f port)
|
||||||
str
|
str
|
||||||
|
|
@ -144,8 +114,6 @@ without buffering."
|
||||||
port)))))
|
port)))))
|
||||||
|
|
||||||
(define (format/knots port s . args)
|
(define (format/knots port s . args)
|
||||||
"Like @code{format} but should be safer when used with a port
|
|
||||||
without buffering."
|
|
||||||
(let ((str (apply format #f s args)))
|
(let ((str (apply format #f s args)))
|
||||||
(if (eq? #f port)
|
(if (eq? #f port)
|
||||||
str
|
str
|
||||||
|
|
@ -264,10 +232,6 @@ without buffering."
|
||||||
(display/knots error-string port)))
|
(display/knots error-string port)))
|
||||||
|
|
||||||
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
||||||
"Spawn a fiber to run THUNK, with knots exception handling.
|
|
||||||
|
|
||||||
Accepts the same optional SCHEDULER and @code{#:parallel?} arguments
|
|
||||||
as @code{spawn-fiber}."
|
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
|
|
|
||||||
|
|
@ -32,16 +32,6 @@
|
||||||
|
|
||||||
(define* (non-blocking-open-socket-for-uri uri
|
(define* (non-blocking-open-socket-for-uri uri
|
||||||
#:key (verify-certificate? #t))
|
#:key (verify-certificate? #t))
|
||||||
"Open a socket for URI and return it as a non-blocking port.
|
|
||||||
|
|
||||||
For HTTPS URIs the TLS handshake is completed while the socket is
|
|
||||||
still blocking (required because Guile's TLS wrapper does not support
|
|
||||||
non-blocking handshakes), then the underlying socket is made
|
|
||||||
non-blocking. For plain HTTP the socket is made non-blocking
|
|
||||||
immediately.
|
|
||||||
|
|
||||||
@code{#:verify-certificate?} controls TLS certificate verification
|
|
||||||
and defaults to @code{#t}."
|
|
||||||
(define tls-wrap
|
(define tls-wrap
|
||||||
(@@ (web client) tls-wrap))
|
(@@ (web client) tls-wrap))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -267,16 +267,6 @@ invocation of PROC finishes. REPORT is passed the results for each
|
||||||
#:key (parallelism 1)
|
#:key (parallelism 1)
|
||||||
(input-channel (make-channel))
|
(input-channel (make-channel))
|
||||||
(process-channel input-channel))
|
(process-channel input-channel))
|
||||||
"Convert PROC into a procedure backed by @code{#:parallelism}
|
|
||||||
(default: 1) background fibers. Returns a wrapper that sends its
|
|
||||||
arguments to one of the fibers and blocks until the result is
|
|
||||||
returned.
|
|
||||||
|
|
||||||
@code{#:input-channel} is the channel that callers write requests to;
|
|
||||||
defaults to a fresh channel. @code{#:process-channel} is the channel
|
|
||||||
the fibers read from; defaults to @code{#:input-channel}. Setting
|
|
||||||
them differently allows external parties to bypass the wrapper and
|
|
||||||
write directly to @code{process-channel}."
|
|
||||||
(for-each
|
(for-each
|
||||||
(lambda _
|
(lambda _
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
|
|
@ -328,12 +318,6 @@ write directly to @code{process-channel}."
|
||||||
(resource-pool parallelism-limiter-resource-pool))
|
(resource-pool parallelism-limiter-resource-pool))
|
||||||
|
|
||||||
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||||
"Return a parallelism limiter that allows at most LIMIT concurrent
|
|
||||||
fibers to execute within @code{with-parallelism-limiter} at the same
|
|
||||||
time. Further fibers block until a slot becomes free.
|
|
||||||
|
|
||||||
@code{#:name} is a string used in log messages. Defaults to
|
|
||||||
@code{\"unnamed\"}."
|
|
||||||
(make-parallelism-limiter-record
|
(make-parallelism-limiter-record
|
||||||
(make-fixed-size-resource-pool
|
(make-fixed-size-resource-pool
|
||||||
(iota limit)
|
(iota limit)
|
||||||
|
|
@ -345,9 +329,6 @@ time. Further fibers block until a slot becomes free.
|
||||||
parallelism-limiter)))
|
parallelism-limiter)))
|
||||||
|
|
||||||
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||||
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
|
|
||||||
slot, and return the values from THUNK. Blocks if no slot is
|
|
||||||
currently available."
|
|
||||||
(call-with-resource-from-pool
|
(call-with-resource-from-pool
|
||||||
(parallelism-limiter-resource-pool parallelism-limiter)
|
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||||
(lambda _
|
(lambda _
|
||||||
|
|
|
||||||
|
|
@ -41,21 +41,12 @@
|
||||||
(evaluated-condition fibers-promise-evaluated-condition))
|
(evaluated-condition fibers-promise-evaluated-condition))
|
||||||
|
|
||||||
(define (fibers-delay thunk)
|
(define (fibers-delay thunk)
|
||||||
"Return a new fiber-aware promise that will evaluate THUNK when
|
|
||||||
first forced. THUNK is not called until @code{fibers-force} is
|
|
||||||
called on the promise."
|
|
||||||
(make-fibers-promise
|
(make-fibers-promise
|
||||||
thunk
|
thunk
|
||||||
(make-atomic-box #f)
|
(make-atomic-box #f)
|
||||||
(make-condition)))
|
(make-condition)))
|
||||||
|
|
||||||
(define (fibers-force fp)
|
(define (fibers-force fp)
|
||||||
"Force the fiber-aware promise FP, returning its values.
|
|
||||||
|
|
||||||
The first call evaluates the promise's thunk. Concurrent callers
|
|
||||||
block on a condition variable until evaluation finishes, then receive
|
|
||||||
the same result. If the thunk raises an exception, the exception is
|
|
||||||
stored and re-raised for all callers."
|
|
||||||
(unless (fibers-promise? fp)
|
(unless (fibers-promise? fp)
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-exception
|
(make-exception
|
||||||
|
|
@ -117,9 +108,6 @@ stored and re-raised for all callers."
|
||||||
|
|
||||||
|
|
||||||
(define (fibers-delay/eager thunk)
|
(define (fibers-delay/eager thunk)
|
||||||
"Return a new fiber-aware promise and immediately begin evaluating
|
|
||||||
THUNK in a new fiber. Exceptions during eager evaluation are silently
|
|
||||||
discarded; they will be re-raised when @code{fibers-force} is called."
|
|
||||||
(let ((promise (fibers-delay thunk)))
|
(let ((promise (fibers-delay thunk)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -133,15 +121,10 @@ discarded; they will be re-raised when @code{fibers-force} is called."
|
||||||
promise))
|
promise))
|
||||||
|
|
||||||
(define (fibers-promise-reset fp)
|
(define (fibers-promise-reset fp)
|
||||||
"Reset the fiber-aware promise FP so that the next call to
|
|
||||||
@code{fibers-force} re-evaluates its thunk."
|
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
#f))
|
#f))
|
||||||
|
|
||||||
(define (fibers-promise-result-available? fp)
|
(define (fibers-promise-result-available? fp)
|
||||||
"Return @code{#t} if the fiber-aware promise FP has been evaluated
|
|
||||||
(successfully or with an exception) and @code{#f} if evaluation has
|
|
||||||
not yet started or is still in progress."
|
|
||||||
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
||||||
(not (or (eq? val #f)
|
(not (or (eq? val #f)
|
||||||
(eq? val 'started)))))
|
(eq? val 'started)))))
|
||||||
|
|
|
||||||
|
|
@ -25,12 +25,6 @@
|
||||||
#:export (spawn-queueing-fiber))
|
#:export (spawn-queueing-fiber))
|
||||||
|
|
||||||
(define (spawn-queueing-fiber dest-channel)
|
(define (spawn-queueing-fiber dest-channel)
|
||||||
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
|
|
||||||
Returns a new input channel.
|
|
||||||
|
|
||||||
Multiple producers can put items on the returned channel concurrently.
|
|
||||||
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
|
|
||||||
a time, preserving arrival order."
|
|
||||||
(define queue (make-q))
|
(define queue (make-q))
|
||||||
|
|
||||||
(let ((queue-channel (make-channel)))
|
(let ((queue-channel (make-channel)))
|
||||||
|
|
|
||||||
|
|
@ -154,33 +154,6 @@
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
"Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or
|
|
||||||
vector of pre-existing resource values.
|
|
||||||
|
|
||||||
Use @code{with-resource-from-pool} or
|
|
||||||
@code{call-with-resource-from-pool} to borrow a resource and return it
|
|
||||||
automatically when done.
|
|
||||||
|
|
||||||
Optional keyword arguments:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item #:name
|
|
||||||
A optional string used in log messages.
|
|
||||||
Defaults to @code{\"unnamed\"}.
|
|
||||||
|
|
||||||
@item #:default-checkout-timeout
|
|
||||||
Default checkout timeout when requesting a resource from the pool,
|
|
||||||
unset by default.
|
|
||||||
|
|
||||||
@item #:default-max-waiters
|
|
||||||
Maximum number of fibers that may queue waiting for a resource. When
|
|
||||||
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
|
||||||
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
|
||||||
|
|
||||||
@item #:scheduler
|
|
||||||
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
|
||||||
to the current scheduler.
|
|
||||||
@end table"
|
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
(define destroy-condition
|
(define destroy-condition
|
||||||
(make-condition))
|
(make-condition))
|
||||||
|
|
@ -540,59 +513,6 @@ to the current scheduler.
|
||||||
(add-resources-parallelism 1)
|
(add-resources-parallelism 1)
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
"Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk
|
|
||||||
called to create each new resource value. MAX-SIZE is the maximum
|
|
||||||
number of resources the pool will hold simultaneously.
|
|
||||||
|
|
||||||
Resources are created on demand when a checkout is requested and the
|
|
||||||
pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or
|
|
||||||
@code{call-with-resource-from-pool} to request a resource and return
|
|
||||||
it automatically when done.
|
|
||||||
|
|
||||||
Optional keyword arguments:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item #:min-size
|
|
||||||
Minimum number of resources to keep alive even when idle. Defaults to
|
|
||||||
@code{0}.
|
|
||||||
|
|
||||||
@item #:idle-seconds
|
|
||||||
Seconds a resource may remain unused before being destroyed, provided
|
|
||||||
the pool is above @code{#:min-size}. Defaults to @code{#f} (never
|
|
||||||
expire idle resources).
|
|
||||||
|
|
||||||
@item #:lifetime
|
|
||||||
Maximum number of checkouts a single resource will serve before being
|
|
||||||
destroyed and replaced by a fresh one. Defaults to @code{#f} (no
|
|
||||||
limit).
|
|
||||||
|
|
||||||
@item #:destructor
|
|
||||||
A procedure called as @code{(destructor resource)} when a resource is
|
|
||||||
removed from the pool. Defaults to @code{#f}.
|
|
||||||
|
|
||||||
@item #:add-resources-parallelism
|
|
||||||
Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the
|
|
||||||
pool needs to grow. Allowing resources to be created in parallel can
|
|
||||||
result in more resources being created than can fit inside the pool,
|
|
||||||
if this happens, the surplus resources are destroyed. Defaults to
|
|
||||||
@code{1}.
|
|
||||||
|
|
||||||
@item #:name
|
|
||||||
A string used in log messages. Defaults to @code{\"unnamed\"}.
|
|
||||||
|
|
||||||
@item #:default-checkout-timeout
|
|
||||||
Default checkout timeout when requesting a resource from the pool,
|
|
||||||
unset by default.
|
|
||||||
|
|
||||||
@item #:default-max-waiters
|
|
||||||
Maximum number of fibers that may queue waiting for a resource. When
|
|
||||||
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
|
||||||
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
|
||||||
|
|
||||||
@item #:scheduler
|
|
||||||
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
|
||||||
to the current scheduler.
|
|
||||||
@end table"
|
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
(define destroy-condition
|
(define destroy-condition
|
||||||
(make-condition))
|
(make-condition))
|
||||||
|
|
@ -1252,10 +1172,6 @@ to the current scheduler.
|
||||||
pool)
|
pool)
|
||||||
|
|
||||||
(define (destroy-resource-pool pool)
|
(define (destroy-resource-pool pool)
|
||||||
"Destroy POOL, preventing any new checkouts. Blocks until all
|
|
||||||
checked-out resources have been returned, running the pool's
|
|
||||||
@code{#:destructor} on each. Any fibers waiting for a resource
|
|
||||||
receive @code{&resource-pool-destroyed}."
|
|
||||||
(perform-operation
|
(perform-operation
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
|
|
@ -1472,23 +1388,6 @@ available. Return the resource once PROC has returned."
|
||||||
(lambda (resource) exp ...)))
|
(lambda (resource) exp ...)))
|
||||||
|
|
||||||
(define* (resource-pool-stats pool #:key (timeout 5))
|
(define* (resource-pool-stats pool #:key (timeout 5))
|
||||||
"Return an alist of statistics for POOL with the following keys:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item resources
|
|
||||||
Total number of resources currently held by the pool.
|
|
||||||
@item available
|
|
||||||
Number of resources not currently checked out.
|
|
||||||
@item waiters
|
|
||||||
Number of fibers currently queued waiting for a resource.
|
|
||||||
@item checkout-failure-count
|
|
||||||
Cumulative number of checkouts where an exception was raised inside
|
|
||||||
the proc.
|
|
||||||
@end table
|
|
||||||
|
|
||||||
Blocks waiting for the pool fiber to respond. @code{#:timeout} is
|
|
||||||
the number of seconds to wait; defaults to @code{5}. Raises
|
|
||||||
@code{&resource-pool-timeout} if the pool does not respond in time."
|
|
||||||
(define channel
|
(define channel
|
||||||
(resource-pool-channel pool))
|
(resource-pool-channel pool))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,15 +54,6 @@
|
||||||
rest)))))
|
rest)))))
|
||||||
|
|
||||||
(define* (fibers-sort! items less #:key parallelism)
|
(define* (fibers-sort! items less #:key parallelism)
|
||||||
"Sort ITEMS destructively using LESS as the comparison procedure,
|
|
||||||
using a parallel merge sort. Returns the sorted list.
|
|
||||||
|
|
||||||
Splits ITEMS into chunks, sorts each in an eager fiber-promise in
|
|
||||||
parallel, then merges pairs of sorted chunks in parallel until one
|
|
||||||
sorted list remains.
|
|
||||||
|
|
||||||
@code{#:parallelism} sets the number of initial chunks. Defaults to
|
|
||||||
the current fibers parallelism."
|
|
||||||
(define requested-chunk-count
|
(define requested-chunk-count
|
||||||
(or parallelism
|
(or parallelism
|
||||||
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
||||||
|
|
|
||||||
|
|
@ -163,13 +163,12 @@ from there, or #f if that would be an empty string."
|
||||||
|
|
||||||
(define-record-type <fixed-size-thread-pool>
|
(define-record-type <fixed-size-thread-pool>
|
||||||
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
||||||
default-checkout-timeout threads)
|
default-checkout-timeout)
|
||||||
fixed-size-thread-pool?
|
fixed-size-thread-pool?
|
||||||
(channel fixed-size-thread-pool-channel)
|
(channel fixed-size-thread-pool-channel)
|
||||||
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
||||||
(current-procedures fixed-size-thread-pool-current-procedures)
|
(current-procedures fixed-size-thread-pool-current-procedures)
|
||||||
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
|
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
|
||||||
(threads fixed-size-thread-pool-threads))
|
|
||||||
|
|
||||||
;; Since both thread pool records have this field, use a procedure
|
;; Since both thread pool records have this field, use a procedure
|
||||||
;; than handles the appropriate accessor
|
;; than handles the appropriate accessor
|
||||||
|
|
@ -212,52 +211,6 @@ from there, or #f if that would be an empty string."
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout)
|
default-checkout-timeout)
|
||||||
"Create a pool of SIZE threads started immediately. Use
|
|
||||||
@code{call-with-thread} to run a procedure in one of the threads.
|
|
||||||
|
|
||||||
Optional keyword arguments:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item #:thread-initializer
|
|
||||||
A thunk called once when each thread starts. Its return value is
|
|
||||||
passed as extra arguments to every procedure run in that thread.
|
|
||||||
Defaults to @code{#f} (no extra arguments).
|
|
||||||
|
|
||||||
@item #:thread-destructor
|
|
||||||
A procedure called with the value returned by @code{#:thread-initializer}
|
|
||||||
when a thread exits. Defaults to @code{#f}.
|
|
||||||
|
|
||||||
@item #:thread-lifetime
|
|
||||||
Maximum number of procedures a thread will run before restarting (and
|
|
||||||
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
|
|
||||||
limit).
|
|
||||||
|
|
||||||
@item #:expire-on-exception?
|
|
||||||
When @code{#t}, replace a thread after any unhandled exception.
|
|
||||||
Defaults to @code{#f}.
|
|
||||||
|
|
||||||
@item #:use-default-io-waiters?
|
|
||||||
When @code{#t} (the default), each thread uses blocking I/O waiters so
|
|
||||||
that port reads and writes block the thread rather than trying to
|
|
||||||
suspend a fiber.
|
|
||||||
|
|
||||||
@item #:name
|
|
||||||
String used in thread names and log messages. Defaults to
|
|
||||||
@code{\"unnamed\"}.
|
|
||||||
|
|
||||||
@item #:default-checkout-timeout
|
|
||||||
Seconds to wait for a free thread slot before raising
|
|
||||||
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
|
|
||||||
forever).
|
|
||||||
|
|
||||||
@item #:delay-logger
|
|
||||||
Called as @code{(delay-logger seconds proc)} with the time spent
|
|
||||||
waiting for a thread to become available.
|
|
||||||
|
|
||||||
@item #:duration-logger
|
|
||||||
Called as @code{(duration-logger seconds proc)} after each procedure
|
|
||||||
completes.
|
|
||||||
@end table"
|
|
||||||
(define channel
|
(define channel
|
||||||
(make-channel))
|
(make-channel))
|
||||||
|
|
||||||
|
|
@ -427,20 +380,19 @@ completes.
|
||||||
(initializer/safe)
|
(initializer/safe)
|
||||||
'()))))))))
|
'()))))))))
|
||||||
|
|
||||||
(define threads
|
(for-each
|
||||||
(map (lambda (i)
|
(lambda (i)
|
||||||
(if use-default-io-waiters?
|
(if use-default-io-waiters?
|
||||||
(call-with-default-io-waiters
|
(call-with-default-io-waiters
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-thread i channel)))
|
(start-thread i channel)))
|
||||||
(start-thread i channel)))
|
(start-thread i channel)))
|
||||||
(iota size)))
|
(iota size))
|
||||||
|
|
||||||
(fixed-size-thread-pool channel
|
(fixed-size-thread-pool channel
|
||||||
param
|
param
|
||||||
thread-proc-vector
|
thread-proc-vector
|
||||||
default-checkout-timeout
|
default-checkout-timeout))
|
||||||
threads))
|
|
||||||
|
|
||||||
(define* (make-thread-pool max-size
|
(define* (make-thread-pool max-size
|
||||||
#:key
|
#:key
|
||||||
|
|
@ -456,34 +408,8 @@ completes.
|
||||||
(use-default-io-waiters? #t)
|
(use-default-io-waiters? #t)
|
||||||
default-checkout-timeout
|
default-checkout-timeout
|
||||||
default-max-waiters)
|
default-max-waiters)
|
||||||
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
|
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||||
@code{call-with-thread} to run a procedure in one of the threads.
|
arguments of the thread pool procedure."
|
||||||
|
|
||||||
Unlike @code{make-fixed-size-thread-pool}, threads are created on
|
|
||||||
demand and may be reclaimed when idle (controlled by @code{#:min-size}
|
|
||||||
and the resource pool's idle management).
|
|
||||||
|
|
||||||
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
|
|
||||||
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
|
|
||||||
@code{#:use-default-io-waiters?}, @code{#:name},
|
|
||||||
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
|
|
||||||
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
|
|
||||||
plus:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item #:min-size
|
|
||||||
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
|
|
||||||
the pool is pre-filled and never shrinks).
|
|
||||||
|
|
||||||
@item #:scheduler
|
|
||||||
Fibers scheduler for the pool's internal resource pool fiber. Defaults
|
|
||||||
to the current scheduler.
|
|
||||||
|
|
||||||
@item #:default-max-waiters
|
|
||||||
Maximum number of fibers that may queue waiting for a thread. Raises
|
|
||||||
@code{&thread-pool-timeout-error} when exceeded. Defaults to
|
|
||||||
@code{#f} (no limit).
|
|
||||||
@end table"
|
|
||||||
(define param
|
(define param
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
|
||||||
|
|
@ -518,34 +444,8 @@ Maximum number of fibers that may queue waiting for a thread. Raises
|
||||||
channel
|
channel
|
||||||
destroy-thread-on-exception?
|
destroy-thread-on-exception?
|
||||||
(max-waiters 'default))
|
(max-waiters 'default))
|
||||||
"Run PROC in THREAD-POOL and return its values, blocking until
|
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
||||||
complete. If called from within a thread that already belongs to
|
If already in the thread pool, call PROC immediately."
|
||||||
THREAD-POOL, PROC is called directly in that thread.
|
|
||||||
|
|
||||||
Optional keyword arguments:
|
|
||||||
|
|
||||||
@table @code
|
|
||||||
@item #:checkout-timeout
|
|
||||||
Seconds to wait for a free thread before raising
|
|
||||||
@code{&thread-pool-timeout-error}. Defaults to the pool's
|
|
||||||
@code{#:default-checkout-timeout}.
|
|
||||||
|
|
||||||
@item #:max-waiters
|
|
||||||
Maximum number of fibers that may queue waiting for a thread (for
|
|
||||||
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
|
|
||||||
|
|
||||||
@item #:destroy-thread-on-exception?
|
|
||||||
When @code{#t}, destroy the thread after PROC raises an exception.
|
|
||||||
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
|
|
||||||
@code{#f}.
|
|
||||||
|
|
||||||
@item #:duration-logger
|
|
||||||
Called as @code{(duration-logger seconds)} after PROC completes
|
|
||||||
(whether or not it raised an exception).
|
|
||||||
|
|
||||||
@item #:channel
|
|
||||||
Override the channel used to communicate with the thread.
|
|
||||||
@end table"
|
|
||||||
(define (handle-proc fixed-size-thread-pool
|
(define (handle-proc fixed-size-thread-pool
|
||||||
reply-channel
|
reply-channel
|
||||||
start-time
|
start-time
|
||||||
|
|
@ -629,13 +529,9 @@ Override the channel used to communicate with the thread.
|
||||||
destroy-thread-on-exception?))))))))
|
destroy-thread-on-exception?))))))))
|
||||||
|
|
||||||
(define (destroy-thread-pool pool)
|
(define (destroy-thread-pool pool)
|
||||||
"Destroy POOL, stopping all of its threads and calling the destructor
|
|
||||||
if specified. This procedure will block until the destruction is
|
|
||||||
complete."
|
|
||||||
(if (fixed-size-thread-pool? pool)
|
(if (fixed-size-thread-pool? pool)
|
||||||
(let ((channel (fixed-size-thread-pool-channel pool))
|
(put-message
|
||||||
(threads (fixed-size-thread-pool-threads pool)))
|
(fixed-size-thread-pool-channel pool)
|
||||||
(for-each (lambda _ (put-message channel 'destroy)) threads)
|
'destroy)
|
||||||
(for-each join-thread threads))
|
|
||||||
(destroy-resource-pool
|
(destroy-resource-pool
|
||||||
(thread-pool-resource-pool pool))))
|
(thread-pool-resource-pool pool))))
|
||||||
|
|
|
||||||
|
|
@ -45,16 +45,7 @@
|
||||||
|
|
||||||
with-port-timeouts))
|
with-port-timeouts))
|
||||||
|
|
||||||
(define* (with-fibers-timeout thunk #:key
|
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
|
||||||
timeout
|
|
||||||
(on-timeout
|
|
||||||
(const *unspecified*)))
|
|
||||||
"Run THUNK in a new fiber and return its values, waiting TIMEOUT
|
|
||||||
seconds for it to finish. If THUNK does not complete within TIMEOUT
|
|
||||||
seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout
|
|
||||||
returns the result of ON-TIMEOUT instead.
|
|
||||||
|
|
||||||
If THUNK raises an exception it is re-raised in the calling fiber."
|
|
||||||
(let ((channel (make-channel)))
|
(let ((channel (make-channel)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
@ -119,7 +110,7 @@ If THUNK raises an exception it is re-raised in the calling fiber."
|
||||||
(exception-predicate &port-write-timeout-error))
|
(exception-predicate &port-write-timeout-error))
|
||||||
|
|
||||||
(define (readable? port)
|
(define (readable? port)
|
||||||
"Test if PORT is readable."
|
"Test if PORT is writable."
|
||||||
(= 1 (port-poll port "r" 0)))
|
(= 1 (port-poll port "r" 0)))
|
||||||
|
|
||||||
(define (writable? port)
|
(define (writable? port)
|
||||||
|
|
@ -160,21 +151,6 @@ If THUNK raises an exception it is re-raised in the calling fiber."
|
||||||
#:key timeout
|
#:key timeout
|
||||||
(read-timeout timeout)
|
(read-timeout timeout)
|
||||||
(write-timeout timeout))
|
(write-timeout timeout))
|
||||||
"Run THUNK with per-operation I/O timeouts on all ports. If any
|
|
||||||
read or write blocks for longer than the given number of seconds, an
|
|
||||||
exception is raised.
|
|
||||||
|
|
||||||
@code{#:timeout} sets both read and write timeouts.
|
|
||||||
@code{#:read-timeout} and @code{#:write-timeout} specify the timeout
|
|
||||||
for reads and writes respectively. All three default to @code{#f} (no
|
|
||||||
timeout).
|
|
||||||
|
|
||||||
This procedure works both with fibers, and without fibers by using the
|
|
||||||
poll system call with a timeout.
|
|
||||||
|
|
||||||
On read timeout, raises @code{&port-read-timeout-error}. On write
|
|
||||||
timeout, raises @code{&port-write-timeout-error}. Both carry the
|
|
||||||
@code{thunk} and @code{port} fields from @code{&port-timeout-error}."
|
|
||||||
(define (no-fibers-wait thunk port mode timeout)
|
(define (no-fibers-wait thunk port mode timeout)
|
||||||
(define poll-timeout-ms 200)
|
(define poll-timeout-ms 200)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
(use-modules (tests)
|
(use-modules (tests)
|
||||||
(ice-9 atomic)
|
(ice-9 atomic)
|
||||||
(ice-9 threads)
|
|
||||||
(srfi srfi-71)
|
(srfi srfi-71)
|
||||||
(fibers)
|
(fibers)
|
||||||
(unit-test)
|
(unit-test)
|
||||||
|
|
@ -143,33 +142,4 @@
|
||||||
ref-and-decrement))
|
ref-and-decrement))
|
||||||
(error)))
|
(error)))
|
||||||
|
|
||||||
;; Test that the destructor is called when a size 1 fixed-size thread
|
|
||||||
;; pool is destroyed, and that destroy-thread-pool blocks until it has
|
|
||||||
;; completed.
|
|
||||||
(let* ((destructor-called? #f)
|
|
||||||
(thread-pool
|
|
||||||
(make-fixed-size-thread-pool
|
|
||||||
1
|
|
||||||
#:thread-destructor
|
|
||||||
(lambda ()
|
|
||||||
(set! destructor-called? #t)))))
|
|
||||||
(destroy-thread-pool thread-pool)
|
|
||||||
(assert-equal #t destructor-called?))
|
|
||||||
|
|
||||||
;; Test that the destructor is called for every thread when a
|
|
||||||
;; multi-thread fixed-size thread pool is destroyed, and that
|
|
||||||
;; destroy-thread-pool blocks until all destructors have completed.
|
|
||||||
(let* ((destructor-count 0)
|
|
||||||
(mutex (make-mutex))
|
|
||||||
(pool-size 3)
|
|
||||||
(thread-pool
|
|
||||||
(make-fixed-size-thread-pool
|
|
||||||
pool-size
|
|
||||||
#:thread-destructor
|
|
||||||
(lambda ()
|
|
||||||
(with-mutex mutex
|
|
||||||
(set! destructor-count (+ destructor-count 1)))))))
|
|
||||||
(destroy-thread-pool thread-pool)
|
|
||||||
(assert-equal pool-size destructor-count))
|
|
||||||
|
|
||||||
(display "thread-pool test finished successfully\n")
|
(display "thread-pool test finished successfully\n")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue