Remove the resource pool reply timeout
By being smarter about how long to wait for replies.
This commit is contained in:
parent
a11cd24e57
commit
6337103525
2 changed files with 81 additions and 81 deletions
|
@ -85,7 +85,6 @@
|
||||||
lifetime
|
lifetime
|
||||||
scheduler
|
scheduler
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
(reply-timeout 1)
|
|
||||||
(add-resources-parallelism 1)
|
(add-resources-parallelism 1)
|
||||||
default-checkout-timeout)
|
default-checkout-timeout)
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
|
@ -103,7 +102,6 @@
|
||||||
(lifetime . ,lifetime)
|
(lifetime . ,lifetime)
|
||||||
(scheduler . ,scheduler)
|
(scheduler . ,scheduler)
|
||||||
(name . ,name)
|
(name . ,name)
|
||||||
(reply-timeout . ,reply-timeout)
|
|
||||||
(default-checkout-timeout . ,default-checkout-timeout))))
|
(default-checkout-timeout . ,default-checkout-timeout))))
|
||||||
|
|
||||||
(define checkout-failure-count 0)
|
(define checkout-failure-count 0)
|
||||||
|
@ -183,7 +181,9 @@
|
||||||
|
|
||||||
(loop))))))))
|
(loop))))))))
|
||||||
|
|
||||||
(define (spawn-fiber-for-checkout reply-channel resource)
|
(define (spawn-fiber-for-checkout reply-channel
|
||||||
|
reply-timeout
|
||||||
|
resource)
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(let ((checkout-success?
|
(let ((checkout-success?
|
||||||
|
@ -247,21 +247,27 @@
|
||||||
'()
|
'()
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
(begin
|
(match (last alive-waiters)
|
||||||
(if reply-timeout
|
((waiter-channel . waiter-timeout)
|
||||||
;; Don't sleep in this fiber, so spawn a new
|
(if waiter-timeout
|
||||||
;; fiber to handle handing over the
|
(let ((reply-timeout
|
||||||
;; resource, and returning it if there's a
|
(/ (- waiter-timeout
|
||||||
;; timeout
|
current-internal-time)
|
||||||
(spawn-fiber-for-checkout (car (last alive-waiters))
|
internal-time-units-per-second)))
|
||||||
resource)
|
;; Don't sleep in this fiber, so spawn
|
||||||
(put-message (car (last alive-waiters)) resource))
|
;; a new fiber to handle handing over
|
||||||
|
;; the resource, and returning it if
|
||||||
|
;; there's a timeout
|
||||||
|
(spawn-fiber-for-checkout waiter-channel
|
||||||
|
reply-timeout
|
||||||
|
resource))
|
||||||
|
(put-message waiter-channel resource))
|
||||||
|
|
||||||
(loop (cons resource resources)
|
(loop (cons resource resources)
|
||||||
available
|
available
|
||||||
(drop-right! alive-waiters 1)
|
(drop-right! alive-waiters 1)
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used))))))))
|
resources-last-used)))))))))
|
||||||
|
|
||||||
(('checkout reply timeout-time)
|
(('checkout reply timeout-time)
|
||||||
(if (null? available)
|
(if (null? available)
|
||||||
|
@ -275,27 +281,38 @@
|
||||||
waiters)
|
waiters)
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
|
|
||||||
;; If this client is still waiting
|
(if timeout-time
|
||||||
(if (> timeout-time
|
(let ((current-internal-time
|
||||||
(get-internal-real-time))
|
(get-internal-real-time)))
|
||||||
(let ((resource (car available)))
|
;; If this client is still waiting
|
||||||
(if reply-timeout
|
(if (> timeout-time
|
||||||
;; Don't sleep in this fiber, so spawn a
|
current-internal-time)
|
||||||
;; new fiber to handle handing over the
|
(let ((reply-timeout
|
||||||
;; resource, and returning it if there's a
|
(/ (- timeout-time
|
||||||
;; timeout
|
current-internal-time)
|
||||||
(spawn-fiber-for-checkout reply
|
internal-time-units-per-second)))
|
||||||
resource)
|
|
||||||
(put-message reply resource))
|
;; Don't sleep in this fiber, so spawn a new
|
||||||
|
;; fiber to handle handing over the resource,
|
||||||
|
;; and returning it if there's a timeout
|
||||||
|
(spawn-fiber-for-checkout reply
|
||||||
|
reply-timeout
|
||||||
|
(car available))
|
||||||
|
(loop resources
|
||||||
|
(cdr available)
|
||||||
|
waiters
|
||||||
|
resources-last-used))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters
|
||||||
|
resources-last-used)))
|
||||||
|
(begin
|
||||||
|
(put-message reply (car available))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
(cdr available)
|
(cdr available)
|
||||||
waiters
|
waiters
|
||||||
resources-last-used))
|
resources-last-used)))))
|
||||||
(loop resources
|
|
||||||
available
|
|
||||||
waiters
|
|
||||||
resources-last-used))))
|
|
||||||
|
|
||||||
(((and (or 'return
|
(((and (or 'return
|
||||||
'return-failed-checkout)
|
'return-failed-checkout)
|
||||||
|
@ -342,27 +359,33 @@
|
||||||
resources)
|
resources)
|
||||||
(get-internal-real-time)))
|
(get-internal-real-time)))
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
(begin
|
(match (last alive-waiters)
|
||||||
(if reply-timeout
|
((waiter-channel . waiter-timeout)
|
||||||
;; Don't sleep in this fiber, so spawn a new
|
(if waiter-timeout
|
||||||
;; fiber to handle handing over the
|
(let ((reply-timeout
|
||||||
;; resource, and returning it if there's a
|
(/ (- waiter-timeout
|
||||||
;; timeout
|
current-internal-time)
|
||||||
(spawn-fiber-for-checkout (car (last alive-waiters))
|
internal-time-units-per-second)))
|
||||||
resource)
|
;; Don't sleep in this fiber, so spawn a
|
||||||
(put-message (car (last alive-waiters)) resource))
|
;; new fiber to handle handing over the
|
||||||
|
;; resource, and returning it if there's a
|
||||||
|
;; timeout
|
||||||
|
(spawn-fiber-for-checkout waiter-channel
|
||||||
|
reply-timeout
|
||||||
|
resource))
|
||||||
|
(put-message waiter-channel resource))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
available
|
available
|
||||||
(drop-right! alive-waiters 1)
|
(drop-right! alive-waiters 1)
|
||||||
(begin
|
(begin
|
||||||
(list-set!
|
(list-set!
|
||||||
resources-last-used
|
resources-last-used
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)
|
resources)
|
||||||
(get-internal-real-time))
|
(get-internal-real-time))
|
||||||
resources-last-used)))))))
|
resources-last-used))))))))
|
||||||
|
|
||||||
(('remove resource)
|
(('remove resource)
|
||||||
(let ((index
|
(let ((index
|
||||||
|
@ -405,8 +428,7 @@
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
(put-operation reply stats)
|
(put-operation reply stats)
|
||||||
(const #t))
|
(const #t))
|
||||||
(wrap-operation (sleep-operation (or reply-timeout
|
(wrap-operation (sleep-operation 5)
|
||||||
5))
|
|
||||||
(const #f)))))))
|
(const #f)))))))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
|
@ -572,10 +594,6 @@ available. Return the resource once PROC has returned."
|
||||||
'default-checkout-timeout)
|
'default-checkout-timeout)
|
||||||
timeout))
|
timeout))
|
||||||
|
|
||||||
(define resource-pool-reply-timeout
|
|
||||||
(assq-ref (resource-pool-configuration pool)
|
|
||||||
'reply-timeout))
|
|
||||||
|
|
||||||
(let ((resource
|
(let ((resource
|
||||||
(if timeout-or-default
|
(if timeout-or-default
|
||||||
(let loop ((reply (make-channel))
|
(let loop ((reply (make-channel))
|
||||||
|
@ -622,26 +640,8 @@ available. Return the resource once PROC has returned."
|
||||||
(put-message (resource-pool-channel pool)
|
(put-message (resource-pool-channel pool)
|
||||||
(list 'checkout
|
(list 'checkout
|
||||||
reply
|
reply
|
||||||
(if resource-pool-reply-timeout
|
#f))
|
||||||
(+ (get-internal-real-time)
|
(get-message reply)))))
|
||||||
(* resource-pool-reply-timeout
|
|
||||||
internal-time-units-per-second))
|
|
||||||
#f)))
|
|
||||||
(if resource-pool-reply-timeout
|
|
||||||
(let ((resource-or-timeout
|
|
||||||
(perform-operation
|
|
||||||
(choice-operation
|
|
||||||
(get-operation reply)
|
|
||||||
(wrap-operation
|
|
||||||
(sleep-operation resource-pool-reply-timeout)
|
|
||||||
(const 'resource-pool-reply-timeout))))))
|
|
||||||
(if (or (eq? resource-or-timeout
|
|
||||||
'resource-pool-reply-timeout)
|
|
||||||
(eq? resource-or-timeout
|
|
||||||
'resource-pool-retry-checkout))
|
|
||||||
(loop (make-channel))
|
|
||||||
resource-or-timeout))
|
|
||||||
(get-message reply))))))
|
|
||||||
|
|
||||||
(when (not resource)
|
(when (not resource)
|
||||||
(when timeout-handler
|
(when timeout-handler
|
||||||
|
|
|
@ -129,7 +129,7 @@
|
||||||
(error "collision detected")))
|
(error "collision detected")))
|
||||||
(new-number))
|
(new-number))
|
||||||
1
|
1
|
||||||
#:reply-timeout #f)))
|
#:default-checkout-timeout 120)))
|
||||||
(fibers-batch-for-each
|
(fibers-batch-for-each
|
||||||
(lambda _
|
(lambda _
|
||||||
(with-resource-from-pool
|
(with-resource-from-pool
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue