From a73fd1ca5091e2eb089cf63583ce54b6415561f5 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 10 Mar 2025 21:33:29 +0000 Subject: [PATCH 01/10] Add a fibers-parallel test --- tests/parallelism.scm | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index f005d71..9881a4d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -93,4 +93,22 @@ 1)) #:unwind? #t))) +(run-fibers-for-tests + (lambda () + (let ((a 0)) + (call-with-values + (lambda () + (fibers-parallel + (begin + (sleep 1) + 1) + (begin + (set! a 1) + 2))) + (lambda (a b) + (assert-equal a 1) + (assert-equal b 2))) + + (assert-equal a 1)))) + (display "parallelism test finished successfully\n") From da69fd19f3072a405e394881a11c345704f31806 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 11 Mar 2025 11:55:42 +0000 Subject: [PATCH 02/10] Unwind before calling put-message As I think this might be more reliable in case the stack contains something that would introduce a continuation barrier. --- knots/parallelism.scm | 87 +++++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 77272dd..ab398f8 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 control) + #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) @@ -43,27 +44,30 @@ (let ((reply (make-channel))) (spawn-fiber (lambda () - (call-with-escape-continuation - (lambda (return) - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (put-message reply - (list 'exception - (make-exception - exn - (make-knots-exception stack))))))) - (return)) - (lambda () - (call-with-values - (lambda () - (start-stack #t (thunk))) - (lambda vals - (put-message reply vals)))))))) + (with-exception-handler + (lambda (exn) + (put-message + reply + (list 'exception exn))) + (lambda () + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) + (lambda () + (call-with-values + (lambda () + (start-stack #t (thunk))) + (lambda vals + (put-message reply vals)))))) + #:unwind? #t)) #:parallel? #t) reply)) @@ -245,25 +249,28 @@ (get-message process-channel)))) (put-message reply-channel - (call-with-escape-continuation - (lambda (return) - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (return (list 'exception - (make-exception - exn - (make-knots-exception stack)))))))) - (lambda () - (call-with-values - (lambda () - (start-stack #t (apply proc args))) - (lambda vals - (cons 'result vals))))))))))) + (with-exception-handler + (lambda (exn) + (list 'exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) + (lambda () + (call-with-values + (lambda () + (start-stack #t (apply proc args))) + (lambda vals + (cons 'result vals)))))) + #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) From e1858dfff5e360b36b167cd94c806759f9b4e7e8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 14 Mar 2025 14:27:29 +0000 Subject: [PATCH 03/10] Remove the web-server exception handler This turned out not to be useful, since I wanted to handle exceptions happening in the exception handler, so it didn't really help in the end to allow customising it. --- knots/web-server.scm | 69 ++++++-------------------------------------- tests/web-server.scm | 22 -------------- 2 files changed, 9 insertions(+), 82 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index 9b98018..14d32f6 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -48,7 +48,6 @@ request-body-port/knots read-request-body/knots - default-exception-handler default-write-response-exception-handler web-server? @@ -310,7 +309,7 @@ on the procedure being called at any particular time." ;; Close the client port #f) -(define (default-exception-handler exn request) +(define (exception-handler exn request) (let* ((error-string (call-with-output-string (lambda (port) @@ -332,8 +331,7 @@ on the procedure being called at any particular time." (define (handle-request handler client read-request-exception-handler - write-response-exception-handler - exception-handler) + write-response-exception-handler) (let ((request (with-exception-handler read-request-exception-handler @@ -356,58 +354,14 @@ on the procedure being called at any particular time." (lambda (return) (with-exception-handler (lambda (exn) - (with-exception-handler - (lambda (exn) - (call-with-values - (lambda () - (default-exception-handler - (make-exception - exn - (make-exception-with-message - "exception in exception handler") - (make-exception-with-irritants - exception-handler)) - request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response - request - response - body)) - return))))) - (lambda () + (call-with-values + (lambda () + (exception-handler exn request)) + (lambda (response body) (call-with-values (lambda () - (exception-handler exn request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response request response body)) - return)) - (other - (call-with-values - (lambda () - (default-exception-handler - (make-exception-with-irritants - (list (make-exception-with-message - (simple-format - #f - "wrong number of values returned from exception handler, expecting 2, got ~A" - (length other))) - exception-handler)) - request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response - request - response - body)) - return)))))))))) + (sanitize-response request response body)) + return)))) (lambda () (start-stack #t @@ -475,7 +429,6 @@ on the procedure being called at any particular time." #:unwind? #t)))) (define* (client-loop client handler - exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout @@ -517,8 +470,7 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler - exception-handler))) + write-response-exception-handler))) (if keep-alive? (loop) (close-port client))))))) @@ -537,8 +489,6 @@ on the procedure being called at any particular time." INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port)) - (exception-handler - default-exception-handler) (read-request-exception-handler default-read-request-exception-handler) (write-response-exception-handler @@ -577,7 +527,6 @@ before sending back to the client." ((client . sockaddr) (spawn-fiber (lambda () (client-loop client handler - exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout diff --git a/tests/web-server.scm b/tests/web-server.scm index c21851e..9ac58bb 100644 --- a/tests/web-server.scm +++ b/tests/web-server.scm @@ -52,28 +52,6 @@ uri #:port (non-blocking-open-socket-for-uri uri))))))) -(run-fibers-for-tests - (lambda () - (let* ((web-server - (run-knots-web-server - (lambda (request) - "Hello, World!") - #:port 0 - #:exception-handler - (lambda (exn request) - "Error"))) ;; Bind to any port - (port - (web-server-port web-server)) - (uri - (build-uri 'http #:host "127.0.0.1" #:port port))) - - (assert-equal - 500 - (response-code - (http-get - uri - #:port (non-blocking-open-socket-for-uri uri))))))) - (run-fibers-for-tests (lambda () (let* ((web-server From 8c0f04be4f5fcdf9d1ec9b6a99df5f3b571de256 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 1 Apr 2025 17:46:39 +0300 Subject: [PATCH 04/10] Don't call put-message without unwinding the stack When handling exceptions, as this is error prone. --- knots/resource-pool.scm | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index f4522be..1b50482 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -650,12 +650,27 @@ available. Return the resource once PROC has returned." (lambda () (with-exception-handler (lambda (exn) - (print-backtrace-and-exception/knots exn) + ;; Unwind the stack before calling put-message, as + ;; this avoids inconsistent behaviour with + ;; continuation barriers (put-message (resource-pool-channel pool) `(return ,resource)) (raise-exception exn)) (lambda () - (proc resource)))) + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) + (lambda () + (proc resource)))) + #:unwind? #t)) (lambda vals (put-message (resource-pool-channel pool) `(return ,resource)) From 4f0eafef0a2f6b613b24f260da8fdcda48574a11 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 27 Apr 2025 09:41:56 +0100 Subject: [PATCH 05/10] Resource pool max waiters and destroy changes Add the ability to specify the max number of waiters for a resource pool, this provides a more efficient way of avoiding waiters for a resource pool continually rising. This commit also improves the destroy behaviour. --- knots/resource-pool.scm | 351 +++++++++++++++++++++++++++++++++------- tests/resource-pool.scm | 44 +++++ 2 files changed, 333 insertions(+), 62 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 1b50482..c9aab02 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -43,6 +43,15 @@ resource-pool-timeout-error-pool resource-pool-timeout-error? + &resource-pool-too-many-waiters + resource-pool-too-many-waiters-error-pool + resource-pool-too-many-waiters-error-waiters-count + resource-pool-too-many-waiters-error? + + &resource-pool-destroyed + resource-pool-destroyed-error-pool + resource-pool-destroyed-error? + resource-pool-default-timeout-handler call-with-resource-from-pool @@ -86,7 +95,8 @@ scheduler (name "unnamed") (add-resources-parallelism 1) - default-checkout-timeout) + default-checkout-timeout + default-max-waiters) (define channel (make-channel)) (define pool @@ -102,7 +112,8 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (default-checkout-timeout . ,default-checkout-timeout)))) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) @@ -186,7 +197,8 @@ (perform-operation (choice-operation (wrap-operation - (put-operation reply-channel resource) + (put-operation reply-channel + (cons 'success resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) @@ -196,6 +208,116 @@ channel (list 'return-failed-checkout resource))))))) + (define (destroy-loop resources destroy-waiters) + (let loop ((resources resources) + (destroy-waiters destroy-waiters)) + (match (get-message channel) + (('add-resource resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (loop resources + destroy-waiters)) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + destroy-waiters)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (for-each + (lambda (destroy-waiter) + (spawn-fiber + (lambda () + (put-message destroy-waiter 'destroy-success)))) + destroy-waiters) + + ;; No loop + *unspecified*) + (loop new-resources + destroy-waiters))))) + + (('stats reply) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 5) + (const #f))))))) + + (loop resources + destroy-waiters)) + + (('check-for-idle-resources) + (loop resources + destroy-waiters)) + + (('destroy reply) + (loop resources + (cons reply destroy-waiters))) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + destroy-waiters))))) + (define (main-loop) (let loop ((resources '()) (available '()) @@ -257,7 +379,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop (cons resource resources) available @@ -265,17 +388,45 @@ (cons (get-internal-real-time) resources-last-used))))))))) - (('checkout reply timeout-time) + (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) - (loop resources - available - (cons (cons reply timeout-time) - waiters) - resources-last-used)) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters + resources-last-used)) + (loop resources + available + (cons (cons reply timeout-time) + waiters) + resources-last-used)))) (if timeout-time (let ((current-internal-time @@ -303,7 +454,8 @@ waiters resources-last-used))) (begin - (put-message reply (car available)) + (put-message reply (cons 'success + (car available))) (loop resources (cdr available) @@ -369,7 +521,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop resources available @@ -473,7 +626,8 @@ resources-last-used)))) (('destroy reply) - (if (null? resources) + (if (and (null? resources) + (null? waiters)) (put-message reply 'destroy-success) (begin @@ -488,16 +642,34 @@ #:parallel? #t))) available) - (spawn-fiber - (lambda () - (sleep 0.1) - (put-message channel - (list 'destroy reply)))) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters)) - (loop resources - '() - waiters - resources-last-used)))) + (destroy-loop resources + (list reply))))) (unknown (simple-format @@ -575,12 +747,50 @@ (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) +(define &resource-pool-too-many-waiters + (make-exception-type '&recource-pool-too-many-waiters + &error + '(pool waiters-count))) + +(define resource-pool-too-many-waiters-error-pool + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'pool))) + +(define resource-pool-too-many-waiters-error-waiters-count + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'waiters-count))) + +(define make-resource-pool-too-many-waiters-error + (record-constructor &resource-pool-too-many-waiters)) + +(define resource-pool-too-many-waiters-error? + (record-predicate &resource-pool-too-many-waiters)) + +(define &resource-pool-destroyed + (make-exception-type '&recource-pool-destroyed + &error + '(pool))) + +(define resource-pool-destroyed-error-pool + (exception-accessor + &resource-pool-destroyed + (record-accessor &resource-pool-destroyed 'pool))) + +(define make-resource-pool-destroyed-error + (record-constructor &resource-pool-destroyed)) + +(define resource-pool-destroyed-error? + (record-predicate &resource-pool-destroyed)) + (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (resource-pool-default-timeout-handler))) + (timeout-handler (resource-pool-default-timeout-handler)) + (max-waiters 'default)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -590,7 +800,13 @@ available. Return the resource once PROC has returned." 'default-checkout-timeout) timeout)) - (let ((resource + (define max-waiters-or-default + (if (eq? max-waiters 'default) + (assq-ref (resource-pool-configuration pool) + 'default-max-waiters) + max-waiters)) + + (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) @@ -603,7 +819,8 @@ available. Return the resource once PROC has returned." reply (+ start-time (* timeout-or-default - internal-time-units-per-second)))) + internal-time-units-per-second)) + max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) @@ -629,52 +846,62 @@ available. Return the resource once PROC has returned." 0) (loop (make-channel) start-time) - #f) + 'timeout) response)) - #f))))) + 'timeout))))) (let loop ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'checkout reply - #f)) + #f + max-waiters-or-default)) (get-message reply))))) - (when (not resource) - (when timeout-handler - (timeout-handler pool proc timeout)) + (match reply + ('timeout + (when timeout-handler + (timeout-handler pool proc timeout)) - (raise-exception - (make-resource-pool-timeout-error pool))) + (raise-exception + (make-resource-pool-timeout-error pool))) + (('too-many-waiters . count) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - ;; Unwind the stack before calling put-message, as - ;; this avoids inconsistent behaviour with - ;; continuation barriers - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) - (lambda () - (proc resource)))) - #:unwind? #t)) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals))))) + (raise-exception + (make-resource-pool-too-many-waiters-error pool + count))) + (('resource-pool-destroyed . #f) + (raise-exception + (make-resource-pool-destroyed-error pool))) + (('success . resource) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + ;; Unwind the stack before calling put-message, as + ;; this avoids inconsistent behaviour with + ;; continuation barriers + (put-message (resource-pool-channel pool) + `(return ,resource)) + (raise-exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) + (lambda () + (proc resource)))) + #:unwind? #t)) + (lambda vals + (put-message (resource-pool-channel pool) + `(return ,resource)) + (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1251429..1bc09e5 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -142,4 +142,48 @@ 20 (iota 50))))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (lambda () #f) + 1 + #:default-max-waiters 1))) + (call-with-resource-from-pool + resource-pool + (lambda (res) + + ;; 1st waiter + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + (if (resource-pool-destroyed-error? exn) + #t + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t))) + + (while (= 0 + (assq-ref + (resource-pool-stats resource-pool) + 'waiters)) + (sleep 0)) + + (with-exception-handler + (lambda (exn) + (if (resource-pool-too-many-waiters-error? exn) + #t + (raise-exception exn))) + (lambda () + ;; 2nd waiter + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t)))))) + (display "resource-pool test finished successfully\n") From 68cfbe0380085735086b8952e7e700fda0028ad0 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 27 Apr 2025 10:03:06 +0100 Subject: [PATCH 06/10] Use a condition for destroying resource pools This avoids the situation where the resource pool is destroyed, so there's no fiber to listen to the destroy request. --- knots/resource-pool.scm | 70 +++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index c9aab02..5c96eef 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -29,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) + #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (resource-pool? @@ -71,11 +72,12 @@ (record-predicate &resource-pool-abort-add-resource)) (define-record-type - (make-resource-pool-record name channel configuration) + (make-resource-pool-record name channel destroy-condition configuration) resource-pool? - (name resource-pool-name) - (channel resource-pool-channel) - (configuration resource-pool-configuration)) + (name resource-pool-name) + (channel resource-pool-channel) + (destroy-condition resource-pool-destroy-condition) + (configuration resource-pool-configuration)) (set-record-type-printer! @@ -98,11 +100,14 @@ default-checkout-timeout default-max-waiters) (define channel (make-channel)) + (define destroy-condition + (make-condition)) (define pool (make-resource-pool-record name channel + destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) @@ -208,16 +213,14 @@ channel (list 'return-failed-checkout resource))))))) - (define (destroy-loop resources destroy-waiters) - (let loop ((resources resources) - (destroy-waiters destroy-waiters)) + (define (destroy-loop resources) + (let loop ((resources resources)) (match (get-message channel) (('add-resource resource) (when destructor (spawn-fiber-to-destroy-resource resource)) - (loop resources - destroy-waiters)) + (loop resources)) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () @@ -237,8 +240,7 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop resources - destroy-waiters)) + (loop resources)) (((and (or 'return 'return-failed-checkout 'remove) @@ -270,17 +272,11 @@ resources)))) (if (null? new-resources) (begin - (for-each - (lambda (destroy-waiter) - (spawn-fiber - (lambda () - (put-message destroy-waiter 'destroy-success)))) - destroy-waiters) + (signal-condition! destroy-condition) ;; No loop *unspecified*) - (loop new-resources - destroy-waiters))))) + (loop new-resources))))) (('stats reply) (let ((stats @@ -299,24 +295,20 @@ (wrap-operation (sleep-operation 5) (const #f))))))) - (loop resources - destroy-waiters)) + (loop resources)) (('check-for-idle-resources) - (loop resources - destroy-waiters)) + (loop resources)) (('destroy reply) - (loop resources - (cons reply destroy-waiters))) + (loop resources)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop resources - destroy-waiters))))) + (loop resources))))) (define (main-loop) (let loop ((resources '()) @@ -625,10 +617,11 @@ waiters resources-last-used)))) - (('destroy reply) + (('destroy) (if (and (null? resources) (null? waiters)) - (put-message reply 'destroy-success) + (signal-condition! + destroy-condition) (begin (for-each @@ -668,8 +661,7 @@ op)))))))) waiters)) - (destroy-loop resources - (list reply))))) + (destroy-loop resources)))) (unknown (simple-format @@ -724,12 +716,16 @@ pool) (define (destroy-resource-pool pool) - (let ((reply (make-channel))) - (put-message (resource-pool-channel pool) - (list 'destroy reply)) - (let ((msg (get-message reply))) - (unless (eq? msg 'destroy-success) - (error msg))))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + (list 'destroy)) + (lambda _ + (wait (resource-pool-destroy-condition pool)))) + (wait-operation + (resource-pool-destroy-condition pool)))) + #t) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout From 1dca6d755e910b67636b791242f0948356cf8c4d Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 27 Apr 2025 10:52:36 +0100 Subject: [PATCH 07/10] Allow specifying the resource-pool-channel --- knots/resource-pool.scm | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 5c96eef..ea87a79 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -786,7 +786,8 @@ (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler)) - (max-waiters 'default)) + (max-waiters 'default) + (channel (resource-pool-channel pool))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -810,7 +811,7 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation (resource-pool-channel pool) + (put-operation channel (list 'checkout reply (+ start-time @@ -846,7 +847,7 @@ available. Return the resource once PROC has returned." response)) 'timeout))))) (let loop ((reply (make-channel))) - (put-message (resource-pool-channel pool) + (put-message channel (list 'checkout reply #f From 838ee6f1e368b28724c41f35d06ccf62a700c424 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 28 Apr 2025 09:20:33 +0100 Subject: [PATCH 08/10] Enable destroying individual resources in a resource pool --- knots/resource-pool.scm | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index ea87a79..bd69f95 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -53,6 +53,10 @@ resource-pool-destroyed-error-pool resource-pool-destroyed-error? + &resource-pool-destroy-resource + make-resource-pool-destroy-resource-exception + resource-pool-destroy-resource-exception? + resource-pool-default-timeout-handler call-with-resource-from-pool @@ -555,6 +559,14 @@ resources-last-used index)))) + (('destroy resource) + (spawn-fiber-to-destroy-resource resource) + + (loop resources + available + waiters + resources-last-used)) + (('stats reply) (let ((stats `((resources . ,(length resources)) @@ -780,6 +792,17 @@ (define resource-pool-destroyed-error? (record-predicate &resource-pool-destroyed)) +(define &resource-pool-destroy-resource + (make-exception-type '&recource-pool-destroy-resource + &exception + '())) + +(define make-resource-pool-destroy-resource-exception + (record-constructor &resource-pool-destroy-resource)) + +(define resource-pool-destroy-resource-exception? + (record-predicate &resource-pool-destroy-resource)) + (define resource-pool-default-timeout-handler (make-parameter #f)) @@ -787,7 +810,8 @@ pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler)) (max-waiters 'default) - (channel (resource-pool-channel pool))) + (channel (resource-pool-channel pool)) + (destroy-resource-on-exception? #f)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -877,9 +901,15 @@ available. Return the resource once PROC has returned." ;; Unwind the stack before calling put-message, as ;; this avoids inconsistent behaviour with ;; continuation barriers - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exn)) + (put-message + (resource-pool-channel pool) + (list (if (or destroy-resource-on-exception? + (resource-pool-destroy-resource-exception? exn)) + 'destroy + 'return) + resource)) + (unless (resource-pool-destroy-resource-exception? exn) + (raise-exception exn))) (lambda () (with-exception-handler (lambda (exn) From 8c63ed7b4e9bae10e956672ac0d021739823c395 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 28 Apr 2025 10:08:00 +0100 Subject: [PATCH 09/10] Support listing resource pool resources --- knots/resource-pool.scm | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index bd69f95..76c11e6 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -567,6 +567,16 @@ waiters resources-last-used)) + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters + resources-last-used)) + (('stats reply) (let ((stats `((resources . ,(length resources)) @@ -965,3 +975,8 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error pool)))))) +(define (resource-pool-list-resources pool) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + (list 'list-resources reply)) + (get-message reply))) From 7ba77010ae98e675340a7ea22b400f0dcc20ef65 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 15 May 2025 09:25:30 +0100 Subject: [PATCH 10/10] Handle %stacks not being a pair Not sure when this would happen, but guard against it. --- knots.scm | 13 ++++++++----- knots/parallelism.scm | 42 +++++++++++++++++++++++------------------ knots/resource-pool.scm | 15 +++++++++------ knots/thread-pool.scm | 21 ++++++++++++--------- 4 files changed, 53 insertions(+), 38 deletions(-) diff --git a/knots.scm b/knots.scm index 8e31c6b..42f2af7 100644 --- a/knots.scm +++ b/knots.scm @@ -67,11 +67,14 @@ (define* (print-backtrace-and-exception/knots exn #:key (port (current-error-port))) - (let* ((stack (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))))) + (let* ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t)))) (error-string (call-with-output-string (lambda (port) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index ab398f8..f8b2b8b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -52,15 +52,18 @@ (lambda () (with-exception-handler (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) (lambda () (call-with-values (lambda () @@ -255,15 +258,18 @@ (lambda () (with-exception-handler (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) (lambda () (call-with-values (lambda () diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 76c11e6..da52051 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -923,15 +923,18 @@ available. Return the resource once PROC has returned." (lambda () (with-exception-handler (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) (raise-exception (make-exception exn - (make-knots-exception stack))))))) + (make-knots-exception stack))))) (lambda () (proc resource)))) #:unwind? #t)) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index f2f174e..14a8125 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -251,15 +251,18 @@ arguments of the thread pool procedure." proc) (with-exception-handler (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) (lambda () (call-with-values (lambda ()