Compare commits

..

7 commits

4 changed files with 252 additions and 157 deletions

View file

@ -101,6 +101,16 @@
start start
(cdr end)))) (cdr end))))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define* (make-fixed-size-resource-pool resources (define* (make-fixed-size-resource-pool resources
#:key #:key
(delay-logger (const #f)) (delay-logger (const #f))
@ -345,27 +355,18 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters)
(let ((current-internal-time (let ((current-internal-time
(get-internal-real-time))) (get-internal-real-time)))
(with-exception-handler (let waiter-loop ((waiter (safe-deq waiters)))
(lambda (exn) (match waiter
(if (eq? (exception-kind exn) 'q-empty) (#f
(loop resources (loop resources
(cons resource available) (cons resource available)
waiters) waiters))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout) ((reply . timeout)
(if (and timeout (if (and timeout
(< timeout current-internal-time)) (< timeout current-internal-time))
(waiter-loop (deq! waiters)) (waiter-loop (safe-deq waiters))
(if timeout (if timeout
(let ((reply-timeout (let ((reply-timeout
(/ (- timeout (/ (- timeout
@ -379,11 +380,10 @@
reply-timeout reply-timeout
resource)) resource))
(put-message reply (cons 'success (put-message reply (cons 'success
resource)))))))) resource))))
#:unwind? #t)
(loop resources (loop resources
available available
waiters)))) waiters))))))
(('list-resources reply) (('list-resources reply)
(spawn-fiber (spawn-fiber
@ -749,6 +749,7 @@
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
(waiters (make-q)) (waiters (make-q))
(resources-checkout-count '())
(resources-last-used '())) (resources-last-used '()))
(match (get-message channel) (match (get-message channel)
@ -762,39 +763,30 @@
(loop (cons resource resources) (loop (cons resource resources)
available available
waiters waiters
(cons 0 resources-checkout-count)
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used))) resources-last-used)))
(loop resources (loop resources
available available
waiters waiters
(cons (get-internal-real-time) resources-checkout-count
resources-last-used)))) resources-last-used)))
(if (q-empty? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(let ((current-internal-time (let ((current-internal-time
(get-internal-real-time))) (get-internal-real-time)))
(with-exception-handler (let waiter-loop ((waiter (safe-deq waiters)))
(lambda (exn) (match waiter
(if (eq? (exception-kind exn) 'q-empty) (#f
(loop (cons resource resources) (loop (cons resource resources)
(cons resource available) (cons resource available)
waiters waiters
(cons 0 resources-checkout-count)
(cons current-internal-time (cons current-internal-time
resources-last-used)) resources-last-used)))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout) ((reply . timeout)
(if (and timeout (if (and timeout
(< timeout current-internal-time)) (< timeout current-internal-time))
(waiter-loop (deq! waiters)) (waiter-loop (safe-deq waiters))
(if timeout (if timeout
(let ((reply-timeout (let ((reply-timeout
(/ (- timeout (/ (- timeout
@ -808,13 +800,13 @@
reply-timeout reply-timeout
resource)) resource))
(put-message reply (cons 'success (put-message reply (cons 'success
resource)))))))) resource))))
#:unwind? #t)
(loop (cons resource resources) (loop (cons resource resources)
available available
waiters waiters
(cons 1 resources-checkout-count)
(cons current-internal-time (cons current-internal-time
resources-last-used)))))) resources-last-used))))))))
(('checkout reply timeout-time max-waiters) (('checkout reply timeout-time max-waiters)
(if (null? available) (if (null? available)
@ -849,10 +841,12 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
(enq! waiters (cons reply timeout-time)) (enq! waiters (cons reply timeout-time))
resources-checkout-count
resources-last-used)))) resources-last-used))))
(if timeout-time (if timeout-time
@ -875,10 +869,21 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))) resources-last-used)))
(begin (begin
(put-message reply (cons 'success (put-message reply (cons 'success
@ -887,6 +892,16 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used))))) resources-last-used)))))
(((and (or 'return (((and (or 'return
@ -899,44 +914,50 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(if (q-empty? waiters) (let ((current-internal-time
(loop resources (get-internal-real-time))
(cons resource available) (resource-index
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources) resources)))
(get-internal-real-time)) (if (and lifetime
(>= (list-ref resources-checkout-count
resource-index)
lifetime))
(begin
(spawn-fiber-to-destroy-resource resource)
(loop resources
available
waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(let waiter-loop ((waiter (safe-deq waiters)))
(let ((current-internal-time (match waiter
(get-internal-real-time))) (#f
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources (loop resources
(cons resource available) (cons resource available)
waiters waiters
(if (eq? 'return-failed-checkout
return-type)
(begin
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin (begin
(when (eq? return-type 'return) (when (eq? return-type 'return)
(list-set! (list-set!
resources-last-used resources-last-used
(list-index (lambda (x) resource-index
(eq? x resource))
resources)
current-internal-time)) current-internal-time))
resources-last-used)) resources-last-used)))
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout) ((reply . timeout)
(if (and timeout (if (and timeout
(< timeout current-internal-time)) (< timeout current-internal-time))
(waiter-loop (deq! waiters)) (waiter-loop (safe-deq waiters))
(if timeout (if timeout
(let ((reply-timeout (let ((reply-timeout
(/ (- timeout (/ (- timeout
@ -950,25 +971,38 @@
reply-timeout reply-timeout
resource)) resource))
(put-message reply (cons 'success (put-message reply (cons 'success
resource)))))))) resource))))
#:unwind? #t)
(loop resources (loop resources
available available
waiters waiters
(if (eq? 'return-failed-checkout
return-type)
(begin
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin (begin
(list-set! (list-set!
resources-last-used resources-last-used
(list-index (lambda (x) resource-index
(eq? x resource))
resources)
current-internal-time) current-internal-time)
resources-last-used))))) resources-last-used))))))))
(('remove resource) (('remove resource)
(let ((index (let ((index
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources))) resources)))
(when (and (not (q-empty? waiters))
(< (- (length resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop (if index (loop (if index
(remove-at-index! resources index) (remove-at-index! resources index)
(begin (begin
@ -979,6 +1013,9 @@
resources)) resources))
available ; resource shouldn't be in this list available ; resource shouldn't be in this list
waiters waiters
(remove-at-index!
resources-checkout-count
index)
(remove-at-index! (remove-at-index!
resources-last-used resources-last-used
index)))) index))))
@ -989,6 +1026,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('list-resources reply) (('list-resources reply)
@ -999,6 +1037,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('stats reply timeout-time) (('stats reply timeout-time)
@ -1006,6 +1045,7 @@
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . ,(length available)) (available . ,(length available))
(waiters . ,(q-length waiters)) (waiters . ,(q-length waiters))
(resources-checkout-count . ,resources-checkout-count)
(checkout-failure-count . ,checkout-failure-count)))) (checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber (spawn-fiber
@ -1025,6 +1065,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('check-for-idle-resources) (('check-for-idle-resources)
@ -1065,6 +1106,7 @@
(loop resources (loop resources
(lset-difference eq? available resources-to-destroy) (lset-difference eq? available resources-to-destroy)
waiters waiters
resources-checkout-count
resources-last-used)))) resources-last-used))))
(('destroy) (('destroy)
@ -1128,6 +1170,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))))) resources-last-used)))))
(spawn-fiber (spawn-fiber

View file

@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
(sleep 1) (sleep 1)
(destructor/safe args))))) (destructor/safe args)))))
(define (process channel args) (define (process thread-index channel args)
(let loop () (let loop ((lifetime thread-lifetime))
(match (get-message channel) (match (get-message channel)
('destroy #f) ('destroy #f)
((reply sent-time proc) ((reply sent-time proc)
@ -292,6 +292,9 @@ from there, or #f if that would be an empty string."
internal-time-units-per-second) internal-time-units-per-second)
exn)) exn))
(lambda () (lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(let ((stack (let ((stack
@ -319,6 +322,10 @@ from there, or #f if that would be an empty string."
vals)))))) vals))))))
#:unwind? #t))) #:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply (put-message reply
response) response)
@ -335,7 +342,11 @@ from there, or #f if that would be an empty string."
(if (and exception? (if (and exception?
expire-on-exception?) expire-on-exception?)
#t #t
(loop)))))))) (if lifetime
(if (<= 1 lifetime)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(define (start-thread index channel) (define (start-thread index channel)
(call-with-new-thread (call-with-new-thread
@ -358,7 +369,7 @@ from there, or #f if that would be an empty string."
"knots: thread-pool: internal exception: ~A\n" exn)) "knots: thread-pool: internal exception: ~A\n" exn))
(lambda () (lambda ()
(parameterize ((param args)) (parameterize ((param args))
(process channel args))) (process index channel args)))
#:unwind? #t))) #:unwind? #t)))
(when thread-destructor (when thread-destructor
@ -395,7 +406,8 @@ from there, or #f if that would be an empty string."
(expire-on-exception? #f) (expire-on-exception? #f)
(name "unnamed") (name "unnamed")
(use-default-io-waiters? #t) (use-default-io-waiters? #t)
default-checkout-timeout) default-checkout-timeout
default-max-waiters)
"Return a channel used to offload work to a dedicated thread. ARGS are the "Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure." arguments of the thread pool procedure."
(define param (define param
@ -408,7 +420,6 @@ arguments of the thread pool procedure."
1 1
#:thread-initializer thread-initializer #:thread-initializer thread-initializer
#:thread-destructor thread-destructor #:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception? #:expire-on-exception? expire-on-exception?
#:name name #:name name
#:use-default-io-waiters? use-default-io-waiters?)) #:use-default-io-waiters? use-default-io-waiters?))
@ -416,9 +427,11 @@ arguments of the thread pool procedure."
#:destructor destroy-thread-pool #:destructor destroy-thread-pool
#:min-size min-size #:min-size min-size
#:delay-logger delay-logger #:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler #:scheduler scheduler
#:duration-logger duration-logger #:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout))) #:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
(thread-pool resource-pool (thread-pool resource-pool
param))) param)))

View file

@ -211,4 +211,21 @@
(destroy-resource-pool resource-pool)))) (destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")

View file

@ -85,4 +85,26 @@
(+ 1 'a)))) (+ 1 'a))))
#:unwind? #t))))) #:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(display "thread-pool test finished successfully\n") (display "thread-pool test finished successfully\n")