Use a condition for destroying resource pools

This avoids the situation where the resource pool is destroyed, so
there's no fiber to listen to the destroy request.
This commit is contained in:
Christopher Baines 2025-04-27 10:03:06 +01:00
parent 4f0eafef0a
commit 68cfbe0380

View file

@ -29,6 +29,7 @@
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers scheduler) #:use-module (fibers scheduler)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots) #:use-module (knots)
#:use-module (knots parallelism) #:use-module (knots parallelism)
#:export (resource-pool? #:export (resource-pool?
@ -71,11 +72,12 @@
(record-predicate &resource-pool-abort-add-resource)) (record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool> (define-record-type <resource-pool>
(make-resource-pool-record name channel configuration) (make-resource-pool-record name channel destroy-condition configuration)
resource-pool? resource-pool?
(name resource-pool-name) (name resource-pool-name)
(channel resource-pool-channel) (channel resource-pool-channel)
(configuration resource-pool-configuration)) (destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
(set-record-type-printer! (set-record-type-printer!
<resource-pool> <resource-pool>
@ -98,11 +100,14 @@
default-checkout-timeout default-checkout-timeout
default-max-waiters) default-max-waiters)
(define channel (make-channel)) (define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool (define pool
(make-resource-pool-record (make-resource-pool-record
name name
channel channel
destroy-condition
`((max-size . ,max-size) `((max-size . ,max-size)
(min-size . ,min-size) (min-size . ,min-size)
(idle-seconds . ,idle-seconds) (idle-seconds . ,idle-seconds)
@ -208,16 +213,14 @@
channel channel
(list 'return-failed-checkout resource))))))) (list 'return-failed-checkout resource)))))))
(define (destroy-loop resources destroy-waiters) (define (destroy-loop resources)
(let loop ((resources resources) (let loop ((resources resources))
(destroy-waiters destroy-waiters))
(match (get-message channel) (match (get-message channel)
(('add-resource resource) (('add-resource resource)
(when destructor (when destructor
(spawn-fiber-to-destroy-resource resource)) (spawn-fiber-to-destroy-resource resource))
(loop resources (loop resources))
destroy-waiters))
(('checkout reply timeout-time max-waiters) (('checkout reply timeout-time max-waiters)
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
@ -237,8 +240,7 @@
internal-time-units-per-second)) internal-time-units-per-second))
(const #f))) (const #f)))
op))))) op)))))
(loop resources (loop resources))
destroy-waiters))
(((and (or 'return (((and (or 'return
'return-failed-checkout 'return-failed-checkout
'remove) 'remove)
@ -270,17 +272,11 @@
resources)))) resources))))
(if (null? new-resources) (if (null? new-resources)
(begin (begin
(for-each (signal-condition! destroy-condition)
(lambda (destroy-waiter)
(spawn-fiber
(lambda ()
(put-message destroy-waiter 'destroy-success))))
destroy-waiters)
;; No loop ;; No loop
*unspecified*) *unspecified*)
(loop new-resources (loop new-resources)))))
destroy-waiters)))))
(('stats reply) (('stats reply)
(let ((stats (let ((stats
@ -299,24 +295,20 @@
(wrap-operation (sleep-operation 5) (wrap-operation (sleep-operation 5)
(const #f))))))) (const #f)))))))
(loop resources (loop resources))
destroy-waiters))
(('check-for-idle-resources) (('check-for-idle-resources)
(loop resources (loop resources))
destroy-waiters))
(('destroy reply) (('destroy reply)
(loop resources (loop resources))
(cons reply destroy-waiters)))
(unknown (unknown
(simple-format (simple-format
(current-error-port) (current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n" "unrecognised message to ~A resource pool channel: ~A\n"
name name
unknown) unknown)
(loop resources (loop resources)))))
destroy-waiters)))))
(define (main-loop) (define (main-loop)
(let loop ((resources '()) (let loop ((resources '())
@ -625,10 +617,11 @@
waiters waiters
resources-last-used)))) resources-last-used))))
(('destroy reply) (('destroy)
(if (and (null? resources) (if (and (null? resources)
(null? waiters)) (null? waiters))
(put-message reply 'destroy-success) (signal-condition!
destroy-condition)
(begin (begin
(for-each (for-each
@ -668,8 +661,7 @@
op)))))))) op))))))))
waiters)) waiters))
(destroy-loop resources (destroy-loop resources))))
(list reply)))))
(unknown (unknown
(simple-format (simple-format
@ -724,12 +716,16 @@
pool) pool)
(define (destroy-resource-pool pool) (define (destroy-resource-pool pool)
(let ((reply (make-channel))) (perform-operation
(put-message (resource-pool-channel pool) (choice-operation
(list 'destroy reply)) (wrap-operation
(let ((msg (get-message reply))) (put-operation (resource-pool-channel pool)
(unless (eq? msg 'destroy-success) (list 'destroy))
(error msg))))) (lambda _
(wait (resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
(define &resource-pool-timeout (define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout (make-exception-type '&recource-pool-timeout