Compare commits
8 commits
35f4c16ab0
...
db9b549e59
| Author | SHA1 | Date | |
|---|---|---|---|
| db9b549e59 | |||
| 09cb805ee2 | |||
| d3d4964210 | |||
| 5b84273cbf | |||
| 30aa837cf4 | |||
| 79d5603416 | |||
| 8e29587ec1 | |||
| 8b489490e1 |
11 changed files with 391 additions and 24 deletions
15
README
15
README
|
|
@ -1,4 +1,15 @@
|
|||
-*- mode: org -*-
|
||||
|
||||
This Guile library provides useful patterns and functionality to use
|
||||
Guile Fibers.
|
||||
* Guile Knots
|
||||
|
||||
Guile Knots is a library providing higher-level patterns and building
|
||||
blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]].
|
||||
|
||||
This includes:
|
||||
|
||||
- Parallel map/for-each with configurable concurrency limits
|
||||
- Resource and thread pools
|
||||
- Fiber-aware promises for lazy and eager parallel evaluation
|
||||
- Timeouts for fibers and I/O ports
|
||||
- A HTTP web server
|
||||
- Non-blocking socket utilities
|
||||
|
|
|
|||
40
knots.scm
40
knots.scm
|
|
@ -29,6 +29,11 @@
|
|||
spawn-fiber/knots))
|
||||
|
||||
(define (call-with-default-io-waiters thunk)
|
||||
"Run THUNK with Guile's default blocking I/O waiters active.
|
||||
|
||||
This is useful when restoring the default Guile I/O waiters from
|
||||
within a context (like Fibers) where different I/O waiters are used,
|
||||
for example when creating a new thread from a fiber."
|
||||
(parameterize
|
||||
((current-read-waiter (@@ (ice-9 suspendable-ports)
|
||||
default-read-waiter))
|
||||
|
|
@ -37,15 +42,33 @@
|
|||
(thunk)))
|
||||
|
||||
(define (wait-when-system-clock-behind)
|
||||
(let ((start-of-the-year-2000 946684800))
|
||||
"Block until the system clock reads at least 2001-01-02.
|
||||
|
||||
Useful at startup in environments (virtual machines, embedded systems)
|
||||
where the clock may start at or near the Unix epoch. Prints a warning
|
||||
to the current error port every 20 seconds while waiting."
|
||||
;; Jan 02 2001 02:00:00
|
||||
(let ((start-of-the-year-2001 978400800))
|
||||
(while (< (current-time)
|
||||
start-of-the-year-2000)
|
||||
start-of-the-year-2001)
|
||||
(simple-format (current-error-port)
|
||||
"warning: system clock potentially behind, waiting\n")
|
||||
(sleep 20))))
|
||||
|
||||
;; Copied from (fibers web server)
|
||||
(define (call-with-sigint thunk cvar)
|
||||
"Run THUNK with a SIGINT handler that signals the Fibers condition
|
||||
CVAR. Restores the previous handler when THUNK returns.
|
||||
|
||||
Typical usage is to pass a condition variable to this procedure and
|
||||
wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
|
||||
|
||||
@example
|
||||
(let ((quit-cvar (make-condition)))
|
||||
(call-with-sigint
|
||||
(lambda () (wait quit-cvar))
|
||||
quit-cvar))
|
||||
@end example"
|
||||
(let ((handler #f))
|
||||
(dynamic-wind
|
||||
(lambda ()
|
||||
|
|
@ -96,6 +119,11 @@
|
|||
(raise-exception exn)))))
|
||||
|
||||
(define* (display/knots obj #:optional (port (current-output-port)))
|
||||
"Write OBJ to PORT (default: current output port) as a UTF-8 byte
|
||||
sequence via @code{put-bytevector}.
|
||||
|
||||
When used with ports without buffering, this should be safer than
|
||||
display."
|
||||
(put-bytevector
|
||||
port
|
||||
(string->utf8
|
||||
|
|
@ -104,6 +132,8 @@
|
|||
(display obj port))))))
|
||||
|
||||
(define (simple-format/knots port s . args)
|
||||
"Like @code{simple-format} but should be safer when used with a port
|
||||
without buffering."
|
||||
(let ((str (apply simple-format #f s args)))
|
||||
(if (eq? #f port)
|
||||
str
|
||||
|
|
@ -114,6 +144,8 @@
|
|||
port)))))
|
||||
|
||||
(define (format/knots port s . args)
|
||||
"Like @code{format} but should be safer when used with a port
|
||||
without buffering."
|
||||
(let ((str (apply format #f s args)))
|
||||
(if (eq? #f port)
|
||||
str
|
||||
|
|
@ -232,6 +264,10 @@
|
|||
(display/knots error-string port)))
|
||||
|
||||
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
||||
"Spawn a fiber to run THUNK, with knots exception handling.
|
||||
|
||||
Accepts the same optional SCHEDULER and @code{#:parallel?} arguments
|
||||
as @code{spawn-fiber}."
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
|
|
|
|||
|
|
@ -32,6 +32,16 @@
|
|||
|
||||
(define* (non-blocking-open-socket-for-uri uri
|
||||
#:key (verify-certificate? #t))
|
||||
"Open a socket for URI and return it as a non-blocking port.
|
||||
|
||||
For HTTPS URIs the TLS handshake is completed while the socket is
|
||||
still blocking (required because Guile's TLS wrapper does not support
|
||||
non-blocking handshakes), then the underlying socket is made
|
||||
non-blocking. For plain HTTP the socket is made non-blocking
|
||||
immediately.
|
||||
|
||||
@code{#:verify-certificate?} controls TLS certificate verification
|
||||
and defaults to @code{#t}."
|
||||
(define tls-wrap
|
||||
(@@ (web client) tls-wrap))
|
||||
|
||||
|
|
|
|||
|
|
@ -267,6 +267,16 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
#:key (parallelism 1)
|
||||
(input-channel (make-channel))
|
||||
(process-channel input-channel))
|
||||
"Convert PROC into a procedure backed by @code{#:parallelism}
|
||||
(default: 1) background fibers. Returns a wrapper that sends its
|
||||
arguments to one of the fibers and blocks until the result is
|
||||
returned.
|
||||
|
||||
@code{#:input-channel} is the channel that callers write requests to;
|
||||
defaults to a fresh channel. @code{#:process-channel} is the channel
|
||||
the fibers read from; defaults to @code{#:input-channel}. Setting
|
||||
them differently allows external parties to bypass the wrapper and
|
||||
write directly to @code{process-channel}."
|
||||
(for-each
|
||||
(lambda _
|
||||
(spawn-fiber
|
||||
|
|
@ -318,6 +328,12 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
(resource-pool parallelism-limiter-resource-pool))
|
||||
|
||||
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||
"Return a parallelism limiter that allows at most LIMIT concurrent
|
||||
fibers to execute within @code{with-parallelism-limiter} at the same
|
||||
time. Further fibers block until a slot becomes free.
|
||||
|
||||
@code{#:name} is a string used in log messages. Defaults to
|
||||
@code{\"unnamed\"}."
|
||||
(make-parallelism-limiter-record
|
||||
(make-fixed-size-resource-pool
|
||||
(iota limit)
|
||||
|
|
@ -329,6 +345,9 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
parallelism-limiter)))
|
||||
|
||||
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
|
||||
slot, and return the values from THUNK. Blocks if no slot is
|
||||
currently available."
|
||||
(call-with-resource-from-pool
|
||||
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||
(lambda _
|
||||
|
|
|
|||
|
|
@ -41,12 +41,21 @@
|
|||
(evaluated-condition fibers-promise-evaluated-condition))
|
||||
|
||||
(define (fibers-delay thunk)
|
||||
"Return a new fiber-aware promise that will evaluate THUNK when
|
||||
first forced. THUNK is not called until @code{fibers-force} is
|
||||
called on the promise."
|
||||
(make-fibers-promise
|
||||
thunk
|
||||
(make-atomic-box #f)
|
||||
(make-condition)))
|
||||
|
||||
(define (fibers-force fp)
|
||||
"Force the fiber-aware promise FP, returning its values.
|
||||
|
||||
The first call evaluates the promise's thunk. Concurrent callers
|
||||
block on a condition variable until evaluation finishes, then receive
|
||||
the same result. If the thunk raises an exception, the exception is
|
||||
stored and re-raised for all callers."
|
||||
(unless (fibers-promise? fp)
|
||||
(raise-exception
|
||||
(make-exception
|
||||
|
|
@ -108,6 +117,9 @@
|
|||
|
||||
|
||||
(define (fibers-delay/eager thunk)
|
||||
"Return a new fiber-aware promise and immediately begin evaluating
|
||||
THUNK in a new fiber. Exceptions during eager evaluation are silently
|
||||
discarded; they will be re-raised when @code{fibers-force} is called."
|
||||
(let ((promise (fibers-delay thunk)))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
|
|
@ -121,10 +133,15 @@
|
|||
promise))
|
||||
|
||||
(define (fibers-promise-reset fp)
|
||||
"Reset the fiber-aware promise FP so that the next call to
|
||||
@code{fibers-force} re-evaluates its thunk."
|
||||
(atomic-box-set! (fibers-promise-values-box fp)
|
||||
#f))
|
||||
|
||||
(define (fibers-promise-result-available? fp)
|
||||
"Return @code{#t} if the fiber-aware promise FP has been evaluated
|
||||
(successfully or with an exception) and @code{#f} if evaluation has
|
||||
not yet started or is still in progress."
|
||||
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
|
||||
(not (or (eq? val #f)
|
||||
(eq? val 'started)))))
|
||||
|
|
|
|||
|
|
@ -25,6 +25,12 @@
|
|||
#:export (spawn-queueing-fiber))
|
||||
|
||||
(define (spawn-queueing-fiber dest-channel)
|
||||
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
|
||||
Returns a new input channel.
|
||||
|
||||
Multiple producers can put items on the returned channel concurrently.
|
||||
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
|
||||
a time, preserving arrival order."
|
||||
(define queue (make-q))
|
||||
|
||||
(let ((queue-channel (make-channel)))
|
||||
|
|
|
|||
|
|
@ -154,6 +154,33 @@
|
|||
(name "unnamed")
|
||||
default-checkout-timeout
|
||||
default-max-waiters)
|
||||
"Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or
|
||||
vector of pre-existing resource values.
|
||||
|
||||
Use @code{with-resource-from-pool} or
|
||||
@code{call-with-resource-from-pool} to borrow a resource and return it
|
||||
automatically when done.
|
||||
|
||||
Optional keyword arguments:
|
||||
|
||||
@table @code
|
||||
@item #:name
|
||||
A optional string used in log messages.
|
||||
Defaults to @code{\"unnamed\"}.
|
||||
|
||||
@item #:default-checkout-timeout
|
||||
Default checkout timeout when requesting a resource from the pool,
|
||||
unset by default.
|
||||
|
||||
@item #:default-max-waiters
|
||||
Maximum number of fibers that may queue waiting for a resource. When
|
||||
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
||||
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
||||
|
||||
@item #:scheduler
|
||||
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
||||
to the current scheduler.
|
||||
@end table"
|
||||
(define channel (make-channel))
|
||||
(define destroy-condition
|
||||
(make-condition))
|
||||
|
|
@ -513,6 +540,59 @@
|
|||
(add-resources-parallelism 1)
|
||||
default-checkout-timeout
|
||||
default-max-waiters)
|
||||
"Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk
|
||||
called to create each new resource value. MAX-SIZE is the maximum
|
||||
number of resources the pool will hold simultaneously.
|
||||
|
||||
Resources are created on demand when a checkout is requested and the
|
||||
pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or
|
||||
@code{call-with-resource-from-pool} to request a resource and return
|
||||
it automatically when done.
|
||||
|
||||
Optional keyword arguments:
|
||||
|
||||
@table @code
|
||||
@item #:min-size
|
||||
Minimum number of resources to keep alive even when idle. Defaults to
|
||||
@code{0}.
|
||||
|
||||
@item #:idle-seconds
|
||||
Seconds a resource may remain unused before being destroyed, provided
|
||||
the pool is above @code{#:min-size}. Defaults to @code{#f} (never
|
||||
expire idle resources).
|
||||
|
||||
@item #:lifetime
|
||||
Maximum number of checkouts a single resource will serve before being
|
||||
destroyed and replaced by a fresh one. Defaults to @code{#f} (no
|
||||
limit).
|
||||
|
||||
@item #:destructor
|
||||
A procedure called as @code{(destructor resource)} when a resource is
|
||||
removed from the pool. Defaults to @code{#f}.
|
||||
|
||||
@item #:add-resources-parallelism
|
||||
Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the
|
||||
pool needs to grow. Allowing resources to be created in parallel can
|
||||
result in more resources being created than can fit inside the pool,
|
||||
if this happens, the surplus resources are destroyed. Defaults to
|
||||
@code{1}.
|
||||
|
||||
@item #:name
|
||||
A string used in log messages. Defaults to @code{\"unnamed\"}.
|
||||
|
||||
@item #:default-checkout-timeout
|
||||
Default checkout timeout when requesting a resource from the pool,
|
||||
unset by default.
|
||||
|
||||
@item #:default-max-waiters
|
||||
Maximum number of fibers that may queue waiting for a resource. When
|
||||
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
|
||||
raised when a resource is requested. Defaults to @code{#f} (no limit).
|
||||
|
||||
@item #:scheduler
|
||||
The Fibers scheduler to use for the pool's internal fiber. Defaults
|
||||
to the current scheduler.
|
||||
@end table"
|
||||
(define channel (make-channel))
|
||||
(define destroy-condition
|
||||
(make-condition))
|
||||
|
|
@ -1172,6 +1252,10 @@
|
|||
pool)
|
||||
|
||||
(define (destroy-resource-pool pool)
|
||||
"Destroy POOL, preventing any new checkouts. Blocks until all
|
||||
checked-out resources have been returned, running the pool's
|
||||
@code{#:destructor} on each. Any fibers waiting for a resource
|
||||
receive @code{&resource-pool-destroyed}."
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
|
|
@ -1388,6 +1472,23 @@ available. Return the resource once PROC has returned."
|
|||
(lambda (resource) exp ...)))
|
||||
|
||||
(define* (resource-pool-stats pool #:key (timeout 5))
|
||||
"Return an alist of statistics for POOL with the following keys:
|
||||
|
||||
@table @code
|
||||
@item resources
|
||||
Total number of resources currently held by the pool.
|
||||
@item available
|
||||
Number of resources not currently checked out.
|
||||
@item waiters
|
||||
Number of fibers currently queued waiting for a resource.
|
||||
@item checkout-failure-count
|
||||
Cumulative number of checkouts where an exception was raised inside
|
||||
the proc.
|
||||
@end table
|
||||
|
||||
Blocks waiting for the pool fiber to respond. @code{#:timeout} is
|
||||
the number of seconds to wait; defaults to @code{5}. Raises
|
||||
@code{&resource-pool-timeout} if the pool does not respond in time."
|
||||
(define channel
|
||||
(resource-pool-channel pool))
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,15 @@
|
|||
rest)))))
|
||||
|
||||
(define* (fibers-sort! items less #:key parallelism)
|
||||
"Sort ITEMS destructively using LESS as the comparison procedure,
|
||||
using a parallel merge sort. Returns the sorted list.
|
||||
|
||||
Splits ITEMS into chunks, sorts each in an eager fiber-promise in
|
||||
parallel, then merges pairs of sorted chunks in parallel until one
|
||||
sorted list remains.
|
||||
|
||||
@code{#:parallelism} sets the number of initial chunks. Defaults to
|
||||
the current fibers parallelism."
|
||||
(define requested-chunk-count
|
||||
(or parallelism
|
||||
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
||||
|
|
|
|||
|
|
@ -163,12 +163,13 @@ from there, or #f if that would be an empty string."
|
|||
|
||||
(define-record-type <fixed-size-thread-pool>
|
||||
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
||||
default-checkout-timeout)
|
||||
default-checkout-timeout threads)
|
||||
fixed-size-thread-pool?
|
||||
(channel fixed-size-thread-pool-channel)
|
||||
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
||||
(current-procedures fixed-size-thread-pool-current-procedures)
|
||||
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
|
||||
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
|
||||
(threads fixed-size-thread-pool-threads))
|
||||
|
||||
;; Since both thread pool records have this field, use a procedure
|
||||
;; than handles the appropriate accessor
|
||||
|
|
@ -211,6 +212,52 @@ from there, or #f if that would be an empty string."
|
|||
(name "unnamed")
|
||||
(use-default-io-waiters? #t)
|
||||
default-checkout-timeout)
|
||||
"Create a pool of SIZE threads started immediately. Use
|
||||
@code{call-with-thread} to run a procedure in one of the threads.
|
||||
|
||||
Optional keyword arguments:
|
||||
|
||||
@table @code
|
||||
@item #:thread-initializer
|
||||
A thunk called once when each thread starts. Its return value is
|
||||
passed as extra arguments to every procedure run in that thread.
|
||||
Defaults to @code{#f} (no extra arguments).
|
||||
|
||||
@item #:thread-destructor
|
||||
A procedure called with the value returned by @code{#:thread-initializer}
|
||||
when a thread exits. Defaults to @code{#f}.
|
||||
|
||||
@item #:thread-lifetime
|
||||
Maximum number of procedures a thread will run before restarting (and
|
||||
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
|
||||
limit).
|
||||
|
||||
@item #:expire-on-exception?
|
||||
When @code{#t}, replace a thread after any unhandled exception.
|
||||
Defaults to @code{#f}.
|
||||
|
||||
@item #:use-default-io-waiters?
|
||||
When @code{#t} (the default), each thread uses blocking I/O waiters so
|
||||
that port reads and writes block the thread rather than trying to
|
||||
suspend a fiber.
|
||||
|
||||
@item #:name
|
||||
String used in thread names and log messages. Defaults to
|
||||
@code{\"unnamed\"}.
|
||||
|
||||
@item #:default-checkout-timeout
|
||||
Seconds to wait for a free thread slot before raising
|
||||
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
|
||||
forever).
|
||||
|
||||
@item #:delay-logger
|
||||
Called as @code{(delay-logger seconds proc)} with the time spent
|
||||
waiting for a thread to become available.
|
||||
|
||||
@item #:duration-logger
|
||||
Called as @code{(duration-logger seconds proc)} after each procedure
|
||||
completes.
|
||||
@end table"
|
||||
(define channel
|
||||
(make-channel))
|
||||
|
||||
|
|
@ -380,19 +427,20 @@ from there, or #f if that would be an empty string."
|
|||
(initializer/safe)
|
||||
'()))))))))
|
||||
|
||||
(for-each
|
||||
(lambda (i)
|
||||
(define threads
|
||||
(map (lambda (i)
|
||||
(if use-default-io-waiters?
|
||||
(call-with-default-io-waiters
|
||||
(lambda ()
|
||||
(start-thread i channel)))
|
||||
(start-thread i channel)))
|
||||
(iota size))
|
||||
(iota size)))
|
||||
|
||||
(fixed-size-thread-pool channel
|
||||
param
|
||||
thread-proc-vector
|
||||
default-checkout-timeout))
|
||||
default-checkout-timeout
|
||||
threads))
|
||||
|
||||
(define* (make-thread-pool max-size
|
||||
#:key
|
||||
|
|
@ -408,8 +456,34 @@ from there, or #f if that would be an empty string."
|
|||
(use-default-io-waiters? #t)
|
||||
default-checkout-timeout
|
||||
default-max-waiters)
|
||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||
arguments of the thread pool procedure."
|
||||
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
|
||||
@code{call-with-thread} to run a procedure in one of the threads.
|
||||
|
||||
Unlike @code{make-fixed-size-thread-pool}, threads are created on
|
||||
demand and may be reclaimed when idle (controlled by @code{#:min-size}
|
||||
and the resource pool's idle management).
|
||||
|
||||
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
|
||||
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
|
||||
@code{#:use-default-io-waiters?}, @code{#:name},
|
||||
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
|
||||
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
|
||||
plus:
|
||||
|
||||
@table @code
|
||||
@item #:min-size
|
||||
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
|
||||
the pool is pre-filled and never shrinks).
|
||||
|
||||
@item #:scheduler
|
||||
Fibers scheduler for the pool's internal resource pool fiber. Defaults
|
||||
to the current scheduler.
|
||||
|
||||
@item #:default-max-waiters
|
||||
Maximum number of fibers that may queue waiting for a thread. Raises
|
||||
@code{&thread-pool-timeout-error} when exceeded. Defaults to
|
||||
@code{#f} (no limit).
|
||||
@end table"
|
||||
(define param
|
||||
(make-parameter #f))
|
||||
|
||||
|
|
@ -444,8 +518,34 @@ arguments of the thread pool procedure."
|
|||
channel
|
||||
destroy-thread-on-exception?
|
||||
(max-waiters 'default))
|
||||
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
||||
If already in the thread pool, call PROC immediately."
|
||||
"Run PROC in THREAD-POOL and return its values, blocking until
|
||||
complete. If called from within a thread that already belongs to
|
||||
THREAD-POOL, PROC is called directly in that thread.
|
||||
|
||||
Optional keyword arguments:
|
||||
|
||||
@table @code
|
||||
@item #:checkout-timeout
|
||||
Seconds to wait for a free thread before raising
|
||||
@code{&thread-pool-timeout-error}. Defaults to the pool's
|
||||
@code{#:default-checkout-timeout}.
|
||||
|
||||
@item #:max-waiters
|
||||
Maximum number of fibers that may queue waiting for a thread (for
|
||||
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
|
||||
|
||||
@item #:destroy-thread-on-exception?
|
||||
When @code{#t}, destroy the thread after PROC raises an exception.
|
||||
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
|
||||
@code{#f}.
|
||||
|
||||
@item #:duration-logger
|
||||
Called as @code{(duration-logger seconds)} after PROC completes
|
||||
(whether or not it raised an exception).
|
||||
|
||||
@item #:channel
|
||||
Override the channel used to communicate with the thread.
|
||||
@end table"
|
||||
(define (handle-proc fixed-size-thread-pool
|
||||
reply-channel
|
||||
start-time
|
||||
|
|
@ -529,9 +629,13 @@ If already in the thread pool, call PROC immediately."
|
|||
destroy-thread-on-exception?))))))))
|
||||
|
||||
(define (destroy-thread-pool pool)
|
||||
"Destroy POOL, stopping all of its threads and calling the destructor
|
||||
if specified. This procedure will block until the destruction is
|
||||
complete."
|
||||
(if (fixed-size-thread-pool? pool)
|
||||
(put-message
|
||||
(fixed-size-thread-pool-channel pool)
|
||||
'destroy)
|
||||
(let ((channel (fixed-size-thread-pool-channel pool))
|
||||
(threads (fixed-size-thread-pool-threads pool)))
|
||||
(for-each (lambda _ (put-message channel 'destroy)) threads)
|
||||
(for-each join-thread threads))
|
||||
(destroy-resource-pool
|
||||
(thread-pool-resource-pool pool))))
|
||||
|
|
|
|||
|
|
@ -45,7 +45,16 @@
|
|||
|
||||
with-port-timeouts))
|
||||
|
||||
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
|
||||
(define* (with-fibers-timeout thunk #:key
|
||||
timeout
|
||||
(on-timeout
|
||||
(const *unspecified*)))
|
||||
"Run THUNK in a new fiber and return its values, waiting TIMEOUT
|
||||
seconds for it to finish. If THUNK does not complete within TIMEOUT
|
||||
seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout
|
||||
returns the result of ON-TIMEOUT instead.
|
||||
|
||||
If THUNK raises an exception it is re-raised in the calling fiber."
|
||||
(let ((channel (make-channel)))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
|
|
@ -110,7 +119,7 @@
|
|||
(exception-predicate &port-write-timeout-error))
|
||||
|
||||
(define (readable? port)
|
||||
"Test if PORT is writable."
|
||||
"Test if PORT is readable."
|
||||
(= 1 (port-poll port "r" 0)))
|
||||
|
||||
(define (writable? port)
|
||||
|
|
@ -151,6 +160,21 @@
|
|||
#:key timeout
|
||||
(read-timeout timeout)
|
||||
(write-timeout timeout))
|
||||
"Run THUNK with per-operation I/O timeouts on all ports. If any
|
||||
read or write blocks for longer than the given number of seconds, an
|
||||
exception is raised.
|
||||
|
||||
@code{#:timeout} sets both read and write timeouts.
|
||||
@code{#:read-timeout} and @code{#:write-timeout} specify the timeout
|
||||
for reads and writes respectively. All three default to @code{#f} (no
|
||||
timeout).
|
||||
|
||||
This procedure works both with fibers, and without fibers by using the
|
||||
poll system call with a timeout.
|
||||
|
||||
On read timeout, raises @code{&port-read-timeout-error}. On write
|
||||
timeout, raises @code{&port-write-timeout-error}. Both carry the
|
||||
@code{thunk} and @code{port} fields from @code{&port-timeout-error}."
|
||||
(define (no-fibers-wait thunk port mode timeout)
|
||||
(define poll-timeout-ms 200)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
(use-modules (tests)
|
||||
(ice-9 atomic)
|
||||
(ice-9 threads)
|
||||
(srfi srfi-71)
|
||||
(fibers)
|
||||
(unit-test)
|
||||
|
|
@ -142,4 +143,33 @@
|
|||
ref-and-decrement))
|
||||
(error)))
|
||||
|
||||
;; Test that the destructor is called when a size 1 fixed-size thread
|
||||
;; pool is destroyed, and that destroy-thread-pool blocks until it has
|
||||
;; completed.
|
||||
(let* ((destructor-called? #f)
|
||||
(thread-pool
|
||||
(make-fixed-size-thread-pool
|
||||
1
|
||||
#:thread-destructor
|
||||
(lambda ()
|
||||
(set! destructor-called? #t)))))
|
||||
(destroy-thread-pool thread-pool)
|
||||
(assert-equal #t destructor-called?))
|
||||
|
||||
;; Test that the destructor is called for every thread when a
|
||||
;; multi-thread fixed-size thread pool is destroyed, and that
|
||||
;; destroy-thread-pool blocks until all destructors have completed.
|
||||
(let* ((destructor-count 0)
|
||||
(mutex (make-mutex))
|
||||
(pool-size 3)
|
||||
(thread-pool
|
||||
(make-fixed-size-thread-pool
|
||||
pool-size
|
||||
#:thread-destructor
|
||||
(lambda ()
|
||||
(with-mutex mutex
|
||||
(set! destructor-count (+ destructor-count 1)))))))
|
||||
(destroy-thread-pool thread-pool)
|
||||
(assert-equal pool-size destructor-count))
|
||||
|
||||
(display "thread-pool test finished successfully\n")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue