Compare commits

...

7 commits

4 changed files with 252 additions and 157 deletions

View file

@ -101,6 +101,16 @@
start
(cdr end))))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define* (make-fixed-size-resource-pool resources
#:key
(delay-logger (const #f))
@ -345,45 +355,35 @@
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters)
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters))))
(let ((current-internal-time
(get-internal-real-time)))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop resources
(cons resource available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))
(loop resources
available
waiters))))))
(('list-resources reply)
(spawn-fiber
@ -749,6 +749,7 @@
(let loop ((resources '())
(available '())
(waiters (make-q))
(resources-checkout-count '())
(resources-last-used '()))
(match (get-message channel)
@ -762,59 +763,50 @@
(loop (cons resource resources)
available
waiters
(cons 0 resources-checkout-count)
(cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
waiters
(cons (get-internal-real-time)
resources-last-used))))
resources-checkout-count
resources-last-used)))
(if (q-empty? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop (cons resource resources)
(cons resource available)
waiters
(cons current-internal-time
resources-last-used))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop (cons resource resources)
available
waiters
(cons current-internal-time
resources-last-used))))))
(let ((current-internal-time
(get-internal-real-time)))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (cons resource resources)
(cons resource available)
waiters
(cons 0 resources-checkout-count)
(cons current-internal-time
resources-last-used)))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))
(loop (cons resource resources)
available
waiters
(cons 1 resources-checkout-count)
(cons current-internal-time
resources-last-used))))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
@ -849,10 +841,12 @@
(loop resources
available
waiters
resources-checkout-count
resources-last-used))
(loop resources
available
(enq! waiters (cons reply timeout-time))
resources-checkout-count
resources-last-used))))
(if timeout-time
@ -875,10 +869,21 @@
(loop resources
(cdr available)
waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used))
(loop resources
available
waiters
resources-checkout-count
resources-last-used)))
(begin
(put-message reply (cons 'success
@ -887,6 +892,16 @@
(loop resources
(cdr available)
waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used)))))
(((and (or 'return
@ -899,76 +914,95 @@
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
current-internal-time))
resources-last-used))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
current-internal-time)
resources-last-used)))))
(let ((current-internal-time
(get-internal-real-time))
(resource-index
(list-index (lambda (x)
(eq? x resource))
resources)))
(if (and lifetime
(>= (list-ref resources-checkout-count
resource-index)
lifetime))
(begin
(spawn-fiber-to-destroy-resource resource)
(loop resources
available
waiters
resources-checkout-count
resources-last-used))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop resources
(cons resource available)
waiters
(if (eq? 'return-failed-checkout
return-type)
(begin
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
resource-index
current-internal-time))
resources-last-used)))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))
(loop resources
available
waiters
(if (eq? 'return-failed-checkout
return-type)
(begin
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin
(list-set!
resources-last-used
resource-index
current-internal-time)
resources-last-used))))))))
(('remove resource)
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(when (and (not (q-empty? waiters))
(< (- (length resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop (if index
(remove-at-index! resources index)
(begin
@ -979,6 +1013,9 @@
resources))
available ; resource shouldn't be in this list
waiters
(remove-at-index!
resources-checkout-count
index)
(remove-at-index!
resources-last-used
index))))
@ -989,6 +1026,7 @@
(loop resources
available
waiters
resources-checkout-count
resources-last-used))
(('list-resources reply)
@ -999,6 +1037,7 @@
(loop resources
available
waiters
resources-checkout-count
resources-last-used))
(('stats reply timeout-time)
@ -1006,6 +1045,7 @@
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(resources-checkout-count . ,resources-checkout-count)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
@ -1025,6 +1065,7 @@
(loop resources
available
waiters
resources-checkout-count
resources-last-used))
(('check-for-idle-resources)
@ -1065,6 +1106,7 @@
(loop resources
(lset-difference eq? available resources-to-destroy)
waiters
resources-checkout-count
resources-last-used))))
(('destroy)
@ -1128,6 +1170,7 @@
(loop resources
available
waiters
resources-checkout-count
resources-last-used)))))
(spawn-fiber

View file

@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
(sleep 1)
(destructor/safe args)))))
(define (process channel args)
(let loop ()
(define (process thread-index channel args)
(let loop ((lifetime thread-lifetime))
(match (get-message channel)
('destroy #f)
((reply sent-time proc)
@ -292,6 +292,9 @@ from there, or #f if that would be an empty string."
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
@ -319,6 +322,10 @@ from there, or #f if that would be an empty string."
vals))))))
#:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply
response)
@ -335,7 +342,11 @@ from there, or #f if that would be an empty string."
(if (and exception?
expire-on-exception?)
#t
(loop))))))))
(if lifetime
(if (<= 1 lifetime)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(define (start-thread index channel)
(call-with-new-thread
@ -358,7 +369,7 @@ from there, or #f if that would be an empty string."
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process channel args)))
(process index channel args)))
#:unwind? #t)))
(when thread-destructor
@ -395,7 +406,8 @@ from there, or #f if that would be an empty string."
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
default-checkout-timeout
default-max-waiters)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
@ -408,7 +420,6 @@ arguments of the thread pool procedure."
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
@ -416,9 +427,11 @@ arguments of the thread pool procedure."
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout)))
#:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
(thread-pool resource-pool
param)))

View file

@ -211,4 +211,21 @@
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n")

View file

@ -85,4 +85,26 @@
(+ 1 'a))))
#:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(display "thread-pool test finished successfully\n")