From 4f0eafef0a2f6b613b24f260da8fdcda48574a11 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 27 Apr 2025 09:41:56 +0100 Subject: [PATCH] Resource pool max waiters and destroy changes Add the ability to specify the max number of waiters for a resource pool, this provides a more efficient way of avoiding waiters for a resource pool continually rising. This commit also improves the destroy behaviour. --- knots/resource-pool.scm | 351 +++++++++++++++++++++++++++++++++------- tests/resource-pool.scm | 44 +++++ 2 files changed, 333 insertions(+), 62 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 1b50482..c9aab02 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -43,6 +43,15 @@ resource-pool-timeout-error-pool resource-pool-timeout-error? + &resource-pool-too-many-waiters + resource-pool-too-many-waiters-error-pool + resource-pool-too-many-waiters-error-waiters-count + resource-pool-too-many-waiters-error? + + &resource-pool-destroyed + resource-pool-destroyed-error-pool + resource-pool-destroyed-error? + resource-pool-default-timeout-handler call-with-resource-from-pool @@ -86,7 +95,8 @@ scheduler (name "unnamed") (add-resources-parallelism 1) - default-checkout-timeout) + default-checkout-timeout + default-max-waiters) (define channel (make-channel)) (define pool @@ -102,7 +112,8 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (default-checkout-timeout . ,default-checkout-timeout)))) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) @@ -186,7 +197,8 @@ (perform-operation (choice-operation (wrap-operation - (put-operation reply-channel resource) + (put-operation reply-channel + (cons 'success resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) @@ -196,6 +208,116 @@ channel (list 'return-failed-checkout resource))))))) + (define (destroy-loop resources destroy-waiters) + (let loop ((resources resources) + (destroy-waiters destroy-waiters)) + (match (get-message channel) + (('add-resource resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (loop resources + destroy-waiters)) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + destroy-waiters)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (for-each + (lambda (destroy-waiter) + (spawn-fiber + (lambda () + (put-message destroy-waiter 'destroy-success)))) + destroy-waiters) + + ;; No loop + *unspecified*) + (loop new-resources + destroy-waiters))))) + + (('stats reply) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 5) + (const #f))))))) + + (loop resources + destroy-waiters)) + + (('check-for-idle-resources) + (loop resources + destroy-waiters)) + + (('destroy reply) + (loop resources + (cons reply destroy-waiters))) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + destroy-waiters))))) + (define (main-loop) (let loop ((resources '()) (available '()) @@ -257,7 +379,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop (cons resource resources) available @@ -265,17 +388,45 @@ (cons (get-internal-real-time) resources-last-used))))))))) - (('checkout reply timeout-time) + (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) - (loop resources - available - (cons (cons reply timeout-time) - waiters) - resources-last-used)) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters + resources-last-used)) + (loop resources + available + (cons (cons reply timeout-time) + waiters) + resources-last-used)))) (if timeout-time (let ((current-internal-time @@ -303,7 +454,8 @@ waiters resources-last-used))) (begin - (put-message reply (car available)) + (put-message reply (cons 'success + (car available))) (loop resources (cdr available) @@ -369,7 +521,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop resources available @@ -473,7 +626,8 @@ resources-last-used)))) (('destroy reply) - (if (null? resources) + (if (and (null? resources) + (null? waiters)) (put-message reply 'destroy-success) (begin @@ -488,16 +642,34 @@ #:parallel? #t))) available) - (spawn-fiber - (lambda () - (sleep 0.1) - (put-message channel - (list 'destroy reply)))) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters)) - (loop resources - '() - waiters - resources-last-used)))) + (destroy-loop resources + (list reply))))) (unknown (simple-format @@ -575,12 +747,50 @@ (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) +(define &resource-pool-too-many-waiters + (make-exception-type '&recource-pool-too-many-waiters + &error + '(pool waiters-count))) + +(define resource-pool-too-many-waiters-error-pool + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'pool))) + +(define resource-pool-too-many-waiters-error-waiters-count + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'waiters-count))) + +(define make-resource-pool-too-many-waiters-error + (record-constructor &resource-pool-too-many-waiters)) + +(define resource-pool-too-many-waiters-error? + (record-predicate &resource-pool-too-many-waiters)) + +(define &resource-pool-destroyed + (make-exception-type '&recource-pool-destroyed + &error + '(pool))) + +(define resource-pool-destroyed-error-pool + (exception-accessor + &resource-pool-destroyed + (record-accessor &resource-pool-destroyed 'pool))) + +(define make-resource-pool-destroyed-error + (record-constructor &resource-pool-destroyed)) + +(define resource-pool-destroyed-error? + (record-predicate &resource-pool-destroyed)) + (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (resource-pool-default-timeout-handler))) + (timeout-handler (resource-pool-default-timeout-handler)) + (max-waiters 'default)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -590,7 +800,13 @@ available. Return the resource once PROC has returned." 'default-checkout-timeout) timeout)) - (let ((resource + (define max-waiters-or-default + (if (eq? max-waiters 'default) + (assq-ref (resource-pool-configuration pool) + 'default-max-waiters) + max-waiters)) + + (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) @@ -603,7 +819,8 @@ available. Return the resource once PROC has returned." reply (+ start-time (* timeout-or-default - internal-time-units-per-second)))) + internal-time-units-per-second)) + max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) @@ -629,52 +846,62 @@ available. Return the resource once PROC has returned." 0) (loop (make-channel) start-time) - #f) + 'timeout) response)) - #f))))) + 'timeout))))) (let loop ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'checkout reply - #f)) + #f + max-waiters-or-default)) (get-message reply))))) - (when (not resource) - (when timeout-handler - (timeout-handler pool proc timeout)) + (match reply + ('timeout + (when timeout-handler + (timeout-handler pool proc timeout)) - (raise-exception - (make-resource-pool-timeout-error pool))) + (raise-exception + (make-resource-pool-timeout-error pool))) + (('too-many-waiters . count) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - ;; Unwind the stack before calling put-message, as - ;; this avoids inconsistent behaviour with - ;; continuation barriers - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) - (lambda () - (proc resource)))) - #:unwind? #t)) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals))))) + (raise-exception + (make-resource-pool-too-many-waiters-error pool + count))) + (('resource-pool-destroyed . #f) + (raise-exception + (make-resource-pool-destroyed-error pool))) + (('success . resource) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + ;; Unwind the stack before calling put-message, as + ;; this avoids inconsistent behaviour with + ;; continuation barriers + (put-message (resource-pool-channel pool) + `(return ,resource)) + (raise-exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) + (lambda () + (proc resource)))) + #:unwind? #t)) + (lambda vals + (put-message (resource-pool-channel pool) + `(return ,resource)) + (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1251429..1bc09e5 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -142,4 +142,48 @@ 20 (iota 50))))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (lambda () #f) + 1 + #:default-max-waiters 1))) + (call-with-resource-from-pool + resource-pool + (lambda (res) + + ;; 1st waiter + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + (if (resource-pool-destroyed-error? exn) + #t + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t))) + + (while (= 0 + (assq-ref + (resource-pool-stats resource-pool) + 'waiters)) + (sleep 0)) + + (with-exception-handler + (lambda (exn) + (if (resource-pool-too-many-waiters-error? exn) + #t + (raise-exception exn))) + (lambda () + ;; 2nd waiter + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t)))))) + (display "resource-pool test finished successfully\n")