Compare commits

..

No commits in common. "05ad83c7031de9b0d1873a0d5aec630746342a06" and "52092e7a99fb782a5e89a4b0ec42f93549b96437" have entirely different histories.

4 changed files with 157 additions and 252 deletions

View file

@ -101,16 +101,6 @@
start start
(cdr end)))) (cdr end))))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define* (make-fixed-size-resource-pool resources (define* (make-fixed-size-resource-pool resources
#:key #:key
(delay-logger (const #f)) (delay-logger (const #f))
@ -355,35 +345,45 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(let ((current-internal-time (if (q-empty? waiters)
(get-internal-real-time))) (loop resources
(let waiter-loop ((waiter (safe-deq waiters))) (cons resource available)
(match waiter waiters)
(#f
(loop resources (let ((current-internal-time
(cons resource available) (get-internal-real-time)))
waiters)) (with-exception-handler
((reply . timeout) (lambda (exn)
(if (and timeout (if (eq? (exception-kind exn) 'q-empty)
(< timeout current-internal-time)) (loop resources
(waiter-loop (safe-deq waiters)) (cons resource available)
(if timeout waiters)
(let ((reply-timeout (raise-exception exn)))
(/ (- timeout (lambda ()
current-internal-time) (let waiter-loop ((waiter (deq! waiters)))
internal-time-units-per-second))) (match waiter
;; Don't sleep in this fiber, so spawn a ((reply . timeout)
;; new fiber to handle handing over the (if (and timeout
;; resource, and returning it if there's (< timeout current-internal-time))
;; a timeout (waiter-loop (deq! waiters))
(spawn-fiber-for-checkout reply (if timeout
reply-timeout (let ((reply-timeout
resource)) (/ (- timeout
(put-message reply (cons 'success current-internal-time)
resource)))) internal-time-units-per-second)))
(loop resources ;; Don't sleep in this fiber, so spawn a
available ;; new fiber to handle handing over the
waiters)))))) ;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters))))
(('list-resources reply) (('list-resources reply)
(spawn-fiber (spawn-fiber
@ -749,7 +749,6 @@
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
(waiters (make-q)) (waiters (make-q))
(resources-checkout-count '())
(resources-last-used '())) (resources-last-used '()))
(match (get-message channel) (match (get-message channel)
@ -763,50 +762,59 @@
(loop (cons resource resources) (loop (cons resource resources)
available available
waiters waiters
(cons 0 resources-checkout-count)
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used))) resources-last-used)))
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count (cons (get-internal-real-time)
resources-last-used))) resources-last-used))))
(let ((current-internal-time (if (q-empty? waiters)
(get-internal-real-time))) (loop (cons resource resources)
(let waiter-loop ((waiter (safe-deq waiters))) (cons resource available)
(match waiter waiters
(#f (cons (get-internal-real-time)
(loop (cons resource resources) resources-last-used))
(cons resource available)
waiters (let ((current-internal-time
(cons 0 resources-checkout-count) (get-internal-real-time)))
(cons current-internal-time (with-exception-handler
resources-last-used))) (lambda (exn)
((reply . timeout) (if (eq? (exception-kind exn) 'q-empty)
(if (and timeout (loop (cons resource resources)
(< timeout current-internal-time)) (cons resource available)
(waiter-loop (safe-deq waiters)) waiters
(if timeout (cons current-internal-time
(let ((reply-timeout resources-last-used))
(/ (- timeout (raise-exception exn)))
current-internal-time) (lambda ()
internal-time-units-per-second))) (let waiter-loop ((waiter (deq! waiters)))
;; Don't sleep in this fiber, so spawn a (match waiter
;; new fiber to handle handing over the ((reply . timeout)
;; resource, and returning it if there's (if (and timeout
;; a timeout (< timeout current-internal-time))
(spawn-fiber-for-checkout reply (waiter-loop (deq! waiters))
reply-timeout (if timeout
resource)) (let ((reply-timeout
(put-message reply (cons 'success (/ (- timeout
resource)))) current-internal-time)
(loop (cons resource resources) internal-time-units-per-second)))
available ;; Don't sleep in this fiber, so spawn a
waiters ;; new fiber to handle handing over the
(cons 1 resources-checkout-count) ;; resource, and returning it if there's
(cons current-internal-time ;; a timeout
resources-last-used)))))))) (spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop (cons resource resources)
available
waiters
(cons current-internal-time
resources-last-used))))))
(('checkout reply timeout-time max-waiters) (('checkout reply timeout-time max-waiters)
(if (null? available) (if (null? available)
@ -841,12 +849,10 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
(enq! waiters (cons reply timeout-time)) (enq! waiters (cons reply timeout-time))
resources-checkout-count
resources-last-used)))) resources-last-used))))
(if timeout-time (if timeout-time
@ -869,21 +875,10 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))) resources-last-used)))
(begin (begin
(put-message reply (cons 'success (put-message reply (cons 'success
@ -892,16 +887,6 @@
(loop resources (loop resources
(cdr available) (cdr available)
waiters waiters
(let ((resource-index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(list-set! resources-checkout-count
resource-index
(+ 1 (list-ref
resources-checkout-count
resource-index)))
resources-checkout-count)
resources-last-used))))) resources-last-used)))))
(((and (or 'return (((and (or 'return
@ -914,95 +899,76 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(let ((current-internal-time (if (q-empty? waiters)
(get-internal-real-time)) (loop resources
(resource-index (cons resource available)
(list-index (lambda (x) waiters
(eq? x resource)) (begin
resources))) (list-set!
(if (and lifetime resources-last-used
(>= (list-ref resources-checkout-count (list-index (lambda (x)
resource-index) (eq? x resource))
lifetime)) resources)
(begin (get-internal-real-time))
(spawn-fiber-to-destroy-resource resource) resources-last-used))
(loop resources
available (let ((current-internal-time
waiters (get-internal-real-time)))
resources-checkout-count (with-exception-handler
resources-last-used)) (lambda (exn)
(let waiter-loop ((waiter (safe-deq waiters))) (if (eq? (exception-kind exn) 'q-empty)
(match waiter (loop resources
(#f (cons resource available)
(loop resources waiters
(cons resource available) (begin
waiters (when (eq? return-type 'return)
(if (eq? 'return-failed-checkout (list-set!
return-type) resources-last-used
(begin (list-index (lambda (x)
(list-set! resources-checkout-count (eq? x resource))
resource-index resources)
(- (list-ref resources-checkout-count current-internal-time))
resource-index) resources-last-used))
1)) (raise-exception exn)))
resources-checkout-count) (lambda ()
resources-checkout-count) (let waiter-loop ((waiter (deq! waiters)))
(begin (match waiter
(when (eq? return-type 'return) ((reply . timeout)
(list-set! (if (and timeout
resources-last-used (< timeout current-internal-time))
resource-index (waiter-loop (deq! waiters))
current-internal-time)) (if timeout
resources-last-used))) (let ((reply-timeout
((reply . timeout) (/ (- timeout
(if (and timeout current-internal-time)
(< timeout current-internal-time)) internal-time-units-per-second)))
(waiter-loop (safe-deq waiters)) ;; Don't sleep in this fiber, so spawn a
(if timeout ;; new fiber to handle handing over the
(let ((reply-timeout ;; resource, and returning it if there's
(/ (- timeout ;; a timeout
current-internal-time) (spawn-fiber-for-checkout reply
internal-time-units-per-second))) reply-timeout
;; Don't sleep in this fiber, so spawn a resource))
;; new fiber to handle handing over the (put-message reply (cons 'success
;; resource, and returning it if there's resource))))))))
;; a timeout #:unwind? #t)
(spawn-fiber-for-checkout reply (loop resources
reply-timeout available
resource)) waiters
(put-message reply (cons 'success (begin
resource)))) (list-set!
(loop resources resources-last-used
available (list-index (lambda (x)
waiters (eq? x resource))
(if (eq? 'return-failed-checkout resources)
return-type) current-internal-time)
(begin resources-last-used)))))
(list-set! resources-checkout-count
resource-index
(- (list-ref resources-checkout-count
resource-index)
1))
resources-checkout-count)
resources-checkout-count)
(begin
(list-set!
resources-last-used
resource-index
current-internal-time)
resources-last-used))))))))
(('remove resource) (('remove resource)
(let ((index (let ((index
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources))) resources)))
(when (and (not (q-empty? waiters))
(< (- (length resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop (if index (loop (if index
(remove-at-index! resources index) (remove-at-index! resources index)
(begin (begin
@ -1013,9 +979,6 @@
resources)) resources))
available ; resource shouldn't be in this list available ; resource shouldn't be in this list
waiters waiters
(remove-at-index!
resources-checkout-count
index)
(remove-at-index! (remove-at-index!
resources-last-used resources-last-used
index)))) index))))
@ -1026,7 +989,6 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('list-resources reply) (('list-resources reply)
@ -1037,7 +999,6 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('stats reply timeout-time) (('stats reply timeout-time)
@ -1045,7 +1006,6 @@
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . ,(length available)) (available . ,(length available))
(waiters . ,(q-length waiters)) (waiters . ,(q-length waiters))
(resources-checkout-count . ,resources-checkout-count)
(checkout-failure-count . ,checkout-failure-count)))) (checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber (spawn-fiber
@ -1065,7 +1025,6 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used)) resources-last-used))
(('check-for-idle-resources) (('check-for-idle-resources)
@ -1106,7 +1065,6 @@
(loop resources (loop resources
(lset-difference eq? available resources-to-destroy) (lset-difference eq? available resources-to-destroy)
waiters waiters
resources-checkout-count
resources-last-used)))) resources-last-used))))
(('destroy) (('destroy)
@ -1170,7 +1128,6 @@
(loop resources (loop resources
available available
waiters waiters
resources-checkout-count
resources-last-used))))) resources-last-used)))))
(spawn-fiber (spawn-fiber

View file

@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
(sleep 1) (sleep 1)
(destructor/safe args))))) (destructor/safe args)))))
(define (process thread-index channel args) (define (process channel args)
(let loop ((lifetime thread-lifetime)) (let loop ()
(match (get-message channel) (match (get-message channel)
('destroy #f) ('destroy #f)
((reply sent-time proc) ((reply sent-time proc)
@ -292,9 +292,6 @@ from there, or #f if that would be an empty string."
internal-time-units-per-second) internal-time-units-per-second)
exn)) exn))
(lambda () (lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(let ((stack (let ((stack
@ -322,10 +319,6 @@ from there, or #f if that would be an empty string."
vals)))))) vals))))))
#:unwind? #t))) #:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply (put-message reply
response) response)
@ -342,11 +335,7 @@ from there, or #f if that would be an empty string."
(if (and exception? (if (and exception?
expire-on-exception?) expire-on-exception?)
#t #t
(if lifetime (loop))))))))
(if (<= 1 lifetime)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(define (start-thread index channel) (define (start-thread index channel)
(call-with-new-thread (call-with-new-thread
@ -369,7 +358,7 @@ from there, or #f if that would be an empty string."
"knots: thread-pool: internal exception: ~A\n" exn)) "knots: thread-pool: internal exception: ~A\n" exn))
(lambda () (lambda ()
(parameterize ((param args)) (parameterize ((param args))
(process index channel args))) (process channel args)))
#:unwind? #t))) #:unwind? #t)))
(when thread-destructor (when thread-destructor
@ -406,8 +395,7 @@ from there, or #f if that would be an empty string."
(expire-on-exception? #f) (expire-on-exception? #f)
(name "unnamed") (name "unnamed")
(use-default-io-waiters? #t) (use-default-io-waiters? #t)
default-checkout-timeout default-checkout-timeout)
default-max-waiters)
"Return a channel used to offload work to a dedicated thread. ARGS are the "Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure." arguments of the thread pool procedure."
(define param (define param
@ -420,6 +408,7 @@ arguments of the thread pool procedure."
1 1
#:thread-initializer thread-initializer #:thread-initializer thread-initializer
#:thread-destructor thread-destructor #:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception? #:expire-on-exception? expire-on-exception?
#:name name #:name name
#:use-default-io-waiters? use-default-io-waiters?)) #:use-default-io-waiters? use-default-io-waiters?))
@ -427,11 +416,9 @@ arguments of the thread pool procedure."
#:destructor destroy-thread-pool #:destructor destroy-thread-pool
#:min-size min-size #:min-size min-size
#:delay-logger delay-logger #:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler #:scheduler scheduler
#:duration-logger duration-logger #:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout #:default-checkout-timeout default-checkout-timeout)))
#:default-max-waiters default-max-waiters)))
(thread-pool resource-pool (thread-pool resource-pool
param))) param)))

View file

@ -211,21 +211,4 @@
(destroy-resource-pool resource-pool)))) (destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(const 'foo)
1
#:lifetime 1
#:destructor
(const #t))))
(for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")

View file

@ -85,26 +85,4 @@
(+ 1 'a)))) (+ 1 'a))))
#:unwind? #t))))) #:unwind? #t)))))
(let ((thread-pool
(make-fixed-size-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 1 #:thread-lifetime 1)))
(for-each
(lambda _
(call-with-thread
thread-pool
(lambda () #f)))
(iota 10)))))
(display "thread-pool test finished successfully\n") (display "thread-pool test finished successfully\n")