Simplify using the waiters queue in the resource pool
Use a custom dequeue procedure that returns #f rather than raising an exception on an empty queue.
This commit is contained in:
parent
52092e7a99
commit
86fb460d6a
1 changed files with 120 additions and 148 deletions
|
|
@ -101,6 +101,16 @@
|
||||||
start
|
start
|
||||||
(cdr end))))
|
(cdr end))))
|
||||||
|
|
||||||
|
(define (safe-deq q)
|
||||||
|
(if (null? (car q))
|
||||||
|
#f
|
||||||
|
(let ((it (caar q))
|
||||||
|
(next (cdar q)))
|
||||||
|
(if (null? next)
|
||||||
|
(set-cdr! q #f))
|
||||||
|
(set-car! q next)
|
||||||
|
it)))
|
||||||
|
|
||||||
(define* (make-fixed-size-resource-pool resources
|
(define* (make-fixed-size-resource-pool resources
|
||||||
#:key
|
#:key
|
||||||
(delay-logger (const #f))
|
(delay-logger (const #f))
|
||||||
|
|
@ -345,45 +355,35 @@
|
||||||
(set! checkout-failure-count
|
(set! checkout-failure-count
|
||||||
(+ 1 checkout-failure-count)))
|
(+ 1 checkout-failure-count)))
|
||||||
|
|
||||||
(if (q-empty? waiters)
|
(let ((current-internal-time
|
||||||
(loop resources
|
(get-internal-real-time)))
|
||||||
(cons resource available)
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||||
waiters)
|
(match waiter
|
||||||
|
(#f
|
||||||
(let ((current-internal-time
|
(loop resources
|
||||||
(get-internal-real-time)))
|
(cons resource available)
|
||||||
(with-exception-handler
|
waiters))
|
||||||
(lambda (exn)
|
((reply . timeout)
|
||||||
(if (eq? (exception-kind exn) 'q-empty)
|
(if (and timeout
|
||||||
(loop resources
|
(< timeout current-internal-time))
|
||||||
(cons resource available)
|
(waiter-loop (safe-deq waiters))
|
||||||
waiters)
|
(if timeout
|
||||||
(raise-exception exn)))
|
(let ((reply-timeout
|
||||||
(lambda ()
|
(/ (- timeout
|
||||||
(let waiter-loop ((waiter (deq! waiters)))
|
current-internal-time)
|
||||||
(match waiter
|
internal-time-units-per-second)))
|
||||||
((reply . timeout)
|
;; Don't sleep in this fiber, so spawn a
|
||||||
(if (and timeout
|
;; new fiber to handle handing over the
|
||||||
(< timeout current-internal-time))
|
;; resource, and returning it if there's
|
||||||
(waiter-loop (deq! waiters))
|
;; a timeout
|
||||||
(if timeout
|
(spawn-fiber-for-checkout reply
|
||||||
(let ((reply-timeout
|
reply-timeout
|
||||||
(/ (- timeout
|
resource))
|
||||||
current-internal-time)
|
(put-message reply (cons 'success
|
||||||
internal-time-units-per-second)))
|
resource))))
|
||||||
;; Don't sleep in this fiber, so spawn a
|
(loop resources
|
||||||
;; new fiber to handle handing over the
|
available
|
||||||
;; resource, and returning it if there's
|
waiters))))))
|
||||||
;; a timeout
|
|
||||||
(spawn-fiber-for-checkout reply
|
|
||||||
reply-timeout
|
|
||||||
resource))
|
|
||||||
(put-message reply (cons 'success
|
|
||||||
resource))))))))
|
|
||||||
#:unwind? #t)
|
|
||||||
(loop resources
|
|
||||||
available
|
|
||||||
waiters))))
|
|
||||||
|
|
||||||
(('list-resources reply)
|
(('list-resources reply)
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
|
|
@ -770,51 +770,39 @@
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used))))
|
resources-last-used))))
|
||||||
|
|
||||||
(if (q-empty? waiters)
|
(let ((current-internal-time
|
||||||
(loop (cons resource resources)
|
(get-internal-real-time)))
|
||||||
(cons resource available)
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||||
waiters
|
(match waiter
|
||||||
(cons (get-internal-real-time)
|
(#f
|
||||||
resources-last-used))
|
(loop (cons resource resources)
|
||||||
|
(cons resource available)
|
||||||
(let ((current-internal-time
|
waiters
|
||||||
(get-internal-real-time)))
|
(cons current-internal-time
|
||||||
(with-exception-handler
|
resources-last-used)))
|
||||||
(lambda (exn)
|
((reply . timeout)
|
||||||
(if (eq? (exception-kind exn) 'q-empty)
|
(if (and timeout
|
||||||
(loop (cons resource resources)
|
(< timeout current-internal-time))
|
||||||
(cons resource available)
|
(waiter-loop (safe-deq waiters))
|
||||||
waiters
|
(if timeout
|
||||||
(cons current-internal-time
|
(let ((reply-timeout
|
||||||
resources-last-used))
|
(/ (- timeout
|
||||||
(raise-exception exn)))
|
current-internal-time)
|
||||||
(lambda ()
|
internal-time-units-per-second)))
|
||||||
(let waiter-loop ((waiter (deq! waiters)))
|
;; Don't sleep in this fiber, so spawn a
|
||||||
(match waiter
|
;; new fiber to handle handing over the
|
||||||
((reply . timeout)
|
;; resource, and returning it if there's
|
||||||
(if (and timeout
|
;; a timeout
|
||||||
(< timeout current-internal-time))
|
(spawn-fiber-for-checkout reply
|
||||||
(waiter-loop (deq! waiters))
|
reply-timeout
|
||||||
(if timeout
|
resource))
|
||||||
(let ((reply-timeout
|
(put-message reply (cons 'success
|
||||||
(/ (- timeout
|
resource))))
|
||||||
current-internal-time)
|
(loop (cons resource resources)
|
||||||
internal-time-units-per-second)))
|
available
|
||||||
;; Don't sleep in this fiber, so spawn a
|
waiters
|
||||||
;; new fiber to handle handing over the
|
(cons current-internal-time
|
||||||
;; resource, and returning it if there's
|
resources-last-used))))))))
|
||||||
;; a timeout
|
|
||||||
(spawn-fiber-for-checkout reply
|
|
||||||
reply-timeout
|
|
||||||
resource))
|
|
||||||
(put-message reply (cons 'success
|
|
||||||
resource))))))))
|
|
||||||
#:unwind? #t)
|
|
||||||
(loop (cons resource resources)
|
|
||||||
available
|
|
||||||
waiters
|
|
||||||
(cons current-internal-time
|
|
||||||
resources-last-used))))))
|
|
||||||
|
|
||||||
(('checkout reply timeout-time max-waiters)
|
(('checkout reply timeout-time max-waiters)
|
||||||
(if (null? available)
|
(if (null? available)
|
||||||
|
|
@ -899,76 +887,60 @@
|
||||||
(set! checkout-failure-count
|
(set! checkout-failure-count
|
||||||
(+ 1 checkout-failure-count)))
|
(+ 1 checkout-failure-count)))
|
||||||
|
|
||||||
(if (q-empty? waiters)
|
(let ((current-internal-time
|
||||||
(loop resources
|
(get-internal-real-time))
|
||||||
(cons resource available)
|
(resource-index
|
||||||
waiters
|
(list-index (lambda (x)
|
||||||
(begin
|
(eq? x resource))
|
||||||
(list-set!
|
resources)))
|
||||||
resources-last-used
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||||
(list-index (lambda (x)
|
(match waiter
|
||||||
(eq? x resource))
|
(#f
|
||||||
resources)
|
(loop resources
|
||||||
(get-internal-real-time))
|
(cons resource available)
|
||||||
resources-last-used))
|
waiters
|
||||||
|
(begin
|
||||||
(let ((current-internal-time
|
(when (eq? return-type 'return)
|
||||||
(get-internal-real-time)))
|
(list-set!
|
||||||
(with-exception-handler
|
resources-last-used
|
||||||
(lambda (exn)
|
resource-index
|
||||||
(if (eq? (exception-kind exn) 'q-empty)
|
current-internal-time))
|
||||||
(loop resources
|
resources-last-used)))
|
||||||
(cons resource available)
|
((reply . timeout)
|
||||||
waiters
|
(if (and timeout
|
||||||
(begin
|
(< timeout current-internal-time))
|
||||||
(when (eq? return-type 'return)
|
(waiter-loop (safe-deq waiters))
|
||||||
(list-set!
|
(if timeout
|
||||||
resources-last-used
|
(let ((reply-timeout
|
||||||
(list-index (lambda (x)
|
(/ (- timeout
|
||||||
(eq? x resource))
|
current-internal-time)
|
||||||
resources)
|
internal-time-units-per-second)))
|
||||||
current-internal-time))
|
;; Don't sleep in this fiber, so spawn a
|
||||||
resources-last-used))
|
;; new fiber to handle handing over the
|
||||||
(raise-exception exn)))
|
;; resource, and returning it if there's
|
||||||
(lambda ()
|
;; a timeout
|
||||||
(let waiter-loop ((waiter (deq! waiters)))
|
(spawn-fiber-for-checkout reply
|
||||||
(match waiter
|
reply-timeout
|
||||||
((reply . timeout)
|
resource))
|
||||||
(if (and timeout
|
(put-message reply (cons 'success
|
||||||
(< timeout current-internal-time))
|
resource))))
|
||||||
(waiter-loop (deq! waiters))
|
(loop resources
|
||||||
(if timeout
|
available
|
||||||
(let ((reply-timeout
|
waiters
|
||||||
(/ (- timeout
|
(begin
|
||||||
current-internal-time)
|
(list-set!
|
||||||
internal-time-units-per-second)))
|
resources-last-used
|
||||||
;; Don't sleep in this fiber, so spawn a
|
resource-index
|
||||||
;; new fiber to handle handing over the
|
current-internal-time)
|
||||||
;; resource, and returning it if there's
|
resources-last-used)))))))
|
||||||
;; a timeout
|
|
||||||
(spawn-fiber-for-checkout reply
|
|
||||||
reply-timeout
|
|
||||||
resource))
|
|
||||||
(put-message reply (cons 'success
|
|
||||||
resource))))))))
|
|
||||||
#:unwind? #t)
|
|
||||||
(loop resources
|
|
||||||
available
|
|
||||||
waiters
|
|
||||||
(begin
|
|
||||||
(list-set!
|
|
||||||
resources-last-used
|
|
||||||
(list-index (lambda (x)
|
|
||||||
(eq? x resource))
|
|
||||||
resources)
|
|
||||||
current-internal-time)
|
|
||||||
resources-last-used)))))
|
|
||||||
|
|
||||||
(('remove resource)
|
(('remove resource)
|
||||||
(let ((index
|
(let ((index
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)))
|
resources)))
|
||||||
|
|
||||||
|
|
||||||
(loop (if index
|
(loop (if index
|
||||||
(remove-at-index! resources index)
|
(remove-at-index! resources index)
|
||||||
(begin
|
(begin
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue