From 86fb460d6a35a7170611d81b9d7280f793f3d34b Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 10:46:46 +0000 Subject: [PATCH 1/7] Simplify using the waiters queue in the resource pool Use a custom dequeue procedure that returns #f rather than raising an exception on an empty queue. --- knots/resource-pool.scm | 268 ++++++++++++++++++---------------------- 1 file changed, 120 insertions(+), 148 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 6e9c353..c233e29 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -101,6 +101,16 @@ start (cdr end)))) +(define (safe-deq q) + (if (null? (car q)) + #f + (let ((it (caar q)) + (next (cdar q))) + (if (null? next) + (set-cdr! q #f)) + (set-car! q next) + it))) + (define* (make-fixed-size-resource-pool resources #:key (delay-logger (const #f)) @@ -345,45 +355,35 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop resources - (cons resource available) - waiters) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop resources - available - waiters)))) + (let ((current-internal-time + (get-internal-real-time))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters)) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters)))))) (('list-resources reply) (spawn-fiber @@ -770,51 +770,39 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (q-empty? waiters) - (loop (cons resource resources) - (cons resource available) - waiters - (cons (get-internal-real-time) - resources-last-used)) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop (cons resource resources) - (cons resource available) - waiters - (cons current-internal-time - resources-last-used)) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop (cons resource resources) - available - waiters - (cons current-internal-time - resources-last-used)))))) + (let ((current-internal-time + (get-internal-real-time))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -899,76 +887,60 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop resources - (cons resource available) - waiters - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time)) - resources-last-used)) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time) - resources-last-used))))) + (let ((current-internal-time + (get-internal-real-time)) + (resource-index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + resource-index + current-internal-time)) + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + resource-index + current-internal-time) + resources-last-used))))))) (('remove resource) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) + + (loop (if index (remove-at-index! resources index) (begin From 40b64e269b34ebf4cfb95155b2aed99200ec1cfa Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 10:51:33 +0000 Subject: [PATCH 2/7] Fix resources-last-used inconsistency --- knots/resource-pool.scm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index c233e29..232df77 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -767,8 +767,7 @@ (loop resources available waiters - (cons (get-internal-real-time) - resources-last-used)))) + resources-last-used))) (let ((current-internal-time (get-internal-real-time))) From a13098494d59fb1e6f8cc980a12f408f58a60727 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:18:23 +0000 Subject: [PATCH 3/7] Fix a bug where resources pools could empty with waiters --- knots/resource-pool.scm | 4 ++++ tests/resource-pool.scm | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 232df77..d802260 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -939,6 +939,10 @@ (eq? x resource)) resources))) + (when (and (not (q-empty? waiters)) + (< (- (length resources) 1) + max-size)) + (spawn-fiber-to-return-new-resource)) (loop (if index (remove-at-index! resources index) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 461d04b..3999dde 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -211,4 +211,21 @@ (destroy-resource-pool resource-pool)))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (const 'foo) + 1 + #:lifetime 1 + #:destructor + (const #t)))) + (for-each + (lambda _ + (with-resource-from-pool resource-pool + res + res)) + (iota 20)) + + (destroy-resource-pool resource-pool)))) + (display "resource-pool test finished successfully\n") From 244607865774eed9d3d7927091ddc3e2c3d3acac Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:19:01 +0000 Subject: [PATCH 4/7] Fix updating the thread-proc-vector in thread pools --- knots/thread-pool.scm | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 70d7292..49572db 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -269,7 +269,7 @@ from there, or #f if that would be an empty string." (sleep 1) (destructor/safe args))))) - (define (process channel args) + (define (process thread-index channel args) (let loop () (match (get-message channel) ('destroy #f) @@ -292,6 +292,9 @@ from there, or #f if that would be an empty string." internal-time-units-per-second) exn)) (lambda () + (vector-set! thread-proc-vector + thread-index + proc) (with-exception-handler (lambda (exn) (let ((stack @@ -319,6 +322,10 @@ from there, or #f if that would be an empty string." vals)))))) #:unwind? #t))) + (vector-set! thread-proc-vector + thread-index + #f) + (put-message reply response) @@ -358,7 +365,7 @@ from there, or #f if that would be an empty string." "knots: thread-pool: internal exception: ~A\n" exn)) (lambda () (parameterize ((param args)) - (process channel args))) + (process index channel args))) #:unwind? #t))) (when thread-destructor From e78e41b5423d7f79c07cdad2f3a26297475f8901 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:19:30 +0000 Subject: [PATCH 5/7] Pass through default-max-waiters in make-thread-pool --- knots/thread-pool.scm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 49572db..df4352a 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -402,7 +402,8 @@ from there, or #f if that would be an empty string." (expire-on-exception? #f) (name "unnamed") (use-default-io-waiters? #t) - default-checkout-timeout) + default-checkout-timeout + default-max-waiters) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the thread pool procedure." (define param @@ -425,7 +426,8 @@ arguments of the thread pool procedure." #:delay-logger delay-logger #:scheduler scheduler #:duration-logger duration-logger - #:default-checkout-timeout default-checkout-timeout))) + #:default-checkout-timeout default-checkout-timeout + #:default-max-waiters default-max-waiters))) (thread-pool resource-pool param))) From 1a476b5aa8b1fc2cd14ffb488b41da8d4eb95cef Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:20:10 +0000 Subject: [PATCH 6/7] Implement lifetime support in the resource pool --- knots/resource-pool.scm | 148 +++++++++++++++++++++++++++++----------- 1 file changed, 108 insertions(+), 40 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index d802260..301abbd 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -749,6 +749,7 @@ (let loop ((resources '()) (available '()) (waiters (make-q)) + (resources-checkout-count '()) (resources-last-used '())) (match (get-message channel) @@ -762,11 +763,13 @@ (loop (cons resource resources) available waiters + (cons 0 resources-checkout-count) (cons (get-internal-real-time) resources-last-used))) (loop resources available waiters + resources-checkout-count resources-last-used))) (let ((current-internal-time @@ -777,6 +780,7 @@ (loop (cons resource resources) (cons resource available) waiters + (cons 0 resources-checkout-count) (cons current-internal-time resources-last-used))) ((reply . timeout) @@ -800,6 +804,7 @@ (loop (cons resource resources) available waiters + (cons 1 resources-checkout-count) (cons current-internal-time resources-last-used)))))))) @@ -836,10 +841,12 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (loop resources available (enq! waiters (cons reply timeout-time)) + resources-checkout-count resources-last-used)))) (if timeout-time @@ -862,10 +869,21 @@ (loop resources (cdr available) waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) resources-last-used)) (loop resources available waiters + resources-checkout-count resources-last-used))) (begin (put-message reply (cons 'success @@ -874,6 +892,16 @@ (loop resources (cdr available) waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) resources-last-used))))) (((and (or 'return @@ -892,46 +920,77 @@ (list-index (lambda (x) (eq? x resource)) resources))) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop resources - (cons resource available) - waiters - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - resource-index - current-internal-time)) - resources-last-used))) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - resource-index - current-internal-time) - resources-last-used))))))) + (if (and lifetime + (>= (list-ref resources-checkout-count + resource-index) + lifetime)) + (begin + (spawn-fiber-to-destroy-resource resource) + (loop resources + available + waiters + resources-checkout-count + resources-last-used)) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + resource-index + current-internal-time)) + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (list-set! + resources-last-used + resource-index + current-internal-time) + resources-last-used)))))))) (('remove resource) (let ((index @@ -954,6 +1013,9 @@ resources)) available ; resource shouldn't be in this list waiters + (remove-at-index! + resources-checkout-count + index) (remove-at-index! resources-last-used index)))) @@ -964,6 +1026,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('list-resources reply) @@ -974,6 +1037,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('stats reply timeout-time) @@ -981,6 +1045,7 @@ `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) + (resources-checkout-count . ,resources-checkout-count) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1000,6 +1065,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('check-for-idle-resources) @@ -1040,6 +1106,7 @@ (loop resources (lset-difference eq? available resources-to-destroy) waiters + resources-checkout-count resources-last-used)))) (('destroy) @@ -1103,6 +1170,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used))))) (spawn-fiber From 05ad83c7031de9b0d1873a0d5aec630746342a06 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:37:26 +0000 Subject: [PATCH 7/7] Implement lifetime support for thread pools --- knots/thread-pool.scm | 10 +++++++--- tests/thread-pool.scm | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index df4352a..cbbaf21 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -270,7 +270,7 @@ from there, or #f if that would be an empty string." (destructor/safe args))))) (define (process thread-index channel args) - (let loop () + (let loop ((lifetime thread-lifetime)) (match (get-message channel) ('destroy #f) ((reply sent-time proc) @@ -342,7 +342,11 @@ from there, or #f if that would be an empty string." (if (and exception? expire-on-exception?) #t - (loop)))))))) + (if lifetime + (if (<= 1 lifetime) + #t + (loop (- lifetime 1))) + (loop lifetime))))))))) (define (start-thread index channel) (call-with-new-thread @@ -416,7 +420,6 @@ arguments of the thread pool procedure." 1 #:thread-initializer thread-initializer #:thread-destructor thread-destructor - #:thread-lifetime thread-lifetime #:expire-on-exception? expire-on-exception? #:name name #:use-default-io-waiters? use-default-io-waiters?)) @@ -424,6 +427,7 @@ arguments of the thread pool procedure." #:destructor destroy-thread-pool #:min-size min-size #:delay-logger delay-logger + #:lifetime thread-lifetime #:scheduler scheduler #:duration-logger duration-logger #:default-checkout-timeout default-checkout-timeout diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index 1c51cb3..dd0b852 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -85,4 +85,26 @@ (+ 1 'a)))) #:unwind? #t))))) +(let ((thread-pool + (make-fixed-size-thread-pool 1 #:thread-lifetime 1))) + + (for-each + (lambda _ + (call-with-thread + thread-pool + (lambda () #f))) + (iota 10))) + +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 1 #:thread-lifetime 1))) + + (for-each + (lambda _ + (call-with-thread + thread-pool + (lambda () #f))) + (iota 10))))) + (display "thread-pool test finished successfully\n")