Compare commits

...

8 commits

Author SHA1 Message Date
db9b549e59 Fix the destroy behaviour for fixed size thread pools
All checks were successful
/ test (push) Successful in 38s
destroy-thread-pool should block until the thread pool has been
destroyed.
2026-03-18 09:42:17 +00:00
09cb805ee2 Add even more documentation 2026-03-18 08:58:41 +00:00
d3d4964210 Add more documentation 2026-03-18 08:44:26 +00:00
5b84273cbf Add some documentation for the timeout procedures
And tweak how with-fibers-timeout works.
2026-03-17 21:58:22 +00:00
30aa837cf4 Add some resource pool documentation 2026-03-17 21:47:47 +00:00
79d5603416 Add more detail to the README. 2026-03-17 21:35:45 +00:00
8e29587ec1 Change the behind system clock threshold
As the Honeycomb LX2 machines start with 2001 as the time.
2026-03-17 21:13:30 +00:00
8b489490e1 Fix readable? docstring. 2026-03-17 21:13:03 +00:00
11 changed files with 391 additions and 24 deletions

15
README
View file

@ -1,4 +1,15 @@
-*- mode: org -*-
This Guile library provides useful patterns and functionality to use
Guile Fibers.
* Guile Knots
Guile Knots is a library providing higher-level patterns and building
blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]].
This includes:
- Parallel map/for-each with configurable concurrency limits
- Resource and thread pools
- Fiber-aware promises for lazy and eager parallel evaluation
- Timeouts for fibers and I/O ports
- A HTTP web server
- Non-blocking socket utilities

View file

@ -29,6 +29,11 @@
spawn-fiber/knots))
(define (call-with-default-io-waiters thunk)
"Run THUNK with Guile's default blocking I/O waiters active.
This is useful when restoring the default Guile I/O waiters from
within a context (like Fibers) where different I/O waiters are used,
for example when creating a new thread from a fiber."
(parameterize
((current-read-waiter (@@ (ice-9 suspendable-ports)
default-read-waiter))
@ -37,15 +42,33 @@
(thunk)))
(define (wait-when-system-clock-behind)
(let ((start-of-the-year-2000 946684800))
"Block until the system clock reads at least 2001-01-02.
Useful at startup in environments (virtual machines, embedded systems)
where the clock may start at or near the Unix epoch. Prints a warning
to the current error port every 20 seconds while waiting."
;; Jan 02 2001 02:00:00
(let ((start-of-the-year-2001 978400800))
(while (< (current-time)
start-of-the-year-2000)
start-of-the-year-2001)
(simple-format (current-error-port)
"warning: system clock potentially behind, waiting\n")
(sleep 20))))
;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
"Run THUNK with a SIGINT handler that signals the Fibers condition
CVAR. Restores the previous handler when THUNK returns.
Typical usage is to pass a condition variable to this procedure and
wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
@example
(let ((quit-cvar (make-condition)))
(call-with-sigint
(lambda () (wait quit-cvar))
quit-cvar))
@end example"
(let ((handler #f))
(dynamic-wind
(lambda ()
@ -96,6 +119,11 @@
(raise-exception exn)))))
(define* (display/knots obj #:optional (port (current-output-port)))
"Write OBJ to PORT (default: current output port) as a UTF-8 byte
sequence via @code{put-bytevector}.
When used with ports without buffering, this should be safer than
display."
(put-bytevector
port
(string->utf8
@ -104,6 +132,8 @@
(display obj port))))))
(define (simple-format/knots port s . args)
"Like @code{simple-format} but should be safer when used with a port
without buffering."
(let ((str (apply simple-format #f s args)))
(if (eq? #f port)
str
@ -114,6 +144,8 @@
port)))))
(define (format/knots port s . args)
"Like @code{format} but should be safer when used with a port
without buffering."
(let ((str (apply format #f s args)))
(if (eq? #f port)
str
@ -232,6 +264,10 @@
(display/knots error-string port)))
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
"Spawn a fiber to run THUNK, with knots exception handling.
Accepts the same optional SCHEDULER and @code{#:parallel?} arguments
as @code{spawn-fiber}."
(spawn-fiber
(lambda ()
(with-exception-handler

View file

@ -32,6 +32,16 @@
(define* (non-blocking-open-socket-for-uri uri
#:key (verify-certificate? #t))
"Open a socket for URI and return it as a non-blocking port.
For HTTPS URIs the TLS handshake is completed while the socket is
still blocking (required because Guile's TLS wrapper does not support
non-blocking handshakes), then the underlying socket is made
non-blocking. For plain HTTP the socket is made non-blocking
immediately.
@code{#:verify-certificate?} controls TLS certificate verification
and defaults to @code{#t}."
(define tls-wrap
(@@ (web client) tls-wrap))

View file

@ -267,6 +267,16 @@ invocation of PROC finishes. REPORT is passed the results for each
#:key (parallelism 1)
(input-channel (make-channel))
(process-channel input-channel))
"Convert PROC into a procedure backed by @code{#:parallelism}
(default: 1) background fibers. Returns a wrapper that sends its
arguments to one of the fibers and blocks until the result is
returned.
@code{#:input-channel} is the channel that callers write requests to;
defaults to a fresh channel. @code{#:process-channel} is the channel
the fibers read from; defaults to @code{#:input-channel}. Setting
them differently allows external parties to bypass the wrapper and
write directly to @code{process-channel}."
(for-each
(lambda _
(spawn-fiber
@ -318,6 +328,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(resource-pool parallelism-limiter-resource-pool))
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
"Return a parallelism limiter that allows at most LIMIT concurrent
fibers to execute within @code{with-parallelism-limiter} at the same
time. Further fibers block until a slot becomes free.
@code{#:name} is a string used in log messages. Defaults to
@code{\"unnamed\"}."
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
@ -329,6 +345,9 @@ invocation of PROC finishes. REPORT is passed the results for each
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
slot, and return the values from THUNK. Blocks if no slot is
currently available."
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _

View file

@ -41,12 +41,21 @@
(evaluated-condition fibers-promise-evaluated-condition))
(define (fibers-delay thunk)
"Return a new fiber-aware promise that will evaluate THUNK when
first forced. THUNK is not called until @code{fibers-force} is
called on the promise."
(make-fibers-promise
thunk
(make-atomic-box #f)
(make-condition)))
(define (fibers-force fp)
"Force the fiber-aware promise FP, returning its values.
The first call evaluates the promise's thunk. Concurrent callers
block on a condition variable until evaluation finishes, then receive
the same result. If the thunk raises an exception, the exception is
stored and re-raised for all callers."
(unless (fibers-promise? fp)
(raise-exception
(make-exception
@ -108,6 +117,9 @@
(define (fibers-delay/eager thunk)
"Return a new fiber-aware promise and immediately begin evaluating
THUNK in a new fiber. Exceptions during eager evaluation are silently
discarded; they will be re-raised when @code{fibers-force} is called."
(let ((promise (fibers-delay thunk)))
(spawn-fiber
(lambda ()
@ -121,10 +133,15 @@
promise))
(define (fibers-promise-reset fp)
"Reset the fiber-aware promise FP so that the next call to
@code{fibers-force} re-evaluates its thunk."
(atomic-box-set! (fibers-promise-values-box fp)
#f))
(define (fibers-promise-result-available? fp)
"Return @code{#t} if the fiber-aware promise FP has been evaluated
(successfully or with an exception) and @code{#f} if evaluation has
not yet started or is still in progress."
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
(not (or (eq? val #f)
(eq? val 'started)))))

View file

@ -25,6 +25,12 @@
#:export (spawn-queueing-fiber))
(define (spawn-queueing-fiber dest-channel)
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
Returns a new input channel.
Multiple producers can put items on the returned channel concurrently.
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
a time, preserving arrival order."
(define queue (make-q))
(let ((queue-channel (make-channel)))

View file

@ -154,6 +154,33 @@
(name "unnamed")
default-checkout-timeout
default-max-waiters)
"Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or
vector of pre-existing resource values.
Use @code{with-resource-from-pool} or
@code{call-with-resource-from-pool} to borrow a resource and return it
automatically when done.
Optional keyword arguments:
@table @code
@item #:name
A optional string used in log messages.
Defaults to @code{\"unnamed\"}.
@item #:default-checkout-timeout
Default checkout timeout when requesting a resource from the pool,
unset by default.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a resource. When
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
raised when a resource is requested. Defaults to @code{#f} (no limit).
@item #:scheduler
The Fibers scheduler to use for the pool's internal fiber. Defaults
to the current scheduler.
@end table"
(define channel (make-channel))
(define destroy-condition
(make-condition))
@ -513,6 +540,59 @@
(add-resources-parallelism 1)
default-checkout-timeout
default-max-waiters)
"Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk
called to create each new resource value. MAX-SIZE is the maximum
number of resources the pool will hold simultaneously.
Resources are created on demand when a checkout is requested and the
pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or
@code{call-with-resource-from-pool} to request a resource and return
it automatically when done.
Optional keyword arguments:
@table @code
@item #:min-size
Minimum number of resources to keep alive even when idle. Defaults to
@code{0}.
@item #:idle-seconds
Seconds a resource may remain unused before being destroyed, provided
the pool is above @code{#:min-size}. Defaults to @code{#f} (never
expire idle resources).
@item #:lifetime
Maximum number of checkouts a single resource will serve before being
destroyed and replaced by a fresh one. Defaults to @code{#f} (no
limit).
@item #:destructor
A procedure called as @code{(destructor resource)} when a resource is
removed from the pool. Defaults to @code{#f}.
@item #:add-resources-parallelism
Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the
pool needs to grow. Allowing resources to be created in parallel can
result in more resources being created than can fit inside the pool,
if this happens, the surplus resources are destroyed. Defaults to
@code{1}.
@item #:name
A string used in log messages. Defaults to @code{\"unnamed\"}.
@item #:default-checkout-timeout
Default checkout timeout when requesting a resource from the pool,
unset by default.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a resource. When
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
raised when a resource is requested. Defaults to @code{#f} (no limit).
@item #:scheduler
The Fibers scheduler to use for the pool's internal fiber. Defaults
to the current scheduler.
@end table"
(define channel (make-channel))
(define destroy-condition
(make-condition))
@ -1172,6 +1252,10 @@
pool)
(define (destroy-resource-pool pool)
"Destroy POOL, preventing any new checkouts. Blocks until all
checked-out resources have been returned, running the pool's
@code{#:destructor} on each. Any fibers waiting for a resource
receive @code{&resource-pool-destroyed}."
(perform-operation
(choice-operation
(wrap-operation
@ -1388,6 +1472,23 @@ available. Return the resource once PROC has returned."
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
"Return an alist of statistics for POOL with the following keys:
@table @code
@item resources
Total number of resources currently held by the pool.
@item available
Number of resources not currently checked out.
@item waiters
Number of fibers currently queued waiting for a resource.
@item checkout-failure-count
Cumulative number of checkouts where an exception was raised inside
the proc.
@end table
Blocks waiting for the pool fiber to respond. @code{#:timeout} is
the number of seconds to wait; defaults to @code{5}. Raises
@code{&resource-pool-timeout} if the pool does not respond in time."
(define channel
(resource-pool-channel pool))

View file

@ -54,6 +54,15 @@
rest)))))
(define* (fibers-sort! items less #:key parallelism)
"Sort ITEMS destructively using LESS as the comparison procedure,
using a parallel merge sort. Returns the sorted list.
Splits ITEMS into chunks, sorts each in an eager fiber-promise in
parallel, then merges pairs of sorted chunks in parallel until one
sorted list remains.
@code{#:parallelism} sets the number of initial chunks. Defaults to
the current fibers parallelism."
(define requested-chunk-count
(or parallelism
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))

View file

@ -163,12 +163,13 @@ from there, or #f if that would be an empty string."
(define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout)
default-checkout-timeout threads)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
(threads fixed-size-thread-pool-threads))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
@ -211,6 +212,52 @@ from there, or #f if that would be an empty string."
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Create a pool of SIZE threads started immediately. Use
@code{call-with-thread} to run a procedure in one of the threads.
Optional keyword arguments:
@table @code
@item #:thread-initializer
A thunk called once when each thread starts. Its return value is
passed as extra arguments to every procedure run in that thread.
Defaults to @code{#f} (no extra arguments).
@item #:thread-destructor
A procedure called with the value returned by @code{#:thread-initializer}
when a thread exits. Defaults to @code{#f}.
@item #:thread-lifetime
Maximum number of procedures a thread will run before restarting (and
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
limit).
@item #:expire-on-exception?
When @code{#t}, replace a thread after any unhandled exception.
Defaults to @code{#f}.
@item #:use-default-io-waiters?
When @code{#t} (the default), each thread uses blocking I/O waiters so
that port reads and writes block the thread rather than trying to
suspend a fiber.
@item #:name
String used in thread names and log messages. Defaults to
@code{\"unnamed\"}.
@item #:default-checkout-timeout
Seconds to wait for a free thread slot before raising
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
forever).
@item #:delay-logger
Called as @code{(delay-logger seconds proc)} with the time spent
waiting for a thread to become available.
@item #:duration-logger
Called as @code{(duration-logger seconds proc)} after each procedure
completes.
@end table"
(define channel
(make-channel))
@ -380,19 +427,20 @@ from there, or #f if that would be an empty string."
(initializer/safe)
'()))))))))
(for-each
(lambda (i)
(define threads
(map (lambda (i)
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size))
(iota size)))
(fixed-size-thread-pool channel
param
thread-proc-vector
default-checkout-timeout))
default-checkout-timeout
threads))
(define* (make-thread-pool max-size
#:key
@ -408,8 +456,34 @@ from there, or #f if that would be an empty string."
(use-default-io-waiters? #t)
default-checkout-timeout
default-max-waiters)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
@code{call-with-thread} to run a procedure in one of the threads.
Unlike @code{make-fixed-size-thread-pool}, threads are created on
demand and may be reclaimed when idle (controlled by @code{#:min-size}
and the resource pool's idle management).
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
@code{#:use-default-io-waiters?}, @code{#:name},
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
plus:
@table @code
@item #:min-size
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
the pool is pre-filled and never shrinks).
@item #:scheduler
Fibers scheduler for the pool's internal resource pool fiber. Defaults
to the current scheduler.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a thread. Raises
@code{&thread-pool-timeout-error} when exceeded. Defaults to
@code{#f} (no limit).
@end table"
(define param
(make-parameter #f))
@ -444,8 +518,34 @@ arguments of the thread pool procedure."
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately."
"Run PROC in THREAD-POOL and return its values, blocking until
complete. If called from within a thread that already belongs to
THREAD-POOL, PROC is called directly in that thread.
Optional keyword arguments:
@table @code
@item #:checkout-timeout
Seconds to wait for a free thread before raising
@code{&thread-pool-timeout-error}. Defaults to the pool's
@code{#:default-checkout-timeout}.
@item #:max-waiters
Maximum number of fibers that may queue waiting for a thread (for
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
@item #:destroy-thread-on-exception?
When @code{#t}, destroy the thread after PROC raises an exception.
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
@code{#f}.
@item #:duration-logger
Called as @code{(duration-logger seconds)} after PROC completes
(whether or not it raised an exception).
@item #:channel
Override the channel used to communicate with the thread.
@end table"
(define (handle-proc fixed-size-thread-pool
reply-channel
start-time
@ -529,9 +629,13 @@ If already in the thread pool, call PROC immediately."
destroy-thread-on-exception?))))))))
(define (destroy-thread-pool pool)
"Destroy POOL, stopping all of its threads and calling the destructor
if specified. This procedure will block until the destruction is
complete."
(if (fixed-size-thread-pool? pool)
(put-message
(fixed-size-thread-pool-channel pool)
'destroy)
(let ((channel (fixed-size-thread-pool-channel pool))
(threads (fixed-size-thread-pool-threads pool)))
(for-each (lambda _ (put-message channel 'destroy)) threads)
(for-each join-thread threads))
(destroy-resource-pool
(thread-pool-resource-pool pool))))

View file

@ -45,7 +45,16 @@
with-port-timeouts))
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
(define* (with-fibers-timeout thunk #:key
timeout
(on-timeout
(const *unspecified*)))
"Run THUNK in a new fiber and return its values, waiting TIMEOUT
seconds for it to finish. If THUNK does not complete within TIMEOUT
seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout
returns the result of ON-TIMEOUT instead.
If THUNK raises an exception it is re-raised in the calling fiber."
(let ((channel (make-channel)))
(spawn-fiber
(lambda ()
@ -110,7 +119,7 @@
(exception-predicate &port-write-timeout-error))
(define (readable? port)
"Test if PORT is writable."
"Test if PORT is readable."
(= 1 (port-poll port "r" 0)))
(define (writable? port)
@ -151,6 +160,21 @@
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
"Run THUNK with per-operation I/O timeouts on all ports. If any
read or write blocks for longer than the given number of seconds, an
exception is raised.
@code{#:timeout} sets both read and write timeouts.
@code{#:read-timeout} and @code{#:write-timeout} specify the timeout
for reads and writes respectively. All three default to @code{#f} (no
timeout).
This procedure works both with fibers, and without fibers by using the
poll system call with a timeout.
On read timeout, raises @code{&port-read-timeout-error}. On write
timeout, raises @code{&port-write-timeout-error}. Both carry the
@code{thunk} and @code{port} fields from @code{&port-timeout-error}."
(define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)

View file

@ -1,5 +1,6 @@
(use-modules (tests)
(ice-9 atomic)
(ice-9 threads)
(srfi srfi-71)
(fibers)
(unit-test)
@ -142,4 +143,33 @@
ref-and-decrement))
(error)))
;; Test that the destructor is called when a size 1 fixed-size thread
;; pool is destroyed, and that destroy-thread-pool blocks until it has
;; completed.
(let* ((destructor-called? #f)
(thread-pool
(make-fixed-size-thread-pool
1
#:thread-destructor
(lambda ()
(set! destructor-called? #t)))))
(destroy-thread-pool thread-pool)
(assert-equal #t destructor-called?))
;; Test that the destructor is called for every thread when a
;; multi-thread fixed-size thread pool is destroyed, and that
;; destroy-thread-pool blocks until all destructors have completed.
(let* ((destructor-count 0)
(mutex (make-mutex))
(pool-size 3)
(thread-pool
(make-fixed-size-thread-pool
pool-size
#:thread-destructor
(lambda ()
(with-mutex mutex
(set! destructor-count (+ destructor-count 1)))))))
(destroy-thread-pool thread-pool)
(assert-equal pool-size destructor-count))
(display "thread-pool test finished successfully\n")