From ce1b710bcf4b1874bdd06a6f96cb3e268f4b5895 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 30 Jun 2025 22:57:08 +0100 Subject: [PATCH] Use a queue for the resource pool waiters As this will maybe improve performance. --- knots/resource-pool.scm | 277 ++++++++++++++++++++-------------------- 1 file changed, 141 insertions(+), 136 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 7da76b0..b27f329 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-71) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) @@ -267,13 +268,13 @@ (define (main-loop) (let loop ((resources resources) (available resources) - (waiters '())) + (waiters (make-q))) (match (get-message channel) (('checkout reply timeout-time max-waiters) (if (null? available) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -301,8 +302,7 @@ waiters)) (loop resources available - (cons (cons reply timeout-time) - waiters)))) + (enq! waiters (cons reply timeout-time))))) (if timeout-time (let ((current-internal-time @@ -345,44 +345,46 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '()) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1)))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters))))))) + #:unwind? #t)))) (('list-resources reply) (spawn-fiber @@ -397,7 +399,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -420,7 +422,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -448,7 +450,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin @@ -747,7 +749,7 @@ (define (main-loop) (let loop ((resources '()) (available '()) - (waiters '()) + (waiters (make-q)) (resources-last-used '())) (match (get-message channel) @@ -769,50 +771,52 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (null? waiters) + (if (q-empty? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop (cons resource resources) - (cons resource available) - '() - (cons (get-internal-real-time) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn - ;; a new fiber to handle handing over - ;; the resource, and returning it if - ;; there's a timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop (cons resource resources) - available - (drop-right! alive-waiters 1) - (cons (get-internal-real-time) - resources-last-used))))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used))))))) + #:unwind? #t)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -821,7 +825,7 @@ (spawn-fiber-to-return-new-resource)) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -850,8 +854,7 @@ resources-last-used)) (loop resources available - (cons (cons reply timeout-time) - waiters) + (enq! waiters (cons reply timeout-time)) resources-last-used)))) (if timeout-time @@ -898,7 +901,7 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters @@ -911,56 +914,58 @@ (get-internal-real-time)) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '() - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time))) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time)) + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)))))))) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used)))))))) + #:unwind? #t)))) (('remove resource) (let ((index @@ -1003,7 +1008,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1067,7 +1072,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -1095,7 +1100,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin