Resource pool max waiters and destroy changes

Add the ability to specify the max number of waiters for a resource
pool, this provides a more efficient way of avoiding waiters for a
resource pool continually rising.

This commit also improves the destroy behaviour.
This commit is contained in:
Christopher Baines 2025-04-27 09:41:56 +01:00
parent 8c0f04be4f
commit 4f0eafef0a
2 changed files with 333 additions and 62 deletions

View file

@ -43,6 +43,15 @@
resource-pool-timeout-error-pool resource-pool-timeout-error-pool
resource-pool-timeout-error? resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
resource-pool-default-timeout-handler resource-pool-default-timeout-handler
call-with-resource-from-pool call-with-resource-from-pool
@ -86,7 +95,8 @@
scheduler scheduler
(name "unnamed") (name "unnamed")
(add-resources-parallelism 1) (add-resources-parallelism 1)
default-checkout-timeout) default-checkout-timeout
default-max-waiters)
(define channel (make-channel)) (define channel (make-channel))
(define pool (define pool
@ -102,7 +112,8 @@
(lifetime . ,lifetime) (lifetime . ,lifetime)
(scheduler . ,scheduler) (scheduler . ,scheduler)
(name . ,name) (name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)))) (default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0) (define checkout-failure-count 0)
@ -186,7 +197,8 @@
(perform-operation (perform-operation
(choice-operation (choice-operation
(wrap-operation (wrap-operation
(put-operation reply-channel resource) (put-operation reply-channel
(cons 'success resource))
(const #t)) (const #t))
(wrap-operation (sleep-operation (wrap-operation (sleep-operation
reply-timeout) reply-timeout)
@ -196,6 +208,116 @@
channel channel
(list 'return-failed-checkout resource))))))) (list 'return-failed-checkout resource)))))))
(define (destroy-loop resources destroy-waiters)
(let loop ((resources resources)
(destroy-waiters destroy-waiters))
(match (get-message channel)
(('add-resource resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(loop resources
destroy-waiters))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
destroy-waiters))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(for-each
(lambda (destroy-waiter)
(spawn-fiber
(lambda ()
(put-message destroy-waiter 'destroy-success))))
destroy-waiters)
;; No loop
*unspecified*)
(loop new-resources
destroy-waiters)))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
(loop resources
destroy-waiters))
(('check-for-idle-resources)
(loop resources
destroy-waiters))
(('destroy reply)
(loop resources
(cons reply destroy-waiters)))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
destroy-waiters)))))
(define (main-loop) (define (main-loop)
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
@ -257,7 +379,8 @@
(spawn-fiber-for-checkout waiter-channel (spawn-fiber-for-checkout waiter-channel
reply-timeout reply-timeout
resource)) resource))
(put-message waiter-channel resource)) (put-message waiter-channel (cons 'success
resource)))
(loop (cons resource resources) (loop (cons resource resources)
available available
@ -265,17 +388,45 @@
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used))))))))) resources-last-used)))))))))
(('checkout reply timeout-time) (('checkout reply timeout-time max-waiters)
(if (null? available) (if (null? available)
(begin (begin
(unless (= (length resources) max-size) (unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource)) (spawn-fiber-to-return-new-resource))
(loop resources (let ((waiters-count
available (length waiters)))
(cons (cons reply timeout-time) (if (and max-waiters
waiters) (>= waiters-count
resources-last-used)) max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters
resources-last-used))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))))
(if timeout-time (if timeout-time
(let ((current-internal-time (let ((current-internal-time
@ -303,7 +454,8 @@
waiters waiters
resources-last-used))) resources-last-used)))
(begin (begin
(put-message reply (car available)) (put-message reply (cons 'success
(car available)))
(loop resources (loop resources
(cdr available) (cdr available)
@ -369,7 +521,8 @@
(spawn-fiber-for-checkout waiter-channel (spawn-fiber-for-checkout waiter-channel
reply-timeout reply-timeout
resource)) resource))
(put-message waiter-channel resource)) (put-message waiter-channel (cons 'success
resource)))
(loop resources (loop resources
available available
@ -473,7 +626,8 @@
resources-last-used)))) resources-last-used))))
(('destroy reply) (('destroy reply)
(if (null? resources) (if (and (null? resources)
(null? waiters))
(put-message reply 'destroy-success) (put-message reply 'destroy-success)
(begin (begin
@ -488,16 +642,34 @@
#:parallel? #t))) #:parallel? #t)))
available) available)
(spawn-fiber (let ((current-internal-time (get-internal-real-time)))
(lambda () (for-each
(sleep 0.1) (match-lambda
(put-message channel ((reply . timeout)
(list 'destroy reply)))) (when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
waiters))
(loop resources (destroy-loop resources
'() (list reply)))))
waiters
resources-last-used))))
(unknown (unknown
(simple-format (simple-format
@ -575,12 +747,50 @@
(define resource-pool-timeout-error? (define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout)) (record-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(record-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(record-predicate &resource-pool-destroyed))
(define resource-pool-default-timeout-handler (define resource-pool-default-timeout-handler
(make-parameter #f)) (make-parameter #f))
(define* (call-with-resource-from-pool (define* (call-with-resource-from-pool
pool proc #:key (timeout 'default) pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler))) (timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default))
"Call PROC with a resource from POOL, blocking until a resource becomes "Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned." available. Return the resource once PROC has returned."
@ -590,7 +800,13 @@ available. Return the resource once PROC has returned."
'default-checkout-timeout) 'default-checkout-timeout)
timeout)) timeout))
(let ((resource (define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(let ((reply
(if timeout-or-default (if timeout-or-default
(let loop ((reply (make-channel)) (let loop ((reply (make-channel))
(start-time (get-internal-real-time))) (start-time (get-internal-real-time)))
@ -603,7 +819,8 @@ available. Return the resource once PROC has returned."
reply reply
(+ start-time (+ start-time
(* timeout-or-default (* timeout-or-default
internal-time-units-per-second)))) internal-time-units-per-second))
max-waiters-or-default))
(const #t)) (const #t))
(wrap-operation (sleep-operation timeout-or-default) (wrap-operation (sleep-operation timeout-or-default)
(const #f)))))) (const #f))))))
@ -629,52 +846,62 @@ available. Return the resource once PROC has returned."
0) 0)
(loop (make-channel) (loop (make-channel)
start-time) start-time)
#f) 'timeout)
response)) response))
#f))))) 'timeout)))))
(let loop ((reply (make-channel))) (let loop ((reply (make-channel)))
(put-message (resource-pool-channel pool) (put-message (resource-pool-channel pool)
(list 'checkout (list 'checkout
reply reply
#f)) #f
max-waiters-or-default))
(get-message reply))))) (get-message reply)))))
(when (not resource) (match reply
(when timeout-handler ('timeout
(timeout-handler pool proc timeout)) (when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool))) (make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(call-with-values (raise-exception
(lambda () (make-resource-pool-too-many-waiters-error pool
(with-exception-handler count)))
(lambda (exn) (('resource-pool-destroyed . #f)
;; Unwind the stack before calling put-message, as (raise-exception
;; this avoids inconsistent behaviour with (make-resource-pool-destroyed-error pool)))
;; continuation barriers (('success . resource)
(put-message (resource-pool-channel pool) (call-with-values
`(return ,resource)) (lambda ()
(raise-exception exn)) (with-exception-handler
(lambda () (lambda (exn)
(with-exception-handler ;; Unwind the stack before calling put-message, as
(lambda (exn) ;; this avoids inconsistent behaviour with
(match (fluid-ref %stacks) ;; continuation barriers
((stack-tag . prompt-tag) (put-message (resource-pool-channel pool)
(let ((stack (make-stack #t `(return ,resource))
0 prompt-tag (raise-exception exn))
0 (and prompt-tag 1)))) (lambda ()
(raise-exception (with-exception-handler
(make-exception (lambda (exn)
exn (match (fluid-ref %stacks)
(make-knots-exception stack))))))) ((stack-tag . prompt-tag)
(lambda () (let ((stack (make-stack #t
(proc resource)))) 0 prompt-tag
#:unwind? #t)) 0 (and prompt-tag 1))))
(lambda vals (raise-exception
(put-message (resource-pool-channel pool) (make-exception
`(return ,resource)) exn
(apply values vals))))) (make-knots-exception stack)))))))
(lambda ()
(proc resource))))
#:unwind? #t))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...) (define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool (call-with-resource-from-pool

View file

@ -142,4 +142,48 @@
20 20
(iota 50))))) (iota 50)))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda () #f)
1
#:default-max-waiters 1)))
(call-with-resource-from-pool
resource-pool
(lambda (res)
;; 1st waiter
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(if (resource-pool-destroyed-error? exn)
#t
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool)
'waiters))
(sleep 0))
(with-exception-handler
(lambda (exn)
(if (resource-pool-too-many-waiters-error? exn)
#t
(raise-exception exn)))
(lambda ()
;; 2nd waiter
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t))))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")