Compare commits

...
Sign in to create a new pull request.

19 commits

Author SHA1 Message Date
52092e7a99 Fix call to documenta
All checks were successful
/ test (push) Successful in 8s
2025-07-10 16:15:58 +01:00
4468a3ef6d Generate documentation for (knots) as well
All checks were successful
/ test (push) Successful in 24s
As enabled by Guile Documenta 0.4.
2025-07-10 16:11:45 +01:00
d18b5b8d5d Don't loop inside exception handlers
All checks were successful
/ test (push) Successful in 9s
The resource pools seemed to become slower and slower over time, this
might help?
2025-07-09 12:06:20 +01:00
f4b48a1499 Avoid calling deq! if the queue is empty
All checks were successful
/ test (push) Successful in 9s
2025-07-06 18:49:09 +01:00
ec2f2489a2 Fix resource pool bug
All checks were successful
/ test (push) Successful in 9s
And remove unnecessary named let.
2025-07-01 23:13:31 +01:00
ff93dc1442 Add a post-request-hook to the web server
All checks were successful
/ test (push) Successful in 11s
2025-07-01 12:45:12 +01:00
ce1b710bcf Use a queue for the resource pool waiters
All checks were successful
/ test (push) Successful in 8s
As this will maybe improve performance.
2025-06-30 22:57:08 +01:00
7709ffe1d3 Tweak the knots chunked output port
All checks were successful
/ test (push) Successful in 9s
To try and reduce the number of write syscalls.
2025-06-30 15:41:04 +02:00
deae518b52 Use the buffer size for chunked output ports
All checks were successful
/ test (push) Successful in 37s
2025-06-29 08:35:28 +02:00
0fa6737a39 Document some things
All checks were successful
/ test (push) Successful in 9s
2025-06-27 23:28:47 +02:00
4140ef0bd6 More consistently handle results and exceptions
In the parallelism module.
2025-06-27 22:43:25 +02:00
6f6d57b189 Use the knots backtrace printer for tests
All checks were successful
/ test (push) Successful in 11s
2025-06-27 00:16:41 +02:00
d8f64399cd Tweak spacing 2025-06-27 00:16:37 +02:00
163d775496 Fix record-predicate that should be exception-predicate 2025-06-27 00:16:18 +02:00
8f3e0a9a1d Fix exception handling in fibers-map-with-progress
All checks were successful
/ test (push) Successful in 9s
2025-06-26 22:53:15 +02:00
09ca6cfb6b Fix resource-pool-destroy-resource-exception
Raising the exception is more consistent, and avoids returning the
resource.
2025-06-26 21:27:32 +02:00
ab5411da42 Make resource pool changes and add parallelism limiter
All checks were successful
/ test (push) Successful in 9s
This was motivated by trying to allow for completely cleaning up
resource pools, which involved removing their use of fiberize which
currently has no destroy mechanism.

As part of this, there's a new parallelism limiter mechanism using
resource pools rather than fibers, and also a fixed size resource
pool.

The tests now drain? and destroy the resource pools to check cleaning
up.
2025-06-26 10:43:46 +02:00
edf62414ee Avoid workflow erroring if there's nothing to change
All checks were successful
/ test (push) Successful in 12s
2025-06-24 21:14:52 +02:00
2e25c3b074 Add workflow for building the website
Some checks failed
/ test (push) Failing after 14s
2025-06-24 21:07:29 +02:00
9 changed files with 933 additions and 307 deletions

View file

@ -0,0 +1,26 @@
on:
push:
branches:
- trunk
jobs:
test:
runs-on: host
steps:
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git knots-trunk
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
- run: |
cd knots-trunk
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/"
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
- run: |
cd knots-pages
git add .
if [[ -z "$(git status -s)" ]]; then
echo "Nothing to push"
else
git config user.email ""
git config user.name "Automatic website updater"
git commit -m "Automatic website update"
git push
fi

View file

@ -20,6 +20,9 @@
(define-module (knots parallelism)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
@ -27,6 +30,7 @@
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (fibers-batch-map
fibers-map
@ -38,7 +42,13 @@
fibers-parallel
fibers-let
fiberize))
fiberize
make-parallelism-limiter
parallelism-limiter?
destroy-parallelism-limiter
call-with-parallelism-limiter
with-parallelism-limiter))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
@ -48,7 +58,7 @@
(lambda (exn)
(put-message
reply
(list 'exception exn)))
(cons 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -69,7 +79,7 @@
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))
(put-message reply (cons 'result vals)))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -79,13 +89,16 @@
reply-channels)))
(map
(match-lambda
(('exception exn)
(('exception . exn)
(raise-exception exn))
(result
(apply values result)))
(('result . vals)
(apply values vals)))
responses)))
(define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec)
list-or-vec
@ -105,9 +118,18 @@
(channel-indexes '()))
(if (and (eq? #f next-to-process-index)
(null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists))
result-vec
(vector->list result-vec))
processed-result-vec
(vector->list processed-result-vec)))
(if (or (= (length channel-indexes)
(min parallelism-limit vecs-length))
@ -123,18 +145,13 @@
(get-operation
(vector-ref result-vec index))
(lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec
index
(first result))
result)
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))))
(list index))))))
channel-indexes)))))
(loop new-index
new-channel-indexes))
@ -157,9 +174,14 @@
channel-indexes)))))))
(define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map
(lambda args
(apply proc args)
@ -170,10 +192,13 @@
*unspecified*)
(define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -184,12 +209,16 @@
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results
(apply map
(lambda args
@ -210,8 +239,8 @@
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . val))
val))
((#f . ('result . vals))
(car vals)))
channels-to-results)
(loop
(perform-operation
@ -228,12 +257,7 @@
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons #f result)
(cons c r))))
channels-to-results)))
#f))))
@ -254,7 +278,7 @@
reply-channel
(with-exception-handler
(lambda (exn)
(list 'exception exn))
(cons 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -285,5 +309,32 @@
(put-message input-channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception exn)
(('exception . exn)
(raise-exception exn))))))
(define-record-type <parallelism-limiter>
(make-parallelism-limiter-record resource-pool)
parallelism-limiter?
(resource-pool parallelism-limiter-resource-pool))
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
#:name name)))
(define (destroy-parallelism-limiter parallelism-limiter)
(destroy-resource-pool
(parallelism-limiter-resource-pool
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _
(thunk))))
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
(call-with-parallelism-limiter
parallelism-limiter
(lambda () exp ...)))

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
@ -32,9 +33,10 @@
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (resource-pool?
#:export (make-fixed-size-resource-pool
make-resource-pool
resource-pool?
resource-pool-name
resource-pool-channel
resource-pool-configuration
@ -73,7 +75,7 @@
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(record-predicate &resource-pool-abort-add-resource))
(exception-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration)
@ -91,6 +93,429 @@
(resource-pool-name resource-pool))
port)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(define* (make-fixed-size-resource-pool resources
#:key
(delay-logger (const #f))
(duration-logger (const #f))
destructor
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource))
#t)))
#:unwind? #t)))
(if success?
(put-message channel
(list 'remove resource))
(begin
(sleep 5)
(loop))))))))
(define (spawn-fiber-for-checkout reply-channel
reply-timeout
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(cons 'success resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(define (destroy-loop resources)
(let loop ((resources resources))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources))
(('destroy reply)
(loop resources))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources)))))
(define (main-loop)
(let loop ((resources resources)
(available resources)
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters))
(loop resources
available
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout reply
reply-timeout
(car available))
(loop resources
(cdr available)
waiters))
(loop resources
available
waiters)))
(begin
(put-message reply (cons 'success
(car available)))
(loop resources
(cdr available)
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters)
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters))))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (list-copy resources))))
(loop resources
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources
available
waiters))
(('destroy)
(if (and (null? resources)
(q-empty? waiters))
(signal-condition!
destroy-condition)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(if destructor
(begin
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
available)
(destroy-loop resources))
(let dl ((resources resources)
(available available))
(if (null? available)
(if (null? resources)
(signal-condition!
destroy-condition)
(destroy-loop resources))
(let ((index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(dl (remove-at-index! resources index)
(cdr available)))))))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size
#:key (min-size 0)
(idle-seconds #f)
@ -126,28 +551,33 @@
(define checkout-failure-count 0)
(define spawn-fiber-to-return-new-resource
(if add-resources-parallelism
(let ((thunk
(fiberize
(define return-new-resource/parallelism-limiter
(make-parallelism-limiter
(or add-resources-parallelism
max-size)
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(unless (resource-pool-destroyed-error? exn)
(raise-exception exn)))
(lambda ()
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (assq-ref (resource-pool-stats pool)
(size (assq-ref (resource-pool-stats pool #:timeout #f)
'resources)))
(unless (= size max-size)
(let ((new-resource
(return-new-resource)))
(put-message channel
(list 'add-resource new-resource))))))
#:parallelism add-resources-parallelism)))
(lambda ()
(spawn-fiber thunk)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler
(lambda _ #f)
(lambda ()
@ -161,11 +591,12 @@
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (return-new-resource)))))
#:unwind? #t)))
(when new-resource
(let ((new-resource
(start-stack #t (return-new-resource))))
(put-message channel
(list 'add-resource new-resource)))))))))
(list 'add-resource new-resource))))))
#:unwind? #t)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
@ -250,21 +681,14 @@
'remove)
return-type)
resource)
(when destructor
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources
(if index
(remove-at-index! resources index)
@ -276,13 +700,16 @@
resources))))
(if (null? new-resources)
(begin
(and=> return-new-resource/parallelism-limiter
destroy-parallelism-limiter)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply)
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . 0)
@ -291,13 +718,17 @@
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources))
@ -317,7 +748,7 @@
(define (main-loop)
(let loop ((resources '())
(available '())
(waiters '())
(waiters (make-q))
(resources-last-used '()))
(match (get-message channel)
@ -339,50 +770,51 @@
(cons (get-internal-real-time)
resources-last-used))))
(if (null? waiters)
(if (q-empty? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop (cons resource resources)
(cons resource available)
'()
(cons (get-internal-real-time)
waiters
(cons current-internal-time
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- waiter-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn
;; a new fiber to handle handing over
;; the resource, and returning it if
;; there's a timeout
(spawn-fiber-for-checkout waiter-channel
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop (cons resource resources)
available
(drop-right! alive-waiters 1)
(cons (get-internal-real-time)
resources-last-used)))))))))
waiters
(cons current-internal-time
resources-last-used))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
@ -391,7 +823,7 @@
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(length waiters)))
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
@ -420,8 +852,7 @@
resources-last-used))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
(enq! waiters (cons reply timeout-time))
resources-last-used))))
(if timeout-time
@ -468,7 +899,7 @@
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters
@ -481,19 +912,14 @@
(get-internal-real-time))
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
'()
waiters
(begin
(when (eq? return-type 'return)
(list-set!
@ -501,50 +927,48 @@
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time)))
current-internal-time))
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- waiter-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout waiter-channel
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
(drop-right! alive-waiters 1)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))))))))
current-internal-time)
resources-last-used)))))
(('remove resource)
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(loop (if index
(remove-at-index! resources index)
(begin
@ -577,22 +1001,26 @@
waiters
resources-last-used))
(('stats reply)
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources
available
@ -641,22 +1069,10 @@
(('destroy)
(if (and (null? resources)
(null? waiters))
(q-empty? waiters))
(signal-condition!
destroy-condition)
(begin
(for-each
(lambda (resource)
(if destructor
(spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(put-message channel
(list 'remove resource)))
#:parallel? #t)))
available)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
@ -681,10 +1097,28 @@
internal-time-units-per-second))
(const #f)))
op))))))))
waiters))
(destroy-loop resources))))
(car waiters))
(if destructor
(begin
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
available)
(destroy-loop resources))
(let dl ((resources resources)
(available available))
(if (null? available)
(if (null? resources)
(signal-condition!
destroy-condition)
(destroy-loop resources))
(let ((index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(dl (remove-at-index! resources index)
(cdr available)))))))))
(unknown
(simple-format
(current-error-port)
@ -744,7 +1178,8 @@
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait (resource-pool-destroy-condition pool))))
(wait
(resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
@ -763,7 +1198,7 @@
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(exception-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
@ -784,7 +1219,7 @@
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(record-predicate &resource-pool-too-many-waiters))
(exception-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
@ -800,7 +1235,7 @@
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(record-predicate &resource-pool-destroyed))
(exception-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
@ -811,7 +1246,7 @@
(record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception?
(record-predicate &resource-pool-destroy-resource))
(exception-predicate &resource-pool-destroy-resource))
(define resource-pool-default-timeout-handler
(make-parameter #f))
@ -879,8 +1314,9 @@ available. Return the resource once PROC has returned."
start-time)
'timeout)
response))
'timeout)))))
(let loop ((reply (make-channel)))
'timeout))
'timeout)))
(let ((reply (make-channel)))
(put-message channel
(list 'checkout
reply
@ -918,8 +1354,7 @@ available. Return the resource once PROC has returned."
'destroy
'return)
resource))
(unless (resource-pool-destroy-resource-exception? exn)
(raise-exception exn)))
(raise-exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -949,13 +1384,17 @@ available. Return the resource once PROC has returned."
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
(start-time (get-internal-real-time)))
(if timeout
(let* ((reply (make-channel))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(stats ,reply))
`(stats ,reply ,timeout-time))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
@ -976,7 +1415,11 @@ available. Return the resource once PROC has returned."
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool))))))
(make-resource-pool-timeout-error pool)))))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
`(stats ,reply #f))
(get-message reply))))
(define (resource-pool-list-resources pool)
(let ((reply (make-channel)))

View file

@ -198,7 +198,7 @@ from there, or #f if that would be an empty string."
(record-accessor &thread-pool-timeout-error 'pool)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout-error))
(exception-predicate &thread-pool-timeout-error))
(define* (make-fixed-size-thread-pool size
#:key

View file

@ -85,7 +85,7 @@
(record-constructor &port-timeout-error))
(define port-timeout-error?
(record-predicate &port-timeout-error))
(exception-predicate &port-timeout-error))
(define &port-read-timeout-error
(make-exception-type '&port-read-timeout-error
@ -96,7 +96,7 @@
(record-constructor &port-read-timeout-error))
(define port-read-timeout-error?
(record-predicate &port-read-timeout-error))
(exception-predicate &port-read-timeout-error))
(define &port-write-timeout-error
(make-exception-type '&port-write-timeout-error
@ -107,7 +107,7 @@
(record-constructor &port-write-timeout-error))
(define port-write-timeout-error?
(record-predicate &port-write-timeout-error))
(exception-predicate &port-write-timeout-error))
(define (readable? port)
"Test if PORT is writable."

View file

@ -63,6 +63,14 @@
(bind sock family addr port)
sock))
(define crlf-bv
(string->utf8 "\r\n"))
(define (chunked-output-port-overhead-bytes write-size)
(+ (string-length (number->string write-size 16))
(bytevector-length crlf-bv)
(bytevector-length crlf-bv)))
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
(buffering 1200))
"Returns a new port which translates non-encoded data into a HTTP
@ -74,10 +82,12 @@ when done, as it will output the remaining data, and encode the final
zero chunk. When the port is closed it will also close PORT, unless
KEEP-ALIVE? is true."
(define (write! bv start count)
(put-string port (number->string count 16))
(put-string port "\r\n")
(let ((len-string
(number->string count 16)))
(put-string port len-string))
(put-bytevector port crlf-bv 0 2)
(put-bytevector port bv start count)
(put-string port "\r\n")
(put-bytevector port crlf-bv 0 2)
(force-output port)
count)
@ -130,7 +140,7 @@ closes PORT, unless KEEP-ALIVE? is true."
(record-constructor &request-body-ended-prematurely))
(define request-body-ended-prematurely-error?
(record-predicate &request-body-ended-prematurely))
(exception-predicate &request-body-ended-prematurely))
(define (request-body-port/knots r)
(cond
@ -331,15 +341,19 @@ on the procedure being called at any particular time."
(string->utf8
"internal server error")))
(define (handle-request handler client
(define* (handle-request handler client
read-request-exception-handler
write-response-exception-handler)
write-response-exception-handler
buffer-size
#:key post-request-hook)
(let ((request
(with-exception-handler
read-request-exception-handler
(lambda ()
(read-request client))
#:unwind? #t)))
#:unwind? #t))
(read-request-time
(get-internal-real-time)))
(let ((response
body
(cond
@ -388,7 +402,9 @@ on the procedure being called at any particular time."
(lambda ()
(write-response response client)
(let ((body-written?
(let ((response-start-time
(get-internal-real-time))
(body-written?
(if (procedure? body)
(let* ((type (response-content-type response
'(text/plain)))
@ -399,7 +415,11 @@ on the procedure being called at any particular time."
client
(make-chunked-output-port/knots
client
#:keep-alive? #t))))
#:keep-alive? #t
#:buffering
(- buffer-size
(chunked-output-port-overhead-bytes
buffer-size))))))
(set-port-encoding! body-port charset)
(let ((body-written?
(with-exception-handler
@ -423,6 +443,11 @@ on the procedure being called at any particular time."
(if body-written?
(begin
(force-output client)
(when post-request-hook
(post-request-hook request
#:read-request-time read-request-time
#:response-start-time response-start-time
#:response-end-time (get-internal-real-time)))
(when (and (procedure? body)
(response-content-length response))
(set-port-encoding! client "ISO-8859-1"))
@ -434,7 +459,8 @@ on the procedure being called at any particular time."
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
buffer-size)
buffer-size
post-request-hook)
;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves; when we force-output, we really want the data to go
;; out.
@ -472,11 +498,29 @@ on the procedure being called at any particular time."
(else
(let ((keep-alive? (handle-request handler client
read-request-exception-handler
write-response-exception-handler)))
write-response-exception-handler
buffer-size
#:post-request-hook
post-request-hook)))
(if keep-alive?
(loop)
(close-port client)))))))
(define (post-request-hook/safe post-request-hook)
(if post-request-hook
(lambda args
(with-exception-handler
(lambda (exn) #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply post-request-hook args))))
#:unwind? #t))
#f))
(define-record-type <web-server>
(make-web-server socket port)
web-server?
@ -496,7 +540,8 @@ on the procedure being called at any particular time."
(write-response-exception-handler
default-write-response-exception-handler)
(connection-idle-timeout #f)
(connection-buffer-size 1024))
(connection-buffer-size 1024)
post-request-hook)
"Run the knots web server.
HANDLER should be a procedure that takes one argument, the HTTP
@ -532,7 +577,9 @@ before sending back to the client."
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
connection-buffer-size))
connection-buffer-size
(post-request-hook/safe
post-request-hook)))
#:parallel? #t)
(loop))))))

View file

@ -1,10 +1,11 @@
(define-module (tests)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (knots)
#:export (run-fibers-for-tests
assert-no-heap-growth))
(define (run-fibers-for-tests thunk)
(define* (run-fibers-for-tests thunk #:key (drain? #t))
(let ((result
(run-fibers
(lambda ()
@ -12,15 +13,18 @@
(lambda (exn)
exn)
(lambda ()
(simple-format #t "running ~A\n" thunk)
(with-exception-handler
(lambda (exn)
(backtrace)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
thunk)
(lambda ()
(start-stack #t (thunk))))
#t)
#:unwind? #t))
#:hz 0
#:parallelism 1)))
#:parallelism 1
#:drain? drain?)))
(if (exception? result)
(raise-exception result)
result)))

View file

@ -61,6 +61,24 @@
identity
'(()))))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
(lambda (exn)
(unless (and (exception-with-message? exn)
(string=? (exception-message exn)
"foo"))
(raise-exception exn)))
(lambda ()
(fibers-map-with-progress
(lambda _
(raise-exception
(make-exception-with-message "foo")))
'((1)))
(error 'should-not-reach-here))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
@ -111,4 +129,16 @@
(assert-equal a 1))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter 2)))
(fibers-for-each
(lambda _
(with-parallelism-limiter
parallelism-limiter
#f))
(iota 50))
(destroy-parallelism-limiter parallelism-limiter))))
(display "parallelism test finished successfully\n")

View file

@ -19,7 +19,21 @@
(number?
(with-resource-from-pool resource-pool
res
res))))))
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-fixed-size-resource-pool
(list 1))))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
@ -31,7 +45,9 @@
(number?
(with-resource-from-pool resource-pool
res
res))))))
res)))
(destroy-resource-pool resource-pool))))
(let* ((error-constructor
(record-constructor &resource-pool-timeout))
@ -88,10 +104,13 @@
res))
(iota 20))
(let loop ((stats (resource-pool-stats resource-pool)))
(let loop ((stats (resource-pool-stats resource-pool
#:timeout #f)))
(unless (= 0 (assq-ref stats 'resources))
(sleep 0.1)
(loop (resource-pool-stats resource-pool)))))))
(loop (resource-pool-stats resource-pool #:timeout #f))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
@ -115,7 +134,9 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50)))))
(iota 50))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
@ -129,7 +150,7 @@
(error "collision detected")))
(new-number))
1
#:default-checkout-timeout 120)))
#:default-checkout-timeout 5)))
(fibers-batch-for-each
(lambda _
(with-resource-from-pool
@ -140,7 +161,9 @@
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50)))))
(iota 50))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
@ -164,14 +187,14 @@
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#f)))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool)
(resource-pool-stats resource-pool #:timeout #f)
'waiters))
(sleep 0))
(sleep 0.1))
(with-exception-handler
(lambda (exn)
@ -184,6 +207,8 @@
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t))))))
#:unwind? #t)))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n")