Compare commits

..

7 commits

4 changed files with 252 additions and 157 deletions

View file

@ -101,6 +101,16 @@
start start
(cdr end)))) (cdr end))))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define* (make-fixed-size-resource-pool resources (define* (make-fixed-size-resource-pool resources
#:key #:key
(delay-logger (const #f)) (delay-logger (const #f))
@ -345,45 +355,35 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(if (q-empty? waiters) (let ((current-internal-time
(loop resources (get-internal-real-time)))
(cons resource available) (let waiter-loop ((waiter (safe-deq waiters)))
waiters) (match waiter
(#f
(let ((current-internal-time (loop resources
(get-internal-real-time))) (cons resource available)
(with-exception-handler waiters))
(lambda (exn) ((reply . timeout)
(if (eq? (exception-kind exn) 'q-empty) (if (and timeout
(loop resources (< timeout current-internal-time))
(cons resource available) (waiter-loop (safe-deq waiters))
waiters) (if timeout
(raise-exception exn))) (let ((reply-timeout
(lambda () (/ (- timeout
(let waiter-loop ((waiter (deq! waiters))) current-internal-time)
(match waiter internal-time-units-per-second)))
((reply . timeout) ;; Don't sleep in this fiber, so spawn a
(if (and timeout ;; new fiber to handle handing over the
(< timeout current-internal-time)) ;; resource, and returning it if there's
(waiter-loop (deq! waiters)) ;; a timeout
(if timeout (spawn-fiber-for-checkout reply
(let ((reply-timeout reply-timeout
(/ (- timeout resource))
current-internal-time) (put-message reply (cons 'success
internal-time-units-per-second))) resource))))
;; Don't sleep in this fiber, so spawn a (loop resources
;; new fiber to handle handing over the available
;; resource, and returning it if there's waiters))))))
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters))))
(('list-resources reply) (('list-resources reply)
(spawn-fiber (spawn-fiber
@ -749,6 +749,7 @@
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
(waiters (make-q)) (waiters (make-q))
(resources-checkout-count '())
(resources-last-used '())) (resources-last-used '()))
(match (get-message channel) (match (get-message channel)
@ -762,59 +763,50 @@
(loop (cons resource resources) (loop (cons resource resources)
available available
waiters waiters
(cons 0 resources-checkout-count)
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used))) resources-last-used)))
(loop resources (loop resources
available available
waiters waiters
(cons (get-internal-real-time) resources-checkout-count
resources-last-used)))) resources-last-used)))
(if (q-empty? waiters) (let ((current-internal-time
(loop (cons resource resources) (get-internal-real-time)))
(cons resource available) (let waiter-loop ((waiter (safe-deq waiters)))
waiters (match waiter
(cons (get-internal-real-time) (#f
resources-last-used)) (loop (cons resource resources)
(cons resource available)
(let ((current-internal-time waiters
(get-internal-real-time))) (cons 0 resources-checkout-count)
(with-exception-handler (cons current-internal-time
(lambda (exn) resources-last-used)))
(if (eq? (exception-kind exn) 'q-empty) ((reply . timeout)
(loop (cons resource resources) (if (and timeout
(cons resource available) (< timeout current-internal-time))
waiters (waiter-loop (safe-deq waiters))
(cons current-internal-time (if timeout
resources-last-used)) (let ((reply-timeout
(raise-exception exn))) (/ (- timeout
(lambda () current-internal-time)
(let waiter-loop ((waiter (deq! waiters))) internal-time-units-per-second)))
(match waiter ;; Don't sleep in this fiber, so spawn a
((reply . timeout) ;; new fiber to handle handing over the
(if (and timeout ;; resource, and returning it if there's
(< timeout current-internal-time)) ;; a timeout
(waiter-loop (deq! waiters)) (spawn-fiber-for-checkout reply
(if timeout reply-timeout
(let ((reply-timeout resource))
(/ (- timeout (put-message reply (cons 'success
current-internal-time) resource))))
internal-time-units-per-second))) (loop (cons resource resources)
;; Don't sleep in this fiber, so spawn a available
;; new fiber to handle handing over the waiters
;; resource, and returning it if there's (cons 1 resources-checkout-count)
;; a timeout (cons current-internal-time
(spawn-fiber-for-checkout reply resources-last-used))))))))
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop (cons resource resources)
available
waiters
(cons current-internal-time
resources-last-used))))))
(('checkout reply timeout-time max-waiters) (('checkout reply timeout-time max-waiters)
(if (null? available) (if (null? available)
@ -849,10 +841,12 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
(enq! waiters (cons reply timeout-time)) (enq! waiters (cons reply timeout-time))
resources-checkout-count
resources-last-used)))) resources-last-used))))
(if timeout-time (if timeout-time
@ -875,10 +869,21 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))) resources-last-used)))
(begin (begin
(put-message reply (cons 'success (put-message reply (cons 'success
@ -887,6 +892,16 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used))))) resources-last-used)))))
(((and (or 'return (((and (or 'return
@ -899,76 +914,95 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(if (q-empty? waiters) (let ((current-internal-time
(loop resources (get-internal-real-time))
(cons resource available) (resource-index
waiters (list-index (lambda (x)
(begin (eq? x resource))
(list-set! resources)))
resources-last-used (if (and lifetime
(list-index (lambda (x) (>= (list-ref resources-checkout-count
(eq? x resource)) resource-index)
resources) lifetime))
(get-internal-real-time)) (begin
resources-last-used)) (spawn-fiber-to-destroy-resource resource)
(loop resources
(let ((current-internal-time available
(get-internal-real-time))) waiters
(with-exception-handler resources-checkout-count
(lambda (exn) resources-last-used))
(if (eq? (exception-kind exn) 'q-empty) (let waiter-loop ((waiter (safe-deq waiters)))
(loop resources (match waiter
(cons resource available) (#f
waiters (loop resources
(begin (cons resource available)
(when (eq? return-type 'return) waiters
(list-set! (if (eq? 'return-failed-checkout
resources-last-used return-type)
(list-index (lambda (x) (begin
(eq? x resource)) (list-set! resources-checkout-count
resources) resource-index
current-internal-time)) (- (list-ref resources-checkout-count
resources-last-used)) resource-index)
(raise-exception exn))) 1))
(lambda () resources-checkout-count)
(let waiter-loop ((waiter (deq! waiters))) resources-checkout-count)
(match waiter (begin
((reply . timeout) (when (eq? return-type 'return)
(if (and timeout (list-set!
(< timeout current-internal-time)) resources-last-used
(waiter-loop (deq! waiters)) resource-index
(if timeout current-internal-time))
(let ((reply-timeout resources-last-used)))
(/ (- timeout ((reply . timeout)
current-internal-time) (if (and timeout
internal-time-units-per-second))) (< timeout current-internal-time))
;; Don't sleep in this fiber, so spawn a (waiter-loop (safe-deq waiters))
;; new fiber to handle handing over the (if timeout
;; resource, and returning it if there's (let ((reply-timeout
;; a timeout (/ (- timeout
(spawn-fiber-for-checkout reply current-internal-time)
reply-timeout internal-time-units-per-second)))
resource)) ;; Don't sleep in this fiber, so spawn a
(put-message reply (cons 'success ;; new fiber to handle handing over the
resource)))))))) ;; resource, and returning it if there's
#:unwind? #t) ;; a timeout
(loop resources (spawn-fiber-for-checkout reply
available reply-timeout
waiters resource))
(begin (put-message reply (cons 'success
(list-set! resource))))
resources-last-used (loop resources
(list-index (lambda (x) available
(eq? x resource)) waiters
resources) (if (eq? 'return-failed-checkout
current-internal-time) return-type)
resources-last-used))))) (begin
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin
(list-set!
resources-last-used
resource-index
current-internal-time)
resources-last-used))))))))
(('remove resource) (('remove resource)
(let ((index (let ((index
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources))) resources)))
(when (and (not (q-empty? waiters))
(< (- (length resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop (if index (loop (if index
(remove-at-index! resources index) (remove-at-index! resources index)
(begin (begin
@ -979,6 +1013,9 @@
resources)) resources))
available ; resource shouldn't be in this list available ; resource shouldn't be in this list
waiters waiters
(remove-at-index!
resources-checkout-count
index)
(remove-at-index! (remove-at-index!
resources-last-used resources-last-used
index)))) index))))
@ -989,6 +1026,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('list-resources reply) (('list-resources reply)
@ -999,6 +1037,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('stats reply timeout-time) (('stats reply timeout-time)
@ -1006,6 +1045,7 @@
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . ,(length available)) (available . ,(length available))
(waiters . ,(q-length waiters)) (waiters . ,(q-length waiters))
(resources-checkout-count . ,resources-checkout-count)
(checkout-failure-count . ,checkout-failure-count)))) (checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber (spawn-fiber
@ -1025,6 +1065,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('check-for-idle-resources) (('check-for-idle-resources)
@ -1065,6 +1106,7 @@
(loop resources (loop resources
(lset-difference eq? available resources-to-destroy) (lset-difference eq? available resources-to-destroy)
waiters waiters
resources-checkout-count
resources-last-used)))) resources-last-used))))
(('destroy) (('destroy)
@ -1128,6 +1170,7 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))))) resources-last-used)))))
(spawn-fiber (spawn-fiber

View file

@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
(sleep 1) (sleep 1)
(destructor/safe args))))) (destructor/safe args)))))
(define (process channel args) (define (process thread-index channel args)
(let loop () (let loop ((lifetime thread-lifetime))
(match (get-message channel) (match (get-message channel)
('destroy #f) ('destroy #f)
((reply sent-time proc) ((reply sent-time proc)
@ -292,6 +292,9 @@ from there, or #f if that would be an empty string."
internal-time-units-per-second) internal-time-units-per-second)
exn)) exn))
(lambda () (lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(let ((stack (let ((stack
@ -319,6 +322,10 @@ from there, or #f if that would be an empty string."
vals)))))) vals))))))
#:unwind? #t))) #:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply (put-message reply
response) response)
@ -335,7 +342,11 @@ from there, or #f if that would be an empty string."
(if (and exception? (if (and exception?
expire-on-exception?) expire-on-exception?)
#t #t
(loop)))))))) (if lifetime
(if (<= 1 lifetime)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(define (start-thread index channel) (define (start-thread index channel)
(call-with-new-thread (call-with-new-thread
@ -358,7 +369,7 @@ from there, or #f if that would be an empty string."
"knots: thread-pool: internal exception: ~A\n" exn)) "knots: thread-pool: internal exception: ~A\n" exn))
(lambda () (lambda ()
(parameterize ((param args)) (parameterize ((param args))
(process channel args))) (process index channel args)))
#:unwind? #t))) #:unwind? #t)))
(when thread-destructor (when thread-destructor
@ -395,7 +406,8 @@ from there, or #f if that would be an empty string."
(expire-on-exception? #f) (expire-on-exception? #f)
(name "unnamed") (name "unnamed")
(use-default-io-waiters? #t) (use-default-io-waiters? #t)
default-checkout-timeout) default-checkout-timeout
default-max-waiters)
"Return a channel used to offload work to a dedicated thread. ARGS are the "Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure." arguments of the thread pool procedure."
(define param (define param
@ -408,7 +420,6 @@ arguments of the thread pool procedure."
1 1
#:thread-initializer thread-initializer #:thread-initializer thread-initializer
#:thread-destructor thread-destructor #:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception? #:expire-on-exception? expire-on-exception?
#:name name #:name name
#:use-default-io-waiters? use-default-io-waiters?)) #:use-default-io-waiters? use-default-io-waiters?))
@ -416,9 +427,11 @@ arguments of the thread pool procedure."
#:destructor destroy-thread-pool #:destructor destroy-thread-pool
#:min-size min-size #:min-size min-size
#:delay-logger delay-logger #:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler #:scheduler scheduler
#:duration-logger duration-logger #:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout))) #:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
(thread-pool resource-pool (thread-pool resource-pool
param))) param)))

View file

@ -211,4 +211,21 @@
(destroy-resource-pool resource-pool)))) (destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")

View file

@ -85,4 +85,26 @@
(+ 1 'a)))) (+ 1 'a))))
#:unwind? #t))))) #:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(display "thread-pool test finished successfully\n") (display "thread-pool test finished successfully\n")