Compare commits

..

5 commits

Author SHA1 Message Date
003c5aa6b0 WIP
All checks were successful
/ test (push) Successful in 12s
2025-06-24 21:03:34 +02:00
eadfa53b36 WIP
All checks were successful
/ test (push) Successful in 13s
2025-06-24 21:00:54 +02:00
81dd3370e6 WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:59:57 +02:00
7f5f05ef2b WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:58:25 +02:00
7c2c6f2de9 WIP 2025-06-24 20:58:25 +02:00
42 changed files with 839 additions and 3946 deletions

View file

@ -1,7 +1,7 @@
on:
push:
branches:
- trunk
- actions-test
jobs:
test:
runs-on: host
@ -10,17 +10,13 @@ jobs:
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
- run: |
cd knots-trunk
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/"
guix shell -D -f guix-dev.scm -- documenta api knots
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
- run: |
cd knots-pages
git add .
if [[ -z "$(git status -s)" ]]; then
echo "Nothing to push"
else
git config user.email ""
git config user.name "Automatic website updater"
git commit -m "Automatic website update"
git push
fi
git config user.email ""
git config user.name "Automatic website updater"
git commit -m "Automatic website update"
git push

View file

@ -2,31 +2,25 @@ include guile.am
SOURCES = \
knots.scm \
knots/backtraces.scm \
knots/non-blocking.scm \
knots/parallelism.scm \
knots/promise.scm \
knots/queue.scm \
knots/resource-pool.scm \
knots/sort.scm \
knots/thread-pool.scm \
knots/timeout.scm \
knots/web-server.scm \
knots/web.scm
knots/thread-pool.scm
SCM_TESTS = \
tests/backtraces.scm \
tests/non-blocking.scm \
tests/non-blocking.scm \
tests/parallelism.scm \
tests/promise.scm \
tests/queue.scm \
tests/web.scm \
tests/resource-pool.scm \
tests/sort.scm \
tests/thread-pool.scm \
tests/timeout.scm \
tests/web-server.scm
tests/non-blocking.scm \
tests/queue.scm \
tests/web-server.scm \
tests/parallelism.scm \
tests/resource-pool.scm \
tests/thread-pool.scm
TESTS_GOBJECTS = $(SCM_TESTS:%.scm=%.go)
@ -36,4 +30,4 @@ EXTRA_DIST = \
pre-inst-env.in
check: $(GOBJECTS) $(TESTS_GOBJECTS)
find tests -maxdepth 1 -name "*.scm" | xargs -t -L1 ./test-env guile
find tests -name "*.scm" | xargs -t -L1 ./test-env guile

4
README Normal file
View file

@ -0,0 +1,4 @@
-*- mode: org -*-
This Guile library provides useful patterns and functionality to use
Guile Fibers.

View file

@ -1,15 +0,0 @@
-*- mode: org -*-
* Guile Knots
Guile Knots is a library providing higher-level patterns and building
blocks for programming with [[https://codeberg.org/guile/fibers][Guile Fibers]].
This includes:
- Parallel map/for-each with configurable concurrency limits
- Resource and thread pools
- Fiber-aware promises for lazy and eager parallel evaluation
- Timeouts for fibers and I/O ports
- A HTTP web server
- Non-blocking socket utilities

View file

@ -16,10 +16,10 @@
@top Overview
Guile Knots is a library providing tools and patterns for programming
with @url{https://codeberg.org/guile/fibers, Guile Fibers}. Guile
Knots provides higher level building blocks for writing programs using
Guile Fibers, including managing code that can't run in a thread used
by fibers. Also included is a web server implementation using Fibers,
with @url{https://github.com/wingo/fibers, Guile Fibers}. Guile Knots
provides higher level building blocks for writing programs using Guile
Fibers, including managing code that can't run in a thread used by
fibers. Also included is a web server implementation using Fibers,
which while being similar to the web server provided by Fibers, can
provide some benefits in specific circumstances.

248
knots.scm
View file

@ -1,61 +1,23 @@
;;; Guile Knots
;;; Copyright © 2026 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 suspendable-ports)
#:use-module (rnrs bytevectors)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:use-module (ice-9 format)
#:use-module (knots backtraces)
#:re-export (&knots-exception
make-knots-exception
knots-exception?
knots-exception-stack
print-backtrace-and-exception/knots)
#:use-module (system repl debug)
#:export (call-with-default-io-waiters
wait-when-system-clock-behind
call-with-sigint
display/knots
simple-format/knots
format/knots
&knots-exception
make-knots-exception
knots-exception?
knots-exception-stack
call-with-temporary-thread
spawn-fiber/knots))
print-backtrace-and-exception/knots))
(define (call-with-default-io-waiters thunk)
"Run THUNK with Guile's default blocking I/O waiters active.
This is useful when restoring the default Guile I/O waiters from
within a context (like Fibers) where different I/O waiters are used,
for example when creating a new thread from a fiber."
(parameterize
((current-read-waiter (@@ (ice-9 suspendable-ports)
default-read-waiter))
@ -64,33 +26,15 @@ for example when creating a new thread from a fiber."
(thunk)))
(define (wait-when-system-clock-behind)
"Block until the system clock reads at least 2001-01-02.
Useful at startup in environments (virtual machines, embedded systems)
where the clock may start at or near the Unix epoch. Prints a warning
to the current error port every 20 seconds while waiting."
;; Jan 02 2001 02:00:00
(let ((start-of-the-year-2001 978400800))
(let ((start-of-the-year-2000 946684800))
(while (< (current-time)
start-of-the-year-2001)
start-of-the-year-2000)
(simple-format (current-error-port)
"warning: system clock potentially behind, waiting\n")
(sleep 20))))
;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
"Run THUNK with a SIGINT handler that signals the Fibers condition
CVAR. Restores the previous handler when THUNK returns.
Typical usage is to pass a condition variable to this procedure and
wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
@example
(let ((quit-cvar (make-condition)))
(call-with-sigint
(lambda () (wait quit-cvar))
quit-cvar))
@end example"
(let ((handler #f))
(dynamic-wind
(lambda ()
@ -104,109 +48,83 @@ wait on CVAR in a fiber to implement clean shutdown on Ctrl-C:
;; restore original C handler.
(sigaction SIGINT #f))))))
(define (call-with-temporary-thread thunk)
"Run THUNK in a temporary thread and return its result to the
calling fiber."
(let ((channel (make-channel)))
(call-with-new-thread
(lambda ()
(call-with-default-io-waiters
(lambda ()
(with-exception-handler
(lambda (exn)
(put-message channel `(exception . ,exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values thunk
(lambda values
(put-message channel `(values ,@values)))))))
#:unwind? #t)))))
(define &knots-exception
(make-exception-type '&knots-exception
&exception
'(stack)))
(match (get-message channel)
(('values . results)
(apply values results))
(('exception . exn)
(raise-exception exn)))))
(define make-knots-exception
(record-constructor &knots-exception))
(define* (display/knots obj #:optional (port (current-output-port)))
"Write OBJ to PORT (default: current output port) as a UTF-8 byte
sequence via @code{put-bytevector}.
(define knots-exception?
(exception-predicate &knots-exception))
When used with ports without buffering, this should be safer than
display."
(put-bytevector
port
(string->utf8
(call-with-output-string
(lambda (port)
(display obj port))))))
(define knots-exception-stack
(exception-accessor
&knots-exception
(record-accessor &knots-exception 'stack)))
(define (simple-format/knots port s . args)
"Like @code{simple-format} but should be safer when used with a port
without buffering."
(let ((str (apply simple-format #f s args)))
(if (eq? #f port)
str
(display/knots
str
(if (eq? #t port)
(current-output-port)
port)))))
(define* (print-backtrace-and-exception/knots
exn
#:key (port (current-error-port)))
(let* ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t))))
(stack-len
(stack-length stack))
(error-string
(call-with-output-string
(lambda (port)
(let ((knots-stacks
(map knots-exception-stack
(filter knots-exception?
(simple-exceptions exn)))))
(define (format/knots port s . args)
"Like @code{format} but should be safer when used with a port
without buffering."
(let ((str (apply format #f s args)))
(if (eq? #f port)
str
(display/knots
str
(if (eq? #t port)
(current-output-port)
port)))))
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
"Spawn a fiber to run THUNK, with knots exception handling.
Accepts the same optional SCHEDULER and @code{#:parallel?} arguments
as @code{spawn-fiber}."
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(display/knots "Uncaught exception in task:\n"
(current-error-port))
(print-backtrace-and-exception/knots exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
thunk))
#:unwind? #t))
scheduler
#:parallel? parallel?))
(let* ((stack-vec
(stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector
(drop
(vector->list stack-vec)
(if (< stack-vec-length 5)
0
4)))
port
#:count (stack-length stack)))
(for-each
(lambda (stack)
(let* ((stack-vec
(stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector
(drop
(vector->list stack-vec)
(if (< stack-vec-length 4)
0
3)))
port
#:count (stack-length stack))))
knots-stacks)
(print-exception
port
(if (null? knots-stacks)
(stack-ref stack
(if (< stack-len 4)
stack-len
4))
(let* ((stack (last knots-stacks))
(stack-len (stack-length stack)))
(stack-ref stack
(if (< stack-len 3)
stack-len
3))))
'%exception
(list exn)))))))
(display error-string port)))

View file

@ -1,350 +0,0 @@
;;; Guile Knots
;;; Copyright © 2026 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots backtraces)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (system repl debug)
#:use-module (system vm frame)
#:use-module ((knots) #:select (display/knots
simple-format/knots
format/knots))
#:export (&knots-exception
make-knots-exception
knots-exception?
knots-exception-stack
print-backtrace-and-exception/knots))
(define &knots-exception
(make-exception-type '&knots-exception
&exception
'(stack)))
(define make-knots-exception
(record-constructor &knots-exception))
(set-procedure-property! make-knots-exception 'documentation
"Construct a @code{&knots-exception} with the given stack.")
(define knots-exception?
(exception-predicate &knots-exception))
(set-procedure-property! knots-exception? 'documentation
"Return @code{#t} if OBJ is a @code{&knots-exception}.")
(define knots-exception-stack
(exception-accessor
&knots-exception
(record-accessor &knots-exception 'stack)))
(set-procedure-property! knots-exception-stack 'documentation
"Return the stack from a @code{&knots-exception}.")
(define (backtrace-debug-mode?)
(let ((val (getenv "KNOTS_BACKTRACE_DEBUG")))
(and val
(not (string=? val ""))
(not (string=? val "0")))))
(define (debug-print-stack port label full-vec included-vec)
(simple-format/knots port "[KNOTS DEBUG] ~A\n" label)
(if (vector-empty? full-vec)
(simple-format/knots port " (empty)\n")
(vector-fold-right
(lambda (i _ frame)
(let ((marker
(if (vector-index
(lambda (f) (eq? f frame))
included-vec)
">" " "))
(name
(symbol->string
(or (frame-procedure-name frame)
'_))))
(match (frame-source frame)
(#f
(format/knots port " ~a ~3d unknown ~a~%"
marker i name))
((_ file line . col)
(format/knots port " ~a ~3d ~a:~a:~a ~a~%"
marker i file (1+ line) col name)))))
#f
full-vec))
(force-output port))
(define (internal-file? file)
(or (string-prefix? "ice-9/" file)
(string-prefix? "system/" file)
(string-prefix? "srfi/" file)
(string=? file "knots.scm")
(string-prefix? "knots/" file)
(string=? file "fibers.scm")
(string-prefix? "fibers/" file)))
(define (frame-file frame)
(let ((src (frame-source frame)))
(and src (cadr src))))
(define (user-frame? frame)
(let ((file (frame-file frame)))
(and (string? file)
(not (internal-file? file)))))
(define (raise-machinery-frame? frame)
;; Return #t for frames that are part of the raise/unwind machinery
;; and should be skipped when looking for the raise site.
;; Specifically: C/unknown frames (no source file) and
;; ice-9/boot-9.scm frames. Other internal frames such as
;; ice-9/vlist.scm are part of the actual call path and should be
;; preserved.
(let ((file (frame-file frame)))
(or (not file)
(string=? file "ice-9/boot-9.scm"))))
(define (fibers-frame? frame)
;; Return #t if FRAME belongs to the fibers library.
(let ((file (frame-file frame)))
(and (string? file)
(or (string=? file "fibers.scm")
(string-prefix? "fibers/" file)))))
;; The number of frames in Guile's eval-machinery tail appended to every
;; top-level script stack:
;;
;; [n-6] ice-9/boot-9.scm _
;; [n-5] ice-9/boot-9.scm save-module-excursion
;; [n-4] ice-9/eval.scm _
;; [n-3] ice-9/boot-9.scm call-with-prompt
;; [n-2] C/unknown apply-smob/0
;; [n-1] ice-9/boot-9.scm with-exception-handler
(define script-eval-tail-length 6)
(define (classify-stack-situation stack-vector)
(cond
((vector-any fibers-frame? stack-vector)
'run-fibers)
((let ((len (vector-length stack-vector)))
(and (>= len script-eval-tail-length)
(equal? (frame-file (vector-ref stack-vector (- len 1)))
"ice-9/boot-9.scm")
(eq? (frame-procedure-name (vector-ref stack-vector (- len 3)))
'call-with-prompt)
(not (vector-any (lambda (frame)
(eq? (frame-procedure-name frame)
'%start-stack))
stack-vector))))
'script)
(else
'unknown)))
(define (filter-knots-stack-vector vector)
;; Extract user frames from a pre-captured knots stack. The bottom 3 frames
;; are always fixed overhead: make-stack (C), the handler body frame at the
;; make-stack call site (exactly 1 Scheme frame), and raise-exception
;; (boot-9). User frames start at index 3.
(let ((last-user (vector-index-right user-frame? vector)))
(if (or (not last-user) (< last-user 3))
#()
(vector-copy vector 3 (+ last-user 1)))))
(define (filter-stack-vector vector)
;; Return the slice of VECTOR containing the frames relevant for
;; display. Skips the fixed 2-frame overhead (make-stack + call
;; site) and any raise machinery to find after-raise, then bounds at
;; the eval-machinery tail (script) or the first fibers scheduler
;; frame (run-fibers/unknown).
(define (skip-handler-and-raise vector start)
;; Scan forward from START in VECTOR, first past any user frames
;; (the handler body), then past raise-machinery frames (C/unknown
;; and ice-9/boot-9.scm). Returns the index of the first
;; remaining frame — the raise site or context. Other internal
;; frames such as ice-9/vlist.scm are preserved because they are
;; part of the actual call path.
(let* ((len (vector-length vector))
(after-handler
(let loop ((i start))
(if (or (>= i len) (not (user-frame? (vector-ref vector i))))
i
(loop (+ i 1))))))
(let loop ((i after-handler))
(cond
((>= i len) i)
((raise-machinery-frame? (vector-ref vector i)) (loop (+ i 1)))
(else i)))))
(let* ((len (vector-length vector))
(situation (classify-stack-situation vector))
(after-raise (skip-handler-and-raise vector (min 2 len)))
(end (if (and (eq? situation 'script)
(> (- len script-eval-tail-length) after-raise))
(- len script-eval-tail-length)
(let loop ((i after-raise))
(cond ((>= i len) i)
((fibers-frame? (vector-ref vector i)) i)
(else (loop (+ i 1))))))))
(if (>= after-raise end)
#()
(vector-copy vector after-raise end))))
;; Based on print-frame from (system repl debug), but without the
;; frame indexes
(define* (print-frame/no-index frame
#:optional (port (current-output-port))
#:key (width (terminal-width))
(last-source #f) (innermost? #f))
(define (source-file src)
(match src
(#f "unknown file")
((_ #f . _) "current input")
((_ file . _) file)))
(let* ((source (frame-source frame))
(file (source-file source)))
(when (not (equal? file (source-file last-source)))
(format port "~&In ~a:~&" file))
(format port "~9@a ~v:@y~%"
(match source
(#f "")
((_ _ line . col) (simple-format #f "~A:~A" (1+ line) col)))
width
(frame-call-representation frame #:top-frame? innermost?))))
(define* (print-backtrace-and-exception/knots
exn
#:key (port (current-error-port)))
"Print the backtrace and exception information from EXN to PORT. This
procedure captures the stack, so should be run before the stack is
unwound, so using @code{with-exception-handler} without
@code{#:unwind? #t}, the exception may need to then be re-raised and
handled in an outer exception handler.
@example
(with-exception-handler
(lambda (exn)
;; Recover from the exception
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(do-things))))
#:unwind? #t)
@end example
It's important to use @code{print-backtrace-and-exception/knots} for
displaying backtraces involving functionality from Guile Knots, since
the stack involved is potentially split across several fibers. The
stacks involved are attached to the exception, and this procedure
extracts this information out and assembles a backtrace including all
the code involved.
"
(define (get-string out stack)
(let* ((stack-vector (stack->vector stack))
(knots-stack-vectors
(map (lambda (exn)
(stack->vector
(knots-exception-stack exn)))
(reverse
(filter knots-exception?
(simple-exceptions exn)))))
(filtered-stack-vector
(filter-stack-vector stack-vector))
(filtered-knots-stack-vectors
(map filter-knots-stack-vector knots-stack-vectors)))
(when (backtrace-debug-mode?)
(let ((debug-port (current-error-port))
(situation (classify-stack-situation stack-vector)))
(simple-format/knots
debug-port
"[KNOTS DEBUG] situation: ~A\n" situation)
(debug-print-stack debug-port "stack"
stack-vector filtered-stack-vector)
(let ((stack-count (length knots-stack-vectors)))
(for-each
(lambda (knots-vec user-vec index)
(debug-print-stack
debug-port
(format #f "knots stack ~a/~a" index stack-count)
knots-vec user-vec))
knots-stack-vectors
filtered-knots-stack-vectors
(iota stack-count 1)))
(display/knots "\n" debug-port)
(force-output debug-port)))
(for-each (lambda (vec)
(vector-fold-right
(lambda (i last-source frame)
(print-frame/no-index frame out
#:innermost? (= i 0)
#:last-source last-source)
(frame-source frame))
#f
vec))
(cons filtered-stack-vector
filtered-knots-stack-vectors))
(print-exception
out
#f
'%exception
(list (if (backtrace-debug-mode?)
exn
(apply make-exception
(remove knots-exception?
(simple-exceptions exn))))))))
(let* ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t))))
(string-port
(open-output-string))
(output
(with-exception-handler
(lambda (output-exn)
(display/knots (get-output-string string-port)
port)
(close-output-port string-port)
(display/knots "\n\n" port)
(let* ((stack (make-stack #t))
(backtrace
(call-with-output-string
(lambda (port)
(display-backtrace stack port)
(newline port)))))
(display/knots backtrace port))
(simple-format/knots
port
"\nexception in print-backtrace-and-exception/knots: ~A\n"
output-exn)
(raise-exception output-exn))
(lambda ()
(get-string string-port stack)
(let ((str (get-output-string string-port)))
(close-output-port string-port)
str)))))
(display/knots output port)))

View file

@ -32,16 +32,6 @@
(define* (non-blocking-open-socket-for-uri uri
#:key (verify-certificate? #t))
"Open a socket for URI and return it as a non-blocking port.
For HTTPS URIs the TLS handshake is completed while the socket is
still blocking (required because Guile's TLS wrapper does not support
non-blocking handshakes), then the underlying socket is made
non-blocking. For plain HTTP the socket is made non-blocking
immediately.
@code{#:verify-certificate?} controls TLS certificate verification
and defaults to @code{#t}."
(define tls-wrap
(@@ (web client) tls-wrap))

View file

@ -20,9 +20,6 @@
(define-module (knots parallelism)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
@ -30,7 +27,6 @@
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (fibers-batch-map
fibers-map
@ -42,13 +38,7 @@
fibers-parallel
fibers-let
fiberize
make-parallelism-limiter
parallelism-limiter?
destroy-parallelism-limiter
call-with-parallelism-limiter
with-parallelism-limiter))
fiberize))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
@ -58,7 +48,7 @@
(lambda (exn)
(put-message
reply
(cons 'exception exn)))
(list 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -79,7 +69,7 @@
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply (cons 'result vals)))))))
(put-message reply vals))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -89,16 +79,13 @@
reply-channels)))
(map
(match-lambda
(('exception . exn)
(('exception exn)
(raise-exception exn))
(('result . vals)
(apply values vals)))
(result
(apply values result)))
responses)))
(define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec)
list-or-vec
@ -118,18 +105,9 @@ all of the calls to PROC have finished."
(channel-indexes '()))
(if (and (eq? #f next-to-process-index)
(null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists))
processed-result-vec
(vector->list processed-result-vec)))
(if (vector? (first lists))
result-vec
(vector->list result-vec))
(if (or (= (length channel-indexes)
(min parallelism-limit vecs-length))
@ -145,13 +123,18 @@ all of the calls to PROC have finished."
(get-operation
(vector-ref result-vec index))
(lambda (result)
(vector-set! result-vec
index
result)
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec
index
(first result))
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))))
channel-indexes)))))
(loop new-index
new-channel-indexes))
@ -174,14 +157,9 @@ all of the calls to PROC have finished."
channel-indexes)))))))
(define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map
(lambda args
(apply proc args)
@ -192,13 +170,10 @@ parallel."
*unspecified*)
(define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -209,16 +184,12 @@ parallel."
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results
(apply map
(lambda args
@ -239,8 +210,8 @@ invocation of PROC finishes. REPORT is passed the results for each
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . vals))
(car vals)))
((#f . ('result . val))
val))
channels-to-results)
(loop
(perform-operation
@ -257,7 +228,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f result)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r))))
channels-to-results)))
#f))))
@ -267,16 +243,6 @@ invocation of PROC finishes. REPORT is passed the results for each
#:key (parallelism 1)
(input-channel (make-channel))
(process-channel input-channel))
"Convert PROC into a procedure backed by @code{#:parallelism}
(default: 1) background fibers. Returns a wrapper that sends its
arguments to one of the fibers and blocks until the result is
returned.
@code{#:input-channel} is the channel that callers write requests to;
defaults to a fresh channel. @code{#:process-channel} is the channel
the fibers read from; defaults to @code{#:input-channel}. Setting
them differently allows external parties to bypass the wrapper and
write directly to @code{process-channel}."
(for-each
(lambda _
(spawn-fiber
@ -288,7 +254,7 @@ write directly to @code{process-channel}."
reply-channel
(with-exception-handler
(lambda (exn)
(cons 'exception exn))
(list 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -319,48 +285,5 @@ write directly to @code{process-channel}."
(put-message input-channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception . exn)
(('exception exn)
(raise-exception exn))))))
(define-record-type <parallelism-limiter>
(make-parallelism-limiter-record resource-pool)
parallelism-limiter?
(resource-pool parallelism-limiter-resource-pool))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'parallelism-limiter?))
'documentation
"Return @code{#t} if OBJ is a @code{<parallelism-limiter>}.")
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
"Return a parallelism limiter that allows at most LIMIT concurrent
fibers to execute within @code{with-parallelism-limiter} at the same
time. Further fibers block until a slot becomes free.
@code{#:name} is a string used in log messages. Defaults to
@code{\"unnamed\"}."
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
#:name name)))
(define (destroy-parallelism-limiter parallelism-limiter)
"Destroy PARALLELISM-LIMITER, releasing its underlying resource pool."
(destroy-resource-pool
(parallelism-limiter-resource-pool
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
"Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the
slot, and return the values from THUNK. Blocks if no slot is
currently available."
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _
(thunk))))
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
"Evaluate EXP ... while holding a slot from PARALLELISM-LIMITER.
Syntactic sugar around @code{call-with-parallelism-limiter}."
(call-with-parallelism-limiter
parallelism-limiter
(lambda () exp ...)))

View file

@ -28,7 +28,6 @@
#:export (fibers-promise?
fibers-delay
fibers-delay/eager
fibers-force
fibers-promise-reset
fibers-promise-result-available?))
@ -39,27 +38,14 @@
(thunk fibers-promise-thunk)
(values-box fibers-promise-values-box)
(evaluated-condition fibers-promise-evaluated-condition))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fibers-promise?))
'documentation
"Return @code{#t} if OBJ is a @code{<fibers-promise>}.")
(define (fibers-delay thunk)
"Return a new fiber-aware promise that will evaluate THUNK when
first forced. THUNK is not called until @code{fibers-force} is
called on the promise."
(make-fibers-promise
thunk
(make-atomic-box #f)
(make-condition)))
(define (fibers-force fp)
"Force the fiber-aware promise FP, returning its values.
The first call evaluates the promise's thunk. Concurrent callers
block on a condition variable until evaluation finishes, then receive
the same result. If the thunk raises an exception, the exception is
stored and re-raised for all callers."
(unless (fibers-promise? fp)
(raise-exception
(make-exception
@ -96,10 +82,7 @@ stored and re-raised for all callers."
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(start-stack
#t
((fibers-promise-thunk fp))))))
(fibers-promise-thunk fp)))
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
@ -119,33 +102,11 @@ stored and re-raised for all callers."
(raise-exception res)
(apply values res))))))
(define (fibers-delay/eager thunk)
"Return a new fiber-aware promise and immediately begin evaluating
THUNK in a new fiber. Exceptions during eager evaluation are silently
discarded; they will be re-raised when @code{fibers-force} is called."
(let ((promise (fibers-delay thunk)))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda _
;; Silently handle this exception
#f)
(lambda ()
(fibers-force promise))
#:unwind? #t)))
promise))
(define (fibers-promise-reset fp)
"Reset the fiber-aware promise FP so that the next call to
@code{fibers-force} re-evaluates its thunk."
(atomic-box-set! (fibers-promise-values-box fp)
#f))
(define (fibers-promise-result-available? fp)
"Return @code{#t} if the fiber-aware promise FP has been evaluated
(successfully or with an exception) and @code{#f} if evaluation has
not yet started or is still in progress."
(let ((val (atomic-box-ref (fibers-promise-values-box fp))))
(not (or (eq? val #f)
(eq? val 'started)))))

View file

@ -25,12 +25,6 @@
#:export (spawn-queueing-fiber))
(define (spawn-queueing-fiber dest-channel)
"Spawn a fiber that serialises items onto DEST-CHANNEL in FIFO order.
Returns a new input channel.
Multiple producers can put items on the returned channel concurrently.
The fiber buffers them locally and forwards them to DEST-CHANNEL one at
a time, preserving arrival order."
(define queue (make-q))
(let ((queue-channel (make-channel)))

File diff suppressed because it is too large Load diff

View file

@ -1,97 +0,0 @@
;;; Guile Knots
;;; Copyright © 2020, 2025 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots sort)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (fibers scheduler)
#:use-module (knots promise)
#:export (fibers-sort!))
(define (try-split-at! lst i)
(cond ((< i 0)
(error "negitive split size"))
((= i 0)
(values '() lst))
(else
(let lp ((l lst) (n (- i 1)))
(if (<= n 0)
(let ((tmp (cdr l)))
(unless (null? tmp)
(set-cdr! l '()))
(values lst tmp))
(if (or (null? l)
(null? (cdr l)))
(values lst '())
(lp (cdr l) (- n 1))))))))
(define (chunk! lst max-length)
(let loop ((chunks '())
(lst lst))
(let ((chunk
rest
(try-split-at! lst max-length)))
(if (null? rest)
(reverse! (cons chunk chunks))
(loop (cons chunk chunks)
rest)))))
(define* (fibers-sort! items less #:key parallelism)
"Sort ITEMS destructively using LESS as the comparison procedure,
using a parallel merge sort. Returns the sorted list.
Splits ITEMS into chunks, sorts each in an eager fiber-promise in
parallel, then merges pairs of sorted chunks in parallel until one
sorted list remains.
@code{#:parallelism} sets the number of initial chunks. Defaults to
the current fibers parallelism."
(define requested-chunk-count
(or parallelism
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
(define items-length (length items))
(if (= 0 items-length)
items
(let* ((chunk-length (ceiling (/ items-length
requested-chunk-count)))
(chunks (chunk! items chunk-length)))
(let loop ((sorted-chunk-promises
(map
(lambda (chunk)
(fibers-delay/eager
(lambda ()
(sort! chunk less))))
chunks)))
(if (null? (cdr sorted-chunk-promises))
(fibers-force
(first sorted-chunk-promises))
(loop
(map
(match-lambda
((items) items)
((a b)
(fibers-delay/eager
(lambda ()
(merge! (fibers-force a)
(fibers-force b)
less)))))
(chunk! sorted-chunk-promises 2))))))))

View file

@ -55,8 +55,6 @@
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
thread-pool-delay-logger
thread-pool-duration-logger
destroy-thread-pool
@ -162,70 +160,30 @@ from there, or #f if that would be an empty string."
thread-pool?
(resource-pool thread-pool-resource-pool)
(arguments-parameter thread-pool-arguments-parameter-accessor))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'thread-pool?))
'documentation
"Return @code{#t} if OBJ is a @code{<thread-pool>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'thread-pool-resource-pool))
'documentation
"Return the underlying resource pool of the thread pool.")
(define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout delay-logger
duration-logger threads)
default-checkout-timeout)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
(delay-logger fixed-size-thread-pool-delay-logger)
(duration-logger fixed-size-thread-pool-duration-logger)
(threads fixed-size-thread-pool-threads))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool?))
'documentation
"Return @code{#t} if OBJ is a @code{<fixed-size-thread-pool>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool-channel))
'documentation
"Return the channel of the fixed-size thread pool.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool-current-procedures))
'documentation
"Return the current procedures vector of the fixed-size thread pool.")
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
"Return the arguments parameter for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
"Return the default checkout timeout for POOL."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define (thread-pool-delay-logger pool)
"Return the delay logger for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-delay-logger pool)
(resource-pool-delay-logger
(thread-pool-resource-pool pool))))
(define (thread-pool-duration-logger pool)
"Return the duration logger for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-duration-logger pool)
(resource-pool-duration-logger
(thread-pool-resource-pool pool))))
(define &thread-pool-timeout-error
(make-exception-type '&thread-pool-timeout-error
&error
@ -238,13 +196,9 @@ from there, or #f if that would be an empty string."
(exception-accessor
&thread-pool-timeout-error
(record-accessor &thread-pool-timeout-error 'pool)))
(set-procedure-property! thread-pool-timeout-error-pool 'documentation
"Return the pool from a @code{&thread-pool-timeout-error} exception.")
(define thread-pool-timeout-error?
(exception-predicate &thread-pool-timeout-error))
(set-procedure-property! thread-pool-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&thread-pool-timeout-error} exception.")
(record-predicate &thread-pool-timeout-error))
(define* (make-fixed-size-thread-pool size
#:key
@ -257,52 +211,6 @@ from there, or #f if that would be an empty string."
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Create a pool of SIZE threads started immediately. Use
@code{call-with-thread} to run a procedure in one of the threads.
Optional keyword arguments:
@table @code
@item #:thread-initializer
A thunk called once when each thread starts. Its return value is
passed as extra arguments to every procedure run in that thread.
Defaults to @code{#f} (no extra arguments).
@item #:thread-destructor
A procedure called with the value returned by @code{#:thread-initializer}
when a thread exits. Defaults to @code{#f}.
@item #:thread-lifetime
Maximum number of procedures a thread will run before restarting (and
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
limit).
@item #:expire-on-exception?
When @code{#t}, replace a thread after any unhandled exception.
Defaults to @code{#f}.
@item #:use-default-io-waiters?
When @code{#t} (the default), each thread uses blocking I/O waiters so
that port reads and writes block the thread rather than trying to
suspend a fiber.
@item #:name
String used in thread names and log messages. Defaults to
@code{\"unnamed\"}.
@item #:default-checkout-timeout
Seconds to wait for a free thread slot before raising
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
forever).
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a thread to become available.
@item #:duration-logger
Called as @code{(duration-logger seconds)} after each procedure
completes, whether it returned normally or raised an exception.
@end table"
(define channel
(make-channel))
@ -361,11 +269,19 @@ completes, whether it returned normally or raised an exception.
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((lifetime thread-lifetime))
(define (process channel args)
(let loop ()
(match (get-message channel)
('destroy #f)
((reply proc)
((reply sent-time proc)
(when delay-logger
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second)
proc)))
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
@ -376,9 +292,6 @@ completes, whether it returned normally or raised an exception.
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
@ -406,10 +319,6 @@ completes, whether it returned normally or raised an exception.
vals))))))
#:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply
response)
@ -417,20 +326,16 @@ completes, whether it returned normally or raised an exception.
(match response
(('thread-pool-error duration _)
(when duration-logger
(duration-logger duration))
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration))
(duration-logger duration proc))
#f))))
(if (and exception?
expire-on-exception?)
#t
(if lifetime
(if (<= lifetime 1)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(loop))))))))
(define (start-thread index channel)
(call-with-new-thread
@ -453,7 +358,7 @@ completes, whether it returned normally or raised an exception.
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process index channel args)))
(process channel args)))
#:unwind? #t)))
(when thread-destructor
@ -464,22 +369,19 @@ completes, whether it returned normally or raised an exception.
(initializer/safe)
'()))))))))
(define threads
(map (lambda (i)
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size)))
(for-each
(lambda (i)
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size))
(fixed-size-thread-pool channel
param
thread-proc-vector
default-checkout-timeout
delay-logger
duration-logger
threads))
default-checkout-timeout))
(define* (make-thread-pool max-size
#:key
@ -487,42 +389,15 @@ completes, whether it returned normally or raised an exception.
scheduler
thread-initializer
thread-destructor
delay-logger
duration-logger
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout
default-max-waiters)
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
@code{call-with-thread} to run a procedure in one of the threads.
Unlike @code{make-fixed-size-thread-pool}, threads are created on
demand and may be reclaimed when idle (controlled by @code{#:min-size}
and the resource pool's idle management).
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
@code{#:use-default-io-waiters?}, @code{#:name},
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
plus:
@table @code
@item #:min-size
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
the pool is pre-filled and never shrinks).
@item #:scheduler
Fibers scheduler for the pool's internal resource pool fiber. Defaults
to the current scheduler.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a thread. Raises
@code{&thread-pool-timeout-error} when exceeded. Defaults to
@code{#f} (no limit).
@end table"
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
@ -533,6 +408,7 @@ Maximum number of fibers that may queue waiting for a thread. Raises
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
@ -540,11 +416,9 @@ Maximum number of fibers that may queue waiting for a thread. Raises
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
#:default-checkout-timeout default-checkout-timeout)))
(thread-pool resource-pool
param)))
@ -552,53 +426,17 @@ Maximum number of fibers that may queue waiting for a thread. Raises
(define* (call-with-thread thread-pool
proc
#:key
(delay-logger
(thread-pool-delay-logger thread-pool))
(duration-logger
(thread-pool-duration-logger thread-pool))
duration-logger
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Run PROC in THREAD-POOL and return its values, blocking until
complete. If called from within a thread that already belongs to
THREAD-POOL, PROC is called directly in that thread.
Optional keyword arguments:
@table @code
@item #:checkout-timeout
Seconds to wait for a free thread before raising
@code{&thread-pool-timeout-error}. Defaults to the pool's
@code{#:default-checkout-timeout}.
@item #:max-waiters
Maximum number of fibers that may queue waiting for a thread (for
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
@item #:destroy-thread-on-exception?
When @code{#t}, destroy the thread after PROC raises an exception.
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
@code{#f}.
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a thread to become available. Defaults to the pool's
@code{#:delay-logger} if not specified.
@item #:duration-logger
Called as @code{(duration-logger seconds)} after PROC completes
(whether or not it raised an exception). Defaults to the pool's
@code{#:duration-logger} if not specified.
@item #:channel
Override the channel used to communicate with the thread.
@end table"
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately."
(define (handle-proc fixed-size-thread-pool
reply-channel
start-time
timeout
delay-logger)
timeout)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
@ -609,6 +447,7 @@ Override the channel used to communicate with the thread.
(wrap-operation
(put-operation request-channel
(list reply-channel
start-time
proc))
(const #t))))
@ -623,11 +462,6 @@ Override the channel used to communicate with the thread.
(raise-exception
(make-thread-pool-timeout-error)))
(when delay-logger
(delay-logger
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn)
@ -648,8 +482,7 @@ Override the channel used to communicate with the thread.
(handle-proc thread-pool
reply-channel
start-time
checkout-timeout
delay-logger)
checkout-timeout)
(with-exception-handler
(lambda (exn)
(if (and (resource-pool-timeout-error? exn)
@ -670,30 +503,22 @@ Override the channel used to communicate with the thread.
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time
#f)
remaining-time)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f
#f)))
#:delay-logger delay-logger
#:duration-logger #f
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(define (destroy-thread-pool pool)
"Destroy POOL, stopping all of its threads and calling the destructor
if specified. This procedure will block until the destruction is
complete."
(if (fixed-size-thread-pool? pool)
(let ((channel (fixed-size-thread-pool-channel pool))
(threads (fixed-size-thread-pool-threads pool)))
(for-each (lambda _ (put-message channel 'destroy)) threads)
(for-each join-thread threads))
(put-message
(fixed-size-thread-pool-channel pool)
'destroy)
(destroy-resource-pool
(thread-pool-resource-pool pool))))

View file

@ -45,16 +45,7 @@
with-port-timeouts))
(define* (with-fibers-timeout thunk #:key
timeout
(on-timeout
(const *unspecified*)))
"Run THUNK in a new fiber and return its values, waiting TIMEOUT
seconds for it to finish. If THUNK does not complete within TIMEOUT
seconds, the ON-TIMEOUT procedure is called and with-fibers-timeout
returns the result of ON-TIMEOUT instead.
If THUNK raises an exception it is re-raised in the calling fiber."
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
(let ((channel (make-channel)))
(spawn-fiber
(lambda ()
@ -94,9 +85,7 @@ If THUNK raises an exception it is re-raised in the calling fiber."
(record-constructor &port-timeout-error))
(define port-timeout-error?
(exception-predicate &port-timeout-error))
(set-procedure-property! port-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&port-timeout-error}.")
(record-predicate &port-timeout-error))
(define &port-read-timeout-error
(make-exception-type '&port-read-timeout-error
@ -107,9 +96,7 @@ If THUNK raises an exception it is re-raised in the calling fiber."
(record-constructor &port-read-timeout-error))
(define port-read-timeout-error?
(exception-predicate &port-read-timeout-error))
(set-procedure-property! port-read-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&port-read-timeout-error}.")
(record-predicate &port-read-timeout-error))
(define &port-write-timeout-error
(make-exception-type '&port-write-timeout-error
@ -120,12 +107,10 @@ If THUNK raises an exception it is re-raised in the calling fiber."
(record-constructor &port-write-timeout-error))
(define port-write-timeout-error?
(exception-predicate &port-write-timeout-error))
(set-procedure-property! port-write-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&port-write-timeout-error}.")
(record-predicate &port-write-timeout-error))
(define (readable? port)
"Test if PORT is readable."
"Test if PORT is writable."
(= 1 (port-poll port "r" 0)))
(define (writable? port)
@ -166,21 +151,6 @@ If THUNK raises an exception it is re-raised in the calling fiber."
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
"Run THUNK with per-operation I/O timeouts on all ports. If any
read or write blocks for longer than the given number of seconds, an
exception is raised.
@code{#:timeout} sets both read and write timeouts.
@code{#:read-timeout} and @code{#:write-timeout} specify the timeout
for reads and writes respectively. All three default to @code{#f} (no
timeout).
This procedure works both with fibers, and without fibers by using the
poll system call with a timeout.
On read timeout, raises @code{&port-read-timeout-error}. On write
timeout, raises @code{&port-write-timeout-error}. Both carry the
@code{thunk} and @code{port} fields from @code{&port-timeout-error}."
(define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)

View file

@ -63,14 +63,6 @@
(bind sock family addr port)
sock))
(define crlf-bv
(string->utf8 "\r\n"))
(define (chunked-output-port-overhead-bytes write-size)
(+ (string-length (number->string write-size 16))
(bytevector-length crlf-bv)
(bytevector-length crlf-bv)))
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
(buffering 1200))
"Returns a new port which translates non-encoded data into a HTTP
@ -82,12 +74,10 @@ when done, as it will output the remaining data, and encode the final
zero chunk. When the port is closed it will also close PORT, unless
KEEP-ALIVE? is true."
(define (write! bv start count)
(let ((len-string
(number->string count 16)))
(put-string port len-string))
(put-bytevector port crlf-bv 0 2)
(put-string port (number->string count 16))
(put-string port "\r\n")
(put-bytevector port bv start count)
(put-bytevector port crlf-bv 0 2)
(put-string port "\r\n")
(force-output port)
count)
@ -140,30 +130,24 @@ closes PORT, unless KEEP-ALIVE? is true."
(record-constructor &request-body-ended-prematurely))
(define request-body-ended-prematurely-error?
(exception-predicate &request-body-ended-prematurely))
(set-procedure-property! request-body-ended-prematurely-error? 'documentation
"Return @code{#t} if OBJ is a @code{&request-body-ended-prematurely} exception.")
(record-predicate &request-body-ended-prematurely))
(define (request-body-port/knots request)
"Return an input port for reading the body of request REQUEST.
Handles chunked transfer encoding."
(define (request-body-port/knots r)
(cond
((member '(chunked) (request-transfer-encoding request))
(make-chunked-input-port (request-port request)
((member '(chunked) (request-transfer-encoding r))
(make-chunked-input-port (request-port r)
#:keep-alive? #t))
(else
(let ((content-length
(request-content-length request)))
(request-content-length r)))
(make-delimited-input-port
(request-port request)
(request-port r)
content-length
(lambda (bytes-read)
(raise-exception
(make-request-body-ended-prematurely-error bytes-read))))))))
(define (read-request-body/knots r)
"Read and return the full body of request R as a bytevector.
Handles chunked transfer encoding."
(cond
((member '(chunked) (request-transfer-encoding r))
(get-bytevector-all
@ -234,6 +218,8 @@ on the procedure being called at any particular time."
(adapt-response-version response
(request-version request))
body))
((not body)
(values response #vu8()))
((string? body)
(let* ((type (response-content-type response
'(text/plain)))
@ -247,15 +233,16 @@ on the procedure being called at any particular time."
`(,@type (charset . ,charset))))
(string->bytevector body charset))))
((not (or (bytevector? body)
(procedure? body)
(eq? #f body)))
(procedure? body)))
(raise-exception
(make-exception-with-irritants
(list (make-exception-with-message
"unexpected body type")
body))))
((and (response-must-not-include-body? response)
body)
body
;; FIXME make this stricter: even an empty body should be prohibited.
(not (zero? (bytevector-length body))))
(raise-exception
(make-exception-with-irritants
(list (make-exception-with-message
@ -265,24 +252,25 @@ on the procedure being called at any particular time."
;; check length; assert type; add other required fields?
(values (response-maybe-add-connection-header-value
request
(cond
((procedure? body)
(if (response-content-length response)
response
(extend-response response
'transfer-encoding
'((chunked)))))
((bytevector? body)
(let ((rlen (response-content-length response))
(blen (bytevector-length body)))
(cond
(rlen (if (= rlen blen)
response
(error "bad content-length" rlen blen)))
(else (extend-response response 'content-length blen)))))
(else response)))
(if (procedure? body)
(if (response-content-length response)
response
(extend-response response
'transfer-encoding
'((chunked))))
(let ((rlen (response-content-length response))
(blen (bytevector-length body)))
(cond
(rlen (if (= rlen blen)
response
(error "bad content-length" rlen blen)))
(else (extend-response response 'content-length blen))))))
(if (eq? (request-method request) 'HEAD)
#f
(raise-exception
(make-exception-with-irritants
(list (make-exception-with-message
"unexpected body type")
body)))
body)))))
(define (with-stack-and-prompt thunk)
@ -295,7 +283,7 @@ on the procedure being called at any particular time."
(not (memq 'close (response-connection response))))
(define (default-read-request-exception-handler exn)
(display/knots "While reading request:\n" (current-error-port))
(display "While reading request:\n" (current-error-port))
(print-exception
(current-error-port)
#f
@ -305,17 +293,15 @@ on the procedure being called at any particular time."
#f)
(define (default-write-response-exception-handler exn request)
"Default handler for exceptions raised while writing an HTTP response.
Logs the error for REQUEST to the current error port."
(if (and (exception-with-origin? exn)
(string=? (exception-origin exn)
"fport_write"))
(simple-format/knots
(simple-format
(current-error-port)
"~A ~A: error replying to client\n"
(request-method request)
(uri-path (request-uri request)))
(simple-format/knots
(simple-format
(current-error-port)
"knots web server: ~A ~A: exception replying to client: ~A\n"
(request-method request)
@ -325,22 +311,35 @@ Logs the error for REQUEST to the current error port."
;; Close the client port
#f)
(define* (handle-request handler client sockaddr
read-request-exception-handler
write-response-exception-handler
buffer-size
#:key post-request-hook)
(define meta
`((sockaddr . ,sockaddr)))
(define (exception-handler exn request)
(let* ((error-string
(call-with-output-string
(lambda (port)
(simple-format
port
"exception when processing: ~A ~A\n"
(request-method request)
(uri-path (request-uri request)))
(print-backtrace-and-exception/knots
exn
#:port port)))))
(display error-string
(current-error-port)))
(values (build-response #:code 500)
;; TODO Make this configurable
(string->utf8
"internal server error")))
(define (handle-request handler client
read-request-exception-handler
write-response-exception-handler)
(let ((request
(with-exception-handler
read-request-exception-handler
(lambda ()
(read-request client meta))
#:unwind? #t))
(read-request-time
(get-internal-real-time)))
(read-request client))
#:unwind? #t)))
(let ((response
body
(cond
@ -353,107 +352,77 @@ Logs the error for REQUEST to the current error port."
(connection . (close))))
#vu8()))
(else
(with-exception-handler
(lambda (exn)
(sanitize-response
request
(build-response #:code 500)
(string->utf8
"internal server error")))
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((error-string
(call-with-output-string
(lambda (port)
(simple-format
port
"exception when processing: ~A ~A\n"
(request-method request)
(uri-path (request-uri request)))
(print-backtrace-and-exception/knots
exn
#:port port)))))
(display/knots error-string
(current-error-port))))
(lambda ()
(start-stack
#t
(call-with-escape-continuation
(lambda (return)
(with-exception-handler
(lambda (exn)
(call-with-values
(lambda ()
(handler request))
(match-lambda*
((response body)
(sanitize-response request response body))
(other
(raise-exception
(make-exception-with-irritants
(list (make-exception-with-message
(simple-format
#f
"wrong number of values returned from handler, expecting 2, got ~A"
(length other)))
handler))))))))))
#:unwind? #t)))))
(exception-handler exn request))
(lambda (response body)
(call-with-values
(lambda ()
(sanitize-response request response body))
return))))
(lambda ()
(start-stack
#t
(call-with-values
(lambda ()
(handler request))
(match-lambda*
((response body)
(sanitize-response request response body))
(other
(raise-exception
(make-exception-with-irritants
(list (make-exception-with-message
(simple-format
#f
"wrong number of values returned from handler, expecting 2, got ~A"
(length other)))
handler)))))))))))))))
(with-exception-handler
(lambda (exn)
(write-response-exception-handler exn request))
(lambda ()
(write-response response client)
(let ((response-start-time
(get-internal-real-time))
(body-written?
(cond
((and (procedure? body)
(not
(eq? (request-method request)
'HEAD)))
(let* ((type (response-content-type response
'(text/plain)))
(declared-charset (assq-ref (cdr type) 'charset))
(charset (or declared-charset "ISO-8859-1"))
(body-port
(if (response-content-length response)
client
(make-chunked-output-port/knots
client
#:keep-alive? #t
#:buffering
(- buffer-size
(chunked-output-port-overhead-bytes
buffer-size))))))
(set-port-encoding! body-port charset)
(let ((body-written?
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(body body-port)))
#t)
#:unwind? #t)))
(unless (response-content-length response)
(close-port body-port))
body-written?)))
((bytevector? body)
(put-bytevector client body)
#t)
(else
;; No body to write
#t))))
(let ((body-written?
(if (procedure? body)
(let* ((type (response-content-type response
'(text/plain)))
(declared-charset (assq-ref (cdr type) 'charset))
(charset (or declared-charset "ISO-8859-1"))
(body-port
(if (response-content-length response)
client
(make-chunked-output-port/knots
client
#:keep-alive? #t))))
(set-port-encoding! body-port charset)
(let ((body-written?
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(body body-port)))
#t)
#:unwind? #t)))
(unless (response-content-length response)
(close-port body-port))
body-written?))
(begin
(put-bytevector client body)
#t))))
(if body-written?
(begin
(force-output client)
(when post-request-hook
(post-request-hook request
#:read-request-time read-request-time
#:response-start-time response-start-time
#:response-end-time (get-internal-real-time)))
(when (and (procedure? body)
(response-content-length response))
(set-port-encoding! client "ISO-8859-1"))
@ -461,12 +430,11 @@ Logs the error for REQUEST to the current error port."
#f)))
#:unwind? #t))))
(define* (client-loop client handler sockaddr
(define* (client-loop client handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
buffer-size
post-request-hook)
buffer-size)
;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves; when we force-output, we really want the data to go
;; out.
@ -479,17 +447,13 @@ Logs the error for REQUEST to the current error port."
(unless (and (exception-with-origin? exn)
(string=? (exception-origin exn)
"fport_read"))
(display/knots "knots web-server, exception in client loop:\n"
(current-error-port))
(display/knots
(call-with-output-string
(lambda (port)
(print-exception
port
#f
'%exception
(list exn))))
(current-error-port)))
(display "knots web-server, exception in client loop:\n"
(current-error-port))
(print-exception
(current-error-port)
#f
'%exception
(list exn)))
#t)
(lambda ()
(or
@ -506,48 +470,18 @@ Logs the error for REQUEST to the current error port."
#:unwind? #t)
(close-port client))
(else
(let ((keep-alive? (handle-request handler client sockaddr
(let ((keep-alive? (handle-request handler client
read-request-exception-handler
write-response-exception-handler
buffer-size
#:post-request-hook
post-request-hook)))
write-response-exception-handler)))
(if keep-alive?
(loop)
(close-port client)))))))
(define (post-request-hook/safe post-request-hook)
(if post-request-hook
(lambda args
(with-exception-handler
(lambda (exn) #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply post-request-hook args))))
#:unwind? #t))
#f))
(define-record-type <web-server>
(make-web-server socket port)
web-server?
(socket web-server-socket)
(port web-server-port))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'web-server?))
'documentation
"Return @code{#t} if OBJ is a @code{<web-server>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'web-server-socket))
'documentation
"Return the socket of the web server.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'web-server-port))
'documentation
"Return the port number of the web server.")
(define* (run-knots-web-server handler #:key
(host #f)
@ -562,8 +496,7 @@ Logs the error for REQUEST to the current error port."
(write-response-exception-handler
default-write-response-exception-handler)
(connection-idle-timeout #f)
(connection-buffer-size 1024)
post-request-hook)
(connection-buffer-size 1024))
"Run the knots web server.
HANDLER should be a procedure that takes one argument, the HTTP
@ -591,28 +524,17 @@ before sending back to the client."
(spawn-fiber
(lambda ()
(while #t
(with-exception-handler
(const #t)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn))
(lambda ()
(let loop ()
(match (accept socket (logior SOCK_NONBLOCK SOCK_CLOEXEC))
((client . sockaddr)
(spawn-fiber (lambda ()
(client-loop client handler sockaddr
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
connection-buffer-size
(post-request-hook/safe
post-request-hook)))
#:parallel? #t)
(loop)))))))
#:unwind? #t))))
(let loop ()
(match (accept socket (logior SOCK_NONBLOCK SOCK_CLOEXEC))
((client . sockaddr)
(spawn-fiber (lambda ()
(client-loop client handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
connection-buffer-size))
#:parallel? #t)
(loop))))))
(make-web-server socket
(vector-ref (getsockname socket)

View file

@ -1,204 +0,0 @@
;;; Guile Knots
;;; Copyright © 2026 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots web)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (web uri)
#:use-module (web request)
#:use-module (web response)
#:use-module (knots)
#:use-module (knots non-blocking)
#:use-module (knots resource-pool)
#:export (make-connection-cache
call-with-connection-cache
call-with-cached-connection
http-fold-requests))
(define* (make-connection-cache uri
max-cached-connections
#:key (verify-certificate? #t))
"Create a resource pool of up to MAX-CACHED-CONNECTIONS
to URI."
(make-resource-pool
(lambda ()
;; Open the socket in a temporary thread so that the blocking
;; connection attempt does not stall the fiber scheduler.
(call-with-temporary-thread
(lambda ()
(non-blocking-open-socket-for-uri
uri
#:verify-certificate? verify-certificate?))))
max-cached-connections
#:destructor close-port))
(define* (call-with-connection-cache uri
max-cached-connections
proc
#:key (verify-certificate? #t))
"Create a connection cache for URI with up to MAX-CACHED-CONNECTIONS,
call @code{(proc cache)}, then destroy the cache and return
the values returned by PROC."
(let ((cache (make-connection-cache
uri
max-cached-connections
#:verify-certificate? verify-certificate?)))
(call-with-values
(lambda ()
(proc cache))
(lambda vals
(destroy-resource-pool cache)
(apply values vals)))))
(define* (call-with-cached-connection
cache proc
#:key (close-connection-on-exception? #t))
"Check out a connection port from CACHE and call @code{(proc port)},
returning the result. The port is returned to the cache when PROC
returns, or closed on exception if CLOSE-CONNECTION-ON-EXCEPTION? is
true (the default)."
(with-exception-handler
(lambda (exn)
(if (resource-pool-destroy-resource-exception? exn)
(call-with-cached-connection
cache
proc
#:close-connection-on-exception?
close-connection-on-exception?)
(raise-exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-resource-from-pool cache
(lambda (port)
(when (port-closed? port)
(raise-exception
(make-resource-pool-destroy-resource-exception)))
(proc port))
#:destroy-resource-on-exception? close-connection-on-exception?))))
#:unwind? #t))
(define* (http-fold-requests connection-cache proc seed requests
#:key
(batch-size 1000))
"Fold PROC over HTTP request/response pairs using CONNECTION-CACHE
for connections. PROC is called as
@code{(proc request response body-port accumulator)} and its return
value becomes the new accumulator. Requests are sent in batches of
up to BATCH-SIZE before responses are read (HTTP pipelining).
When the server closes the connection mid-batch the remaining requests
are retried on a fresh connection from the cache."
(define &send-error
(make-exception-type '&send-error &exception '()))
(define make-send-error
(record-constructor &send-error))
(define send-error?
(exception-predicate &send-error))
(define (read-responses port batch result)
(let loop ((request (car batch))
(remaining-requests (cdr batch))
(result result))
(let ((response
(with-exception-handler
(lambda (exn)
(close-port port)
#f)
(lambda ()
(read-response port))
#:unwind? #t)))
(if (not response)
(values (cons request remaining-requests) result)
(let* ((body (response-body-port response))
(new-result (proc request response body result)))
(if (memq 'close (response-connection response))
(begin
(close-port port)
(values remaining-requests new-result))
(if (null? remaining-requests)
(values '() new-result)
(loop (car remaining-requests)
(cdr remaining-requests)
new-result))))))))
;; Send up to BATCH-SIZE requests then hand off to read-responses.
;; If writing fails the connection has dropped; raise &send-error so the
;; outer loop retries all remaining requests on a fresh connection.
(define (send-batch port batch)
(with-exception-handler
(lambda (exn)
(close-port port)
(raise-exception (make-send-error)))
(lambda ()
(for-each (lambda (req)
(write-request req port))
batch)
(force-output port))
#:unwind? #t))
(let loop ((remaining-requests requests)
(result seed))
(if (null? remaining-requests)
result
(let ((next-remaining-requests
next-result
(with-exception-handler
(lambda (exn)
(if (or (send-error? exn)
(resource-pool-destroy-resource-exception? exn))
(values remaining-requests result)
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool connection-cache
(lambda (port)
(if (port-closed? port)
(raise-exception
(make-resource-pool-destroy-resource-exception))
(let ((batch
pending
(split-at
remaining-requests
(min batch-size (length
remaining-requests)))))
(send-batch port batch)
(let ((remaining-requests
next-result
(read-responses port batch result)))
(values (append remaining-requests pending)
next-result)))))
#:destroy-resource-on-exception? #t))
#:unwind? #t)))
(loop next-remaining-requests next-result)))))

View file

@ -1,11 +1,10 @@
(define-module (tests)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (knots)
#:export (run-fibers-for-tests
assert-no-heap-growth))
(define* (run-fibers-for-tests thunk #:key (drain? #t))
(define (run-fibers-for-tests thunk)
(let ((result
(run-fibers
(lambda ()
@ -13,18 +12,15 @@
(lambda (exn)
exn)
(lambda ()
(simple-format #t "running ~A\n" thunk)
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(backtrace)
(raise-exception exn))
(lambda ()
(start-stack #t (thunk))))
thunk)
#t)
#:unwind? #t))
#:hz 0
#:parallelism 1
#:drain? drain?)))
#:parallelism 1)))
(if (exception? result)
(raise-exception result)
result)))

View file

@ -1,318 +0,0 @@
(use-modules (srfi srfi-1)
(srfi srfi-13)
(ice-9 popen)
(ice-9 rdelim)
(ice-9 match))
(define (run-backtrace-script file)
(let* ((pipe (open-pipe (string-append "./test-env guile " file " 2>&1")
OPEN_READ))
(output (read-string pipe)))
(close-pipe pipe)
output))
(define (read-backtrace-entry-annotation script keyword)
;; Scan SCRIPT line by line and return the annotation for the expected
;; backtrace entry matching KEYWORD (e.g. "FIRST" or "LAST"), or #f if
;; none is found.
;;
;; Two forms are recognised:
;;
;; ; KEYWORD BACKTRACE ENTRY HERE
;; — placed inline on a code line. Returns ('here LINE COL) where LINE
;; is the 1-based line number and COL is the 0-based column of the
;; first non-space character on that line.
;;
;; ; KEYWORD BACKTRACE ENTRY: STRING
;; — STRING is a literal substring expected to appear in the output.
;; Returns ('string STRING).
(let ((here-marker (string-append keyword " BACKTRACE ENTRY HERE"))
(string-marker (string-append keyword " BACKTRACE ENTRY: ")))
(call-with-input-file script
(lambda (port)
(let loop ((line (read-line port)) (line-num 1))
(cond
((eof-object? line) #f)
((string-contains line here-marker)
(let ((col (string-index line (lambda (c) (not (char=? c #\space))))))
(list 'here line-num col)))
((string-contains line string-marker)
(let* ((idx (+ (string-contains line string-marker)
(string-length string-marker)))
(content (string-trim-right (substring line idx))))
(list 'string content)))
(else (loop (read-line port) (+ line-num 1)))))))))
(define (frame-line? line)
;; Return #t if LINE looks like a backtrace frame line: leading whitespace
;; followed by digits:digits (LINE:COL).
(and (> (string-length line) 0)
(let* ((stripped (string-trim line))
(colon (string-index stripped #\:)))
(and colon
(> colon 0)
(string-every char-set:digit stripped 0 colon)))))
(define (extract-frame-lines output)
;; Return all backtrace frame lines before "ERROR:" in OUTPUT.
(let* ((error-pos (string-contains output "\nERROR:"))
(before-error (if error-pos
(substring output 0 error-pos)
output)))
(filter frame-line? (string-split before-error #\newline))))
(define (innermost-frame-line output)
;; Return the last backtrace frame line before "ERROR:" in OUTPUT, or #f.
(let ((frame-lines (extract-frame-lines output)))
(if (null? frame-lines) #f (last frame-lines))))
(define (outermost-frame-line output)
;; Return the first backtrace frame line before "ERROR:" in OUTPUT, or #f.
(let ((frame-lines (extract-frame-lines output)))
(if (null? frame-lines) #f (car frame-lines))))
;;; Assertions
(define current-test-fail-count 0)
(define (expect! label ok? detail)
;; Print one expectation line; record a failure if not ok.
(if ok?
(format #t " PASS ~a~%" label)
(begin
(set! current-test-fail-count (+ current-test-fail-count 1))
(format #t " FAIL ~a~% ~a~%" label detail))))
(define (assert-output-contains output expected)
(expect! (format #f "output contains ~s" expected)
(string-contains output expected)
"not found in output"))
(define (assert-output-excludes output unexpected)
(expect! (format #f "output excludes ~s" unexpected)
(not (string-contains output unexpected))
"unexpectedly found in output"))
(define (assert-backtrace-entry output script keyword frame-line-proc)
(let ((annotation (read-backtrace-entry-annotation script keyword))
(frame (frame-line-proc output)))
(when annotation
(match annotation
(('here line col)
(let ((expected (string-append (number->string line) ":"
(number->string col))))
(expect! (format #f "~a backtrace entry ~a" keyword expected)
(and frame (string-contains frame expected))
(format #f "got ~s" (or frame "(none)")))))
(('string content)
(expect! (format #f "~a backtrace entry ~s" keyword content)
(string-contains output content)
"not found in output"))))))
(define (assert-first-backtrace-entry output script)
(assert-backtrace-entry output script "FIRST" outermost-frame-line))
(define (assert-last-backtrace-entry output script)
(assert-backtrace-entry output script "LAST" innermost-frame-line))
;;; Test runner
(define pass-count 0)
(define fail-count 0)
(define (run-test name thunk)
(set! current-test-fail-count 0)
(format #t "~%~a~%" name)
(catch #t
thunk
(lambda (key . args)
(set! current-test-fail-count (+ current-test-fail-count 1))
(format #t " ERROR unexpected exception: ~s~%" (cons key args))))
(if (zero? current-test-fail-count)
(set! pass-count (+ pass-count 1))
(set! fail-count (+ fail-count 1))))
;;; Tests
(run-test "plain-exception"
(lambda ()
(let* ((script "tests/backtraces/plain-exception.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"plain error message\""))))
(run-test "triple-with-exception-handler"
(lambda ()
(let* ((script "tests/backtraces/triple-with-exception-handler.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"plain error message\""))))
(run-test "wrapped-exception"
(lambda ()
(let* ((script "tests/backtraces/wrapped-exception.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"wrapped error message\""))))
(run-test "temporary-thread"
(lambda ()
(let* ((script "tests/backtraces/temporary-thread.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from temporary thread\""))))
(run-test "fibers-map"
(lambda ()
(let* ((script "tests/backtraces/fibers-map.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from fibers-map\""))))
(run-test "call-with-resource-from-pool"
(lambda ()
(let* ((script "tests/backtraces/call-with-resource-from-pool.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from call-with-resource-from-pool\""))))
;; Two knots stacks are printed (one per fiber boundary); ERROR: appears
;; once at the end after both frame blocks.
(run-test "call-with-cached-connection"
(lambda ()
(let* ((script "tests/backtraces/call-with-cached-connection.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from call-with-cached-connection\""))))
(run-test "fibers-force"
(lambda ()
(let* ((script "tests/backtraces/fibers-force.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from fibers-force\""))))
(run-test "call-with-thread"
(lambda ()
(let* ((script "tests/backtraces/call-with-thread.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"error from call-with-thread\""))))
;; Nested fibers-map: user frames that survive fiber boundaries appear;
;; intermediate functions (one-deep, two-deep, three-deep) are lost at
;; their respective boundaries because fibers-map yields before the
;; exception propagates back. knots/parallelism.scm and srfi frames
;; appear as call-path context between the surviving user frames.
(run-test "nested-parallelism"
(lambda ()
(let* ((script "tests/backtraces/nested-parallelism.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &error\n 2. &origin: #f\n 3. &message: \"deeply nested error ~S\"")
(assert-output-contains output "(run-work)")
(assert-output-contains output "(process-batch _)")
(assert-output-contains output "(deeply-nested _)")
(assert-output-excludes output "In fibers"))))
(run-test "guile-error-in-thread"
(lambda ()
(let* ((script "tests/backtraces/guile-error-in-thread.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &assertion-failure\n 2. &origin: \"+\"\n 3. &message: \"Wrong type argument in position ~A: ~S\"\n 4. &irritants: (1 a)"))))
;; sort is a C function and appears as "In unknown file:" between the user frames.
(run-test "guile-error-deep-in-thread"
(lambda ()
(let* ((script "tests/backtraces/guile-error-deep-in-thread.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &assertion-failure\n 2. &origin: \"+\"")
(assert-output-contains output "(do-sort)")
(assert-output-contains output "In unknown file:")
(assert-output-contains output "(sort (1 2 3)")
(assert-output-excludes output "In knots/")
(assert-output-excludes output "In srfi/"))))
;; The error fires inside ice-9/vlist.scm (vlist-fold passed a non-vlist),
;; so vlist-fold appears as the innermost frame and ice-9/vlist.scm frames
;; appear between the user frames.
(run-test "vhash-fold"
(lambda ()
(let* ((script "tests/backtraces/vhash-fold.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &assertion-failure\n 2. &origin: #f")
(assert-output-contains output "(do-fold)")
(assert-output-contains output "In ice-9/vlist.scm:")
(assert-output-contains output "(vlist-fold"))))
;; do-fold calls vhash-fold in non-tail position so its frame is preserved.
;; ice-9/vlist.scm frames appear between the user frames, as in vhash-fold.
(run-test "vhash-fold-in-thread"
(lambda ()
(let* ((script "tests/backtraces/vhash-fold-in-thread.scm")
(output (run-backtrace-script script)))
(assert-first-backtrace-entry output script)
(assert-last-backtrace-entry output script)
(assert-output-contains output
"ERROR:\n 1. &assertion-failure\n 2. &origin: #f")
(assert-output-contains output "(do-fold)")
(assert-output-contains output "In ice-9/vlist.scm:")
(assert-output-contains output "(vlist-fold")
(assert-output-excludes output "In knots/"))))
(run-test "stack-situation-script"
(lambda ()
(let* ((script "tests/backtraces/stack-situation-script.scm")
(output (run-backtrace-script script)))
(assert-output-contains output "situation: script"))))
(run-test "stack-situation-fibers"
(lambda ()
(let* ((script "tests/backtraces/stack-situation-fibers.scm")
(output (run-backtrace-script script)))
(assert-output-contains output "situation: run-fibers"))))
(run-test "stack-situation-unknown"
(lambda ()
(let* ((script "tests/backtraces/stack-situation-unknown.scm")
(output (run-backtrace-script script)))
(assert-output-contains output "situation: unknown"))))
;;; Summary
(newline)
(if (zero? fail-count)
(format #t "All ~a scripts passed.~%" pass-count)
(format #t "~a of ~a scripts had failures.~%" fail-count (+ pass-count fail-count)))
(when (> fail-count 0)
(primitive-exit 1))

View file

@ -1,18 +0,0 @@
(use-modules (knots) (fibers) (knots resource-pool) (knots web))
(run-fibers
(lambda ()
(let ((cache (make-fixed-size-resource-pool
(list (open-input-string "fake")))))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (e)
(print-backtrace-and-exception/knots e)
(primitive-exit 1))
(lambda ()
(call-with-cached-connection cache
(lambda (port)
(error "error from call-with-cached-connection")) ; LAST BACKTRACE ENTRY HERE
#:close-connection-on-exception? #f)))))
#:hz 0 #:parallelism 1)

View file

@ -1,16 +0,0 @@
(use-modules (knots) (fibers) (knots resource-pool))
(run-fibers
(lambda ()
(let ((pool (make-resource-pool (const 'resource) 1)))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (e)
(print-backtrace-and-exception/knots e)
(primitive-exit 1))
(lambda ()
(call-with-resource-from-pool pool
(lambda (resource)
(error "error from call-with-resource-from-pool"))))))) ; LAST BACKTRACE ENTRY HERE
#:hz 0 #:parallelism 1)

View file

@ -1,14 +0,0 @@
(use-modules (knots) (knots thread-pool))
(define thread-pool (make-fixed-size-thread-pool 1))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(call-with-thread
thread-pool
(lambda ()
(error "error from call-with-thread"))))) ; LAST BACKTRACE ENTRY HERE

View file

@ -1,15 +0,0 @@
(use-modules (knots) (fibers) (knots promise))
(run-fibers
(lambda ()
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (e)
(print-backtrace-and-exception/knots e)
(primitive-exit 1))
(lambda ()
(fibers-force
(fibers-delay
(lambda ()
(error "error from fibers-force"))))))) ; LAST BACKTRACE ENTRY HERE
#:hz 0 #:parallelism 1)

View file

@ -1,20 +0,0 @@
(use-modules (knots) (fibers) (knots parallelism))
(run-fibers
(lambda ()
(with-exception-handler
(lambda _
;; To avoid the test hanging if there's an exception
(primitive-exit 1))
(lambda ()
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (e)
(print-backtrace-and-exception/knots e)
(primitive-exit 1))
(lambda ()
(fibers-map
(lambda (x)
(error "error from fibers-map")) ; LAST BACKTRACE ENTRY HERE
'(1)))))))
#:hz 0 #:parallelism 1)

View file

@ -1,19 +0,0 @@
(use-modules (knots))
(define (do-sort)
(begin
(sort '(1 2 3)
(lambda _
(+ 1 'a))) ; LAST BACKTRACE ENTRY HERE
'unreachable))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(call-with-temporary-thread
(lambda ()
(do-sort)
'done))))

View file

@ -1,11 +0,0 @@
(use-modules (knots))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(call-with-temporary-thread
(lambda ()
(+ 1 'a))))) ; LAST BACKTRACE ENTRY HERE

View file

@ -1,40 +0,0 @@
(use-modules (knots) (fibers) (knots parallelism))
;; Deep call chain within the innermost fiber. Each function calls the next
;; via `begin', placing the call in non-tail position so Guile's TCO does not
;; collapse the frames; all four frames survive and appear in the backtrace.
(define (deeply-nested x)
(error "deeply nested error" x)) ; LAST BACKTRACE ENTRY HERE
(define (three-deep x)
(fibers-map deeply-nested (list x)))
(define (two-deep x)
(fibers-map three-deep (list x)))
(define (one-deep x)
(fibers-map two-deep (list x)))
;; process-batch runs inside one fiber and dispatches the deep call chain into
;; a nested fiber via a second fibers-map, creating two fiber boundaries.
(define (process-batch items)
(begin
(fibers-map one-deep (list items))
'unreachable))
(define (run-work)
(begin
(fibers-map process-batch '(1))
'unreachable))
(define result
(run-fibers
(lambda ()
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (e)
(print-backtrace-and-exception/knots e)
(primitive-exit 1))
run-work))
#:hz 0 #:parallelism 1))

View file

@ -1,10 +0,0 @@
(use-modules (knots))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(error "plain error message"))) ; LAST BACKTRACE ENTRY HERE

View file

@ -1,20 +0,0 @@
(use-modules (knots)
(knots backtraces)
(fibers)
(system repl debug))
(run-fibers
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack (make-stack #t)))
(print-backtrace-and-exception/knots exn)
(simple-format/knots #t
"situation: ~A\n"
((@@ (knots backtraces)
classify-stack-situation)
(stack->vector stack))))
(primitive-exit 0))
(lambda ()
(error "test"))))
#:hz 0 #:parallelism 1)

View file

@ -1,16 +0,0 @@
(use-modules (knots)
(knots backtraces)
(system repl debug))
(with-exception-handler
(lambda (exn)
(let ((stack (make-stack #t)))
(print-backtrace-and-exception/knots exn)
(simple-format/knots #t
"situation: ~A\n"
((@@ (knots backtraces)
classify-stack-situation)
(stack->vector stack))))
(primitive-exit 0))
(lambda ()
(error "test")))

View file

@ -1,19 +0,0 @@
(use-modules (knots)
(knots backtraces)
(fibers)
(system repl debug))
(start-stack
#t
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(stack-classification
((@@ (knots backtraces)
classify-stack-situation)
(stack->vector stack))))
(print-backtrace-and-exception/knots exn)
(simple-format/knots #t "situation: ~A\n" stack-classification)
(primitive-exit 0)))
(lambda ()
(error "test"))))

View file

@ -1,11 +0,0 @@
(use-modules (knots))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(call-with-temporary-thread
(lambda ()
(error "error from temporary thread"))))) ; LAST BACKTRACE ENTRY HERE

View file

@ -1,16 +0,0 @@
(use-modules (knots))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(error "plain error message"))))))) ; LAST BACKTRACE ENTRY HERE

View file

@ -1,26 +0,0 @@
(use-modules (knots) (ice-9 vlist))
;; LAST BACKTRACE ENTRY: 257:2
(define (do-fold)
(begin
(vhash-fold
(lambda (key value result)
;; Shouldn't be reached
#f)
0
;; The aim here is to pass in #f for the vlist, and cause an
;; exception within the (ice-9 vlist) module
#f)
'done))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(call-with-temporary-thread
(lambda ()
(do-fold)
'done))))

View file

@ -1,24 +0,0 @@
(use-modules (knots) (ice-9 vlist))
;; LAST BACKTRACE ENTRY: 257:2
(define (do-fold)
(begin
(vhash-fold
(lambda (key value result)
;; Shouldn't be reached
#f)
0
;; The aim here is to pass in #f for the vlist, and cause an
;; exception within the (ice-9 vlist) module
#f)
'done))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(do-fold)
'done))

View file

@ -1,16 +0,0 @@
(use-modules (knots))
;; FIRST BACKTRACE ENTRY: (with-exception-handler
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(primitive-exit 1))
(lambda ()
(with-exception-handler
(lambda (exn)
(raise-exception
(make-exception
exn
(make-knots-exception (make-stack #t)))))
(lambda ()
(error "wrapped error message"))))) ; LAST BACKTRACE ENTRY HERE

View file

@ -61,24 +61,6 @@
identity
'(()))))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
(lambda (exn)
(unless (and (exception-with-message? exn)
(string=? (exception-message exn)
"foo"))
(raise-exception exn)))
(lambda ()
(fibers-map-with-progress
(lambda _
(raise-exception
(make-exception-with-message "foo")))
'((1)))
(error 'should-not-reach-here))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
@ -129,16 +111,4 @@
(assert-equal a 1))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter 2)))
(fibers-for-each
(lambda _
(with-parallelism-limiter
parallelism-limiter
#f))
(iota 50))
(destroy-parallelism-limiter parallelism-limiter))))
(display "parallelism test finished successfully\n")

View file

@ -1,33 +1,9 @@
(use-modules (tests)
(fibers)
(fibers channels)
(unit-test)
(knots parallelism)
(knots resource-pool))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter
1)))
(with-parallelism-limiter parallelism-limiter
#f)
(destroy-parallelism-limiter parallelism-limiter))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter
1))
(channel
(make-channel)))
(spawn-fiber
(lambda ()
(with-parallelism-limiter parallelism-limiter
(put-message channel #t)
(sleep 1))))
(get-message channel)
(destroy-parallelism-limiter parallelism-limiter))))
(define new-number
(let ((val 0))
(lambda ()
@ -43,21 +19,7 @@
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-fixed-size-resource-pool
(list 1))))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
res))))))
(run-fibers-for-tests
(lambda ()
@ -69,9 +31,7 @@
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
res))))))
(let* ((error-constructor
(record-constructor &resource-pool-timeout))
@ -128,13 +88,10 @@
res))
(iota 20))
(let loop ((stats (resource-pool-stats resource-pool
#:timeout #f)))
(let loop ((stats (resource-pool-stats resource-pool)))
(unless (= 0 (assq-ref stats 'resources))
(sleep 0.1)
(loop (resource-pool-stats resource-pool #:timeout #f))))
(destroy-resource-pool resource-pool))))
(loop (resource-pool-stats resource-pool)))))))
(run-fibers-for-tests
(lambda ()
@ -158,9 +115,7 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50))
(destroy-resource-pool resource-pool))))
(iota 50)))))
(run-fibers-for-tests
(lambda ()
@ -174,7 +129,7 @@
(error "collision detected")))
(new-number))
1
#:default-checkout-timeout 5)))
#:default-checkout-timeout 120)))
(fibers-batch-for-each
(lambda _
(with-resource-from-pool
@ -185,9 +140,7 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50))
(destroy-resource-pool resource-pool))))
(iota 50)))))
(run-fibers-for-tests
(lambda ()
@ -211,14 +164,14 @@
(call-with-resource-from-pool
resource-pool
(lambda (res)
#f)))
(error 'should-not-be-reached))))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool #:timeout #f)
(resource-pool-stats resource-pool)
'waiters))
(sleep 0.1))
(sleep 0))
(with-exception-handler
(lambda (exn)
@ -231,101 +184,6 @@
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
;; Test allocating resources to waiters and destroying resources
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda ()
(sleep 1)
'res)
2
#:idle-seconds 1
#:add-resources-parallelism 10
#:destructor
(const #t))))
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(sleep 2)
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
;; Test delay-logger and duration-logger
(run-fibers-for-tests
(lambda ()
(let* ((logged-delay #f)
(logged-duration #f)
(resource-pool (make-fixed-size-resource-pool
(list 1)
#:delay-logger
(lambda (seconds)
(set! logged-delay seconds))
#:duration-logger
(lambda (seconds)
(set! logged-duration seconds)))))
(call-with-resource-from-pool resource-pool
(lambda (res)
(sleep 0.2)))
(assert-true (number? logged-delay))
(assert-true (number? logged-duration))
(assert-true (>= logged-duration 0.1))
(destroy-resource-pool resource-pool))))
;; Test per-call duration-logger overrides pool default
(run-fibers-for-tests
(lambda ()
(let* ((pool-logged #f)
(call-logged #f)
(resource-pool (make-fixed-size-resource-pool
(list 1)
#:duration-logger
(lambda (seconds)
(set! pool-logged seconds)))))
(call-with-resource-from-pool resource-pool
(lambda (res) #t)
#:duration-logger
(lambda (seconds)
(set! call-logged seconds)))
(assert-true (not pool-logged))
(assert-true (number? call-logged))
(destroy-resource-pool resource-pool))))
#:unwind? #t))))))
(display "resource-pool test finished successfully\n")

View file

@ -1,28 +0,0 @@
(use-modules (tests)
(fibers)
(unit-test)
(knots sort))
(run-fibers-for-tests
(lambda ()
(assert-equal
'()
(fibers-sort! '() <))
(assert-equal
'(1)
(fibers-sort! (list 1) <))
(assert-equal
'(1)
(fibers-sort! (list 1) < #:parallelism 10))
(assert-equal
'(1 2)
(fibers-sort! (list 2 1) <))
(assert-equal
(sort (reverse! (iota 100)) <)
(fibers-sort! (reverse! (iota 100)) < #:parallelism 10))))
(display "sort test finished successfully\n")

View file

@ -1,6 +1,4 @@
(use-modules (tests)
(ice-9 atomic)
(ice-9 threads)
(srfi srfi-71)
(fibers)
(unit-test)
@ -87,139 +85,4 @@
(+ 1 'a))))
#:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool
1
#:thread-lifetime 1
#:thread-initializer
(lambda ()
(list (make-atomic-box #t))))))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda (box)
(if (atomic-box-ref box)
(atomic-box-set! box #f)
(error (atomic-box-ref box))))))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(let ((thread-pool
(make-fixed-size-thread-pool
1
#:thread-lifetime 2
#:thread-initializer
(lambda ()
(list (make-atomic-box 2))))))
(define (ref-and-decrement box)
(let ((val (atomic-box-ref box)))
(atomic-box-set! box (- val 1))
val))
(unless (= 2 (call-with-thread
thread-pool
ref-and-decrement))
(error))
(unless (= 1 (call-with-thread
thread-pool
ref-and-decrement))
(error))
(unless (= 2 (call-with-thread
thread-pool
ref-and-decrement))
(error)))
;; Test that the destructor is called when a size 1 fixed-size thread
;; pool is destroyed, and that destroy-thread-pool blocks until it has
;; completed.
(let* ((destructor-called? #f)
(thread-pool
(make-fixed-size-thread-pool
1
#:thread-destructor
(lambda ()
(set! destructor-called? #t)))))
(destroy-thread-pool thread-pool)
(assert-equal #t destructor-called?))
;; Test that the destructor is called for every thread when a
;; multi-thread fixed-size thread pool is destroyed, and that
;; destroy-thread-pool blocks until all destructors have completed.
(let* ((destructor-count 0)
(mutex (make-mutex))
(pool-size 3)
(thread-pool
(make-fixed-size-thread-pool
pool-size
#:thread-destructor
(lambda ()
(with-mutex mutex
(set! destructor-count (+ destructor-count 1)))))))
(destroy-thread-pool thread-pool)
(assert-equal pool-size destructor-count))
;; Test delay-logger and duration-logger for fixed-size thread pool
(let* ((logged-delay #f)
(logged-duration #f)
(thread-pool
(make-fixed-size-thread-pool
1
#:delay-logger
(lambda (seconds)
(set! logged-delay seconds))
#:duration-logger
(lambda (seconds)
(set! logged-duration seconds)))))
(call-with-thread
thread-pool
(lambda ()
(usleep 100000)))
(assert-true (number? logged-delay))
(assert-true (number? logged-duration))
(assert-true (>= logged-duration 0.1))
(destroy-thread-pool thread-pool))
;; Test delay-logger and duration-logger for dynamic thread pool
(run-fibers-for-tests
(lambda ()
(let* ((logged-delay #f)
(logged-duration #f)
(thread-pool
(make-thread-pool
1
#:delay-logger
(lambda (seconds)
(set! logged-delay seconds))
#:duration-logger
(lambda (seconds)
(set! logged-duration seconds)))))
(call-with-thread
thread-pool
(lambda ()
(usleep 100000)))
(assert-true (number? logged-delay))
(assert-true (number? logged-duration))
(assert-true (>= logged-duration 0.1))
(destroy-thread-pool thread-pool))))
(display "thread-pool test finished successfully\n")

View file

@ -1,6 +1,5 @@
(use-modules (srfi srfi-71)
(rnrs bytevectors)
(ice-9 match)
(ice-9 binary-ports)
(ice-9 textual-ports)
(tests)
@ -234,68 +233,4 @@
(assert-equal (get-message exception-handled-sucecssfully-channel)
#t))))
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
(match (split-and-decode-uri-path
(uri-path (request-uri request)))
(("head-no-body")
(values '((content-type . (text/plain)))
#f))
(("head-empty-body")
(values '((content-type . (text/plain)))
""))
(("head-no-body-with-content-length")
(values '((content-type . (text/plain))
(content-length . 10))
#f))
(("head-with-body")
(values '((content-type . (text/plain)))
"foo"))
(("head-procedure-body")
(values '((content-type . (text/plain)))
(lambda _
(error "should not be run"))))
(("head-procedure-body-with-content-length")
(values '((content-type . (text/plain))
(content-length . 10))
(lambda _
(error "should not be run"))))))
#:port 0)) ;; Bind to any port
(port
(web-server-port web-server)))
(define* (head path)
(let ((uri
(build-uri 'http #:host "127.0.0.1" #:port port
#:path path)))
(http-head
uri
#:port (non-blocking-open-socket-for-uri uri))))
(let ((response
(head "/head-no-body")))
(assert-equal 200 (response-code response)))
(let ((response
(head "/head-empty-body")))
(assert-equal 200 (response-code response))
(assert-equal 0 (response-content-length response)))
(let ((response
(head "/head-no-body-with-content-length")))
(assert-equal 200 (response-code response))
(assert-equal 10 (response-content-length response)))
(let ((response
(head "/head-with-body")))
(assert-equal 200 (response-code response))
(assert-equal 3 (response-content-length response)))
(let ((response
(head "/head-procedure-body")))
(assert-equal 200 (response-code response)))
(let ((response
(head "/head-procedure-body-with-content-length")))
(assert-equal 200 (response-code response))
(assert-equal 10 (response-content-length response))))))
(display "web-server test finished successfully\n")

View file

@ -1,223 +0,0 @@
(use-modules (tests)
(fibers)
(srfi srfi-71)
(ice-9 rdelim)
(ice-9 exceptions)
(unit-test)
(web uri)
(web client)
(web request)
(web response)
(knots resource-pool)
(knots web-server)
(knots web))
;; Test that call-with-cached-connection passes the port to proc and
;; returns its result.
(run-fibers-for-tests
(lambda ()
(let* ((port (open-input-string ""))
(cache (make-fixed-size-resource-pool (list port))))
(assert-equal
'ok
(call-with-cached-connection cache (lambda (p) 'ok)))
(destroy-resource-pool cache))))
;; Test that call-with-cached-connection retries when the checked-out
;; port is already closed, using a fresh connection from the pool.
(run-fibers-for-tests
(lambda ()
(let* ((n 0)
(cache (make-resource-pool
(lambda ()
(set! n (+ n 1))
(if (= n 1)
(let ((p (open-input-string "")))
(close-port p)
p)
(open-input-string "")))
1
;; Without a destructor, the resource pool calls (#f port)
;; when destroying the closed-port resource, looping forever.
#:destructor (const #t))))
(assert-equal
'ok
(call-with-cached-connection cache (lambda (p) 'ok)))
(destroy-resource-pool cache))))
;; Test that call-with-connection-cache provides a working cache and
;; destroys it after the body returns.
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
(values '((content-type . (text/plain))) "ok"))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port)))
(assert-equal
200
(call-with-connection-cache
uri 1
(lambda (cache)
(call-with-cached-connection cache
(lambda (p)
(let ((response body
(http-get uri #:port p #:keep-alive? #t)))
(response-code response))))))))))
;; Test that http-fold-requests sends requests and folds over responses.
;; The proc must drain the body port between responses so that HTTP
;; pipelining works correctly.
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
(values '((content-type . (text/plain))) "ok"))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
(cache (make-connection-cache uri 1))
(requests (list (build-request uri)
(build-request uri))))
(let ((codes
(http-fold-requests
cache
(lambda (req resp body result)
(read-string body) ; drain body before next pipelined response
(cons (response-code resp) result))
'()
requests)))
(assert-equal '(200 200) codes))
(destroy-resource-pool cache))))
;; Test that http-fold-requests reconnects and retries remaining requests when
;; the server closes the connection mid-batch via Connection: close. Three
;; requests are sent in one batch; the server closes after the first response,
;; so the remaining two must be retried on a fresh connection.
(run-fibers-for-tests
(lambda ()
(let* ((n 0)
(web-server
(run-knots-web-server
(lambda (request)
(set! n (1+ n))
(if (= n 1)
(values '((content-type . (text/plain))
(connection . (close)))
"ok")
(values '((content-type . (text/plain))) "ok")))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
(cache (make-connection-cache uri 1))
(requests (list (build-request uri)
(build-request uri)
(build-request uri))))
(let ((codes
(http-fold-requests
cache
(lambda (req resp body result)
(read-string body)
(cons (response-code resp) result))
'()
requests)))
(assert-equal '(200 200 200) codes))
(destroy-resource-pool cache))))
;; Test that write errors in send-batch are handled gracefully. Each request
;; carries a large header so that the batch data exceeds the TCP send buffer,
;; causing write-request to fail while the server has already closed the
;; connection after the first response.
(run-fibers-for-tests
(lambda ()
(let* ((n 0)
(web-server
(run-knots-web-server
(lambda (request)
(set! n (1+ n))
(if (= n 1)
(values '((content-type . (text/plain))
(connection . (close)))
"ok")
(values '((content-type . (text/plain))) "ok")))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
(cache (make-connection-cache uri 1))
(n-requests 100)
;; 100 requests x ~100 KB of headers each = ~10 MB, well above
;; the typical TCP send buffer, so writes fail mid-batch.
(large-request
(build-request uri
#:headers
`((x-padding . ,(make-string 100000 #\a)))))
(requests (make-list n-requests large-request)))
(let ((codes
(http-fold-requests
cache
(lambda (req resp body result)
(read-string body)
(cons (response-code resp) result))
'()
requests)))
(assert-equal (make-list n-requests 200) codes))
(destroy-resource-pool cache))))
;; Test that http-fold-requests processes multiple batches. With batch-size 2
;; and 5 requests, three batches are needed; without the pending fix only the
;; first batch would be processed.
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
(values '((content-type . (text/plain))) "ok"))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
(cache (make-connection-cache uri 1))
(requests (make-list 5 (build-request uri))))
(let ((codes
(http-fold-requests
cache
(lambda (req resp body result)
(read-string body)
(cons (response-code resp) result))
'()
requests
#:batch-size 2)))
(assert-equal (make-list 5 200) codes))
(destroy-resource-pool cache))))
;; Test that an exception raised by proc propagates out of http-fold-requests.
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
(values '((content-type . (text/plain))) "ok"))
#:port 0))
(server-port (web-server-port web-server))
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
(cache (make-connection-cache uri 1))
(requests (list (build-request uri))))
(assert-equal
'proc-exception
(exception-message
(with-exception-handler
(lambda (exn) exn)
(lambda ()
(http-fold-requests
cache
(lambda (req resp body result)
(raise-exception
(make-exception-with-message 'proc-exception)))
'()
requests))
#:unwind? #t)))
(destroy-resource-pool cache))))
(display "web test finished successfully\n")