diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 301abbd..6e9c353 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -101,16 +101,6 @@ start (cdr end)))) -(define (safe-deq q) - (if (null? (car q)) - #f - (let ((it (caar q)) - (next (cdar q))) - (if (null? next) - (set-cdr! q #f)) - (set-car! q next) - it))) - (define* (make-fixed-size-resource-pool resources #:key (delay-logger (const #f)) @@ -355,35 +345,45 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (let ((current-internal-time - (get-internal-real-time))) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop resources - (cons resource available) - waiters)) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources - available - waiters)))))) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters) + + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters)))) (('list-resources reply) (spawn-fiber @@ -749,7 +749,6 @@ (let loop ((resources '()) (available '()) (waiters (make-q)) - (resources-checkout-count '()) (resources-last-used '())) (match (get-message channel) @@ -763,50 +762,59 @@ (loop (cons resource resources) available waiters - (cons 0 resources-checkout-count) (cons (get-internal-real-time) resources-last-used))) (loop resources available waiters - resources-checkout-count - resources-last-used))) + (cons (get-internal-real-time) + resources-last-used)))) - (let ((current-internal-time - (get-internal-real-time))) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop (cons resource resources) - (cons resource available) - waiters - (cons 0 resources-checkout-count) - (cons current-internal-time - resources-last-used))) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop (cons resource resources) - available - waiters - (cons 1 resources-checkout-count) - (cons current-internal-time - resources-last-used)))))))) + (if (q-empty? waiters) + (loop (cons resource resources) + (cons resource available) + waiters + (cons (get-internal-real-time) + resources-last-used)) + + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -841,12 +849,10 @@ (loop resources available waiters - resources-checkout-count resources-last-used)) (loop resources available (enq! waiters (cons reply timeout-time)) - resources-checkout-count resources-last-used)))) (if timeout-time @@ -869,21 +875,10 @@ (loop resources (cdr available) waiters - (let ((resource-index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (list-set! resources-checkout-count - resource-index - (+ 1 (list-ref - resources-checkout-count - resource-index))) - resources-checkout-count) resources-last-used)) (loop resources available waiters - resources-checkout-count resources-last-used))) (begin (put-message reply (cons 'success @@ -892,16 +887,6 @@ (loop resources (cdr available) waiters - (let ((resource-index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (list-set! resources-checkout-count - resource-index - (+ 1 (list-ref - resources-checkout-count - resource-index))) - resources-checkout-count) resources-last-used))))) (((and (or 'return @@ -914,95 +899,76 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (let ((current-internal-time - (get-internal-real-time)) - (resource-index - (list-index (lambda (x) - (eq? x resource)) - resources))) - (if (and lifetime - (>= (list-ref resources-checkout-count - resource-index) - lifetime)) - (begin - (spawn-fiber-to-destroy-resource resource) - (loop resources - available - waiters - resources-checkout-count - resources-last-used)) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop resources - (cons resource available) - waiters - (if (eq? 'return-failed-checkout - return-type) - (begin - (list-set! resources-checkout-count - resource-index - (- (list-ref resources-checkout-count - resource-index) - 1)) - resources-checkout-count) - resources-checkout-count) - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - resource-index - current-internal-time)) - resources-last-used))) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources - available - waiters - (if (eq? 'return-failed-checkout - return-type) - (begin - (list-set! resources-checkout-count - resource-index - (- (list-ref resources-checkout-count - resource-index) - 1)) - resources-checkout-count) - resources-checkout-count) - (begin - (list-set! - resources-last-used - resource-index - current-internal-time) - resources-last-used)))))))) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) + + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time)) + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used))))) (('remove resource) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) - - (when (and (not (q-empty? waiters)) - (< (- (length resources) 1) - max-size)) - (spawn-fiber-to-return-new-resource)) - (loop (if index (remove-at-index! resources index) (begin @@ -1013,9 +979,6 @@ resources)) available ; resource shouldn't be in this list waiters - (remove-at-index! - resources-checkout-count - index) (remove-at-index! resources-last-used index)))) @@ -1026,7 +989,6 @@ (loop resources available waiters - resources-checkout-count resources-last-used)) (('list-resources reply) @@ -1037,7 +999,6 @@ (loop resources available waiters - resources-checkout-count resources-last-used)) (('stats reply timeout-time) @@ -1045,7 +1006,6 @@ `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) - (resources-checkout-count . ,resources-checkout-count) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1065,7 +1025,6 @@ (loop resources available waiters - resources-checkout-count resources-last-used)) (('check-for-idle-resources) @@ -1106,7 +1065,6 @@ (loop resources (lset-difference eq? available resources-to-destroy) waiters - resources-checkout-count resources-last-used)))) (('destroy) @@ -1170,7 +1128,6 @@ (loop resources available waiters - resources-checkout-count resources-last-used))))) (spawn-fiber diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index cbbaf21..70d7292 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -269,8 +269,8 @@ from there, or #f if that would be an empty string." (sleep 1) (destructor/safe args))))) - (define (process thread-index channel args) - (let loop ((lifetime thread-lifetime)) + (define (process channel args) + (let loop () (match (get-message channel) ('destroy #f) ((reply sent-time proc) @@ -292,9 +292,6 @@ from there, or #f if that would be an empty string." internal-time-units-per-second) exn)) (lambda () - (vector-set! thread-proc-vector - thread-index - proc) (with-exception-handler (lambda (exn) (let ((stack @@ -322,10 +319,6 @@ from there, or #f if that would be an empty string." vals)))))) #:unwind? #t))) - (vector-set! thread-proc-vector - thread-index - #f) - (put-message reply response) @@ -342,11 +335,7 @@ from there, or #f if that would be an empty string." (if (and exception? expire-on-exception?) #t - (if lifetime - (if (<= 1 lifetime) - #t - (loop (- lifetime 1))) - (loop lifetime))))))))) + (loop)))))))) (define (start-thread index channel) (call-with-new-thread @@ -369,7 +358,7 @@ from there, or #f if that would be an empty string." "knots: thread-pool: internal exception: ~A\n" exn)) (lambda () (parameterize ((param args)) - (process index channel args))) + (process channel args))) #:unwind? #t))) (when thread-destructor @@ -406,8 +395,7 @@ from there, or #f if that would be an empty string." (expire-on-exception? #f) (name "unnamed") (use-default-io-waiters? #t) - default-checkout-timeout - default-max-waiters) + default-checkout-timeout) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the thread pool procedure." (define param @@ -420,6 +408,7 @@ arguments of the thread pool procedure." 1 #:thread-initializer thread-initializer #:thread-destructor thread-destructor + #:thread-lifetime thread-lifetime #:expire-on-exception? expire-on-exception? #:name name #:use-default-io-waiters? use-default-io-waiters?)) @@ -427,11 +416,9 @@ arguments of the thread pool procedure." #:destructor destroy-thread-pool #:min-size min-size #:delay-logger delay-logger - #:lifetime thread-lifetime #:scheduler scheduler #:duration-logger duration-logger - #:default-checkout-timeout default-checkout-timeout - #:default-max-waiters default-max-waiters))) + #:default-checkout-timeout default-checkout-timeout))) (thread-pool resource-pool param))) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 3999dde..461d04b 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -211,21 +211,4 @@ (destroy-resource-pool resource-pool)))) -(run-fibers-for-tests - (lambda () - (let ((resource-pool (make-resource-pool - (const 'foo) - 1 - #:lifetime 1 - #:destructor - (const #t)))) - (for-each - (lambda _ - (with-resource-from-pool resource-pool - res - res)) - (iota 20)) - - (destroy-resource-pool resource-pool)))) - (display "resource-pool test finished successfully\n") diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index dd0b852..1c51cb3 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -85,26 +85,4 @@ (+ 1 'a)))) #:unwind? #t))))) -(let ((thread-pool - (make-fixed-size-thread-pool 1 #:thread-lifetime 1))) - - (for-each - (lambda _ - (call-with-thread - thread-pool - (lambda () #f))) - (iota 10))) - -(run-fibers-for-tests - (lambda () - (let ((thread-pool - (make-thread-pool 1 #:thread-lifetime 1))) - - (for-each - (lambda _ - (call-with-thread - thread-pool - (lambda () #f))) - (iota 10))))) - (display "thread-pool test finished successfully\n")