diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 3638500..301abbd 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -21,7 +21,6 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) - #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 q) #:use-module (ice-9 match) @@ -82,8 +81,7 @@ (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) - (channel resource-pool-channel - set-resource-pool-channel!) + (channel resource-pool-channel) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) @@ -95,6 +93,14 @@ (resource-pool-name resource-pool)) port))) +(define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + (define (safe-deq q) (if (null? (car q)) #f @@ -105,51 +111,11 @@ (set-car! q next) it))) -(define-record-type - (make-resource-details value checkout-count last-used) - resource-details? - (value resource-details-value) - (checkout-count resource-details-checkout-count - set-resource-details-checkout-count!) - (last-used resource-details-last-used - set-resource-details-last-used!)) - -(define-inlinable (increment-resource-checkout-count! resource) - (set-resource-details-checkout-count! - resource - (1+ (resource-details-checkout-count resource)))) - -(define-inlinable (decrement-resource-checkout-count! resource) - (set-resource-details-checkout-count! - resource - (1+ (resource-details-checkout-count resource)))) - -(define (spawn-fiber-for-checkout channel - reply-channel - reply-timeout - resource-id - resource) - (spawn-fiber - (lambda () - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply-channel - (list 'success resource-id resource)) - (const #t)) - (wrap-operation (sleep-operation - reply-timeout) - (const #f)))))) - (unless checkout-success? - (put-message - channel - (list 'return-failed-checkout resource-id))))))) - -(define* (make-fixed-size-resource-pool resources-list-or-vector +(define* (make-fixed-size-resource-pool resources #:key (delay-logger (const #f)) (duration-logger (const #f)) + destructor scheduler (name "unnamed") default-checkout-timeout @@ -165,6 +131,7 @@ destroy-condition `((delay-logger . ,delay-logger) (duration-logger . ,duration-logger) + (destructor . ,destructor) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) @@ -172,24 +139,58 @@ (define checkout-failure-count 0) - (define resources - (vector-map - (lambda (_ resource) - (make-resource-details - resource - 0 - #f)) - (if (vector? resources-list-or-vector) - resources-list-or-vector - (list->vector resources-list-or-vector)))) + (define (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (let loop () + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) - (define (destroy-loop) - (define (empty?) - (vector-every (lambda (r) - (eq? r #f)) - resources)) + (if success? + (put-message channel + (list 'remove resource)) + (begin + (sleep 5) - (let loop () + (loop)))))))) + + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) (match (get-message channel) (('checkout reply timeout-time max-waiters) (spawn-fiber @@ -210,27 +211,40 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop)) + (loop resources)) (((and (or 'return - 'return-failed-checkout) + 'return-failed-checkout + 'remove) return-type) - resource-id) - (vector-set! resources - resource-id - #f) + resource) + (when (and (not (eq? return-type 'remove)) + destructor) + (spawn-fiber-to-destroy-resource resource)) - (if (empty?) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) - ;; No loop - *unspecified*) - (loop))) + ;; No loop + *unspecified*) + (loop new-resources))))) (('stats reply timeout-time) (let ((stats - `((resources . ,(vector-length resources)) + `((resources . ,(length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) @@ -249,20 +263,21 @@ internal-time-units-per-second))) op)))))) - (loop)) + (loop resources)) - (('destroy) - (loop)) + (('destroy reply) + (loop resources)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop))))) + (loop resources))))) (define (main-loop) - (let loop ((available (iota (vector-length resources))) + (let loop ((resources resources) + (available resources) (waiters (make-q))) (match (get-message channel) @@ -292,9 +307,11 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop available + (loop resources + available waiters)) - (loop available + (loop resources + available (enq! waiters (cons reply timeout-time))))) (if timeout-time @@ -306,45 +323,32 @@ (let ((reply-timeout (/ (- timeout-time current-internal-time) - internal-time-units-per-second)) - (resource-id - new-available - (car+cdr available))) + internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value - (vector-ref resources - resource-id))) - (loop new-available + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) waiters)) - (loop available + (loop resources + available waiters))) - (let* ((resource-id - next-available - (car+cdr available)) - (resource-details - (vector-ref resources - resource-id))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))) + (begin + (put-message reply (cons 'success + (car available))) - (loop next-available + (loop resources + (cdr available) waiters))))) (((and (or 'return 'return-failed-checkout) return-type) - resource-id) + resource) (when (eq? 'return-failed-checkout return-type) @@ -356,7 +360,8 @@ (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (loop (cons resource-id available) + (loop resources + (cons resource available) waiters)) ((reply . timeout) (if (and timeout @@ -371,21 +376,13 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value - (vector-ref resources - resource-id)))) - (put-message reply - (list 'success - resource-id - (resource-details-value - (vector-ref resources - resource-id)))))) - (loop available + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available waiters)))))) (('list-resources reply) @@ -393,12 +390,13 @@ (lambda () (put-message reply (list-copy resources)))) - (loop available + (loop resources + available waiters)) (('stats reply timeout-time) (let ((stats - `((resources . ,(vector-length resources)) + `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) @@ -417,46 +415,62 @@ internal-time-units-per-second))) op)))))) - (loop available + (loop resources + available waiters)) (('destroy) - (let ((current-internal-time (get-internal-real-time))) - ;; Notify all waiters that the pool has been destroyed - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) + (if (and (null? resources) + (q-empty? waiters)) + (signal-condition! + destroy-condition) - (if (= (vector-length resources) - (length available)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) - ;; No loop - *unspecified*) - (destroy-loop)))) + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) (unknown (simple-format @@ -464,7 +478,8 @@ "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop available + (loop resources + available waiters))))) (spawn-fiber @@ -536,12 +551,6 @@ (define checkout-failure-count 0) - (define resources - (make-hash-table)) - - (define-inlinable (count-resources resources) - (hash-count (const #t) resources)) - (define return-new-resource/parallelism-limiter (make-parallelism-limiter (or add-resources-parallelism @@ -566,8 +575,9 @@ (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) - (size (count-resources resources))) - (unless (>= size max-size) + (size (assq-ref (resource-pool-stats pool #:timeout #f) + 'resources))) + (unless (= size max-size) (with-exception-handler (lambda _ #f) (lambda () @@ -588,51 +598,64 @@ #:unwind? #t))))) #:unwind? #t)))) - (define (spawn-fiber-to-destroy-resource resource-id resource-details) + (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () (let loop () - (let* ((resource - (resource-details-value resource-details)) - (success? - (with-exception-handler - (lambda _ #f) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A\n" - name - destructor) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (destructor resource)) - #t))) - #:unwind? #t))) + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) (if success? (put-message channel - (list 'remove resource-id)) + (list 'remove resource)) (begin (sleep 5) (loop)))))))) - (define (destroy-loop resources next-resource-id) - (let loop ((next-resource-id next-resource-id)) + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) (match (get-message channel) (('add-resource resource) - (if destructor - (begin - (spawn-fiber-to-destroy-resource next-resource-id - resource) - (hash-set! resources next-resource-id resource) - - (loop (1+ next-resource-id))) - (loop next-resource-id))) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + (loop resources)) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () @@ -652,31 +675,43 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop next-resource-id)) + (loop resources)) (((and (or 'return 'return-failed-checkout 'remove) return-type) - resource-id) + resource) (when (and (not (eq? return-type 'remove)) destructor) - (spawn-fiber-to-destroy-resource - resource-id - (hash-ref resources resource-id))) + (spawn-fiber-to-destroy-resource resource)) - (hash-remove! resources resource-id) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (and=> return-new-resource/parallelism-limiter + destroy-parallelism-limiter) - (if (= 0 (count-resources resources)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) - ;; No loop - *unspecified*) - (loop next-resource-id))) (('stats reply timeout-time) (let ((stats - `((resources . ,(count-resources resources)) + `((resources . ,(length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) @@ -695,63 +730,59 @@ internal-time-units-per-second))) op)))))) - (loop next-resource-id)) + (loop resources)) (('check-for-idle-resources) - (loop next-resource-id)) + (loop resources)) - (('destroy) - (loop next-resource-id)) + (('destroy reply) + (loop resources)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop next-resource-id))))) + (loop resources))))) (define (main-loop) - (let loop ((next-resource-id 0) + (let loop ((resources '()) (available '()) - (waiters (make-q))) + (waiters (make-q)) + (resources-checkout-count '()) + (resources-last-used '())) (match (get-message channel) (('add-resource resource) - (if (= (count-resources resources) max-size) - (if destructor - (begin - (hash-set! resources - next-resource-id - (make-resource-details - resource - 0 - (get-internal-real-time))) - (spawn-fiber-to-destroy-resource next-resource-id - resource) + (if (= (length resources) max-size) + (begin + (if destructor + (begin + (spawn-fiber-to-destroy-resource resource) - (loop (1+ next-resource-id) + (loop (cons resource resources) + available + waiters + (cons 0 resources-checkout-count) + (cons (get-internal-real-time) + resources-last-used))) + (loop resources available - waiters)) - (loop next-resource-id - available - waiters)) + waiters + resources-checkout-count + resources-last-used))) - (let* ((current-internal-time - (get-internal-real-time)) - (resource-details - (make-resource-details - resource - 0 - current-internal-time))) - (hash-set! resources - next-resource-id - resource-details) + (let ((current-internal-time + (get-internal-real-time))) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (loop (1+ next-resource-id) - (cons resource-details available) - waiters)) + (loop (cons resource resources) + (cons resource available) + waiters + (cons 0 resources-checkout-count) + (cons current-internal-time + resources-last-used))) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) @@ -765,24 +796,22 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout channel - reply + (spawn-fiber-for-checkout reply reply-timeout - next-resource-id resource)) - (put-message reply (list 'success - next-resource-id + (put-message reply (cons 'success resource)))) - (set-resource-details-checkout-count! resource-details - 1) - (loop (1+ next-resource-id) + (loop (cons resource resources) available - waiters))))))) + waiters + (cons 1 resources-checkout-count) + (cons current-internal-time + resources-last-used)))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin - (unless (= (count-resources resources) max-size) + (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count @@ -809,12 +838,16 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop next-resource-id + (loop resources available - waiters)) - (loop next-resource-id + waiters + resources-checkout-count + resources-last-used)) + (loop resources available - (enq! waiters (cons reply timeout-time)))))) + (enq! waiters (cons reply timeout-time)) + resources-checkout-count + resources-last-used)))) (if timeout-time (let ((current-internal-time @@ -822,55 +855,59 @@ ;; If this client is still waiting (if (> timeout-time current-internal-time) - (let* ((reply-timeout - (/ (- timeout-time - current-internal-time) - internal-time-units-per-second)) - (resource-id - (car available)) - (resource-details - (hash-ref resources resource-id))) - - (increment-resource-checkout-count! - resource-details) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout - (spawn-fiber-for-checkout channel - reply + (spawn-fiber-for-checkout reply reply-timeout - resource-id - (resource-details-value - resource-details)) - (loop next-resource-id + (car available)) + (loop resources (cdr available) - waiters)) - (loop next-resource-id + waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) + resources-last-used)) + (loop resources available - waiters))) - (let* ((resource-id - next-available - (car+cdr available)) - (resource-details - (hash-ref resources - resource-id))) - (increment-resource-checkout-count! resource-details) + waiters + resources-checkout-count + resources-last-used))) + (begin + (put-message reply (cons 'success + (car available))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))) - - (loop next-resource-id - next-available - waiters))))) + (loop resources + (cdr available) + waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) + resources-last-used))))) (((and (or 'return 'return-failed-checkout) return-type) - resource-id) + resource) (when (eq? 'return-failed-checkout return-type) @@ -879,30 +916,44 @@ (let ((current-internal-time (get-internal-real-time)) - (resource-details - (hash-ref resources resource-id))) + (resource-index + (list-index (lambda (x) + (eq? x resource)) + resources))) (if (and lifetime - (>= (resource-details-checkout-count resource-details) + (>= (list-ref resources-checkout-count + resource-index) lifetime)) (begin - (spawn-fiber-to-destroy-resource resource-id - resource-details) - (loop next-resource-id + (spawn-fiber-to-destroy-resource resource) + (loop resources available - waiters)) + waiters + resources-checkout-count + resources-last-used)) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (if (eq? 'return-failed-checkout - return-type) - (decrement-resource-checkout-count! resource-details) - (set-resource-details-last-used! - resource-details - current-internal-time)) - - (loop next-resource-id - (cons resource-id available) - waiters)) + (loop resources + (cons resource available) + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + resource-index + current-internal-time)) + resources-last-used))) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) @@ -916,74 +967,85 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value resource-details))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))))) - - (set-resource-details-last-used! resource-details - current-internal-time) - (when (eq? 'return-failed-checkout - return-type) - (decrement-resource-checkout-count! resource-details)) - - (loop next-resource-id + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources available - waiters))))))) + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (list-set! + resources-last-used + resource-index + current-internal-time) + resources-last-used)))))))) - (('remove resource-id) - (hash-remove! resources - resource-id) + (('remove resource) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) - (when (and (not (q-empty? waiters)) - (< (- (count-resources resources) 1) - max-size)) - (spawn-fiber-to-return-new-resource)) + (when (and (not (q-empty? waiters)) + (< (- (length resources) 1) + max-size)) + (spawn-fiber-to-return-new-resource)) - (loop next-resource-id - available ; resource shouldn't be in this list - waiters)) + (loop (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)) + available ; resource shouldn't be in this list + waiters + (remove-at-index! + resources-checkout-count + index) + (remove-at-index! + resources-last-used + index)))) - (('destroy resource-id) - (let ((resource-details - (hash-ref resources - resource-id))) - (spawn-fiber-to-destroy-resource resource-id - resource-details) + (('destroy resource) + (spawn-fiber-to-destroy-resource resource) - (loop next-resource-id - available - waiters))) + (loop resources + available + waiters + resources-checkout-count + resources-last-used)) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (list-copy resources)))) - (loop next-resource-id + (loop resources available - waiters)) + waiters + resources-checkout-count + resources-last-used)) (('stats reply timeout-time) (let ((stats - `((resources . ,(count-resources resources)) + `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) - (resources-checkout-count - . ,(hash-fold - (lambda (_ resource-details result) - (cons (resource-details-checkout-count - resource-details) - result)) - '() - resources)) + (resources-checkout-count . ,resources-checkout-count) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1000,125 +1062,125 @@ internal-time-units-per-second))) op)))))) - (loop next-resource-id + (loop resources available - waiters)) + waiters + resources-checkout-count + resources-last-used)) (('check-for-idle-resources) - (let* ((internal-real-time - (get-internal-real-time)) - (candidate-resource-ids-to-destroy + (let* ((resources-last-used-seconds + (map + (lambda (internal-time) + (/ (- (get-internal-real-time) internal-time) + internal-time-units-per-second)) + resources-last-used)) + (candidate-resources-to-destroy (filter-map - (lambda (resource-id) - (let ((resource-details - (hash-ref resources resource-id))) - (if (> (/ (- internal-real-time - (resource-details-last-used - resource-details)) - internal-time-units-per-second) - idle-seconds) - resource-id - #f))) - available)) - (max-resources-to-destroy - (max 0 - (- (count-resources resources) - min-size))) - (resources-to-destroy - (take candidate-resource-ids-to-destroy - (min max-resources-to-destroy - (length candidate-resource-ids-to-destroy))))) - (when destructor - (for-each - (lambda (resource-id) - (spawn-fiber-to-destroy-resource - resource-id - (hash-ref resources resource-id))) - resources-to-destroy)) + (lambda (resource last-used-seconds) + (if (and (member resource available) + (> last-used-seconds idle-seconds)) + resource + #f)) + resources + resources-last-used-seconds))) - (loop next-resource-id - (lset-difference = available resources-to-destroy) - waiters))) + (let* ((available-resources-to-destroy + (lset-intersection eq? + available + candidate-resources-to-destroy)) + (max-resources-to-destroy + (max 0 + (- (length resources) + min-size))) + (resources-to-destroy + (take available-resources-to-destroy + (min max-resources-to-destroy + (length available-resources-to-destroy))))) + (when destructor + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + resources-to-destroy)) + + (loop resources + (lset-difference eq? available resources-to-destroy) + waiters + resources-checkout-count + resources-last-used)))) (('destroy) - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) + (if (and (null? resources) + (q-empty? waiters)) + (signal-condition! + destroy-condition) - (when destructor - (for-each - (lambda (resource-id) - (spawn-fiber-to-destroy-resource - resource-id - (hash-ref resources - resource-id))) - available)) - - ;; Do this in parallel to avoid deadlocks between the - ;; limiter and returning new resources to this pool - (and=> return-new-resource/parallelism-limiter - (lambda (limiter) - (spawn-fiber - (lambda () - (destroy-parallelism-limiter limiter))))) - - (if (or (= 0 (count-resources resources)) - (not destructor)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (destroy-loop resources next-resource-id)))) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop next-resource-id + (loop resources available - waiters))))) + waiters + resources-checkout-count + resources-last-used))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () - (let loop () - (put-message channel '(check-for-idle-resources)) - (when (choice-operation - (wrap-operation - (sleep-operation idle-seconds) - (const #t)) - (wrap-operation - (wait-operation destroy-condition) - (const #f))) - (loop)))))) + (while #t + (sleep idle-seconds) + (put-message channel '(check-for-idle-resources)))))) (with-exception-handler (lambda (exn) @@ -1253,10 +1315,6 @@ available. Return the resource once PROC has returned." 'default-max-waiters) max-waiters)) - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) @@ -1324,7 +1382,7 @@ available. Return the resource once PROC has returned." (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) - (('success resource-id resource-value) + (('success . resource) (call-with-values (lambda () (with-exception-handler @@ -1333,12 +1391,12 @@ available. Return the resource once PROC has returned." ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message - channel + (resource-pool-channel pool) (list (if (or destroy-resource-on-exception? (resource-pool-destroy-resource-exception? exn)) 'destroy 'return) - resource-id)) + resource)) (raise-exception exn)) (lambda () (with-exception-handler @@ -1356,11 +1414,11 @@ available. Return the resource once PROC has returned." exn (make-knots-exception stack))))) (lambda () - (proc resource-value)))) + (proc resource)))) #:unwind? #t)) (lambda vals - (put-message channel - `(return ,resource-id)) + (put-message (resource-pool-channel pool) + `(return ,resource)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) @@ -1369,13 +1427,6 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (define channel - (resource-pool-channel pool)) - - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - (if timeout (let* ((reply (make-channel)) (start-time (get-internal-real-time)) @@ -1385,7 +1436,7 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation channel + (put-operation (resource-pool-channel pool) `(stats ,reply ,timeout-time)) (const #t)) (wrap-operation (sleep-operation timeout) @@ -1409,18 +1460,11 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error pool))))) (let ((reply (make-channel))) - (put-message channel + (put-message (resource-pool-channel pool) `(stats ,reply #f)) (get-message reply)))) (define (resource-pool-list-resources pool) - (define channel - (resource-pool-channel pool)) - - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'list-resources reply)) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 2e30cb9..3999dde 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -1,33 +1,9 @@ (use-modules (tests) (fibers) - (fibers channels) (unit-test) (knots parallelism) (knots resource-pool)) -(run-fibers-for-tests - (lambda () - (let ((parallelism-limiter (make-parallelism-limiter - 1))) - (with-parallelism-limiter parallelism-limiter - #f) - - (destroy-parallelism-limiter parallelism-limiter)))) - -(run-fibers-for-tests - (lambda () - (let ((parallelism-limiter (make-parallelism-limiter - 1)) - (channel - (make-channel))) - (spawn-fiber - (lambda () - (with-parallelism-limiter parallelism-limiter - (put-message channel #t) - (sleep 1)))) - (get-message channel) - (destroy-parallelism-limiter parallelism-limiter)))) - (define new-number (let ((val 0)) (lambda ()