Compare commits

..

5 commits

Author SHA1 Message Date
003c5aa6b0 WIP
All checks were successful
/ test (push) Successful in 12s
2025-06-24 21:03:34 +02:00
eadfa53b36 WIP
All checks were successful
/ test (push) Successful in 13s
2025-06-24 21:00:54 +02:00
81dd3370e6 WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:59:57 +02:00
7f5f05ef2b WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:58:25 +02:00
7c2c6f2de9 WIP 2025-06-24 20:58:25 +02:00
11 changed files with 525 additions and 1279 deletions

View file

@ -1,7 +1,7 @@
on:
push:
branches:
- trunk
- actions-test
jobs:
test:
runs-on: host
@ -10,17 +10,13 @@ jobs:
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
- run: |
cd knots-trunk
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/"
guix shell -D -f guix-dev.scm -- documenta api knots
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
- run: |
cd knots-pages
git add .
if [[ -z "$(git status -s)" ]]; then
echo "Nothing to push"
else
git config user.email ""
git config user.name "Automatic website updater"
git commit -m "Automatic website update"
git push
fi
git config user.email ""
git config user.name "Automatic website updater"
git commit -m "Automatic website update"
git push

View file

@ -20,9 +20,6 @@
(define-module (knots parallelism)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
@ -30,7 +27,6 @@
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (fibers-batch-map
fibers-map
@ -42,13 +38,7 @@
fibers-parallel
fibers-let
fiberize
make-parallelism-limiter
parallelism-limiter?
destroy-parallelism-limiter
call-with-parallelism-limiter
with-parallelism-limiter))
fiberize))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
@ -58,7 +48,7 @@
(lambda (exn)
(put-message
reply
(cons 'exception exn)))
(list 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -79,7 +69,7 @@
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply (cons 'result vals)))))))
(put-message reply vals))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -89,16 +79,13 @@
reply-channels)))
(map
(match-lambda
(('exception . exn)
(('exception exn)
(raise-exception exn))
(('result . vals)
(apply values vals)))
(result
(apply values result)))
responses)))
(define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec)
list-or-vec
@ -118,18 +105,9 @@ all of the calls to PROC have finished."
(channel-indexes '()))
(if (and (eq? #f next-to-process-index)
(null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists))
processed-result-vec
(vector->list processed-result-vec)))
(if (vector? (first lists))
result-vec
(vector->list result-vec))
(if (or (= (length channel-indexes)
(min parallelism-limit vecs-length))
@ -145,13 +123,18 @@ all of the calls to PROC have finished."
(get-operation
(vector-ref result-vec index))
(lambda (result)
(vector-set! result-vec
index
result)
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec
index
(first result))
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))))
channel-indexes)))))
(loop new-index
new-channel-indexes))
@ -174,14 +157,9 @@ all of the calls to PROC have finished."
channel-indexes)))))))
(define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map
(lambda args
(apply proc args)
@ -192,13 +170,10 @@ parallel."
*unspecified*)
(define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -209,16 +184,12 @@ parallel."
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results
(apply map
(lambda args
@ -239,8 +210,8 @@ invocation of PROC finishes. REPORT is passed the results for each
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . vals))
(car vals)))
((#f . ('result . val))
val))
channels-to-results)
(loop
(perform-operation
@ -257,7 +228,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f result)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r))))
channels-to-results)))
#f))))
@ -278,7 +254,7 @@ invocation of PROC finishes. REPORT is passed the results for each
reply-channel
(with-exception-handler
(lambda (exn)
(cons 'exception exn))
(list 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -309,32 +285,5 @@ invocation of PROC finishes. REPORT is passed the results for each
(put-message input-channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception . exn)
(('exception exn)
(raise-exception exn))))))
(define-record-type <parallelism-limiter>
(make-parallelism-limiter-record resource-pool)
parallelism-limiter?
(resource-pool parallelism-limiter-resource-pool))
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
#:name name)))
(define (destroy-parallelism-limiter parallelism-limiter)
(destroy-resource-pool
(parallelism-limiter-resource-pool
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _
(thunk))))
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
(call-with-parallelism-limiter
parallelism-limiter
(lambda () exp ...)))

View file

@ -82,10 +82,7 @@
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(start-stack
#t
((fibers-promise-thunk fp))))))
(fibers-promise-thunk fp)))
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)

File diff suppressed because it is too large Load diff

View file

@ -198,7 +198,7 @@ from there, or #f if that would be an empty string."
(record-accessor &thread-pool-timeout-error 'pool)))
(define thread-pool-timeout-error?
(exception-predicate &thread-pool-timeout-error))
(record-predicate &thread-pool-timeout-error))
(define* (make-fixed-size-thread-pool size
#:key
@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((lifetime thread-lifetime))
(define (process channel args)
(let loop ()
(match (get-message channel)
('destroy #f)
((reply sent-time proc)
@ -292,9 +292,6 @@ from there, or #f if that would be an empty string."
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
@ -322,10 +319,6 @@ from there, or #f if that would be an empty string."
vals))))))
#:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply
response)
@ -342,11 +335,7 @@ from there, or #f if that would be an empty string."
(if (and exception?
expire-on-exception?)
#t
(if lifetime
(if (<= lifetime 1)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(loop))))))))
(define (start-thread index channel)
(call-with-new-thread
@ -369,7 +358,7 @@ from there, or #f if that would be an empty string."
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process index channel args)))
(process channel args)))
#:unwind? #t)))
(when thread-destructor
@ -406,8 +395,7 @@ from there, or #f if that would be an empty string."
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout
default-max-waiters)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
@ -420,6 +408,7 @@ arguments of the thread pool procedure."
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
@ -427,11 +416,9 @@ arguments of the thread pool procedure."
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
#:default-checkout-timeout default-checkout-timeout)))
(thread-pool resource-pool
param)))

View file

@ -85,7 +85,7 @@
(record-constructor &port-timeout-error))
(define port-timeout-error?
(exception-predicate &port-timeout-error))
(record-predicate &port-timeout-error))
(define &port-read-timeout-error
(make-exception-type '&port-read-timeout-error
@ -96,7 +96,7 @@
(record-constructor &port-read-timeout-error))
(define port-read-timeout-error?
(exception-predicate &port-read-timeout-error))
(record-predicate &port-read-timeout-error))
(define &port-write-timeout-error
(make-exception-type '&port-write-timeout-error
@ -107,7 +107,7 @@
(record-constructor &port-write-timeout-error))
(define port-write-timeout-error?
(exception-predicate &port-write-timeout-error))
(record-predicate &port-write-timeout-error))
(define (readable? port)
"Test if PORT is writable."

View file

@ -63,14 +63,6 @@
(bind sock family addr port)
sock))
(define crlf-bv
(string->utf8 "\r\n"))
(define (chunked-output-port-overhead-bytes write-size)
(+ (string-length (number->string write-size 16))
(bytevector-length crlf-bv)
(bytevector-length crlf-bv)))
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
(buffering 1200))
"Returns a new port which translates non-encoded data into a HTTP
@ -82,12 +74,10 @@ when done, as it will output the remaining data, and encode the final
zero chunk. When the port is closed it will also close PORT, unless
KEEP-ALIVE? is true."
(define (write! bv start count)
(let ((len-string
(number->string count 16)))
(put-string port len-string))
(put-bytevector port crlf-bv 0 2)
(put-string port (number->string count 16))
(put-string port "\r\n")
(put-bytevector port bv start count)
(put-bytevector port crlf-bv 0 2)
(put-string port "\r\n")
(force-output port)
count)
@ -140,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true."
(record-constructor &request-body-ended-prematurely))
(define request-body-ended-prematurely-error?
(exception-predicate &request-body-ended-prematurely))
(record-predicate &request-body-ended-prematurely))
(define (request-body-port/knots r)
(cond
@ -341,19 +331,15 @@ on the procedure being called at any particular time."
(string->utf8
"internal server error")))
(define* (handle-request handler client
read-request-exception-handler
write-response-exception-handler
buffer-size
#:key post-request-hook)
(define (handle-request handler client
read-request-exception-handler
write-response-exception-handler)
(let ((request
(with-exception-handler
read-request-exception-handler
(lambda ()
(read-request client))
#:unwind? #t))
(read-request-time
(get-internal-real-time)))
#:unwind? #t)))
(let ((response
body
(cond
@ -402,9 +388,7 @@ on the procedure being called at any particular time."
(lambda ()
(write-response response client)
(let ((response-start-time
(get-internal-real-time))
(body-written?
(let ((body-written?
(if (procedure? body)
(let* ((type (response-content-type response
'(text/plain)))
@ -415,11 +399,7 @@ on the procedure being called at any particular time."
client
(make-chunked-output-port/knots
client
#:keep-alive? #t
#:buffering
(- buffer-size
(chunked-output-port-overhead-bytes
buffer-size))))))
#:keep-alive? #t))))
(set-port-encoding! body-port charset)
(let ((body-written?
(with-exception-handler
@ -443,11 +423,6 @@ on the procedure being called at any particular time."
(if body-written?
(begin
(force-output client)
(when post-request-hook
(post-request-hook request
#:read-request-time read-request-time
#:response-start-time response-start-time
#:response-end-time (get-internal-real-time)))
(when (and (procedure? body)
(response-content-length response))
(set-port-encoding! client "ISO-8859-1"))
@ -459,8 +434,7 @@ on the procedure being called at any particular time."
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
buffer-size
post-request-hook)
buffer-size)
;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves; when we force-output, we really want the data to go
;; out.
@ -498,29 +472,11 @@ on the procedure being called at any particular time."
(else
(let ((keep-alive? (handle-request handler client
read-request-exception-handler
write-response-exception-handler
buffer-size
#:post-request-hook
post-request-hook)))
write-response-exception-handler)))
(if keep-alive?
(loop)
(close-port client)))))))
(define (post-request-hook/safe post-request-hook)
(if post-request-hook
(lambda args
(with-exception-handler
(lambda (exn) #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply post-request-hook args))))
#:unwind? #t))
#f))
(define-record-type <web-server>
(make-web-server socket port)
web-server?
@ -540,8 +496,7 @@ on the procedure being called at any particular time."
(write-response-exception-handler
default-write-response-exception-handler)
(connection-idle-timeout #f)
(connection-buffer-size 1024)
post-request-hook)
(connection-buffer-size 1024))
"Run the knots web server.
HANDLER should be a procedure that takes one argument, the HTTP
@ -577,9 +532,7 @@ before sending back to the client."
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
connection-buffer-size
(post-request-hook/safe
post-request-hook)))
connection-buffer-size))
#:parallel? #t)
(loop))))))

View file

@ -1,11 +1,10 @@
(define-module (tests)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (knots)
#:export (run-fibers-for-tests
assert-no-heap-growth))
(define* (run-fibers-for-tests thunk #:key (drain? #t))
(define (run-fibers-for-tests thunk)
(let ((result
(run-fibers
(lambda ()
@ -13,18 +12,15 @@
(lambda (exn)
exn)
(lambda ()
(simple-format #t "running ~A\n" thunk)
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(backtrace)
(raise-exception exn))
(lambda ()
(start-stack #t (thunk))))
thunk)
#t)
#:unwind? #t))
#:hz 0
#:parallelism 1
#:drain? drain?)))
#:parallelism 1)))
(if (exception? result)
(raise-exception result)
result)))

View file

@ -61,24 +61,6 @@
identity
'(()))))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
(lambda (exn)
(unless (and (exception-with-message? exn)
(string=? (exception-message exn)
"foo"))
(raise-exception exn)))
(lambda ()
(fibers-map-with-progress
(lambda _
(raise-exception
(make-exception-with-message "foo")))
'((1)))
(error 'should-not-reach-here))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
@ -129,16 +111,4 @@
(assert-equal a 1))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter 2)))
(fibers-for-each
(lambda _
(with-parallelism-limiter
parallelism-limiter
#f))
(iota 50))
(destroy-parallelism-limiter parallelism-limiter))))
(display "parallelism test finished successfully\n")

View file

@ -1,33 +1,9 @@
(use-modules (tests)
(fibers)
(fibers channels)
(unit-test)
(knots parallelism)
(knots resource-pool))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter
1)))
(with-parallelism-limiter parallelism-limiter
#f)
(destroy-parallelism-limiter parallelism-limiter))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter
1))
(channel
(make-channel)))
(spawn-fiber
(lambda ()
(with-parallelism-limiter parallelism-limiter
(put-message channel #t)
(sleep 1))))
(get-message channel)
(destroy-parallelism-limiter parallelism-limiter))))
(define new-number
(let ((val 0))
(lambda ()
@ -43,21 +19,7 @@
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-fixed-size-resource-pool
(list 1))))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
res))))))
(run-fibers-for-tests
(lambda ()
@ -69,9 +31,7 @@
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
res))))))
(let* ((error-constructor
(record-constructor &resource-pool-timeout))
@ -128,13 +88,10 @@
res))
(iota 20))
(let loop ((stats (resource-pool-stats resource-pool
#:timeout #f)))
(let loop ((stats (resource-pool-stats resource-pool)))
(unless (= 0 (assq-ref stats 'resources))
(sleep 0.1)
(loop (resource-pool-stats resource-pool #:timeout #f))))
(destroy-resource-pool resource-pool))))
(loop (resource-pool-stats resource-pool)))))))
(run-fibers-for-tests
(lambda ()
@ -158,9 +115,7 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50))
(destroy-resource-pool resource-pool))))
(iota 50)))))
(run-fibers-for-tests
(lambda ()
@ -174,7 +129,7 @@
(error "collision detected")))
(new-number))
1
#:default-checkout-timeout 5)))
#:default-checkout-timeout 120)))
(fibers-batch-for-each
(lambda _
(with-resource-from-pool
@ -185,9 +140,7 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50))
(destroy-resource-pool resource-pool))))
(iota 50)))))
(run-fibers-for-tests
(lambda ()
@ -211,14 +164,14 @@
(call-with-resource-from-pool
resource-pool
(lambda (res)
#f)))
(error 'should-not-be-reached))))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool #:timeout #f)
(resource-pool-stats resource-pool)
'waiters))
(sleep 0.1))
(sleep 0))
(with-exception-handler
(lambda (exn)
@ -231,55 +184,6 @@
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
;; Test allocating resources to waiters and destroying resources
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda ()
(sleep 1)
'res)
2
#:idle-seconds 1
#:add-resources-parallelism 10
#:destructor
(const #t))))
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(sleep 2)
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
#:unwind? #t))))))
(display "resource-pool test finished successfully\n")

View file

@ -1,5 +1,4 @@
(use-modules (tests)
(ice-9 atomic)
(srfi srfi-71)
(fibers)
(unit-test)
@ -86,60 +85,4 @@
(+ 1 'a))))
#:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool
1
#:thread-lifetime 1
#:thread-initializer
(lambda ()
(list (make-atomic-box #t))))))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda (box)
(if (atomic-box-ref box)
(atomic-box-set! box #f)
(error (atomic-box-ref box))))))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(let ((thread-pool
(make-fixed-size-thread-pool
1
#:thread-lifetime 2
#:thread-initializer
(lambda ()
(list (make-atomic-box 2))))))
(define (ref-and-decrement box)
(let ((val (atomic-box-ref box)))
(atomic-box-set! box (- val 1))
val))
(unless (= 2 (call-with-thread
thread-pool
ref-and-decrement))
(error))
(unless (= 1 (call-with-thread
thread-pool
ref-and-decrement))
(error))
(unless (= 2 (call-with-thread
thread-pool
ref-and-decrement))
(error)))
(display "thread-pool test finished successfully\n")