Compare commits

...

10 commits

Author SHA1 Message Date
7ba77010ae Handle %stacks not being a pair
Not sure when this would happen, but guard against it.
2025-05-15 09:26:29 +01:00
8c63ed7b4e Support listing resource pool resources 2025-05-15 09:24:28 +01:00
838ee6f1e3 Enable destroying individual resources in a resource pool 2025-05-15 09:24:28 +01:00
1dca6d755e Allow specifying the resource-pool-channel 2025-04-27 10:52:36 +01:00
68cfbe0380 Use a condition for destroying resource pools
This avoids the situation where the resource pool is destroyed, so
there's no fiber to listen to the destroy request.
2025-04-27 10:03:06 +01:00
4f0eafef0a Resource pool max waiters and destroy changes
Add the ability to specify the max number of waiters for a resource
pool, this provides a more efficient way of avoiding waiters for a
resource pool continually rising.

This commit also improves the destroy behaviour.
2025-04-27 09:41:56 +01:00
8c0f04be4f Don't call put-message without unwinding the stack
When handling exceptions, as this is error prone.
2025-04-01 17:47:11 +03:00
e1858dfff5 Remove the web-server exception handler
This turned out not to be useful, since I wanted to handle exceptions
happening in the exception handler, so it didn't really help in the
end to allow customising it.
2025-03-14 14:51:42 +00:00
da69fd19f3 Unwind before calling put-message
As I think this might be more reliable in case the stack contains
something that would introduce a continuation barrier.
2025-03-11 11:55:42 +00:00
a73fd1ca50 Add a fibers-parallel test 2025-03-10 21:33:29 +00:00
8 changed files with 492 additions and 197 deletions

View file

@ -67,11 +67,14 @@
(define* (print-backtrace-and-exception/knots
exn
#:key (port (current-error-port)))
(let* ((stack (match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))))
(let* ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t))))
(error-string
(call-with-output-string
(lambda (port)

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
@ -43,27 +44,33 @@
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(call-with-escape-continuation
(lambda (return)
(with-exception-handler
(lambda (exn)
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(put-message reply
(list 'exception
(make-exception
exn
(make-knots-exception stack)))))))
(return))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))))
(with-exception-handler
(lambda (exn)
(put-message
reply
(list 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -245,25 +252,31 @@
(get-message process-channel))))
(put-message
reply-channel
(call-with-escape-continuation
(lambda (return)
(with-exception-handler
(lambda (exn)
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(return (list 'exception
(make-exception
exn
(make-knots-exception stack))))))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (apply proc args)))
(lambda vals
(cons 'result vals)))))))))))
(with-exception-handler
(lambda (exn)
(list 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (apply proc args)))
(lambda vals
(cons 'result vals))))))
#:unwind? #t)))))
#:parallel? #t))
(iota parallelism))

View file

@ -29,6 +29,7 @@
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (resource-pool?
@ -43,6 +44,19 @@
resource-pool-timeout-error-pool
resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
&resource-pool-destroy-resource
make-resource-pool-destroy-resource-exception
resource-pool-destroy-resource-exception?
resource-pool-default-timeout-handler
call-with-resource-from-pool
@ -62,11 +76,12 @@
(record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel configuration)
(make-resource-pool-record name channel destroy-condition configuration)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel)
(configuration resource-pool-configuration))
(name resource-pool-name)
(channel resource-pool-channel)
(destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
(set-record-type-printer!
<resource-pool>
@ -86,13 +101,17 @@
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout)
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
@ -102,7 +121,8 @@
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout))))
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
@ -186,7 +206,8 @@
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel resource)
(put-operation reply-channel
(cons 'success resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
@ -196,6 +217,103 @@
channel
(list 'return-failed-checkout resource)))))))
(define (destroy-loop resources)
(let loop ((resources resources))
(match (get-message channel)
(('add-resource resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(loop resources))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
(loop resources))
(('check-for-idle-resources)
(loop resources))
(('destroy reply)
(loop resources))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources)))))
(define (main-loop)
(let loop ((resources '())
(available '())
@ -257,7 +375,8 @@
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel resource))
(put-message waiter-channel (cons 'success
resource)))
(loop (cons resource resources)
available
@ -265,17 +384,45 @@
(cons (get-internal-real-time)
resources-last-used)))))))))
(('checkout reply timeout-time)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(begin
(unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))
(let ((waiters-count
(length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters
resources-last-used))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))))
(if timeout-time
(let ((current-internal-time
@ -303,7 +450,8 @@
waiters
resources-last-used)))
(begin
(put-message reply (car available))
(put-message reply (cons 'success
(car available)))
(loop resources
(cdr available)
@ -369,7 +517,8 @@
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel resource))
(put-message waiter-channel (cons 'success
resource)))
(loop resources
available
@ -410,6 +559,24 @@
resources-last-used
index))))
(('destroy resource)
(spawn-fiber-to-destroy-resource resource)
(loop resources
available
waiters
resources-last-used))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (list-copy resources))))
(loop resources
available
waiters
resources-last-used))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
@ -472,9 +639,11 @@
waiters
resources-last-used))))
(('destroy reply)
(if (null? resources)
(put-message reply 'destroy-success)
(('destroy)
(if (and (null? resources)
(null? waiters))
(signal-condition!
destroy-condition)
(begin
(for-each
@ -488,16 +657,33 @@
#:parallel? #t)))
available)
(spawn-fiber
(lambda ()
(sleep 0.1)
(put-message channel
(list 'destroy reply))))
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
waiters))
(loop resources
'()
waiters
resources-last-used))))
(destroy-loop resources))))
(unknown
(simple-format
@ -552,12 +738,16 @@
pool)
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait (resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
@ -575,12 +765,63 @@
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(record-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(record-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
&exception
'()))
(define make-resource-pool-destroy-resource-exception
(record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception?
(record-predicate &resource-pool-destroy-resource))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler)))
(timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default)
(channel (resource-pool-channel pool))
(destroy-resource-on-exception? #f))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
@ -590,7 +831,13 @@ available. Return the resource once PROC has returned."
'default-checkout-timeout)
timeout))
(let ((resource
(define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(let ((reply
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
@ -598,12 +845,13 @@ available. Return the resource once PROC has returned."
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(put-operation channel
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))))
internal-time-units-per-second))
max-waiters-or-default))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
@ -629,37 +877,71 @@ available. Return the resource once PROC has returned."
0)
(loop (make-channel)
start-time)
#f)
'timeout)
response))
#f)))))
'timeout)))))
(let loop ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(put-message channel
(list 'checkout
reply
#f))
#f
max-waiters-or-default))
(get-message reply)))))
(when (not resource)
(when timeout-handler
(timeout-handler pool proc timeout))
(match reply
('timeout
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(raise-exception
(make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exn))
(lambda ()
(proc resource))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))
(raise-exception
(make-resource-pool-too-many-waiters-error pool
count)))
(('resource-pool-destroyed . #f)
(raise-exception
(make-resource-pool-destroyed-error pool)))
(('success . resource)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
;; Unwind the stack before calling put-message, as
;; this avoids inconsistent behaviour with
;; continuation barriers
(put-message
(resource-pool-channel pool)
(list (if (or destroy-resource-on-exception?
(resource-pool-destroy-resource-exception? exn))
'destroy
'return)
resource))
(unless (resource-pool-destroy-resource-exception? exn)
(raise-exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(proc resource))))
#:unwind? #t))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
@ -696,3 +978,8 @@ available. Return the resource once PROC has returned."
(raise-exception
(make-resource-pool-timeout-error pool))))))
(define (resource-pool-list-resources pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'list-resources reply))
(get-message reply)))

View file

@ -251,15 +251,18 @@ arguments of the thread pool procedure."
proc)
(with-exception-handler
(lambda (exn)
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))))
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()

View file

@ -48,7 +48,6 @@
request-body-port/knots
read-request-body/knots
default-exception-handler
default-write-response-exception-handler
web-server?
@ -310,7 +309,7 @@ on the procedure being called at any particular time."
;; Close the client port
#f)
(define (default-exception-handler exn request)
(define (exception-handler exn request)
(let* ((error-string
(call-with-output-string
(lambda (port)
@ -332,8 +331,7 @@ on the procedure being called at any particular time."
(define (handle-request handler client
read-request-exception-handler
write-response-exception-handler
exception-handler)
write-response-exception-handler)
(let ((request
(with-exception-handler
read-request-exception-handler
@ -356,58 +354,14 @@ on the procedure being called at any particular time."
(lambda (return)
(with-exception-handler
(lambda (exn)
(with-exception-handler
(lambda (exn)
(call-with-values
(lambda ()
(default-exception-handler
(make-exception
exn
(make-exception-with-message
"exception in exception handler")
(make-exception-with-irritants
exception-handler))
request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response
request
response
body))
return)))))
(lambda ()
(call-with-values
(lambda ()
(exception-handler exn request))
(lambda (response body)
(call-with-values
(lambda ()
(exception-handler exn request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response request response body))
return))
(other
(call-with-values
(lambda ()
(default-exception-handler
(make-exception-with-irritants
(list (make-exception-with-message
(simple-format
#f
"wrong number of values returned from exception handler, expecting 2, got ~A"
(length other)))
exception-handler))
request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response
request
response
body))
return))))))))))
(sanitize-response request response body))
return))))
(lambda ()
(start-stack
#t
@ -475,7 +429,6 @@ on the procedure being called at any particular time."
#:unwind? #t))))
(define* (client-loop client handler
exception-handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
@ -517,8 +470,7 @@ on the procedure being called at any particular time."
(else
(let ((keep-alive? (handle-request handler client
read-request-exception-handler
write-response-exception-handler
exception-handler)))
write-response-exception-handler)))
(if keep-alive?
(loop)
(close-port client)))))))
@ -537,8 +489,6 @@ on the procedure being called at any particular time."
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port))
(exception-handler
default-exception-handler)
(read-request-exception-handler
default-read-request-exception-handler)
(write-response-exception-handler
@ -577,7 +527,6 @@ before sending back to the client."
((client . sockaddr)
(spawn-fiber (lambda ()
(client-loop client handler
exception-handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout

View file

@ -93,4 +93,22 @@
1))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((a 0))
(call-with-values
(lambda ()
(fibers-parallel
(begin
(sleep 1)
1)
(begin
(set! a 1)
2)))
(lambda (a b)
(assert-equal a 1)
(assert-equal b 2)))
(assert-equal a 1))))
(display "parallelism test finished successfully\n")

View file

@ -142,4 +142,48 @@
20
(iota 50)))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda () #f)
1
#:default-max-waiters 1)))
(call-with-resource-from-pool
resource-pool
(lambda (res)
;; 1st waiter
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(if (resource-pool-destroyed-error? exn)
#t
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool)
'waiters))
(sleep 0))
(with-exception-handler
(lambda (exn)
(if (resource-pool-too-many-waiters-error? exn)
#t
(raise-exception exn)))
(lambda ()
;; 2nd waiter
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t))))))
(display "resource-pool test finished successfully\n")

View file

@ -52,28 +52,6 @@
uri
#:port (non-blocking-open-socket-for-uri uri)))))))
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
"Hello, World!")
#:port 0
#:exception-handler
(lambda (exn request)
"Error"))) ;; Bind to any port
(port
(web-server-port web-server))
(uri
(build-uri 'http #:host "127.0.0.1" #:port port)))
(assert-equal
500
(response-code
(http-get
uri
#:port (non-blocking-open-socket-for-uri uri)))))))
(run-fibers-for-tests
(lambda ()
(let* ((web-server