Use a queue for the resource pool waiters
All checks were successful
/ test (push) Successful in 8s

As this will maybe improve performance.
This commit is contained in:
Christopher Baines 2025-06-30 22:57:08 +01:00
parent 7709ffe1d3
commit ce1b710bcf

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
@ -267,13 +268,13 @@
(define (main-loop)
(let loop ((resources resources)
(available resources)
(waiters '()))
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(length waiters)))
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
@ -301,8 +302,7 @@
waiters))
(loop resources
available
(cons (cons reply timeout-time)
waiters))))
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
@ -345,44 +345,46 @@
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters)
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources
(cons resource available)
'())
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(loop resources
available
(drop-right! alive-waiters 1))))))))
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters)
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(begin
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource)))
(loop resources
available
waiters)))))))
#:unwind? #t))))
(('list-resources reply)
(spawn-fiber
@ -397,7 +399,7 @@
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
@ -420,7 +422,7 @@
(('destroy)
(if (and (null? resources)
(null? waiters))
(q-empty? waiters))
(signal-condition!
destroy-condition)
@ -448,7 +450,7 @@
internal-time-units-per-second))
(const #f)))
op))))))))
waiters)
(car waiters))
(if destructor
(begin
@ -747,7 +749,7 @@
(define (main-loop)
(let loop ((resources '())
(available '())
(waiters '())
(waiters (make-q))
(resources-last-used '()))
(match (get-message channel)
@ -769,50 +771,52 @@
(cons (get-internal-real-time)
resources-last-used))))
(if (null? waiters)
(if (q-empty? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop (cons resource resources)
(cons resource available)
'()
(cons (get-internal-real-time)
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn
;; a new fiber to handle handing over
;; the resource, and returning it if
;; there's a timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(loop (cons resource resources)
available
(drop-right! alive-waiters 1)
(cons (get-internal-real-time)
resources-last-used)))))))))
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop (cons resource resources)
(cons resource available)
waiters
(cons current-internal-time
resources-last-used))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(begin
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource)))
(loop (cons resource resources)
available
waiters
(cons current-internal-time
resources-last-used)))))))
#:unwind? #t))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
@ -821,7 +825,7 @@
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(length waiters)))
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
@ -850,8 +854,7 @@
resources-last-used))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
(enq! waiters (cons reply timeout-time))
resources-last-used))))
(if timeout-time
@ -898,7 +901,7 @@
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters
@ -911,56 +914,58 @@
(get-internal-real-time))
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources
(cons resource available)
'()
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time)))
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(loop resources
available
(drop-right! alive-waiters 1)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
current-internal-time))
resources-last-used))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))))))))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource)))
(loop resources
available
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
current-internal-time)
resources-last-used))))))))
#:unwind? #t))))
(('remove resource)
(let ((index
@ -1003,7 +1008,7 @@
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
@ -1067,7 +1072,7 @@
(('destroy)
(if (and (null? resources)
(null? waiters))
(q-empty? waiters))
(signal-condition!
destroy-condition)
@ -1095,7 +1100,7 @@
internal-time-units-per-second))
(const #f)))
op))))))))
waiters)
(car waiters))
(if destructor
(begin