Tweak the resource pool

Mostly to no longer sleep in the main fiber. Now the main fiber just
spawns other fibers when it would previously block on put-operation
and these other fibers communicate back to the main resource pool
fiber when necessary.

This should mean that the resource pool is more responsive.
This commit is contained in:
Christopher Baines 2025-01-08 15:57:30 +00:00
parent 59c183b13f
commit dcb56ee2c5
2 changed files with 340 additions and 258 deletions

View file

@ -28,10 +28,13 @@
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers scheduler) #:use-module (fibers scheduler)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots parallelism)
#:export (resource-pool? #:export (resource-pool?
make-resource-pool make-resource-pool
resource-pool-name resource-pool-name
resource-pool-channel
resource-pool-configuration
destroy-resource-pool destroy-resource-pool
resource-pool-default-timeout resource-pool-default-timeout
@ -48,11 +51,23 @@
resource-pool-stats)) resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool> (define-record-type <resource-pool>
(make-resource-pool-record name channel) (make-resource-pool-record name channel configuration)
resource-pool? resource-pool?
(name resource-pool-name) (name resource-pool-name)
(channel resource-pool-channel)) (channel resource-pool-channel)
(configuration resource-pool-configuration))
(set-record-type-printer! (set-record-type-printer!
<resource-pool> <resource-pool>
@ -62,7 +77,7 @@
(resource-pool-name resource-pool)) (resource-pool-name resource-pool))
port))) port)))
(define* (make-resource-pool initializer max-size (define* (make-resource-pool return-new-resource max-size
#:key (min-size max-size) #:key (min-size max-size)
(idle-seconds #f) (idle-seconds #f)
(delay-logger (const #f)) (delay-logger (const #f))
@ -71,25 +86,80 @@
lifetime lifetime
scheduler scheduler
(name "unnamed") (name "unnamed")
(put-message-timeout 0.5)) (reply-timeout 0.5)
(define (initializer/safe) add-resources-parallelism)
(define channel (make-channel))
(define pool
(make-resource-pool-record
name
channel
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(reply-timeout . ,reply-timeout))))
(define checkout-failure-count 0)
(define spawn-fiber-to-return-new-resource
(let ((thunk
(if add-resources-parallelism
(fiberize
(lambda ()
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (assq-ref (resource-pool-stats pool)
'resources)))
(if (= size max-size)
(raise-exception
(make-resource-pool-abort-add-resource-error))
(return-new-resource))))
#:parallelism add-resources-parallelism
#:show-backtrace?
(lambda (key . args)
(not
(and (eq? key '%exception)
(resource-pool-abort-add-resource-error?
(car args))))))
return-new-resource)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(unless (resource-pool-abort-add-resource-error? exn)
(simple-format (simple-format
(current-error-port) (current-error-port)
"exception running ~A resource pool initializer: ~A:\n ~A\n" "exception adding resource to pool ~A: ~A:\n ~A\n"
name name
initializer return-new-resource
exn) exn))
#f) #f)
(lambda () (lambda ()
(with-throw-handler #t (with-throw-handler #t
initializer thunk
(lambda args (lambda (key . args)
(backtrace)))) (unless (and (eq? key '%exception)
#:unwind? #t)) (resource-pool-abort-add-resource-error?
(car args)))
(backtrace)))))
#:unwind? #t)))
(when new-resource
(put-message channel
(list 'add-resource new-resource)))))))))
(define (destructor/safe args) (define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success? (let ((success?
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -103,20 +173,34 @@
(lambda () (lambda ()
(with-throw-handler #t (with-throw-handler #t
(lambda () (lambda ()
(destructor args) (destructor resource)
#t) #t)
(lambda _ (lambda _
(backtrace)))) (backtrace))))
#:unwind? #t))) #:unwind? #t)))
(or success? (unless success?
#t
(begin
(sleep 5) (sleep 5)
(destructor/safe args)))))
(let ((channel (make-channel)) (loop)))))))
(checkout-failure-count 0))
(define (spawn-fiber-for-checkout reply-channel resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel resource)
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(when idle-seconds (when idle-seconds
@ -126,14 +210,7 @@
(sleep idle-seconds) (sleep idle-seconds)
(put-message channel '(check-for-idle-resources)))))) (put-message channel '(check-for-idle-resources))))))
(while #t (with-throw-handler #t
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception in the ~A pool fiber: ~A\n"
name
exn))
(lambda () (lambda ()
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
@ -141,62 +218,74 @@
(resources-last-used '())) (resources-last-used '()))
(match (get-message channel) (match (get-message channel)
(('checkout reply) (('add-resource resource)
(if (null? available)
(if (= (length resources) max-size) (if (= (length resources) max-size)
(begin
(spawn-fiber-to-destroy-resource resource)
(loop resources (loop resources
available available
(cons reply waiters) waiters
resources-last-used) resources-last-used))
(let ((new-resource (initializer/safe)))
(if new-resource
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply new-resource)
(const #t))
(wrap-operation (sleep-operation
put-message-timeout)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(loop (cons new-resource resources) (if (null? waiters)
(if checkout-success? (loop (cons resource resources)
available (cons resource available)
(cons new-resource available))
waiters waiters
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used))) resources-last-used))
(begin
(if reply-timeout
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout (last waiters)
resource)
(put-message (last waiters) resource))
(loop (cons resource resources)
available
(drop-right! waiters 1)
(cons (get-internal-real-time)
resources-last-used))))))
(('checkout reply)
(if (null? available)
(begin
(unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource))
(loop resources (loop resources
available available
(cons reply waiters) (cons reply waiters)
resources-last-used)))) resources-last-used))
(let ((checkout-success?
(perform-operation (let ((resource (car available)))
(choice-operation (if reply-timeout
(wrap-operation ;; Don't sleep in this fiber, so spawn a
(put-operation reply (car available)) ;; new fiber to handle handing over the
(const #t)) ;; resource, and returning it if there's a
(wrap-operation (sleep-operation ;; timeout
put-message-timeout) (spawn-fiber-for-checkout reply resource)
(const #f)))))) (put-message reply resource))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success?
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
resources-last-used) resources-last-used))))
(loop resources
available (((and (or 'return
waiters 'return-failed-checkout)
resources-last-used))))) return-type)
(('return resource) resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters) (if (null? waiters)
(loop resources (loop resources
(cons resource available) (cons resource available)
@ -209,21 +298,17 @@
resources) resources)
(get-internal-real-time)) (get-internal-real-time))
resources-last-used)) resources-last-used))
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (last waiters)
resource)
(const #t))
(wrap-operation (sleep-operation
put-message-timeout)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success? (begin
(if reply-timeout
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout (last waiters)
resource)
(put-message (last waiters) resource))
(loop resources (loop resources
available available
(drop-right! waiters 1) (drop-right! waiters 1)
@ -234,29 +319,8 @@
(eq? x resource)) (eq? x resource))
resources) resources)
(get-internal-real-time)) (get-internal-real-time))
resources-last-used)) resources-last-used)))))
(begin
(for-each
(lambda (waiter)
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation waiter 'resource-pool-retry-checkout)
(sleep-operation 10))))))
waiters)
(loop resources
(cons resource available)
'()
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used)))))))
(('stats reply) (('stats reply)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
@ -272,13 +336,14 @@
(put-operation reply stats) (put-operation reply stats)
(const #t)) (const #t))
(wrap-operation (sleep-operation (wrap-operation (sleep-operation
put-message-timeout) reply-timeout)
(const #f))))))) (const #f)))))))
(loop resources (loop resources
available available
waiters waiters
resources-last-used)) resources-last-used))
(('check-for-idle-resources) (('check-for-idle-resources)
(let* ((resources-last-used-seconds (let* ((resources-last-used-seconds
(map (map
@ -298,7 +363,7 @@
(for-each (for-each
(lambda (resource) (lambda (resource)
(destructor/safe resource)) (spawn-fiber-to-destroy-resource resource))
resources-to-destroy) resources-to-destroy)
(loop (lset-difference eq? resources resources-to-destroy) (loop (lset-difference eq? resources resources-to-destroy)
@ -311,12 +376,13 @@
last-used)) last-used))
resources resources
resources-last-used)))) resources-last-used))))
(('destroy reply) (('destroy reply)
(if (= (length resources) (length available)) (if (= (length resources) (length available))
(begin (begin
(for-each (for-each
(lambda (resource) (lambda (resource)
(destructor/safe resource)) (spawn-fiber-to-destroy-resource resource))
resources) resources)
(put-message reply 'destroy-success)) (put-message reply 'destroy-success))
(begin (begin
@ -330,6 +396,7 @@
available available
waiters waiters
resources-last-used)))) resources-last-used))))
(unknown (unknown
(simple-format (simple-format
(current-error-port) (current-error-port)
@ -340,11 +407,13 @@
available available
waiters waiters
resources-last-used))))) resources-last-used)))))
#:unwind? #t))) (lambda (key . args)
(simple-format (current-error-port)
"exception in the ~A pool fiber\n" name))))
(or scheduler (or scheduler
(current-scheduler))) (current-scheduler)))
(make-resource-pool-record name channel))) pool)
(define (destroy-resource-pool pool) (define (destroy-resource-pool pool)
(let ((reply (make-channel))) (let ((reply (make-channel)))

View file

@ -15,6 +15,19 @@
res) res)
2)))) 2))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda ()
2)
1
#:add-resources-parallelism 1)))
(assert-equal
(with-resource-from-pool resource-pool
res
res)
2))))
(let* ((error-constructor (let* ((error-constructor
(record-constructor &resource-pool-timeout)) (record-constructor &resource-pool-timeout))
(err (err