diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 8f7bcc9..8bfef2b 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -85,7 +85,6 @@ lifetime scheduler (name "unnamed") - (reply-timeout 1) (add-resources-parallelism 1) default-checkout-timeout) (define channel (make-channel)) @@ -103,7 +102,6 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (reply-timeout . ,reply-timeout) (default-checkout-timeout . ,default-checkout-timeout)))) (define checkout-failure-count 0) @@ -183,7 +181,9 @@ (loop)))))))) - (define (spawn-fiber-for-checkout reply-channel resource) + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) (spawn-fiber (lambda () (let ((checkout-success? @@ -247,21 +247,27 @@ '() (cons (get-internal-real-time) resources-last-used)) - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (car (last alive-waiters)) - resource) - (put-message (car (last alive-waiters)) resource)) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn + ;; a new fiber to handle handing over + ;; the resource, and returning it if + ;; there's a timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel resource)) - (loop (cons resource resources) - available - (drop-right! alive-waiters 1) - (cons (get-internal-real-time) - resources-last-used)))))))) + (loop (cons resource resources) + available + (drop-right! alive-waiters 1) + (cons (get-internal-real-time) + resources-last-used))))))))) (('checkout reply timeout-time) (if (null? available) @@ -275,27 +281,38 @@ waiters) resources-last-used)) - ;; If this client is still waiting - (if (> timeout-time - (get-internal-real-time)) - (let ((resource (car available))) - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout reply - resource) - (put-message reply resource)) + (if timeout-time + (let ((current-internal-time + (get-internal-real-time))) + ;; If this client is still waiting + (if (> timeout-time + current-internal-time) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) + + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the resource, + ;; and returning it if there's a timeout + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) + waiters + resources-last-used)) + (loop resources + available + waiters + resources-last-used))) + (begin + (put-message reply (car available)) (loop resources (cdr available) waiters - resources-last-used)) - (loop resources - available - waiters - resources-last-used)))) + resources-last-used))))) (((and (or 'return 'return-failed-checkout) @@ -342,27 +359,33 @@ resources) (get-internal-real-time))) resources-last-used)) - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (car (last alive-waiters)) - resource) - (put-message (car (last alive-waiters)) resource)) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel resource)) - (loop resources - available - (drop-right! alive-waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))))))) + (loop resources + available + (drop-right! alive-waiters 1) + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)))))))) (('remove resource) (let ((index @@ -405,8 +428,7 @@ (wrap-operation (put-operation reply stats) (const #t)) - (wrap-operation (sleep-operation (or reply-timeout - 5)) + (wrap-operation (sleep-operation 5) (const #f))))))) (loop resources @@ -572,10 +594,6 @@ available. Return the resource once PROC has returned." 'default-checkout-timeout) timeout)) - (define resource-pool-reply-timeout - (assq-ref (resource-pool-configuration pool) - 'reply-timeout)) - (let ((resource (if timeout-or-default (let loop ((reply (make-channel)) @@ -622,26 +640,8 @@ available. Return the resource once PROC has returned." (put-message (resource-pool-channel pool) (list 'checkout reply - (if resource-pool-reply-timeout - (+ (get-internal-real-time) - (* resource-pool-reply-timeout - internal-time-units-per-second)) - #f))) - (if resource-pool-reply-timeout - (let ((resource-or-timeout - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation - (sleep-operation resource-pool-reply-timeout) - (const 'resource-pool-reply-timeout)))))) - (if (or (eq? resource-or-timeout - 'resource-pool-reply-timeout) - (eq? resource-or-timeout - 'resource-pool-retry-checkout)) - (loop (make-channel)) - resource-or-timeout)) - (get-message reply)))))) + #f)) + (get-message reply))))) (when (not resource) (when timeout-handler diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index cf108de..1251429 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -129,7 +129,7 @@ (error "collision detected"))) (new-number)) 1 - #:reply-timeout #f))) + #:default-checkout-timeout 120))) (fibers-batch-for-each (lambda _ (with-resource-from-pool