From dcb56ee2c5ac3e283cb46841766e7282f3c2c52e Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 8 Jan 2025 15:57:30 +0000 Subject: [PATCH] Tweak the resource pool Mostly to no longer sleep in the main fiber. Now the main fiber just spawns other fibers when it would previously block on put-operation and these other fibers communicate back to the main resource pool fiber when necessary. This should mean that the resource pool is more responsive. --- knots/resource-pool.scm | 585 ++++++++++++++++++++++------------------ tests/resource-pool.scm | 13 + 2 files changed, 340 insertions(+), 258 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 9b2dce4..355cfa9 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -28,10 +28,13 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) + #:use-module (knots parallelism) #:export (resource-pool? make-resource-pool resource-pool-name + resource-pool-channel + resource-pool-configuration destroy-resource-pool resource-pool-default-timeout @@ -48,11 +51,23 @@ resource-pool-stats)) +(define &resource-pool-abort-add-resource + (make-exception-type '&recource-pool-abort-add-resource + &error + '())) + +(define make-resource-pool-abort-add-resource-error + (record-constructor &resource-pool-abort-add-resource)) + +(define resource-pool-abort-add-resource-error? + (record-predicate &resource-pool-abort-add-resource)) + (define-record-type - (make-resource-pool-record name channel) + (make-resource-pool-record name channel configuration) resource-pool? - (name resource-pool-name) - (channel resource-pool-channel)) + (name resource-pool-name) + (channel resource-pool-channel) + (configuration resource-pool-configuration)) (set-record-type-printer! @@ -62,7 +77,7 @@ (resource-pool-name resource-pool)) port))) -(define* (make-resource-pool initializer max-size +(define* (make-resource-pool return-new-resource max-size #:key (min-size max-size) (idle-seconds #f) (delay-logger (const #f)) @@ -71,280 +86,334 @@ lifetime scheduler (name "unnamed") - (put-message-timeout 0.5)) - (define (initializer/safe) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running ~A resource pool initializer: ~A:\n ~A\n" - name - initializer - exn) - #f) + (reply-timeout 0.5) + add-resources-parallelism) + (define channel (make-channel)) + + (define pool + (make-resource-pool-record + name + channel + `((max-size . ,max-size) + (min-size . ,min-size) + (idle-seconds . ,idle-seconds) + (delay-logger . ,delay-logger) + (duration-logger . ,duration-logger) + (destructor . ,destructor) + (lifetime . ,lifetime) + (scheduler . ,scheduler) + (name . ,name) + (reply-timeout . ,reply-timeout)))) + + (define checkout-failure-count 0) + + (define spawn-fiber-to-return-new-resource + (let ((thunk + (if add-resources-parallelism + (fiberize + (lambda () + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool) + 'resources))) + (if (= size max-size) + (raise-exception + (make-resource-pool-abort-add-resource-error)) + (return-new-resource)))) + #:parallelism add-resources-parallelism + #:show-backtrace? + (lambda (key . args) + (not + (and (eq? key '%exception) + (resource-pool-abort-add-resource-error? + (car args)))))) + return-new-resource))) (lambda () - (with-throw-handler #t - initializer - (lambda args - (backtrace)))) - #:unwind? #t)) + (spawn-fiber + (lambda () + (let ((new-resource + (with-exception-handler + (lambda (exn) + (unless (resource-pool-abort-add-resource-error? exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A:\n ~A\n" + name + return-new-resource + exn)) + #f) + (lambda () + (with-throw-handler #t + thunk + (lambda (key . args) + (unless (and (eq? key '%exception) + (resource-pool-abort-add-resource-error? + (car args))) + (backtrace))))) + #:unwind? #t))) + (when new-resource + (put-message channel + (list 'add-resource new-resource))))))))) - (define (destructor/safe args) - (let ((success? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A:\n ~A\n" - name - destructor - exn) - #f) - (lambda () - (with-throw-handler #t - (lambda () - (destructor args) - #t) - (lambda _ - (backtrace)))) - #:unwind? #t))) - - (or success? - #t - (begin - (sleep 5) - (destructor/safe args))))) - - (let ((channel (make-channel)) - (checkout-failure-count 0)) + (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () - (when idle-seconds - (spawn-fiber - (lambda () - (while #t - (sleep idle-seconds) - (put-message channel '(check-for-idle-resources)))))) + (let loop () + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A:\n ~A\n" + name + destructor + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (destructor resource) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) - (while #t - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception in the ~A pool fiber: ~A\n" - name - exn)) - (lambda () - (let loop ((resources '()) - (available '()) - (waiters '()) - (resources-last-used '())) + (unless success? + (sleep 5) - (match (get-message channel) - (('checkout reply) - (if (null? available) - (if (= (length resources) max-size) - (loop resources - available - (cons reply waiters) - resources-last-used) - (let ((new-resource (initializer/safe))) - (if new-resource - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply new-resource) - (const #t)) - (wrap-operation (sleep-operation - put-message-timeout) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) + (loop))))))) - (loop (cons new-resource resources) - (if checkout-success? - available - (cons new-resource available)) - waiters - (cons (get-internal-real-time) - resources-last-used))) - (loop resources - available - (cons reply waiters) - resources-last-used)))) - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply (car available)) - (const #t)) - (wrap-operation (sleep-operation - put-message-timeout) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) + (define (spawn-fiber-for-checkout reply-channel resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel resource) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (spawn-fiber + (lambda () + (when idle-seconds + (spawn-fiber + (lambda () + (while #t + (sleep idle-seconds) + (put-message channel '(check-for-idle-resources)))))) + + (with-throw-handler #t + (lambda () + (let loop ((resources '()) + (available '()) + (waiters '()) + (resources-last-used '())) + + (match (get-message channel) + (('add-resource resource) + (if (= (length resources) max-size) + (begin + (spawn-fiber-to-destroy-resource resource) + + (loop resources + available + waiters + resources-last-used)) - (if checkout-success? - (loop resources - (cdr available) - waiters - resources-last-used) - (loop resources - available - waiters - resources-last-used))))) - (('return resource) (if (null? waiters) - (loop resources + (loop (cons resource resources) (cons resource available) waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation (last waiters) - resource) - (const #t)) - (wrap-operation (sleep-operation - put-message-timeout) - (const #f)))))) - (unless checkout-success? - (set! checkout-failure-count - (+ 1 checkout-failure-count))) + (cons (get-internal-real-time) + resources-last-used)) - (if checkout-success? - (loop resources - available - (drop-right! waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (begin - (for-each - (lambda (waiter) - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation waiter 'resource-pool-retry-checkout) - (sleep-operation 10)))))) - waiters) + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (last waiters) + resource) + (put-message (last waiters) resource)) - (loop resources - (cons resource available) - '() - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))))))) - (('stats reply) - (let ((stats - `((resources . ,(length resources)) - (available . ,(length available)) - (waiters . ,(length waiters)) - (checkout-failure-count . ,checkout-failure-count)))) + (loop (cons resource resources) + available + (drop-right! waiters 1) + (cons (get-internal-real-time) + resources-last-used)))))) + (('checkout reply) + (if (null? available) + (begin + (unless (= (length resources) max-size) + (spawn-fiber-to-return-new-resource)) + + (loop resources + available + (cons reply waiters) + resources-last-used)) + + (let ((resource (car available))) + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout reply resource) + (put-message reply resource)) + + (loop resources + (cdr available) + waiters + resources-last-used)))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (null? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) + + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (last waiters) + resource) + (put-message (last waiters) resource)) + + (loop resources + available + (drop-right! waiters 1) + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used))))) + + (('stats reply) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f))))))) + + (loop resources + available + waiters + resources-last-used)) + + (('check-for-idle-resources) + (let* ((resources-last-used-seconds + (map + (lambda (internal-time) + (/ (- (get-internal-real-time) internal-time) + internal-time-units-per-second)) + resources-last-used)) + (resources-to-destroy + (filter-map + (lambda (resource last-used-seconds) + (if (and (member resource available) + (> last-used-seconds idle-seconds)) + resource + #f)) + resources + resources-last-used-seconds))) + + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + resources-to-destroy) + + (loop (lset-difference eq? resources resources-to-destroy) + (lset-difference eq? available resources-to-destroy) + waiters + (filter-map + (lambda (resource last-used) + (if (memq resource resources-to-destroy) + #f + last-used)) + resources + resources-last-used)))) + + (('destroy reply) + (if (= (length resources) (length available)) + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + resources) + (put-message reply 'destroy-success)) + (begin (spawn-fiber (lambda () (perform-operation (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation - put-message-timeout) - (const #f))))))) - - (loop resources - available - waiters - resources-last-used)) - (('check-for-idle-resources) - (let* ((resources-last-used-seconds - (map - (lambda (internal-time) - (/ (- (get-internal-real-time) internal-time) - internal-time-units-per-second)) - resources-last-used)) - (resources-to-destroy - (filter-map - (lambda (resource last-used-seconds) - (if (and (member resource available) - (> last-used-seconds idle-seconds)) - resource - #f)) - resources - resources-last-used-seconds))) - - (for-each - (lambda (resource) - (destructor/safe resource)) - resources-to-destroy) - - (loop (lset-difference eq? resources resources-to-destroy) - (lset-difference eq? available resources-to-destroy) + (put-operation reply 'resource-pool-destroy-failed) + (sleep-operation 10))))) + (loop resources + available waiters - (filter-map - (lambda (resource last-used) - (if (memq resource resources-to-destroy) - #f - last-used)) - resources - resources-last-used)))) - (('destroy reply) - (if (= (length resources) (length available)) - (begin - (for-each - (lambda (resource) - (destructor/safe resource)) - resources) - (put-message reply 'destroy-success)) - (begin - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (put-operation reply 'resource-pool-destroy-failed) - (sleep-operation 10))))) - (loop resources - available - waiters - resources-last-used)))) - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop resources - available - waiters - resources-last-used))))) - #:unwind? #t))) - (or scheduler - (current-scheduler))) + resources-last-used)))) - (make-resource-pool-record name channel))) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters + resources-last-used))))) + (lambda (key . args) + (simple-format (current-error-port) + "exception in the ~A pool fiber\n" name)))) + (or scheduler + (current-scheduler))) + + pool) (define (destroy-resource-pool pool) (let ((reply (make-channel))) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index b6db98f..33f5a0c 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -15,6 +15,19 @@ res) 2)))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (lambda () + 2) + 1 + #:add-resources-parallelism 1))) + (assert-equal + (with-resource-from-pool resource-pool + res + res) + 2)))) + (let* ((error-constructor (record-constructor &resource-pool-timeout)) (err