Make it possible to destroy a resource pool

And implement removing idle resources.
This commit is contained in:
Christopher Baines 2024-01-18 14:39:39 +00:00
parent 15b6dad5a5
commit 5af6233e5b

View file

@ -34,6 +34,7 @@
resource-pool-default-timeout
make-resource-pool
destroy-resource-pool
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats
@ -73,7 +74,7 @@
(define* (make-resource-pool initializer max-size
#:key (min-size max-size)
(idle-duration #f)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
@ -96,6 +97,32 @@
(backtrace))))
#:unwind? #t))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 5)
(destructor/safe args)))))
(let ((channel (make-channel)))
(spawn-fiber
(lambda ()
@ -110,15 +137,25 @@
(lambda ()
(let loop ((resources '())
(available '())
(waiters '()))
(waiters '())
(resources-last-used '()))
(match (get-message channel)
(match (if idle-seconds
(perform-operation
(choice-operation
(get-operation channel)
(wrap-operation
;; TODO Do something smarter
(sleep-operation 10)
(const '(check-for-idle-resources)))))
(get-message channel))
(('checkout reply)
(if (null? available)
(if (= (length resources) max-size)
(loop resources
available
(cons reply waiters))
(cons reply waiters)
resources-last-used)
(let ((new-resource (initializer/safe)))
(if new-resource
(let ((checkout-success?
@ -133,10 +170,13 @@
(if checkout-success?
available
(cons new-resource available))
waiters))
waiters
(cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
(cons reply waiters)))))
(cons reply waiters)
resources-last-used))))
(let ((checkout-success?
(perform-operation
(choice-operation
@ -148,10 +188,12 @@
(if checkout-success?
(loop resources
(cdr available)
waiters)
waiters
resources-last-used)
(loop resources
available
waiters)))))
waiters
resources-last-used)))))
(('return resource)
;; When a resource is returned, prompt all the waiters to request
;; again. This is to avoid the pool waiting on channels that may
@ -169,7 +211,15 @@
(loop resources
(cons resource available)
;; clear waiters, as they've been notified
'()))
'()
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used)))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
@ -186,7 +236,59 @@
(loop resources
available
waiters))
waiters
resources-last-used))
(('check-for-idle-resources)
(let* ((resources-last-used-seconds
(map
(lambda (internal-time)
(/ (- (get-internal-real-time) internal-time)
internal-time-units-per-second))
resources-last-used))
(resources-to-destroy
(filter-map
(lambda (resource last-used-seconds)
(if (and (member resource available)
(> last-used-seconds idle-seconds))
resource
#f))
resources
resources-last-used-seconds)))
(for-each
(lambda (resource)
(destructor/safe resource))
resources-to-destroy)
(loop (lset-difference eq? resources resources-to-destroy)
(lset-difference eq? available resources-to-destroy)
waiters
(filter-map
(lambda (resource last-used)
(if (memq resource resources-to-destroy)
#f
last-used))
resources
resources-last-used))))
(('destroy reply)
(if (= (length resources) (length available))
(begin
(for-each
(lambda (resource)
(destructor/safe resource))
resources)
(put-message reply 'destroy-success))
(begin
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation reply 'resource-pool-destroy-failed)
(sleep-operation 10)))))
(loop resources
available
waiters
resources-last-used))))
(unknown
(simple-format
(current-error-port)
@ -195,11 +297,19 @@
unknown)
(loop resources
available
waiters)))))
waiters
resources-last-used)))))
#:unwind? #t))))
channel))
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message pool (list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define resource-pool-default-timeout
(make-parameter #f))
@ -258,9 +368,12 @@ available. Return the resource once PROC has returned."
#f)
response))
#f)))
(begin
(let loop ()
(put-message pool `(checkout ,reply))
(get-message reply))))))
(let ((response (get-message reply)))
(if (eq? response 'resource-pool-retry-checkout)
(loop)
response)))))))
(when (or (not resource)
(eq? resource 'resource-pool-retry-checkout))