Improve resource pool performance when there are lots of waiters

This commit is contained in:
Christopher Baines 2025-02-04 12:52:12 +00:00
parent aadbee0d0e
commit a11cd24e57
2 changed files with 221 additions and 138 deletions

View file

@ -29,6 +29,7 @@
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers scheduler) #:use-module (fibers scheduler)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots parallelism) #:use-module (knots parallelism)
#:export (resource-pool? #:export (resource-pool?
@ -38,9 +39,6 @@
resource-pool-configuration resource-pool-configuration
destroy-resource-pool destroy-resource-pool
resource-pool-default-timeout
resource-pool-retry-checkout-timeout
&resource-pool-timeout &resource-pool-timeout
resource-pool-timeout-error-pool resource-pool-timeout-error-pool
resource-pool-timeout-error? resource-pool-timeout-error?
@ -87,8 +85,9 @@
lifetime lifetime
scheduler scheduler
(name "unnamed") (name "unnamed")
(reply-timeout 0.5) (reply-timeout 1)
(add-resources-parallelism 1)) (add-resources-parallelism 1)
default-checkout-timeout)
(define channel (make-channel)) (define channel (make-channel))
(define pool (define pool
@ -104,7 +103,8 @@
(lifetime . ,lifetime) (lifetime . ,lifetime)
(scheduler . ,scheduler) (scheduler . ,scheduler)
(name . ,name) (name . ,name)
(reply-timeout . ,reply-timeout)))) (reply-timeout . ,reply-timeout)
(default-checkout-timeout . ,default-checkout-timeout))))
(define checkout-failure-count 0) (define checkout-failure-count 0)
@ -232,23 +232,38 @@
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used)) resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop (cons resource resources)
(cons resource available)
'()
(cons (get-internal-real-time)
resources-last-used))
(begin (begin
(if reply-timeout (if reply-timeout
;; Don't sleep in this fiber, so spawn a new ;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the ;; fiber to handle handing over the
;; resource, and returning it if there's a ;; resource, and returning it if there's a
;; timeout ;; timeout
(spawn-fiber-for-checkout (last waiters) (spawn-fiber-for-checkout (car (last alive-waiters))
resource) resource)
(put-message (last waiters) resource)) (put-message (car (last alive-waiters)) resource))
(loop (cons resource resources) (loop (cons resource resources)
available available
(drop-right! waiters 1) (drop-right! alive-waiters 1)
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used)))))) resources-last-used))))))))
(('checkout reply) (('checkout reply timeout-time)
(if (null? available) (if (null? available)
(begin (begin
(unless (= (length resources) max-size) (unless (= (length resources) max-size)
@ -256,21 +271,30 @@
(loop resources (loop resources
available available
(cons reply waiters) (cons (cons reply timeout-time)
waiters)
resources-last-used)) resources-last-used))
;; If this client is still waiting
(if (> timeout-time
(get-internal-real-time))
(let ((resource (car available))) (let ((resource (car available)))
(if reply-timeout (if reply-timeout
;; Don't sleep in this fiber, so spawn a ;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the ;; new fiber to handle handing over the
;; resource, and returning it if there's a ;; resource, and returning it if there's a
;; timeout ;; timeout
(spawn-fiber-for-checkout reply resource) (spawn-fiber-for-checkout reply
resource)
(put-message reply resource)) (put-message reply resource))
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
resources-last-used))
(loop resources
available
waiters
resources-last-used)))) resources-last-used))))
(((and (or 'return (((and (or 'return
@ -296,19 +320,41 @@
(get-internal-real-time)) (get-internal-real-time))
resources-last-used)) resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources
(cons resource available)
'()
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time)))
resources-last-used))
(begin (begin
(if reply-timeout (if reply-timeout
;; Don't sleep in this fiber, so spawn a new ;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the ;; fiber to handle handing over the
;; resource, and returning it if there's a ;; resource, and returning it if there's a
;; timeout ;; timeout
(spawn-fiber-for-checkout (last waiters) (spawn-fiber-for-checkout (car (last alive-waiters))
resource) resource)
(put-message (last waiters) resource)) (put-message (car (last alive-waiters)) resource))
(loop resources (loop resources
available available
(drop-right! waiters 1) (drop-right! alive-waiters 1)
(begin (begin
(list-set! (list-set!
resources-last-used resources-last-used
@ -316,7 +362,7 @@
(eq? x resource)) (eq? x resource))
resources) resources)
(get-internal-real-time)) (get-internal-real-time))
resources-last-used))))) resources-last-used)))))))
(('remove resource) (('remove resource)
(let ((index (let ((index
@ -359,8 +405,8 @@
(wrap-operation (wrap-operation
(put-operation reply stats) (put-operation reply stats)
(const #t)) (const #t))
(wrap-operation (sleep-operation (wrap-operation (sleep-operation (or reply-timeout
reply-timeout) 5))
(const #f))))))) (const #f)))))))
(loop resources (loop resources
@ -495,12 +541,6 @@
(unless (eq? msg 'destroy-success) (unless (eq? msg 'destroy-success)
(error msg))))) (error msg)))))
(define resource-pool-default-timeout
(make-parameter #f))
(define resource-pool-retry-checkout-timeout
(make-parameter 5))
(define &resource-pool-timeout (define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout (make-exception-type '&recource-pool-timeout
&error &error
@ -526,31 +566,36 @@
"Call PROC with a resource from POOL, blocking until a resource becomes "Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned." available. Return the resource once PROC has returned."
(define retry-timeout
(resource-pool-retry-checkout-timeout))
(define timeout-or-default (define timeout-or-default
(if (eq? timeout 'default) (if (eq? timeout 'default)
(resource-pool-default-timeout) (assq-ref (resource-pool-configuration pool)
'default-checkout-timeout)
timeout)) timeout))
(define resource-pool-reply-timeout
(assq-ref (resource-pool-configuration pool)
'reply-timeout))
(let ((resource (let ((resource
(let ((reply (make-channel))) (if timeout-or-default
(let loop ((start-time (get-internal-real-time))) (let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
(let ((request-success? (let ((request-success?
(perform-operation (perform-operation
(choice-operation (choice-operation
(wrap-operation (wrap-operation
(put-operation (resource-pool-channel pool) (put-operation (resource-pool-channel pool)
`(checkout ,reply)) (list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))))
(const #t)) (const #t))
(wrap-operation (sleep-operation (or timeout-or-default (wrap-operation (sleep-operation timeout-or-default)
retry-timeout))
(const #f)))))) (const #f))))))
(if request-success? (if request-success?
(let ((time-remaining (let ((time-remaining
(- (or timeout-or-default (- timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time) (/ (- (get-internal-real-time)
start-time) start-time)
internal-time-units-per-second)))) internal-time-units-per-second))))
@ -563,50 +608,62 @@ available. Return the resource once PROC has returned."
(const #f)))))) (const #f))))))
(if (or (not response) (if (or (not response)
(eq? response 'resource-pool-retry-checkout)) (eq? response 'resource-pool-retry-checkout))
(if (> (- (or timeout-or-default (if (> (- timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time) (/ (- (get-internal-real-time)
start-time) start-time)
internal-time-units-per-second)) internal-time-units-per-second))
0) 0)
(loop start-time) (loop (make-channel)
(if (eq? timeout-or-default #f) start-time)
(loop (get-internal-real-time)) #f)
#f))
response)) response))
(if (eq? timeout-or-default #f) #f)))))
(loop (get-internal-real-time)) (let loop ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'checkout
reply
(if resource-pool-reply-timeout
(+ (get-internal-real-time)
(* resource-pool-reply-timeout
internal-time-units-per-second))
#f))) #f)))
(if (eq? timeout-or-default #f) (if resource-pool-reply-timeout
(loop (get-internal-real-time)) (let ((resource-or-timeout
#f))))))) (perform-operation
(choice-operation
(get-operation reply)
(wrap-operation
(sleep-operation resource-pool-reply-timeout)
(const 'resource-pool-reply-timeout))))))
(if (or (eq? resource-or-timeout
'resource-pool-reply-timeout)
(eq? resource-or-timeout
'resource-pool-retry-checkout))
(loop (make-channel))
resource-or-timeout))
(get-message reply))))))
(when (or (not resource) (when (not resource)
(eq? resource 'resource-pool-retry-checkout))
(when timeout-handler (when timeout-handler
(timeout-handler pool proc timeout)) (timeout-handler pool proc timeout))
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool))) (make-resource-pool-timeout-error pool)))
(with-exception-handler
(lambda (exception)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values (call-with-values
(lambda () (lambda ()
(with-throw-handler #t (with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exn))
(lambda () (lambda ()
(proc resource)) (proc resource))))
(lambda _
(backtrace))))
(lambda vals (lambda vals
(put-message (resource-pool-channel pool) (put-message (resource-pool-channel pool)
`(return ,resource)) `(return ,resource))
(apply values vals)))) (apply values vals)))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...) (define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool (call-with-resource-from-pool

View file

@ -105,7 +105,7 @@
(error "collision detected"))) (error "collision detected")))
(new-number)) (new-number))
1))) 1)))
(fibers-for-each (fibers-batch-for-each
(lambda _ (lambda _
(with-resource-from-pool (with-resource-from-pool
resource-pool res resource-pool res
@ -114,6 +114,32 @@
(if (= start-val counter) (if (= start-val counter)
(set! counter (+ 1 counter)) (set! counter (+ 1 counter))
(error "collision detected"))))) (error "collision detected")))))
20
(iota 50)))))
(run-fibers-for-tests
(lambda ()
(let* ((counter 0)
(resource-pool (make-resource-pool
(lambda ()
(let ((start-val counter))
(sleep 0.05)
(if (= start-val counter)
(set! counter (+ 1 counter))
(error "collision detected")))
(new-number))
1
#:reply-timeout #f)))
(fibers-batch-for-each
(lambda _
(with-resource-from-pool
resource-pool res
(let ((start-val counter))
(sleep 0.05)
(if (= start-val counter)
(set! counter (+ 1 counter))
(error "collision detected")))))
20
(iota 50))))) (iota 50)))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")