Compare commits
7 commits
52092e7a99
...
05ad83c703
| Author | SHA1 | Date | |
|---|---|---|---|
| 05ad83c703 | |||
| 1a476b5aa8 | |||
| e78e41b542 | |||
| 2446078657 | |||
| a13098494d | |||
| 40b64e269b | |||
| 86fb460d6a |
4 changed files with 252 additions and 157 deletions
|
|
@ -101,6 +101,16 @@
|
|||
start
|
||||
(cdr end))))
|
||||
|
||||
(define (safe-deq q)
|
||||
(if (null? (car q))
|
||||
#f
|
||||
(let ((it (caar q))
|
||||
(next (cdar q)))
|
||||
(if (null? next)
|
||||
(set-cdr! q #f))
|
||||
(set-car! q next)
|
||||
it)))
|
||||
|
||||
(define* (make-fixed-size-resource-pool resources
|
||||
#:key
|
||||
(delay-logger (const #f))
|
||||
|
|
@ -345,45 +355,35 @@
|
|||
(set! checkout-failure-count
|
||||
(+ 1 checkout-failure-count)))
|
||||
|
||||
(if (q-empty? waiters)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters)
|
||||
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time)))
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(if (eq? (exception-kind exn) 'q-empty)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters)
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(let waiter-loop ((waiter (deq! waiters)))
|
||||
(match waiter
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (deq! waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))))))
|
||||
#:unwind? #t)
|
||||
(loop resources
|
||||
available
|
||||
waiters))))
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time)))
|
||||
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||
(match waiter
|
||||
(#f
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters))
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (safe-deq waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))
|
||||
(loop resources
|
||||
available
|
||||
waiters))))))
|
||||
|
||||
(('list-resources reply)
|
||||
(spawn-fiber
|
||||
|
|
@ -749,6 +749,7 @@
|
|||
(let loop ((resources '())
|
||||
(available '())
|
||||
(waiters (make-q))
|
||||
(resources-checkout-count '())
|
||||
(resources-last-used '()))
|
||||
|
||||
(match (get-message channel)
|
||||
|
|
@ -762,59 +763,50 @@
|
|||
(loop (cons resource resources)
|
||||
available
|
||||
waiters
|
||||
(cons 0 resources-checkout-count)
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used)))
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))))
|
||||
resources-checkout-count
|
||||
resources-last-used)))
|
||||
|
||||
(if (q-empty? waiters)
|
||||
(loop (cons resource resources)
|
||||
(cons resource available)
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))
|
||||
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time)))
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(if (eq? (exception-kind exn) 'q-empty)
|
||||
(loop (cons resource resources)
|
||||
(cons resource available)
|
||||
waiters
|
||||
(cons current-internal-time
|
||||
resources-last-used))
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(let waiter-loop ((waiter (deq! waiters)))
|
||||
(match waiter
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (deq! waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))))))
|
||||
#:unwind? #t)
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
waiters
|
||||
(cons current-internal-time
|
||||
resources-last-used))))))
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time)))
|
||||
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||
(match waiter
|
||||
(#f
|
||||
(loop (cons resource resources)
|
||||
(cons resource available)
|
||||
waiters
|
||||
(cons 0 resources-checkout-count)
|
||||
(cons current-internal-time
|
||||
resources-last-used)))
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (safe-deq waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
waiters
|
||||
(cons 1 resources-checkout-count)
|
||||
(cons current-internal-time
|
||||
resources-last-used))))))))
|
||||
|
||||
(('checkout reply timeout-time max-waiters)
|
||||
(if (null? available)
|
||||
|
|
@ -849,10 +841,12 @@
|
|||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))
|
||||
(loop resources
|
||||
available
|
||||
(enq! waiters (cons reply timeout-time))
|
||||
resources-checkout-count
|
||||
resources-last-used))))
|
||||
|
||||
(if timeout-time
|
||||
|
|
@ -875,10 +869,21 @@
|
|||
(loop resources
|
||||
(cdr available)
|
||||
waiters
|
||||
(let ((resource-index
|
||||
(list-index (lambda (x)
|
||||
(eq? x (car available)))
|
||||
resources)))
|
||||
(list-set! resources-checkout-count
|
||||
resource-index
|
||||
(+ 1 (list-ref
|
||||
resources-checkout-count
|
||||
resource-index)))
|
||||
resources-checkout-count)
|
||||
resources-last-used))
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used)))
|
||||
(begin
|
||||
(put-message reply (cons 'success
|
||||
|
|
@ -887,6 +892,16 @@
|
|||
(loop resources
|
||||
(cdr available)
|
||||
waiters
|
||||
(let ((resource-index
|
||||
(list-index (lambda (x)
|
||||
(eq? x (car available)))
|
||||
resources)))
|
||||
(list-set! resources-checkout-count
|
||||
resource-index
|
||||
(+ 1 (list-ref
|
||||
resources-checkout-count
|
||||
resource-index)))
|
||||
resources-checkout-count)
|
||||
resources-last-used)))))
|
||||
|
||||
(((and (or 'return
|
||||
|
|
@ -899,76 +914,95 @@
|
|||
(set! checkout-failure-count
|
||||
(+ 1 checkout-failure-count)))
|
||||
|
||||
(if (q-empty? waiters)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
(get-internal-real-time))
|
||||
resources-last-used))
|
||||
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time)))
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(if (eq? (exception-kind exn) 'q-empty)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters
|
||||
(begin
|
||||
(when (eq? return-type 'return)
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
current-internal-time))
|
||||
resources-last-used))
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(let waiter-loop ((waiter (deq! waiters)))
|
||||
(match waiter
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (deq! waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))))))
|
||||
#:unwind? #t)
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
current-internal-time)
|
||||
resources-last-used)))))
|
||||
(let ((current-internal-time
|
||||
(get-internal-real-time))
|
||||
(resource-index
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)))
|
||||
(if (and lifetime
|
||||
(>= (list-ref resources-checkout-count
|
||||
resource-index)
|
||||
lifetime))
|
||||
(begin
|
||||
(spawn-fiber-to-destroy-resource resource)
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))
|
||||
(let waiter-loop ((waiter (safe-deq waiters)))
|
||||
(match waiter
|
||||
(#f
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters
|
||||
(if (eq? 'return-failed-checkout
|
||||
return-type)
|
||||
(begin
|
||||
(list-set! resources-checkout-count
|
||||
resource-index
|
||||
(- (list-ref resources-checkout-count
|
||||
resource-index)
|
||||
1))
|
||||
resources-checkout-count)
|
||||
resources-checkout-count)
|
||||
(begin
|
||||
(when (eq? return-type 'return)
|
||||
(list-set!
|
||||
resources-last-used
|
||||
resource-index
|
||||
current-internal-time))
|
||||
resources-last-used)))
|
||||
((reply . timeout)
|
||||
(if (and timeout
|
||||
(< timeout current-internal-time))
|
||||
(waiter-loop (safe-deq waiters))
|
||||
(if timeout
|
||||
(let ((reply-timeout
|
||||
(/ (- timeout
|
||||
current-internal-time)
|
||||
internal-time-units-per-second)))
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's
|
||||
;; a timeout
|
||||
(spawn-fiber-for-checkout reply
|
||||
reply-timeout
|
||||
resource))
|
||||
(put-message reply (cons 'success
|
||||
resource))))
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
(if (eq? 'return-failed-checkout
|
||||
return-type)
|
||||
(begin
|
||||
(list-set! resources-checkout-count
|
||||
resource-index
|
||||
(- (list-ref resources-checkout-count
|
||||
resource-index)
|
||||
1))
|
||||
resources-checkout-count)
|
||||
resources-checkout-count)
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
resource-index
|
||||
current-internal-time)
|
||||
resources-last-used))))))))
|
||||
|
||||
(('remove resource)
|
||||
(let ((index
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)))
|
||||
|
||||
(when (and (not (q-empty? waiters))
|
||||
(< (- (length resources) 1)
|
||||
max-size))
|
||||
(spawn-fiber-to-return-new-resource))
|
||||
|
||||
(loop (if index
|
||||
(remove-at-index! resources index)
|
||||
(begin
|
||||
|
|
@ -979,6 +1013,9 @@
|
|||
resources))
|
||||
available ; resource shouldn't be in this list
|
||||
waiters
|
||||
(remove-at-index!
|
||||
resources-checkout-count
|
||||
index)
|
||||
(remove-at-index!
|
||||
resources-last-used
|
||||
index))))
|
||||
|
|
@ -989,6 +1026,7 @@
|
|||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))
|
||||
|
||||
(('list-resources reply)
|
||||
|
|
@ -999,6 +1037,7 @@
|
|||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))
|
||||
|
||||
(('stats reply timeout-time)
|
||||
|
|
@ -1006,6 +1045,7 @@
|
|||
`((resources . ,(length resources))
|
||||
(available . ,(length available))
|
||||
(waiters . ,(q-length waiters))
|
||||
(resources-checkout-count . ,resources-checkout-count)
|
||||
(checkout-failure-count . ,checkout-failure-count))))
|
||||
|
||||
(spawn-fiber
|
||||
|
|
@ -1025,6 +1065,7 @@
|
|||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))
|
||||
|
||||
(('check-for-idle-resources)
|
||||
|
|
@ -1065,6 +1106,7 @@
|
|||
(loop resources
|
||||
(lset-difference eq? available resources-to-destroy)
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used))))
|
||||
|
||||
(('destroy)
|
||||
|
|
@ -1128,6 +1170,7 @@
|
|||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-checkout-count
|
||||
resources-last-used)))))
|
||||
|
||||
(spawn-fiber
|
||||
|
|
|
|||
|
|
@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
|
|||
(sleep 1)
|
||||
(destructor/safe args)))))
|
||||
|
||||
(define (process channel args)
|
||||
(let loop ()
|
||||
(define (process thread-index channel args)
|
||||
(let loop ((lifetime thread-lifetime))
|
||||
(match (get-message channel)
|
||||
('destroy #f)
|
||||
((reply sent-time proc)
|
||||
|
|
@ -292,6 +292,9 @@ from there, or #f if that would be an empty string."
|
|||
internal-time-units-per-second)
|
||||
exn))
|
||||
(lambda ()
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
proc)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let ((stack
|
||||
|
|
@ -319,6 +322,10 @@ from there, or #f if that would be an empty string."
|
|||
vals))))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
#f)
|
||||
|
||||
(put-message reply
|
||||
response)
|
||||
|
||||
|
|
@ -335,7 +342,11 @@ from there, or #f if that would be an empty string."
|
|||
(if (and exception?
|
||||
expire-on-exception?)
|
||||
#t
|
||||
(loop))))))))
|
||||
(if lifetime
|
||||
(if (<= 1 lifetime)
|
||||
#t
|
||||
(loop (- lifetime 1)))
|
||||
(loop lifetime)))))))))
|
||||
|
||||
(define (start-thread index channel)
|
||||
(call-with-new-thread
|
||||
|
|
@ -358,7 +369,7 @@ from there, or #f if that would be an empty string."
|
|||
"knots: thread-pool: internal exception: ~A\n" exn))
|
||||
(lambda ()
|
||||
(parameterize ((param args))
|
||||
(process channel args)))
|
||||
(process index channel args)))
|
||||
#:unwind? #t)))
|
||||
|
||||
(when thread-destructor
|
||||
|
|
@ -395,7 +406,8 @@ from there, or #f if that would be an empty string."
|
|||
(expire-on-exception? #f)
|
||||
(name "unnamed")
|
||||
(use-default-io-waiters? #t)
|
||||
default-checkout-timeout)
|
||||
default-checkout-timeout
|
||||
default-max-waiters)
|
||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||
arguments of the thread pool procedure."
|
||||
(define param
|
||||
|
|
@ -408,7 +420,6 @@ arguments of the thread pool procedure."
|
|||
1
|
||||
#:thread-initializer thread-initializer
|
||||
#:thread-destructor thread-destructor
|
||||
#:thread-lifetime thread-lifetime
|
||||
#:expire-on-exception? expire-on-exception?
|
||||
#:name name
|
||||
#:use-default-io-waiters? use-default-io-waiters?))
|
||||
|
|
@ -416,9 +427,11 @@ arguments of the thread pool procedure."
|
|||
#:destructor destroy-thread-pool
|
||||
#:min-size min-size
|
||||
#:delay-logger delay-logger
|
||||
#:lifetime thread-lifetime
|
||||
#:scheduler scheduler
|
||||
#:duration-logger duration-logger
|
||||
#:default-checkout-timeout default-checkout-timeout)))
|
||||
#:default-checkout-timeout default-checkout-timeout
|
||||
#:default-max-waiters default-max-waiters)))
|
||||
|
||||
(thread-pool resource-pool
|
||||
param)))
|
||||
|
|
|
|||
|
|
@ -211,4 +211,21 @@
|
|||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((resource-pool (make-resource-pool
|
||||
(const 'foo)
|
||||
1
|
||||
#:lifetime 1
|
||||
#:destructor
|
||||
(const #t))))
|
||||
(for-each
|
||||
(lambda _
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res))
|
||||
(iota 20))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
|
||||
(display "resource-pool test finished successfully\n")
|
||||
|
|
|
|||
|
|
@ -85,4 +85,26 @@
|
|||
(+ 1 'a))))
|
||||
#:unwind? #t)))))
|
||||
|
||||
(let ((thread-pool
|
||||
(make-fixed-size-thread-pool 1 #:thread-lifetime 1)))
|
||||
|
||||
(for-each
|
||||
(lambda _
|
||||
(call-with-thread
|
||||
thread-pool
|
||||
(lambda () #f)))
|
||||
(iota 10)))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((thread-pool
|
||||
(make-thread-pool 1 #:thread-lifetime 1)))
|
||||
|
||||
(for-each
|
||||
(lambda _
|
||||
(call-with-thread
|
||||
thread-pool
|
||||
(lambda () #f)))
|
||||
(iota 10)))))
|
||||
|
||||
(display "thread-pool test finished successfully\n")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue