From 7c2c6f2de9e4ebeab8de78077cbb2a0b7c585e6b Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 11:59:22 +0200 Subject: [PATCH 01/41] WIP --- .forgejo/workflows/demo.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .forgejo/workflows/demo.yaml diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml new file mode 100644 index 0000000..935846f --- /dev/null +++ b/.forgejo/workflows/demo.yaml @@ -0,0 +1,13 @@ +on: + push: + branches: + - actions-test +jobs: + test: + runs-on: host + steps: + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git knots-trunk + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages + - run: | + cd knots-trunk + guix shell -D -f guix-dev.scm -- documenta api knots From 7f5f05ef2b1d62ba0bc0b1a37986c3d4bb2a5f99 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 20:56:56 +0200 Subject: [PATCH 02/41] WIP --- .forgejo/workflows/demo.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 935846f..87ff5f8 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,3 +11,12 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o pages/index.html doc/index.texi + + - run: | + cd knots-pages + git add . + git config user.email <> + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push From 81dd3370e67c159175ecad6214398f57da0b25e8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 20:59:57 +0200 Subject: [PATCH 03/41] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 87ff5f8..34fea34 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -16,7 +16,7 @@ jobs: - run: | cd knots-pages git add . - git config user.email <> + git config user.email "" git config user.name "Automatic website updater" git commit -m "Automatic website update" git push From eadfa53b36cbd3d78daacfacf0499efef73f6624 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:00:54 +0200 Subject: [PATCH 04/41] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 34fea34..8fb0304 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,7 +11,7 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots - guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o pages/index.html doc/index.texi + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages From 003c5aa6b0078a9fa97ca54537e3798d911a77df Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:03:34 +0200 Subject: [PATCH 05/41] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 8fb0304..c3f4156 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,7 +11,7 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots - guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o ../knots-pages/index.html doc/index.texi + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages From 2e25c3b074ff218498631108559cdcb014e289c4 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:07:29 +0200 Subject: [PATCH 06/41] Add workflow for building the website --- .forgejo/workflows/build-website.yaml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .forgejo/workflows/build-website.yaml diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml new file mode 100644 index 0000000..ac6261f --- /dev/null +++ b/.forgejo/workflows/build-website.yaml @@ -0,0 +1,22 @@ +on: + push: + branches: + - trunk +jobs: + test: + runs-on: host + steps: + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git knots-trunk + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages + - run: | + cd knots-trunk + guix shell -D -f guix-dev.scm -- documenta api knots + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi + + - run: | + cd knots-pages + git add . + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push From edf62414eebb661729b6b6f564c6ce7a1be27d07 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:14:52 +0200 Subject: [PATCH 07/41] Avoid workflow erroring if there's nothing to change --- .forgejo/workflows/build-website.yaml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index ac6261f..d859b76 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -16,7 +16,11 @@ jobs: - run: | cd knots-pages git add . - git config user.email "" - git config user.name "Automatic website updater" - git commit -m "Automatic website update" - git push + if [[ -z "$(git status -s)" ]]; then + echo "Nothing to push" + else + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push + fi From ab5411da423043f2b8a0e27c7507f8d9c34686a2 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 25 Jun 2025 18:46:46 +0200 Subject: [PATCH 08/41] Make resource pool changes and add parallelism limiter This was motivated by trying to allow for completely cleaning up resource pools, which involved removing their use of fiberize which currently has no destroy mechanism. As part of this, there's a new parallelism limiter mechanism using resource pools rather than fibers, and also a fixed size resource pool. The tests now drain? and destroy the resource pools to check cleaning up. --- knots/parallelism.scm | 38 ++- knots/resource-pool.scm | 719 ++++++++++++++++++++++++++++++++-------- tests.scm | 6 +- tests/parallelism.scm | 12 + tests/resource-pool.scm | 47 ++- 5 files changed, 669 insertions(+), 153 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f8b2b8b..9e80f5b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -20,6 +20,8 @@ (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -27,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map @@ -38,7 +41,13 @@ fibers-parallel fibers-let - fiberize)) + fiberize + + make-parallelism-limiter + parallelism-limiter? + destroy-parallelism-limiter + call-with-parallelism-limiter + with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -287,3 +296,30 @@ (('result . vals) (apply values vals)) (('exception exn) (raise-exception exn)))))) + +(define-record-type + (make-parallelism-limiter-record resource-pool) + parallelism-limiter? + (resource-pool parallelism-limiter-resource-pool)) + +(define* (make-parallelism-limiter limit #:key (name "unnamed")) + (make-parallelism-limiter-record + (make-fixed-size-resource-pool + (iota limit) + #:name name))) + +(define (destroy-parallelism-limiter parallelism-limiter) + (destroy-resource-pool + (parallelism-limiter-resource-pool + parallelism-limiter))) + +(define* (call-with-parallelism-limiter parallelism-limiter thunk) + (call-with-resource-from-pool + (parallelism-limiter-resource-pool parallelism-limiter) + (lambda _ + (thunk)))) + +(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) + (call-with-parallelism-limiter + parallelism-limiter + (lambda () exp ...))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index da52051..71c378c 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -32,9 +32,10 @@ #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) - #:export (resource-pool? - + #:export (make-fixed-size-resource-pool make-resource-pool + + resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration @@ -91,6 +92,429 @@ (resource-pool-name resource-pool)) port))) +(define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + +(define* (make-fixed-size-resource-pool resources + #:key + (delay-logger (const #f)) + (duration-logger (const #f)) + destructor + scheduler + (name "unnamed") + default-checkout-timeout + default-max-waiters) + (define channel (make-channel)) + (define destroy-condition + (make-condition)) + + (define pool + (make-resource-pool-record + name + channel + destroy-condition + `((delay-logger . ,delay-logger) + (duration-logger . ,duration-logger) + (destructor . ,destructor) + (scheduler . ,scheduler) + (name . ,name) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) + + (define checkout-failure-count 0) + + (define (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (let loop () + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) + + (if success? + (put-message channel + (list 'remove resource)) + (begin + (sleep 5) + + (loop)))))))) + + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when (and (not (eq? return-type 'remove)) + destructor) + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources)) + + (('destroy reply) + (loop resources)) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources))))) + + (define (main-loop) + (let loop ((resources resources) + (available resources) + (waiters '())) + + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (if (null? available) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters)) + (loop resources + available + (cons (cons reply timeout-time) + waiters)))) + + (if timeout-time + (let ((current-internal-time + (get-internal-real-time))) + ;; If this client is still waiting + (if (> timeout-time + current-internal-time) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) + + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the resource, + ;; and returning it if there's a timeout + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) + waiters)) + (loop resources + available + waiters))) + (begin + (put-message reply (cons 'success + (car available))) + + (loop resources + (cdr available) + waiters))))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (null? waiters) + (loop resources + (cons resource available) + waiters) + + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop resources + (cons resource available) + '()) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel (cons 'success + resource))) + + (loop resources + available + (drop-right! alive-waiters 1)))))))) + + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters)) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources + available + waiters)) + + (('destroy) + (if (and (null? resources) + (null? waiters)) + (signal-condition! + destroy-condition) + + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters) + + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) + + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters))))) + + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + #f) + (lambda () + (with-exception-handler + (lambda (exn) + (let* ((stack (make-stack #t)) + (error-string + (call-with-output-string + (lambda (port) + (display-backtrace stack port 3) + (simple-format + port + "exception in the ~A pool fiber, " name) + (print-exception + port + (stack-ref stack 3) + '%exception + (list exn)))))) + (display error-string + (current-error-port))) + (raise-exception exn)) + (lambda () + (start-stack + #t + (main-loop))))) + #:unwind? #t)) + (or scheduler + (current-scheduler))) + + pool) + (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) @@ -126,46 +550,52 @@ (define checkout-failure-count 0) - (define spawn-fiber-to-return-new-resource - (if add-resources-parallelism - (let ((thunk - (fiberize - (lambda () - (let ((max-size - (assq-ref (resource-pool-configuration pool) - 'max-size)) - (size (assq-ref (resource-pool-stats pool) - 'resources))) - (unless (= size max-size) - (let ((new-resource - (return-new-resource))) - (put-message channel - (list 'add-resource new-resource)))))) - #:parallelism add-resources-parallelism))) - (lambda () - (spawn-fiber thunk))) - (lambda () - (spawn-fiber - (lambda () - (let ((new-resource + (define return-new-resource/parallelism-limiter + (make-parallelism-limiter + (or add-resources-parallelism + max-size) + #:name + (string-append + name + " resource pool new resource parallelism limiter"))) + + (define (spawn-fiber-to-return-new-resource) + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + ;; This can happen if the resource pool is destroyed very + ;; quickly + (unless (resource-pool-destroyed-error? exn) + (raise-exception exn))) + (lambda () + (with-parallelism-limiter + return-new-resource/parallelism-limiter + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool #:timeout #f) + 'resources))) + (unless (= size max-size) + (with-exception-handler + (lambda _ #f) + (lambda () (with-exception-handler - (lambda _ #f) + (lambda (exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A\n\n" + name + return-new-resource) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception adding resource to pool ~A: ~A\n\n" - name - return-new-resource) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (return-new-resource))))) - #:unwind? #t))) - (when new-resource - (put-message channel - (list 'add-resource new-resource))))))))) + (let ((new-resource + (start-stack #t (return-new-resource)))) + (put-message channel + (list 'add-resource new-resource)))))) + #:unwind? #t))))) + #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber @@ -250,21 +680,14 @@ 'remove) return-type) resource) - (when destructor + (when (and (not (eq? return-type 'remove)) + destructor) (spawn-fiber-to-destroy-resource resource)) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (let ((new-resources (if index (remove-at-index! resources index) @@ -276,13 +699,16 @@ resources)))) (if (null? new-resources) (begin + (and=> return-new-resource/parallelism-limiter + destroy-parallelism-limiter) + (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop new-resources))))) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . 0) @@ -291,13 +717,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources)) @@ -537,14 +967,6 @@ (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (loop (if index (remove-at-index! resources index) (begin @@ -577,7 +999,7 @@ waiters resources-last-used)) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) @@ -586,13 +1008,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources available @@ -645,46 +1071,52 @@ (signal-condition! destroy-condition) - (begin + (let ((current-internal-time (get-internal-real-time))) (for-each - (lambda (resource) - (if destructor - (spawn-fiber-to-destroy-resource resource) - (spawn-fiber - (lambda () - (put-message channel - (list 'remove resource))) - #:parallel? #t))) - available) - - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - waiters)) - - (destroy-loop resources)))) + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters) + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) (unknown (simple-format (current-error-port) @@ -744,7 +1176,8 @@ (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ - (wait (resource-pool-destroy-condition pool)))) + (wait + (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) @@ -949,34 +1382,42 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (if timeout + (let* ((reply (make-channel)) + (start-time (get-internal-real-time)) + (timeout-time + (+ start-time + (* internal-time-units-per-second timeout)))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + `(stats ,reply ,timeout-time)) + (const #t)) + (wrap-operation (sleep-operation timeout) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (let ((time-remaining + (- timeout + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) + (raise-exception + (make-resource-pool-timeout-error pool))))) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + `(stats ,reply #f)) + (get-message reply)))) (define (resource-pool-list-resources pool) (let ((reply (make-channel))) diff --git a/tests.scm b/tests.scm index 2b24c6a..a58eff0 100644 --- a/tests.scm +++ b/tests.scm @@ -4,7 +4,7 @@ #:export (run-fibers-for-tests assert-no-heap-growth)) -(define (run-fibers-for-tests thunk) +(define* (run-fibers-for-tests thunk #:key (drain? #t)) (let ((result (run-fibers (lambda () @@ -12,6 +12,7 @@ (lambda (exn) exn) (lambda () + (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) (backtrace) @@ -20,7 +21,8 @@ #t) #:unwind? #t)) #:hz 0 - #:parallelism 1))) + #:parallelism 1 + #:drain? drain?))) (if (exception? result) (raise-exception result) result))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 9881a4d..03ec376 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -111,4 +111,16 @@ (assert-equal a 1)))) +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter 2))) + (fibers-for-each + (lambda _ + (with-parallelism-limiter + parallelism-limiter + #f)) + (iota 50)) + + (destroy-parallelism-limiter parallelism-limiter)))) + (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1bc09e5..461d04b 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -19,7 +19,21 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) + +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-fixed-size-resource-pool + (list 1)))) + (assert-true + (number? + (with-resource-from-pool resource-pool + res + res))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -31,7 +45,9 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) (let* ((error-constructor (record-constructor &resource-pool-timeout)) @@ -88,10 +104,13 @@ res)) (iota 20)) - (let loop ((stats (resource-pool-stats resource-pool))) + (let loop ((stats (resource-pool-stats resource-pool + #:timeout #f))) (unless (= 0 (assq-ref stats 'resources)) (sleep 0.1) - (loop (resource-pool-stats resource-pool))))))) + (loop (resource-pool-stats resource-pool #:timeout #f)))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -115,7 +134,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -129,7 +150,7 @@ (error "collision detected"))) (new-number)) 1 - #:default-checkout-timeout 120))) + #:default-checkout-timeout 5))) (fibers-batch-for-each (lambda _ (with-resource-from-pool @@ -140,7 +161,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -164,14 +187,14 @@ (call-with-resource-from-pool resource-pool (lambda (res) - (error 'should-not-be-reached)))) + #f))) #:unwind? #t))) (while (= 0 (assq-ref - (resource-pool-stats resource-pool) + (resource-pool-stats resource-pool #:timeout #f) 'waiters)) - (sleep 0)) + (sleep 0.1)) (with-exception-handler (lambda (exn) @@ -184,6 +207,8 @@ resource-pool (lambda (res) (error 'should-not-be-reached)))) - #:unwind? #t)))))) + #:unwind? #t))) + + (destroy-resource-pool resource-pool)))) (display "resource-pool test finished successfully\n") From 09ca6cfb6bb24b94684f67daee5b1a8eae4aa3cb Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 26 Jun 2025 21:27:32 +0200 Subject: [PATCH 09/41] Fix resource-pool-destroy-resource-exception Raising the exception is more consistent, and avoids returning the resource. --- knots/resource-pool.scm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 71c378c..e55bdac 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1351,8 +1351,7 @@ available. Return the resource once PROC has returned." 'destroy 'return) resource)) - (unless (resource-pool-destroy-resource-exception? exn) - (raise-exception exn))) + (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) From 8f3e0a9a1d8572f4e96651ec0ded21717644c6bb Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 26 Jun 2025 22:53:15 +0200 Subject: [PATCH 10/41] Fix exception handling in fibers-map-with-progress --- knots/parallelism.scm | 8 ++++---- tests/parallelism.scm | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 9e80f5b..a3b04c7 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -217,9 +217,9 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception . exn)) + ((#f . ('exception exn)) (raise-exception exn)) - ((#f . ('result . val)) + ((#f . ('result val)) val)) channels-to-results) (loop @@ -239,10 +239,10 @@ (if (eq? channel c) (cons #f (match result - (('exception . exn) + (('exception exn) result) (_ - (cons 'result result)))) + (list 'result result)))) (cons c r)))) channels-to-results))) #f)))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 03ec376..91b2f3d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -61,6 +61,24 @@ identity '(())))) +(run-fibers-for-tests + (lambda () + (with-exception-handler + (lambda (exn) + (unless (and (exception-with-message? exn) + (string=? (exception-message exn) + "foo")) + (raise-exception exn))) + (lambda () + (fibers-map-with-progress + (lambda _ + (raise-exception + (make-exception-with-message "foo"))) + '((1))) + + (error 'should-not-reach-here)) + #:unwind? #t))) + (run-fibers-for-tests (lambda () (with-exception-handler From 163d775496f8413fe5c1edb5ed22289df9d4fd01 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:18 +0200 Subject: [PATCH 11/41] Fix record-predicate that should be exception-predicate --- knots/resource-pool.scm | 10 +++++----- knots/thread-pool.scm | 2 +- knots/timeout.scm | 6 +++--- knots/web-server.scm | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index e55bdac..7da76b0 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -74,7 +74,7 @@ (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? - (record-predicate &resource-pool-abort-add-resource)) + (exception-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) @@ -1196,7 +1196,7 @@ (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? - (record-predicate &resource-pool-timeout)) + (exception-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters @@ -1217,7 +1217,7 @@ (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? - (record-predicate &resource-pool-too-many-waiters)) + (exception-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed @@ -1233,7 +1233,7 @@ (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? - (record-predicate &resource-pool-destroyed)) + (exception-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource @@ -1244,7 +1244,7 @@ (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? - (record-predicate &resource-pool-destroy-resource)) + (exception-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index b176162..70d7292 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -198,7 +198,7 @@ from there, or #f if that would be an empty string." (record-accessor &thread-pool-timeout-error 'pool))) (define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout-error)) + (exception-predicate &thread-pool-timeout-error)) (define* (make-fixed-size-thread-pool size #:key diff --git a/knots/timeout.scm b/knots/timeout.scm index 58306e0..a65a095 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -85,7 +85,7 @@ (record-constructor &port-timeout-error)) (define port-timeout-error? - (record-predicate &port-timeout-error)) + (exception-predicate &port-timeout-error)) (define &port-read-timeout-error (make-exception-type '&port-read-timeout-error @@ -96,7 +96,7 @@ (record-constructor &port-read-timeout-error)) (define port-read-timeout-error? - (record-predicate &port-read-timeout-error)) + (exception-predicate &port-read-timeout-error)) (define &port-write-timeout-error (make-exception-type '&port-write-timeout-error @@ -107,7 +107,7 @@ (record-constructor &port-write-timeout-error)) (define port-write-timeout-error? - (record-predicate &port-write-timeout-error)) + (exception-predicate &port-write-timeout-error)) (define (readable? port) "Test if PORT is writable." diff --git a/knots/web-server.scm b/knots/web-server.scm index 453db44..a0a3641 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -130,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true." (record-constructor &request-body-ended-prematurely)) (define request-body-ended-prematurely-error? - (record-predicate &request-body-ended-prematurely)) + (exception-predicate &request-body-ended-prematurely)) (define (request-body-port/knots r) (cond From d8f64399cd572fb1c6b19303ee710ce6529e77b4 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:37 +0200 Subject: [PATCH 12/41] Tweak spacing --- knots/parallelism.scm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index a3b04c7..c98ca3f 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -217,9 +217,9 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception exn)) + ((#f . ('exception exn)) (raise-exception exn)) - ((#f . ('result val)) + ((#f . ('result val)) val)) channels-to-results) (loop From 6f6d57b189a7073718407df263bbe3c1245f2e51 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:41 +0200 Subject: [PATCH 13/41] Use the knots backtrace printer for tests --- tests.scm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests.scm b/tests.scm index a58eff0..0cca3b4 100644 --- a/tests.scm +++ b/tests.scm @@ -1,6 +1,7 @@ (define-module (tests) #:use-module (ice-9 exceptions) #:use-module (fibers) + #:use-module (knots) #:export (run-fibers-for-tests assert-no-heap-growth)) @@ -15,9 +16,10 @@ (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) - (backtrace) + (print-backtrace-and-exception/knots exn) (raise-exception exn)) - thunk) + (lambda () + (start-stack #t (thunk)))) #t) #:unwind? #t)) #:hz 0 From 4140ef0bd67dfed419203713df497f94b0ac2e45 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 22:43:25 +0200 Subject: [PATCH 14/41] More consistently handle results and exceptions In the parallelism module. --- knots/parallelism.scm | 62 +++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index c98ca3f..f15dbe8 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -57,7 +58,7 @@ (lambda (exn) (put-message reply - (list 'exception exn))) + (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -78,7 +79,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply vals)))))) + (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -88,10 +89,10 @@ reply-channels))) (map (match-lambda - (('exception exn) + (('exception . exn) (raise-exception exn)) - (result - (apply values result))) + (('result . vals) + (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) @@ -114,9 +115,18 @@ (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (if (vector? (first lists)) - result-vec - (vector->list result-vec)) + (let ((processed-result-vec + (vector-map + (lambda (_ result-or-exn) + (match result-or-exn + (('exception . exn) + (raise-exception exn)) + (('result . vals) + (car vals)))) + result-vec))) + (if (vector? (first lists)) + processed-result-vec + (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -132,18 +142,13 @@ (get-operation (vector-ref result-vec index)) (lambda (result) - (match result - (('exception exn) - (raise-exception exn)) - (_ - (vector-set! result-vec - index - (first result)) - - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))))) + (vector-set! result-vec + index + result) + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -217,10 +222,10 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception exn)) + ((#f . ('exception . exn)) (raise-exception exn)) - ((#f . ('result val)) - val)) + ((#f . ('result . vals)) + (car vals))) channels-to-results) (loop (perform-operation @@ -237,12 +242,7 @@ (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f - (match result - (('exception exn) - result) - (_ - (list 'result result)))) + (cons #f result) (cons c r)))) channels-to-results))) #f)))) @@ -263,7 +263,7 @@ reply-channel (with-exception-handler (lambda (exn) - (list 'exception exn)) + (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -294,7 +294,7 @@ (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception exn) + (('exception . exn) (raise-exception exn)))))) (define-record-type From 0fa6737a39f866bdbffc11fd16348c5411c11a7c Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 23:28:47 +0200 Subject: [PATCH 15/41] Document some things --- knots/parallelism.scm | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f15dbe8..7631055 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -96,6 +96,9 @@ responses))) (define (fibers-batch-map proc parallelism-limit . lists) + "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of +the invocations of PROC raise an exception, this will be raised once +all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -171,9 +174,14 @@ channel-indexes))))))) (define (fibers-map proc . lists) + "Map PROC over LISTS in parallel, running up to 20 fibers in + PARALLEL. If any of the invocations of PROC raise an exception, this +will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) + "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in +parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -184,10 +192,13 @@ *unspecified*) (define (fibers-for-each proc . lists) + "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) + "Run each expression in parallel. If any expression raises an + exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -198,12 +209,16 @@ (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) + "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) + "Map PROC over LISTS, calling #:REPORT if specified after each +invocation of PROC finishes. REPORT is passed the results for each + element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args From deae518b528485638bef89b4230616024f5009d7 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 29 Jun 2025 08:35:28 +0200 Subject: [PATCH 16/41] Use the buffer size for chunked output ports --- knots/web-server.scm | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index a0a3641..fec349e 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -333,7 +333,8 @@ on the procedure being called at any particular time." (define (handle-request handler client read-request-exception-handler - write-response-exception-handler) + write-response-exception-handler + buffer-size) (let ((request (with-exception-handler read-request-exception-handler @@ -399,7 +400,8 @@ on the procedure being called at any particular time." client (make-chunked-output-port/knots client - #:keep-alive? #t)))) + #:keep-alive? #t + #:buffering buffer-size)))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler @@ -472,7 +474,8 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler))) + write-response-exception-handler + buffer-size))) (if keep-alive? (loop) (close-port client))))))) From 7709ffe1d33ac9da62189f725a64d59f729b4463 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 30 Jun 2025 15:41:04 +0200 Subject: [PATCH 17/41] Tweak the knots chunked output port To try and reduce the number of write syscalls. --- knots/web-server.scm | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index fec349e..3e5f388 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -63,6 +63,14 @@ (bind sock family addr port) sock)) +(define crlf-bv + (string->utf8 "\r\n")) + +(define (chunked-output-port-overhead-bytes write-size) + (+ (string-length (number->string write-size 16)) + (bytevector-length crlf-bv) + (bytevector-length crlf-bv))) + (define* (make-chunked-output-port/knots port #:key (keep-alive? #f) (buffering 1200)) "Returns a new port which translates non-encoded data into a HTTP @@ -74,10 +82,12 @@ when done, as it will output the remaining data, and encode the final zero chunk. When the port is closed it will also close PORT, unless KEEP-ALIVE? is true." (define (write! bv start count) - (put-string port (number->string count 16)) - (put-string port "\r\n") + (let ((len-string + (number->string count 16))) + (put-string port len-string)) + (put-bytevector port crlf-bv 0 2) (put-bytevector port bv start count) - (put-string port "\r\n") + (put-bytevector port crlf-bv 0 2) (force-output port) count) @@ -401,7 +411,10 @@ on the procedure being called at any particular time." (make-chunked-output-port/knots client #:keep-alive? #t - #:buffering buffer-size)))) + #:buffering + (- buffer-size + (chunked-output-port-overhead-bytes + buffer-size)))))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler From ce1b710bcf4b1874bdd06a6f96cb3e268f4b5895 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 30 Jun 2025 22:57:08 +0100 Subject: [PATCH 18/41] Use a queue for the resource pool waiters As this will maybe improve performance. --- knots/resource-pool.scm | 277 ++++++++++++++++++++-------------------- 1 file changed, 141 insertions(+), 136 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 7da76b0..b27f329 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-71) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) @@ -267,13 +268,13 @@ (define (main-loop) (let loop ((resources resources) (available resources) - (waiters '())) + (waiters (make-q))) (match (get-message channel) (('checkout reply timeout-time max-waiters) (if (null? available) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -301,8 +302,7 @@ waiters)) (loop resources available - (cons (cons reply timeout-time) - waiters)))) + (enq! waiters (cons reply timeout-time))))) (if timeout-time (let ((current-internal-time @@ -345,44 +345,46 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '()) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1)))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters))))))) + #:unwind? #t)))) (('list-resources reply) (spawn-fiber @@ -397,7 +399,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -420,7 +422,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -448,7 +450,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin @@ -747,7 +749,7 @@ (define (main-loop) (let loop ((resources '()) (available '()) - (waiters '()) + (waiters (make-q)) (resources-last-used '())) (match (get-message channel) @@ -769,50 +771,52 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (null? waiters) + (if (q-empty? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop (cons resource resources) - (cons resource available) - '() - (cons (get-internal-real-time) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn - ;; a new fiber to handle handing over - ;; the resource, and returning it if - ;; there's a timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop (cons resource resources) - available - (drop-right! alive-waiters 1) - (cons (get-internal-real-time) - resources-last-used))))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used))))))) + #:unwind? #t)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -821,7 +825,7 @@ (spawn-fiber-to-return-new-resource)) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -850,8 +854,7 @@ resources-last-used)) (loop resources available - (cons (cons reply timeout-time) - waiters) + (enq! waiters (cons reply timeout-time)) resources-last-used)))) (if timeout-time @@ -898,7 +901,7 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters @@ -911,56 +914,58 @@ (get-internal-real-time)) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '() - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time))) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time)) + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)))))))) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used)))))))) + #:unwind? #t)))) (('remove resource) (let ((index @@ -1003,7 +1008,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1067,7 +1072,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -1095,7 +1100,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin From ff93dc144284044255957de8870f0d848f8fa844 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 1 Jul 2025 12:45:12 +0100 Subject: [PATCH 19/41] Add a post-request-hook to the web server --- knots/web-server.scm | 51 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index 3e5f388..4d7240b 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -341,16 +341,19 @@ on the procedure being called at any particular time." (string->utf8 "internal server error"))) -(define (handle-request handler client - read-request-exception-handler - write-response-exception-handler - buffer-size) +(define* (handle-request handler client + read-request-exception-handler + write-response-exception-handler + buffer-size + #:key post-request-hook) (let ((request (with-exception-handler read-request-exception-handler (lambda () (read-request client)) - #:unwind? #t))) + #:unwind? #t)) + (read-request-time + (get-internal-real-time))) (let ((response body (cond @@ -399,7 +402,9 @@ on the procedure being called at any particular time." (lambda () (write-response response client) - (let ((body-written? + (let ((response-start-time + (get-internal-real-time)) + (body-written? (if (procedure? body) (let* ((type (response-content-type response '(text/plain))) @@ -438,6 +443,11 @@ on the procedure being called at any particular time." (if body-written? (begin (force-output client) + (when post-request-hook + (post-request-hook request + #:read-request-time read-request-time + #:response-start-time response-start-time + #:response-end-time (get-internal-real-time))) (when (and (procedure? body) (response-content-length response)) (set-port-encoding! client "ISO-8859-1")) @@ -449,7 +459,8 @@ on the procedure being called at any particular time." read-request-exception-handler write-response-exception-handler connection-idle-timeout - buffer-size) + buffer-size + post-request-hook) ;; Always disable Nagle's algorithm, as we handle buffering ;; ourselves; when we force-output, we really want the data to go ;; out. @@ -488,11 +499,28 @@ on the procedure being called at any particular time." (let ((keep-alive? (handle-request handler client read-request-exception-handler write-response-exception-handler - buffer-size))) + buffer-size + #:post-request-hook + post-request-hook))) (if keep-alive? (loop) (close-port client))))))) +(define (post-request-hook/safe post-request-hook) + (if post-request-hook + (lambda args + (with-exception-handler + (lambda (exn) #f) + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (apply post-request-hook args)))) + #:unwind? #t)) + #f)) + (define-record-type (make-web-server socket port) web-server? @@ -512,7 +540,8 @@ on the procedure being called at any particular time." (write-response-exception-handler default-write-response-exception-handler) (connection-idle-timeout #f) - (connection-buffer-size 1024)) + (connection-buffer-size 1024) + post-request-hook) "Run the knots web server. HANDLER should be a procedure that takes one argument, the HTTP @@ -548,7 +577,9 @@ before sending back to the client." read-request-exception-handler write-response-exception-handler connection-idle-timeout - connection-buffer-size)) + connection-buffer-size + (post-request-hook/safe + post-request-hook))) #:parallel? #t) (loop)))))) From ec2f2489a2394a9f426931cee2b7a4856349603e Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 1 Jul 2025 23:13:31 +0100 Subject: [PATCH 20/41] Fix resource pool bug And remove unnecessary named let. --- knots/resource-pool.scm | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index b27f329..6b28a27 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1317,8 +1317,9 @@ available. Return the resource once PROC has returned." start-time) 'timeout) response)) - 'timeout))))) - (let loop ((reply (make-channel))) + 'timeout)) + 'timeout))) + (let ((reply (make-channel))) (put-message channel (list 'checkout reply From f4b48a149923fd5cb4ff77772f2fdcc0b694f676 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 6 Jul 2025 18:49:09 +0100 Subject: [PATCH 21/41] Avoid calling deq! if the queue is empty --- knots/resource-pool.scm | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 6b28a27..640978f 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -365,7 +365,11 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout @@ -795,7 +799,13 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop (cons resource resources) + (cons resource available) + waiters + (cons (get-internal-real-time) + resources-last-used)) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout @@ -938,7 +948,19 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout From d18b5b8d5de5beff3b9f84cfb359b73a4dcf2070 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 9 Jul 2025 12:06:20 +0100 Subject: [PATCH 22/41] Don't loop inside exception handlers The resource pools seemed to become slower and slower over time, this might help? --- knots/resource-pool.scm | 159 +++++++++++++++++----------------------- 1 file changed, 67 insertions(+), 92 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 640978f..6e9c353 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -365,30 +365,25 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop resources - available - waiters))))))) - #:unwind? #t)))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters)))) (('list-resources reply) (spawn-fiber @@ -799,34 +794,27 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop (cons resource resources) - (cons resource available) - waiters - (cons (get-internal-real-time) - resources-last-used)) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop (cons resource resources) - available - waiters - (cons current-internal-time - resources-last-used))))))) - #:unwind? #t)))))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -948,46 +936,33 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time) - resources-last-used)))))))) - #:unwind? #t)))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used))))) (('remove resource) (let ((index From 4468a3ef6d27556db35b0c6d07f7c5110ae362cc Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 10 Jul 2025 16:11:45 +0100 Subject: [PATCH 23/41] Generate documentation for (knots) as well As enabled by Guile Documenta 0.4. --- .forgejo/workflows/build-website.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index d859b76..ab24066 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -10,7 +10,7 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api knots + guix shell -D -f guix-dev.scm -- documenta api knots.scm knots guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | From 52092e7a99fb782a5e89a4b0ec42f93549b96437 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 10 Jul 2025 16:15:58 +0100 Subject: [PATCH 24/41] Fix call to documenta --- .forgejo/workflows/build-website.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index ab24066..ae6c4da 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -10,7 +10,7 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api knots.scm knots + guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/" guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | From 86fb460d6a35a7170611d81b9d7280f793f3d34b Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 10:46:46 +0000 Subject: [PATCH 25/41] Simplify using the waiters queue in the resource pool Use a custom dequeue procedure that returns #f rather than raising an exception on an empty queue. --- knots/resource-pool.scm | 268 ++++++++++++++++++---------------------- 1 file changed, 120 insertions(+), 148 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 6e9c353..c233e29 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -101,6 +101,16 @@ start (cdr end)))) +(define (safe-deq q) + (if (null? (car q)) + #f + (let ((it (caar q)) + (next (cdar q))) + (if (null? next) + (set-cdr! q #f)) + (set-car! q next) + it))) + (define* (make-fixed-size-resource-pool resources #:key (delay-logger (const #f)) @@ -345,45 +355,35 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop resources - (cons resource available) - waiters) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop resources - available - waiters)))) + (let ((current-internal-time + (get-internal-real-time))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters)) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters)))))) (('list-resources reply) (spawn-fiber @@ -770,51 +770,39 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (q-empty? waiters) - (loop (cons resource resources) - (cons resource available) - waiters - (cons (get-internal-real-time) - resources-last-used)) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop (cons resource resources) - (cons resource available) - waiters - (cons current-internal-time - resources-last-used)) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop (cons resource resources) - available - waiters - (cons current-internal-time - resources-last-used)))))) + (let ((current-internal-time + (get-internal-real-time))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -899,76 +887,60 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - - (let ((current-internal-time - (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (if (eq? (exception-kind exn) 'q-empty) - (loop resources - (cons resource available) - waiters - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time)) - resources-last-used)) - (raise-exception exn))) - (lambda () - (let waiter-loop ((waiter (deq! waiters))) - (match waiter - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))))))) - #:unwind? #t) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time) - resources-last-used))))) + (let ((current-internal-time + (get-internal-real-time)) + (resource-index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + resource-index + current-internal-time)) + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + resource-index + current-internal-time) + resources-last-used))))))) (('remove resource) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) + + (loop (if index (remove-at-index! resources index) (begin From 40b64e269b34ebf4cfb95155b2aed99200ec1cfa Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 10:51:33 +0000 Subject: [PATCH 26/41] Fix resources-last-used inconsistency --- knots/resource-pool.scm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index c233e29..232df77 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -767,8 +767,7 @@ (loop resources available waiters - (cons (get-internal-real-time) - resources-last-used)))) + resources-last-used))) (let ((current-internal-time (get-internal-real-time))) From a13098494d59fb1e6f8cc980a12f408f58a60727 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:18:23 +0000 Subject: [PATCH 27/41] Fix a bug where resources pools could empty with waiters --- knots/resource-pool.scm | 4 ++++ tests/resource-pool.scm | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 232df77..d802260 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -939,6 +939,10 @@ (eq? x resource)) resources))) + (when (and (not (q-empty? waiters)) + (< (- (length resources) 1) + max-size)) + (spawn-fiber-to-return-new-resource)) (loop (if index (remove-at-index! resources index) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 461d04b..3999dde 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -211,4 +211,21 @@ (destroy-resource-pool resource-pool)))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (const 'foo) + 1 + #:lifetime 1 + #:destructor + (const #t)))) + (for-each + (lambda _ + (with-resource-from-pool resource-pool + res + res)) + (iota 20)) + + (destroy-resource-pool resource-pool)))) + (display "resource-pool test finished successfully\n") From 244607865774eed9d3d7927091ddc3e2c3d3acac Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:19:01 +0000 Subject: [PATCH 28/41] Fix updating the thread-proc-vector in thread pools --- knots/thread-pool.scm | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 70d7292..49572db 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -269,7 +269,7 @@ from there, or #f if that would be an empty string." (sleep 1) (destructor/safe args))))) - (define (process channel args) + (define (process thread-index channel args) (let loop () (match (get-message channel) ('destroy #f) @@ -292,6 +292,9 @@ from there, or #f if that would be an empty string." internal-time-units-per-second) exn)) (lambda () + (vector-set! thread-proc-vector + thread-index + proc) (with-exception-handler (lambda (exn) (let ((stack @@ -319,6 +322,10 @@ from there, or #f if that would be an empty string." vals)))))) #:unwind? #t))) + (vector-set! thread-proc-vector + thread-index + #f) + (put-message reply response) @@ -358,7 +365,7 @@ from there, or #f if that would be an empty string." "knots: thread-pool: internal exception: ~A\n" exn)) (lambda () (parameterize ((param args)) - (process channel args))) + (process index channel args))) #:unwind? #t))) (when thread-destructor From e78e41b5423d7f79c07cdad2f3a26297475f8901 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:19:30 +0000 Subject: [PATCH 29/41] Pass through default-max-waiters in make-thread-pool --- knots/thread-pool.scm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 49572db..df4352a 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -402,7 +402,8 @@ from there, or #f if that would be an empty string." (expire-on-exception? #f) (name "unnamed") (use-default-io-waiters? #t) - default-checkout-timeout) + default-checkout-timeout + default-max-waiters) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the thread pool procedure." (define param @@ -425,7 +426,8 @@ arguments of the thread pool procedure." #:delay-logger delay-logger #:scheduler scheduler #:duration-logger duration-logger - #:default-checkout-timeout default-checkout-timeout))) + #:default-checkout-timeout default-checkout-timeout + #:default-max-waiters default-max-waiters))) (thread-pool resource-pool param))) From 1a476b5aa8b1fc2cd14ffb488b41da8d4eb95cef Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:20:10 +0000 Subject: [PATCH 30/41] Implement lifetime support in the resource pool --- knots/resource-pool.scm | 148 +++++++++++++++++++++++++++++----------- 1 file changed, 108 insertions(+), 40 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index d802260..301abbd 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -749,6 +749,7 @@ (let loop ((resources '()) (available '()) (waiters (make-q)) + (resources-checkout-count '()) (resources-last-used '())) (match (get-message channel) @@ -762,11 +763,13 @@ (loop (cons resource resources) available waiters + (cons 0 resources-checkout-count) (cons (get-internal-real-time) resources-last-used))) (loop resources available waiters + resources-checkout-count resources-last-used))) (let ((current-internal-time @@ -777,6 +780,7 @@ (loop (cons resource resources) (cons resource available) waiters + (cons 0 resources-checkout-count) (cons current-internal-time resources-last-used))) ((reply . timeout) @@ -800,6 +804,7 @@ (loop (cons resource resources) available waiters + (cons 1 resources-checkout-count) (cons current-internal-time resources-last-used)))))))) @@ -836,10 +841,12 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (loop resources available (enq! waiters (cons reply timeout-time)) + resources-checkout-count resources-last-used)))) (if timeout-time @@ -862,10 +869,21 @@ (loop resources (cdr available) waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) resources-last-used)) (loop resources available waiters + resources-checkout-count resources-last-used))) (begin (put-message reply (cons 'success @@ -874,6 +892,16 @@ (loop resources (cdr available) waiters + (let ((resource-index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (list-set! resources-checkout-count + resource-index + (+ 1 (list-ref + resources-checkout-count + resource-index))) + resources-checkout-count) resources-last-used))))) (((and (or 'return @@ -892,46 +920,77 @@ (list-index (lambda (x) (eq? x resource)) resources))) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop resources - (cons resource available) - waiters - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - resource-index - current-internal-time)) - resources-last-used))) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - resource-index - current-internal-time) - resources-last-used))))))) + (if (and lifetime + (>= (list-ref resources-checkout-count + resource-index) + lifetime)) + (begin + (spawn-fiber-to-destroy-resource resource) + (loop resources + available + waiters + resources-checkout-count + resources-last-used)) + (let waiter-loop ((waiter (safe-deq waiters))) + (match waiter + (#f + (loop resources + (cons resource available) + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + resource-index + current-internal-time)) + resources-last-used))) + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (safe-deq waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))) + (loop resources + available + waiters + (if (eq? 'return-failed-checkout + return-type) + (begin + (list-set! resources-checkout-count + resource-index + (- (list-ref resources-checkout-count + resource-index) + 1)) + resources-checkout-count) + resources-checkout-count) + (begin + (list-set! + resources-last-used + resource-index + current-internal-time) + resources-last-used)))))))) (('remove resource) (let ((index @@ -954,6 +1013,9 @@ resources)) available ; resource shouldn't be in this list waiters + (remove-at-index! + resources-checkout-count + index) (remove-at-index! resources-last-used index)))) @@ -964,6 +1026,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('list-resources reply) @@ -974,6 +1037,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('stats reply timeout-time) @@ -981,6 +1045,7 @@ `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) + (resources-checkout-count . ,resources-checkout-count) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1000,6 +1065,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used)) (('check-for-idle-resources) @@ -1040,6 +1106,7 @@ (loop resources (lset-difference eq? available resources-to-destroy) waiters + resources-checkout-count resources-last-used)))) (('destroy) @@ -1103,6 +1170,7 @@ (loop resources available waiters + resources-checkout-count resources-last-used))))) (spawn-fiber From 05ad83c7031de9b0d1873a0d5aec630746342a06 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 17 Nov 2025 11:37:26 +0000 Subject: [PATCH 31/41] Implement lifetime support for thread pools --- knots/thread-pool.scm | 10 +++++++--- tests/thread-pool.scm | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index df4352a..cbbaf21 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -270,7 +270,7 @@ from there, or #f if that would be an empty string." (destructor/safe args))))) (define (process thread-index channel args) - (let loop () + (let loop ((lifetime thread-lifetime)) (match (get-message channel) ('destroy #f) ((reply sent-time proc) @@ -342,7 +342,11 @@ from there, or #f if that would be an empty string." (if (and exception? expire-on-exception?) #t - (loop)))))))) + (if lifetime + (if (<= 1 lifetime) + #t + (loop (- lifetime 1))) + (loop lifetime))))))))) (define (start-thread index channel) (call-with-new-thread @@ -416,7 +420,6 @@ arguments of the thread pool procedure." 1 #:thread-initializer thread-initializer #:thread-destructor thread-destructor - #:thread-lifetime thread-lifetime #:expire-on-exception? expire-on-exception? #:name name #:use-default-io-waiters? use-default-io-waiters?)) @@ -424,6 +427,7 @@ arguments of the thread pool procedure." #:destructor destroy-thread-pool #:min-size min-size #:delay-logger delay-logger + #:lifetime thread-lifetime #:scheduler scheduler #:duration-logger duration-logger #:default-checkout-timeout default-checkout-timeout diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index 1c51cb3..dd0b852 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -85,4 +85,26 @@ (+ 1 'a)))) #:unwind? #t))))) +(let ((thread-pool + (make-fixed-size-thread-pool 1 #:thread-lifetime 1))) + + (for-each + (lambda _ + (call-with-thread + thread-pool + (lambda () #f))) + (iota 10))) + +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 1 #:thread-lifetime 1))) + + (for-each + (lambda _ + (call-with-thread + thread-pool + (lambda () #f))) + (iota 10))))) + (display "thread-pool test finished successfully\n") From 95200eccfd5668fa94a0dfad6eab93d7b7731c9d Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 19 Nov 2025 13:18:39 +0000 Subject: [PATCH 32/41] Fix fixed size thread pool lifetimes --- knots/thread-pool.scm | 2 +- tests/thread-pool.scm | 39 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index cbbaf21..22c1b5c 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -343,7 +343,7 @@ from there, or #f if that would be an empty string." expire-on-exception?) #t (if lifetime - (if (<= 1 lifetime) + (if (<= lifetime 1) #t (loop (- lifetime 1))) (loop lifetime))))))))) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index dd0b852..e3a1cdd 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -1,4 +1,5 @@ (use-modules (tests) + (ice-9 atomic) (srfi srfi-71) (fibers) (unit-test) @@ -86,13 +87,21 @@ #:unwind? #t))))) (let ((thread-pool - (make-fixed-size-thread-pool 1 #:thread-lifetime 1))) + (make-fixed-size-thread-pool + 1 + #:thread-lifetime 1 + #:thread-initializer + (lambda () + (list (make-atomic-box #t)))))) (for-each (lambda _ (call-with-thread thread-pool - (lambda () #f))) + (lambda (box) + (if (atomic-box-ref box) + (atomic-box-set! box #f) + (error (atomic-box-ref box)))))) (iota 10))) (run-fibers-for-tests @@ -107,4 +116,30 @@ (lambda () #f))) (iota 10))))) +(let ((thread-pool + (make-fixed-size-thread-pool + 1 + #:thread-lifetime 2 + #:thread-initializer + (lambda () + (list (make-atomic-box 2)))))) + + (define (ref-and-decrement box) + (let ((val (atomic-box-ref box))) + (atomic-box-set! box (- val 1)) + val)) + + (unless (= 2 (call-with-thread + thread-pool + ref-and-decrement)) + (error)) + (unless (= 1 (call-with-thread + thread-pool + ref-and-decrement)) + (error)) + (unless (= 2 (call-with-thread + thread-pool + ref-and-decrement)) + (error))) + (display "thread-pool test finished successfully\n") From d07e309566cec93aa9f630bd4558ae464c37ad3a Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 24 Nov 2025 17:06:27 +0000 Subject: [PATCH 33/41] Fix a bug with resource pool idle checking The fiber would never finish. --- knots/resource-pool.scm | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 301abbd..2efbeab 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1178,9 +1178,16 @@ (when idle-seconds (spawn-fiber (lambda () - (while #t - (sleep idle-seconds) - (put-message channel '(check-for-idle-resources)))))) + (let loop () + (put-message channel '(check-for-idle-resources)) + (when (choice-operation + (wrap-operation + (sleep-operation idle-seconds) + (const #t)) + (wrap-operation + (wait-operation destroy-condition) + (const #f))) + (loop)))))) (with-exception-handler (lambda (exn) From 3eba6fc8209f254b8068459a59d0056a629528d9 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 24 Nov 2025 21:54:00 +0000 Subject: [PATCH 34/41] Rework the resource pool implementations Don't rely on resource equality for keeping track of resources and make some other tweaks. --- knots/resource-pool.scm | 1017 +++++++++++++++++++-------------------- tests/resource-pool.scm | 24 + 2 files changed, 507 insertions(+), 534 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 2efbeab..3638500 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -21,6 +21,7 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 q) #:use-module (ice-9 match) @@ -81,7 +82,8 @@ (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) - (channel resource-pool-channel) + (channel resource-pool-channel + set-resource-pool-channel!) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) @@ -93,14 +95,6 @@ (resource-pool-name resource-pool)) port))) -(define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (define (safe-deq q) (if (null? (car q)) #f @@ -111,11 +105,51 @@ (set-car! q next) it))) -(define* (make-fixed-size-resource-pool resources +(define-record-type + (make-resource-details value checkout-count last-used) + resource-details? + (value resource-details-value) + (checkout-count resource-details-checkout-count + set-resource-details-checkout-count!) + (last-used resource-details-last-used + set-resource-details-last-used!)) + +(define-inlinable (increment-resource-checkout-count! resource) + (set-resource-details-checkout-count! + resource + (1+ (resource-details-checkout-count resource)))) + +(define-inlinable (decrement-resource-checkout-count! resource) + (set-resource-details-checkout-count! + resource + (1+ (resource-details-checkout-count resource)))) + +(define (spawn-fiber-for-checkout channel + reply-channel + reply-timeout + resource-id + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (list 'success resource-id resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource-id))))))) + +(define* (make-fixed-size-resource-pool resources-list-or-vector #:key (delay-logger (const #f)) (duration-logger (const #f)) - destructor scheduler (name "unnamed") default-checkout-timeout @@ -131,7 +165,6 @@ destroy-condition `((delay-logger . ,delay-logger) (duration-logger . ,duration-logger) - (destructor . ,destructor) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) @@ -139,58 +172,24 @@ (define checkout-failure-count 0) - (define (spawn-fiber-to-destroy-resource resource) - (spawn-fiber - (lambda () - (let loop () - (let ((success? - (with-exception-handler - (lambda _ #f) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A\n" - name - destructor) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (destructor resource)) - #t))) - #:unwind? #t))) + (define resources + (vector-map + (lambda (_ resource) + (make-resource-details + resource + 0 + #f)) + (if (vector? resources-list-or-vector) + resources-list-or-vector + (list->vector resources-list-or-vector)))) - (if success? - (put-message channel - (list 'remove resource)) - (begin - (sleep 5) + (define (destroy-loop) + (define (empty?) + (vector-every (lambda (r) + (eq? r #f)) + resources)) - (loop)))))))) - - (define (spawn-fiber-for-checkout reply-channel - reply-timeout - resource) - (spawn-fiber - (lambda () - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply-channel - (cons 'success resource)) - (const #t)) - (wrap-operation (sleep-operation - reply-timeout) - (const #f)))))) - (unless checkout-success? - (put-message - channel - (list 'return-failed-checkout resource))))))) - - (define (destroy-loop resources) - (let loop ((resources resources)) + (let loop () (match (get-message channel) (('checkout reply timeout-time max-waiters) (spawn-fiber @@ -211,40 +210,27 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop resources)) + (loop)) (((and (or 'return - 'return-failed-checkout - 'remove) + 'return-failed-checkout) return-type) - resource) - (when (and (not (eq? return-type 'remove)) - destructor) - (spawn-fiber-to-destroy-resource resource)) + resource-id) + (vector-set! resources + resource-id + #f) - (let ((index - (list-index (lambda (x) - (eq? x resource)) - resources))) - (let ((new-resources - (if index - (remove-at-index! resources index) - (begin - (simple-format - (current-error-port) - "resource pool error: unable to remove ~A\n" - resource) - resources)))) - (if (null? new-resources) - (begin - (signal-condition! destroy-condition) + (if (empty?) + (begin + (set-resource-pool-channel! pool #f) + (signal-condition! destroy-condition) - ;; No loop - *unspecified*) - (loop new-resources))))) + ;; No loop + *unspecified*) + (loop))) (('stats reply timeout-time) (let ((stats - `((resources . ,(length resources)) + `((resources . ,(vector-length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) @@ -263,21 +249,20 @@ internal-time-units-per-second))) op)))))) - (loop resources)) + (loop)) - (('destroy reply) - (loop resources)) + (('destroy) + (loop)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop resources))))) + (loop))))) (define (main-loop) - (let loop ((resources resources) - (available resources) + (let loop ((available (iota (vector-length resources))) (waiters (make-q))) (match (get-message channel) @@ -307,11 +292,9 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop resources - available + (loop available waiters)) - (loop resources - available + (loop available (enq! waiters (cons reply timeout-time))))) (if timeout-time @@ -323,32 +306,45 @@ (let ((reply-timeout (/ (- timeout-time current-internal-time) - internal-time-units-per-second))) + internal-time-units-per-second)) + (resource-id + new-available + (car+cdr available))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout - (spawn-fiber-for-checkout reply - reply-timeout - (car available)) - (loop resources - (cdr available) + (spawn-fiber-for-checkout + channel + reply + reply-timeout + resource-id + (resource-details-value + (vector-ref resources + resource-id))) + (loop new-available waiters)) - (loop resources - available + (loop available waiters))) - (begin - (put-message reply (cons 'success - (car available))) + (let* ((resource-id + next-available + (car+cdr available)) + (resource-details + (vector-ref resources + resource-id))) + (put-message reply + (list 'success + resource-id + (resource-details-value + resource-details))) - (loop resources - (cdr available) + (loop next-available waiters))))) (((and (or 'return 'return-failed-checkout) return-type) - resource) + resource-id) (when (eq? 'return-failed-checkout return-type) @@ -360,8 +356,7 @@ (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (loop resources - (cons resource available) + (loop (cons resource-id available) waiters)) ((reply . timeout) (if (and timeout @@ -376,13 +371,21 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources - available + (spawn-fiber-for-checkout + channel + reply + reply-timeout + resource-id + (resource-details-value + (vector-ref resources + resource-id)))) + (put-message reply + (list 'success + resource-id + (resource-details-value + (vector-ref resources + resource-id)))))) + (loop available waiters)))))) (('list-resources reply) @@ -390,13 +393,12 @@ (lambda () (put-message reply (list-copy resources)))) - (loop resources - available + (loop available waiters)) (('stats reply timeout-time) (let ((stats - `((resources . ,(length resources)) + `((resources . ,(vector-length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) @@ -415,62 +417,46 @@ internal-time-units-per-second))) op)))))) - (loop resources - available + (loop available waiters)) (('destroy) - (if (and (null? resources) - (q-empty? waiters)) - (signal-condition! - destroy-condition) + (let ((current-internal-time (get-internal-real-time))) + ;; Notify all waiters that the pool has been destroyed + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) + (if (= (vector-length resources) + (length available)) + (begin + (set-resource-pool-channel! pool #f) + (signal-condition! destroy-condition) - (if destructor - (begin - (for-each - (lambda (resource) - (spawn-fiber-to-destroy-resource resource)) - available) - (destroy-loop resources)) - (let dl ((resources resources) - (available available)) - (if (null? available) - (if (null? resources) - (signal-condition! - destroy-condition) - (destroy-loop resources)) - (let ((index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (dl (remove-at-index! resources index) - (cdr available))))))))) + ;; No loop + *unspecified*) + (destroy-loop)))) (unknown (simple-format @@ -478,8 +464,7 @@ "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop resources - available + (loop available waiters))))) (spawn-fiber @@ -551,6 +536,12 @@ (define checkout-failure-count 0) + (define resources + (make-hash-table)) + + (define-inlinable (count-resources resources) + (hash-count (const #t) resources)) + (define return-new-resource/parallelism-limiter (make-parallelism-limiter (or add-resources-parallelism @@ -575,9 +566,8 @@ (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) - (size (assq-ref (resource-pool-stats pool #:timeout #f) - 'resources))) - (unless (= size max-size) + (size (count-resources resources))) + (unless (>= size max-size) (with-exception-handler (lambda _ #f) (lambda () @@ -598,64 +588,51 @@ #:unwind? #t))))) #:unwind? #t)))) - (define (spawn-fiber-to-destroy-resource resource) + (define (spawn-fiber-to-destroy-resource resource-id resource-details) (spawn-fiber (lambda () (let loop () - (let ((success? - (with-exception-handler - (lambda _ #f) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A\n" - name - destructor) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (destructor resource)) - #t))) - #:unwind? #t))) + (let* ((resource + (resource-details-value resource-details)) + (success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) (if success? (put-message channel - (list 'remove resource)) + (list 'remove resource-id)) (begin (sleep 5) (loop)))))))) - (define (spawn-fiber-for-checkout reply-channel - reply-timeout - resource) - (spawn-fiber - (lambda () - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply-channel - (cons 'success resource)) - (const #t)) - (wrap-operation (sleep-operation - reply-timeout) - (const #f)))))) - (unless checkout-success? - (put-message - channel - (list 'return-failed-checkout resource))))))) - - (define (destroy-loop resources) - (let loop ((resources resources)) + (define (destroy-loop resources next-resource-id) + (let loop ((next-resource-id next-resource-id)) (match (get-message channel) (('add-resource resource) - (when destructor - (spawn-fiber-to-destroy-resource resource)) + (if destructor + (begin + (spawn-fiber-to-destroy-resource next-resource-id + resource) + (hash-set! resources next-resource-id resource) + + (loop (1+ next-resource-id))) + (loop next-resource-id))) - (loop resources)) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () @@ -675,43 +652,31 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop resources)) + (loop next-resource-id)) (((and (or 'return 'return-failed-checkout 'remove) return-type) - resource) + resource-id) (when (and (not (eq? return-type 'remove)) destructor) - (spawn-fiber-to-destroy-resource resource)) + (spawn-fiber-to-destroy-resource + resource-id + (hash-ref resources resource-id))) - (let ((index - (list-index (lambda (x) - (eq? x resource)) - resources))) - (let ((new-resources - (if index - (remove-at-index! resources index) - (begin - (simple-format - (current-error-port) - "resource pool error: unable to remove ~A\n" - resource) - resources)))) - (if (null? new-resources) - (begin - (and=> return-new-resource/parallelism-limiter - destroy-parallelism-limiter) + (hash-remove! resources resource-id) - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (loop new-resources))))) + (if (= 0 (count-resources resources)) + (begin + (set-resource-pool-channel! pool #f) + (signal-condition! destroy-condition) + ;; No loop + *unspecified*) + (loop next-resource-id))) (('stats reply timeout-time) (let ((stats - `((resources . ,(length resources)) + `((resources . ,(count-resources resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) @@ -730,59 +695,63 @@ internal-time-units-per-second))) op)))))) - (loop resources)) + (loop next-resource-id)) (('check-for-idle-resources) - (loop resources)) + (loop next-resource-id)) - (('destroy reply) - (loop resources)) + (('destroy) + (loop next-resource-id)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop resources))))) + (loop next-resource-id))))) (define (main-loop) - (let loop ((resources '()) + (let loop ((next-resource-id 0) (available '()) - (waiters (make-q)) - (resources-checkout-count '()) - (resources-last-used '())) + (waiters (make-q))) (match (get-message channel) (('add-resource resource) - (if (= (length resources) max-size) - (begin - (if destructor - (begin - (spawn-fiber-to-destroy-resource resource) + (if (= (count-resources resources) max-size) + (if destructor + (begin + (hash-set! resources + next-resource-id + (make-resource-details + resource + 0 + (get-internal-real-time))) + (spawn-fiber-to-destroy-resource next-resource-id + resource) - (loop (cons resource resources) - available - waiters - (cons 0 resources-checkout-count) - (cons (get-internal-real-time) - resources-last-used))) - (loop resources + (loop (1+ next-resource-id) available - waiters - resources-checkout-count - resources-last-used))) + waiters)) + (loop next-resource-id + available + waiters)) - (let ((current-internal-time - (get-internal-real-time))) + (let* ((current-internal-time + (get-internal-real-time)) + (resource-details + (make-resource-details + resource + 0 + current-internal-time))) + (hash-set! resources + next-resource-id + resource-details) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (loop (cons resource resources) - (cons resource available) - waiters - (cons 0 resources-checkout-count) - (cons current-internal-time - resources-last-used))) + (loop (1+ next-resource-id) + (cons resource-details available) + waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) @@ -796,22 +765,24 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout reply + (spawn-fiber-for-checkout channel + reply reply-timeout + next-resource-id resource)) - (put-message reply (cons 'success + (put-message reply (list 'success + next-resource-id resource)))) - (loop (cons resource resources) + (set-resource-details-checkout-count! resource-details + 1) + (loop (1+ next-resource-id) available - waiters - (cons 1 resources-checkout-count) - (cons current-internal-time - resources-last-used)))))))) + waiters))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin - (unless (= (length resources) max-size) + (unless (= (count-resources resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count @@ -838,16 +809,12 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop resources + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used)) - (loop resources + waiters)) + (loop next-resource-id available - (enq! waiters (cons reply timeout-time)) - resources-checkout-count - resources-last-used)))) + (enq! waiters (cons reply timeout-time)))))) (if timeout-time (let ((current-internal-time @@ -855,59 +822,55 @@ ;; If this client is still waiting (if (> timeout-time current-internal-time) - (let ((reply-timeout - (/ (- timeout-time - current-internal-time) - internal-time-units-per-second))) + (let* ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second)) + (resource-id + (car available)) + (resource-details + (hash-ref resources resource-id))) + + (increment-resource-checkout-count! + resource-details) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout - (spawn-fiber-for-checkout reply + (spawn-fiber-for-checkout channel + reply reply-timeout - (car available)) - (loop resources + resource-id + (resource-details-value + resource-details)) + (loop next-resource-id (cdr available) - waiters - (let ((resource-index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (list-set! resources-checkout-count - resource-index - (+ 1 (list-ref - resources-checkout-count - resource-index))) - resources-checkout-count) - resources-last-used)) - (loop resources + waiters)) + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used))) - (begin - (put-message reply (cons 'success - (car available))) + waiters))) + (let* ((resource-id + next-available + (car+cdr available)) + (resource-details + (hash-ref resources + resource-id))) + (increment-resource-checkout-count! resource-details) - (loop resources - (cdr available) - waiters - (let ((resource-index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (list-set! resources-checkout-count - resource-index - (+ 1 (list-ref - resources-checkout-count - resource-index))) - resources-checkout-count) - resources-last-used))))) + (put-message reply + (list 'success + resource-id + (resource-details-value + resource-details))) + + (loop next-resource-id + next-available + waiters))))) (((and (or 'return 'return-failed-checkout) return-type) - resource) + resource-id) (when (eq? 'return-failed-checkout return-type) @@ -916,44 +879,30 @@ (let ((current-internal-time (get-internal-real-time)) - (resource-index - (list-index (lambda (x) - (eq? x resource)) - resources))) + (resource-details + (hash-ref resources resource-id))) (if (and lifetime - (>= (list-ref resources-checkout-count - resource-index) + (>= (resource-details-checkout-count resource-details) lifetime)) (begin - (spawn-fiber-to-destroy-resource resource) - (loop resources + (spawn-fiber-to-destroy-resource resource-id + resource-details) + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used)) + waiters)) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f - (loop resources - (cons resource available) - waiters - (if (eq? 'return-failed-checkout - return-type) - (begin - (list-set! resources-checkout-count - resource-index - (- (list-ref resources-checkout-count - resource-index) - 1)) - resources-checkout-count) - resources-checkout-count) - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - resource-index - current-internal-time)) - resources-last-used))) + (if (eq? 'return-failed-checkout + return-type) + (decrement-resource-checkout-count! resource-details) + (set-resource-details-last-used! + resource-details + current-internal-time)) + + (loop next-resource-id + (cons resource-id available) + waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) @@ -967,85 +916,74 @@ ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource)))) - (loop resources + (spawn-fiber-for-checkout + channel + reply + reply-timeout + resource-id + (resource-details-value resource-details))) + (put-message reply + (list 'success + resource-id + (resource-details-value + resource-details))))) + + (set-resource-details-last-used! resource-details + current-internal-time) + (when (eq? 'return-failed-checkout + return-type) + (decrement-resource-checkout-count! resource-details)) + + (loop next-resource-id available - waiters - (if (eq? 'return-failed-checkout - return-type) - (begin - (list-set! resources-checkout-count - resource-index - (- (list-ref resources-checkout-count - resource-index) - 1)) - resources-checkout-count) - resources-checkout-count) - (begin - (list-set! - resources-last-used - resource-index - current-internal-time) - resources-last-used)))))))) + waiters))))))) - (('remove resource) - (let ((index - (list-index (lambda (x) - (eq? x resource)) - resources))) + (('remove resource-id) + (hash-remove! resources + resource-id) - (when (and (not (q-empty? waiters)) - (< (- (length resources) 1) - max-size)) - (spawn-fiber-to-return-new-resource)) + (when (and (not (q-empty? waiters)) + (< (- (count-resources resources) 1) + max-size)) + (spawn-fiber-to-return-new-resource)) - (loop (if index - (remove-at-index! resources index) - (begin - (simple-format - (current-error-port) - "resource pool error: unable to remove ~A\n" - resource) - resources)) - available ; resource shouldn't be in this list - waiters - (remove-at-index! - resources-checkout-count - index) - (remove-at-index! - resources-last-used - index)))) + (loop next-resource-id + available ; resource shouldn't be in this list + waiters)) - (('destroy resource) - (spawn-fiber-to-destroy-resource resource) + (('destroy resource-id) + (let ((resource-details + (hash-ref resources + resource-id))) + (spawn-fiber-to-destroy-resource resource-id + resource-details) - (loop resources - available - waiters - resources-checkout-count - resources-last-used)) + (loop next-resource-id + available + waiters))) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (list-copy resources)))) - (loop resources + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used)) + waiters)) (('stats reply timeout-time) (let ((stats - `((resources . ,(length resources)) + `((resources . ,(count-resources resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) - (resources-checkout-count . ,resources-checkout-count) + (resources-checkout-count + . ,(hash-fold + (lambda (_ resource-details result) + (cons (resource-details-checkout-count + resource-details) + result)) + '() + resources)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1062,116 +1000,109 @@ internal-time-units-per-second))) op)))))) - (loop resources + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used)) + waiters)) (('check-for-idle-resources) - (let* ((resources-last-used-seconds - (map - (lambda (internal-time) - (/ (- (get-internal-real-time) internal-time) - internal-time-units-per-second)) - resources-last-used)) - (candidate-resources-to-destroy + (let* ((internal-real-time + (get-internal-real-time)) + (candidate-resource-ids-to-destroy (filter-map - (lambda (resource last-used-seconds) - (if (and (member resource available) - (> last-used-seconds idle-seconds)) - resource - #f)) - resources - resources-last-used-seconds))) + (lambda (resource-id) + (let ((resource-details + (hash-ref resources resource-id))) + (if (> (/ (- internal-real-time + (resource-details-last-used + resource-details)) + internal-time-units-per-second) + idle-seconds) + resource-id + #f))) + available)) + (max-resources-to-destroy + (max 0 + (- (count-resources resources) + min-size))) + (resources-to-destroy + (take candidate-resource-ids-to-destroy + (min max-resources-to-destroy + (length candidate-resource-ids-to-destroy))))) + (when destructor + (for-each + (lambda (resource-id) + (spawn-fiber-to-destroy-resource + resource-id + (hash-ref resources resource-id))) + resources-to-destroy)) - (let* ((available-resources-to-destroy - (lset-intersection eq? - available - candidate-resources-to-destroy)) - (max-resources-to-destroy - (max 0 - (- (length resources) - min-size))) - (resources-to-destroy - (take available-resources-to-destroy - (min max-resources-to-destroy - (length available-resources-to-destroy))))) - (when destructor - (for-each - (lambda (resource) - (spawn-fiber-to-destroy-resource resource)) - resources-to-destroy)) - - (loop resources - (lset-difference eq? available resources-to-destroy) - waiters - resources-checkout-count - resources-last-used)))) + (loop next-resource-id + (lset-difference = available resources-to-destroy) + waiters))) (('destroy) - (if (and (null? resources) - (q-empty? waiters)) - (signal-condition! - destroy-condition) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) + (when destructor + (for-each + (lambda (resource-id) + (spawn-fiber-to-destroy-resource + resource-id + (hash-ref resources + resource-id))) + available)) + + ;; Do this in parallel to avoid deadlocks between the + ;; limiter and returning new resources to this pool + (and=> return-new-resource/parallelism-limiter + (lambda (limiter) + (spawn-fiber + (lambda () + (destroy-parallelism-limiter limiter))))) + + (if (or (= 0 (count-resources resources)) + (not destructor)) + (begin + (set-resource-pool-channel! pool #f) + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (destroy-loop resources next-resource-id)))) - (if destructor - (begin - (for-each - (lambda (resource) - (spawn-fiber-to-destroy-resource resource)) - available) - (destroy-loop resources)) - (let dl ((resources resources) - (available available)) - (if (null? available) - (if (null? resources) - (signal-condition! - destroy-condition) - (destroy-loop resources)) - (let ((index - (list-index (lambda (x) - (eq? x (car available))) - resources))) - (dl (remove-at-index! resources index) - (cdr available))))))))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop resources + (loop next-resource-id available - waiters - resources-checkout-count - resources-last-used))))) + waiters))))) (spawn-fiber (lambda () @@ -1322,6 +1253,10 @@ available. Return the resource once PROC has returned." 'default-max-waiters) max-waiters)) + (unless channel + (raise-exception + (make-resource-pool-destroyed-error pool))) + (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) @@ -1389,7 +1324,7 @@ available. Return the resource once PROC has returned." (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) - (('success . resource) + (('success resource-id resource-value) (call-with-values (lambda () (with-exception-handler @@ -1398,12 +1333,12 @@ available. Return the resource once PROC has returned." ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message - (resource-pool-channel pool) + channel (list (if (or destroy-resource-on-exception? (resource-pool-destroy-resource-exception? exn)) 'destroy 'return) - resource)) + resource-id)) (raise-exception exn)) (lambda () (with-exception-handler @@ -1421,11 +1356,11 @@ available. Return the resource once PROC has returned." exn (make-knots-exception stack))))) (lambda () - (proc resource)))) + (proc resource-value)))) #:unwind? #t)) (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) + (put-message channel + `(return ,resource-id)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) @@ -1434,6 +1369,13 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) + (define channel + (resource-pool-channel pool)) + + (unless channel + (raise-exception + (make-resource-pool-destroyed-error pool))) + (if timeout (let* ((reply (make-channel)) (start-time (get-internal-real-time)) @@ -1443,7 +1385,7 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation (resource-pool-channel pool) + (put-operation channel `(stats ,reply ,timeout-time)) (const #t)) (wrap-operation (sleep-operation timeout) @@ -1467,11 +1409,18 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error pool))))) (let ((reply (make-channel))) - (put-message (resource-pool-channel pool) + (put-message channel `(stats ,reply #f)) (get-message reply)))) (define (resource-pool-list-resources pool) + (define channel + (resource-pool-channel pool)) + + (unless channel + (raise-exception + (make-resource-pool-destroyed-error pool))) + (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'list-resources reply)) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 3999dde..2e30cb9 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -1,9 +1,33 @@ (use-modules (tests) (fibers) + (fibers channels) (unit-test) (knots parallelism) (knots resource-pool)) +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter + 1))) + (with-parallelism-limiter parallelism-limiter + #f) + + (destroy-parallelism-limiter parallelism-limiter)))) + +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter + 1)) + (channel + (make-channel))) + (spawn-fiber + (lambda () + (with-parallelism-limiter parallelism-limiter + (put-message channel #t) + (sleep 1)))) + (get-message channel) + (destroy-parallelism-limiter parallelism-limiter)))) + (define new-number (let ((val 0)) (lambda () From 04d964a9f8ce007ed2fc53f47eca6435c83973b9 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 25 Nov 2025 09:37:29 +0000 Subject: [PATCH 35/41] Fix adding the resource id to the available list --- knots/resource-pool.scm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 3638500..73f7083 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -750,7 +750,7 @@ (match waiter (#f (loop (1+ next-resource-id) - (cons resource-details available) + (cons next-resource-id available) waiters)) ((reply . timeout) (if (and timeout From 9cce89fc0122126b1ccd180d637f83a3d1f00cd7 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 25 Nov 2025 09:58:45 +0000 Subject: [PATCH 36/41] Change how spawn-fiber-to-destroy-resource is used And fix a couple of incorrect uses. --- knots/resource-pool.scm | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 73f7083..739ec4f 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -588,13 +588,11 @@ #:unwind? #t))))) #:unwind? #t)))) - (define (spawn-fiber-to-destroy-resource resource-id resource-details) + (define (spawn-fiber-to-destroy-resource resource-id resource-value) (spawn-fiber (lambda () (let loop () - (let* ((resource - (resource-details-value resource-details)) - (success? + (let* ((success? (with-exception-handler (lambda _ #f) (lambda () @@ -608,7 +606,7 @@ (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () - (start-stack #t (destructor resource)) + (start-stack #t (destructor resource-value)) #t))) #:unwind? #t))) @@ -662,7 +660,8 @@ destructor) (spawn-fiber-to-destroy-resource resource-id - (hash-ref resources resource-id))) + (resource-details-value + (hash-ref resources resource-id)))) (hash-remove! resources resource-id) @@ -886,7 +885,8 @@ lifetime)) (begin (spawn-fiber-to-destroy-resource resource-id - resource-details) + (resource-details-value + resource-details)) (loop next-resource-id available waiters)) @@ -956,7 +956,8 @@ (hash-ref resources resource-id))) (spawn-fiber-to-destroy-resource resource-id - resource-details) + (resource-details-value + resource-details)) (loop next-resource-id available @@ -1033,7 +1034,8 @@ (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id - (hash-ref resources resource-id))) + (resource-details-value + (hash-ref resources resource-id)))) resources-to-destroy)) (loop next-resource-id @@ -1072,8 +1074,9 @@ (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id - (hash-ref resources - resource-id))) + (resource-details-value + (hash-ref resources + resource-id)))) available)) ;; Do this in parallel to avoid deadlocks between the From 8100d36aa5bebb6c6fab114ae5595ace2be43b2e Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 25 Nov 2025 09:58:58 +0000 Subject: [PATCH 37/41] Avoid errors about returning no values from a exception handler --- knots/resource-pool.scm | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 739ec4f..f00e05b 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -558,8 +558,9 @@ (lambda (exn) ;; This can happen if the resource pool is destroyed very ;; quickly - (unless (resource-pool-destroyed-error? exn) - (raise-exception exn))) + (if (resource-pool-destroyed-error? exn) + #f + (raise-exception exn))) (lambda () (with-parallelism-limiter return-new-resource/parallelism-limiter From 9c123bbfa937413e759916f91cbc62e3966324c5 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 25 Nov 2025 14:26:39 +0000 Subject: [PATCH 38/41] Fix listing resource pool resources --- knots/resource-pool.scm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index f00e05b..2d96ed7 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -391,7 +391,7 @@ (('list-resources reply) (spawn-fiber (lambda () - (put-message reply (list-copy resources)))) + (put-message reply (vector->list resources)))) (loop available waiters)) @@ -967,7 +967,9 @@ (('list-resources reply) (spawn-fiber (lambda () - (put-message reply (list-copy resources)))) + (put-message reply (hash-map->list + (lambda (_ value) value) + resources)))) (loop next-resource-id available From 05f7daf0e9b866dd3f8d997b49c6ae14b30d635c Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 26 Nov 2025 10:06:09 +0000 Subject: [PATCH 39/41] Add another resource pool test --- tests/resource-pool.scm | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 2e30cb9..b3a84d7 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -252,4 +252,34 @@ (destroy-resource-pool resource-pool)))) +;; Test allocating resources to waiters and destroying resources +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (lambda () + (sleep 1) + 'res) + 2 + #:idle-seconds 1 + #:add-resources-parallelism 10 + #:destructor + (const #t)))) + (fibers-for-each + (lambda _ + (with-resource-from-pool resource-pool + res + res)) + (iota 20)) + + (sleep 2) + + (fibers-for-each + (lambda _ + (with-resource-from-pool resource-pool + res + res)) + (iota 20)) + + (destroy-resource-pool resource-pool)))) + (display "resource-pool test finished successfully\n") From a8e07b738b558d701c6de1f5ee6452ee4095198e Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 26 Nov 2025 10:06:20 +0000 Subject: [PATCH 40/41] Fix a resource pool bug with idle seconds Actually perform the choice operation. --- knots/resource-pool.scm | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 2d96ed7..f957c3d 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1117,13 +1117,14 @@ (lambda () (let loop () (put-message channel '(check-for-idle-resources)) - (when (choice-operation - (wrap-operation - (sleep-operation idle-seconds) - (const #t)) - (wrap-operation - (wait-operation destroy-condition) - (const #f))) + (when (perform-operation + (choice-operation + (wrap-operation + (sleep-operation idle-seconds) + (const #t)) + (wrap-operation + (wait-operation destroy-condition) + (const #f)))) (loop)))))) (with-exception-handler From f64e435b5710c3602c223c5027e515f0f6aefc02 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sat, 6 Dec 2025 10:02:58 +0000 Subject: [PATCH 41/41] Use start-stack in fibers-force Otherwise the backtraces are more confusing. --- knots/promise.scm | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/knots/promise.scm b/knots/promise.scm index 9df376b..6aa3f0b 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -82,7 +82,10 @@ (make-exception exn (make-knots-exception stack))))) - (fibers-promise-thunk fp))) + (lambda () + (start-stack + #t + ((fibers-promise-thunk fp)))))) #:unwind? #t)) (lambda vals (atomic-box-set! (fibers-promise-values-box fp)