From 7c2c6f2de9e4ebeab8de78077cbb2a0b7c585e6b Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 11:59:22 +0200 Subject: [PATCH 01/24] WIP --- .forgejo/workflows/demo.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .forgejo/workflows/demo.yaml diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml new file mode 100644 index 0000000..935846f --- /dev/null +++ b/.forgejo/workflows/demo.yaml @@ -0,0 +1,13 @@ +on: + push: + branches: + - actions-test +jobs: + test: + runs-on: host + steps: + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git knots-trunk + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages + - run: | + cd knots-trunk + guix shell -D -f guix-dev.scm -- documenta api knots From 7f5f05ef2b1d62ba0bc0b1a37986c3d4bb2a5f99 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 20:56:56 +0200 Subject: [PATCH 02/24] WIP --- .forgejo/workflows/demo.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 935846f..87ff5f8 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,3 +11,12 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o pages/index.html doc/index.texi + + - run: | + cd knots-pages + git add . + git config user.email <> + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push From 81dd3370e67c159175ecad6214398f57da0b25e8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 20:59:57 +0200 Subject: [PATCH 03/24] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 87ff5f8..34fea34 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -16,7 +16,7 @@ jobs: - run: | cd knots-pages git add . - git config user.email <> + git config user.email "" git config user.name "Automatic website updater" git commit -m "Automatic website update" git push From eadfa53b36cbd3d78daacfacf0499efef73f6624 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:00:54 +0200 Subject: [PATCH 04/24] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 34fea34..8fb0304 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,7 +11,7 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots - guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o pages/index.html doc/index.texi + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages From 003c5aa6b0078a9fa97ca54537e3798d911a77df Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:03:34 +0200 Subject: [PATCH 05/24] WIP --- .forgejo/workflows/demo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/demo.yaml index 8fb0304..c3f4156 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/demo.yaml @@ -11,7 +11,7 @@ jobs: - run: | cd knots-trunk guix shell -D -f guix-dev.scm -- documenta api knots - guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -o ../knots-pages/index.html doc/index.texi + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages From 2e25c3b074ff218498631108559cdcb014e289c4 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:07:29 +0200 Subject: [PATCH 06/24] Add workflow for building the website --- .forgejo/workflows/build-website.yaml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .forgejo/workflows/build-website.yaml diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml new file mode 100644 index 0000000..ac6261f --- /dev/null +++ b/.forgejo/workflows/build-website.yaml @@ -0,0 +1,22 @@ +on: + push: + branches: + - trunk +jobs: + test: + runs-on: host + steps: + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git knots-trunk + - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages + - run: | + cd knots-trunk + guix shell -D -f guix-dev.scm -- documenta api knots + guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi + + - run: | + cd knots-pages + git add . + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push From edf62414eebb661729b6b6f564c6ce7a1be27d07 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 24 Jun 2025 21:14:52 +0200 Subject: [PATCH 07/24] Avoid workflow erroring if there's nothing to change --- .forgejo/workflows/build-website.yaml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index ac6261f..d859b76 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -16,7 +16,11 @@ jobs: - run: | cd knots-pages git add . - git config user.email "" - git config user.name "Automatic website updater" - git commit -m "Automatic website update" - git push + if [[ -z "$(git status -s)" ]]; then + echo "Nothing to push" + else + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push + fi From ab5411da423043f2b8a0e27c7507f8d9c34686a2 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 25 Jun 2025 18:46:46 +0200 Subject: [PATCH 08/24] Make resource pool changes and add parallelism limiter This was motivated by trying to allow for completely cleaning up resource pools, which involved removing their use of fiberize which currently has no destroy mechanism. As part of this, there's a new parallelism limiter mechanism using resource pools rather than fibers, and also a fixed size resource pool. The tests now drain? and destroy the resource pools to check cleaning up. --- knots/parallelism.scm | 38 ++- knots/resource-pool.scm | 719 ++++++++++++++++++++++++++++++++-------- tests.scm | 6 +- tests/parallelism.scm | 12 + tests/resource-pool.scm | 47 ++- 5 files changed, 669 insertions(+), 153 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f8b2b8b..9e80f5b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -20,6 +20,8 @@ (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -27,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map @@ -38,7 +41,13 @@ fibers-parallel fibers-let - fiberize)) + fiberize + + make-parallelism-limiter + parallelism-limiter? + destroy-parallelism-limiter + call-with-parallelism-limiter + with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -287,3 +296,30 @@ (('result . vals) (apply values vals)) (('exception exn) (raise-exception exn)))))) + +(define-record-type + (make-parallelism-limiter-record resource-pool) + parallelism-limiter? + (resource-pool parallelism-limiter-resource-pool)) + +(define* (make-parallelism-limiter limit #:key (name "unnamed")) + (make-parallelism-limiter-record + (make-fixed-size-resource-pool + (iota limit) + #:name name))) + +(define (destroy-parallelism-limiter parallelism-limiter) + (destroy-resource-pool + (parallelism-limiter-resource-pool + parallelism-limiter))) + +(define* (call-with-parallelism-limiter parallelism-limiter thunk) + (call-with-resource-from-pool + (parallelism-limiter-resource-pool parallelism-limiter) + (lambda _ + (thunk)))) + +(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) + (call-with-parallelism-limiter + parallelism-limiter + (lambda () exp ...))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index da52051..71c378c 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -32,9 +32,10 @@ #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) - #:export (resource-pool? - + #:export (make-fixed-size-resource-pool make-resource-pool + + resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration @@ -91,6 +92,429 @@ (resource-pool-name resource-pool)) port))) +(define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + +(define* (make-fixed-size-resource-pool resources + #:key + (delay-logger (const #f)) + (duration-logger (const #f)) + destructor + scheduler + (name "unnamed") + default-checkout-timeout + default-max-waiters) + (define channel (make-channel)) + (define destroy-condition + (make-condition)) + + (define pool + (make-resource-pool-record + name + channel + destroy-condition + `((delay-logger . ,delay-logger) + (duration-logger . ,duration-logger) + (destructor . ,destructor) + (scheduler . ,scheduler) + (name . ,name) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) + + (define checkout-failure-count 0) + + (define (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (let loop () + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) + + (if success? + (put-message channel + (list 'remove resource)) + (begin + (sleep 5) + + (loop)))))))) + + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when (and (not (eq? return-type 'remove)) + destructor) + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources)) + + (('destroy reply) + (loop resources)) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources))))) + + (define (main-loop) + (let loop ((resources resources) + (available resources) + (waiters '())) + + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (if (null? available) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters)) + (loop resources + available + (cons (cons reply timeout-time) + waiters)))) + + (if timeout-time + (let ((current-internal-time + (get-internal-real-time))) + ;; If this client is still waiting + (if (> timeout-time + current-internal-time) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) + + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the resource, + ;; and returning it if there's a timeout + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) + waiters)) + (loop resources + available + waiters))) + (begin + (put-message reply (cons 'success + (car available))) + + (loop resources + (cdr available) + waiters))))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (null? waiters) + (loop resources + (cons resource available) + waiters) + + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop resources + (cons resource available) + '()) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel (cons 'success + resource))) + + (loop resources + available + (drop-right! alive-waiters 1)))))))) + + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters)) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources + available + waiters)) + + (('destroy) + (if (and (null? resources) + (null? waiters)) + (signal-condition! + destroy-condition) + + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters) + + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) + + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters))))) + + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + #f) + (lambda () + (with-exception-handler + (lambda (exn) + (let* ((stack (make-stack #t)) + (error-string + (call-with-output-string + (lambda (port) + (display-backtrace stack port 3) + (simple-format + port + "exception in the ~A pool fiber, " name) + (print-exception + port + (stack-ref stack 3) + '%exception + (list exn)))))) + (display error-string + (current-error-port))) + (raise-exception exn)) + (lambda () + (start-stack + #t + (main-loop))))) + #:unwind? #t)) + (or scheduler + (current-scheduler))) + + pool) + (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) @@ -126,46 +550,52 @@ (define checkout-failure-count 0) - (define spawn-fiber-to-return-new-resource - (if add-resources-parallelism - (let ((thunk - (fiberize - (lambda () - (let ((max-size - (assq-ref (resource-pool-configuration pool) - 'max-size)) - (size (assq-ref (resource-pool-stats pool) - 'resources))) - (unless (= size max-size) - (let ((new-resource - (return-new-resource))) - (put-message channel - (list 'add-resource new-resource)))))) - #:parallelism add-resources-parallelism))) - (lambda () - (spawn-fiber thunk))) - (lambda () - (spawn-fiber - (lambda () - (let ((new-resource + (define return-new-resource/parallelism-limiter + (make-parallelism-limiter + (or add-resources-parallelism + max-size) + #:name + (string-append + name + " resource pool new resource parallelism limiter"))) + + (define (spawn-fiber-to-return-new-resource) + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + ;; This can happen if the resource pool is destroyed very + ;; quickly + (unless (resource-pool-destroyed-error? exn) + (raise-exception exn))) + (lambda () + (with-parallelism-limiter + return-new-resource/parallelism-limiter + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool #:timeout #f) + 'resources))) + (unless (= size max-size) + (with-exception-handler + (lambda _ #f) + (lambda () (with-exception-handler - (lambda _ #f) + (lambda (exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A\n\n" + name + return-new-resource) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception adding resource to pool ~A: ~A\n\n" - name - return-new-resource) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (return-new-resource))))) - #:unwind? #t))) - (when new-resource - (put-message channel - (list 'add-resource new-resource))))))))) + (let ((new-resource + (start-stack #t (return-new-resource)))) + (put-message channel + (list 'add-resource new-resource)))))) + #:unwind? #t))))) + #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber @@ -250,21 +680,14 @@ 'remove) return-type) resource) - (when destructor + (when (and (not (eq? return-type 'remove)) + destructor) (spawn-fiber-to-destroy-resource resource)) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (let ((new-resources (if index (remove-at-index! resources index) @@ -276,13 +699,16 @@ resources)))) (if (null? new-resources) (begin + (and=> return-new-resource/parallelism-limiter + destroy-parallelism-limiter) + (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop new-resources))))) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . 0) @@ -291,13 +717,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources)) @@ -537,14 +967,6 @@ (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (loop (if index (remove-at-index! resources index) (begin @@ -577,7 +999,7 @@ waiters resources-last-used)) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) @@ -586,13 +1008,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources available @@ -645,46 +1071,52 @@ (signal-condition! destroy-condition) - (begin + (let ((current-internal-time (get-internal-real-time))) (for-each - (lambda (resource) - (if destructor - (spawn-fiber-to-destroy-resource resource) - (spawn-fiber - (lambda () - (put-message channel - (list 'remove resource))) - #:parallel? #t))) - available) - - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - waiters)) - - (destroy-loop resources)))) + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters) + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) (unknown (simple-format (current-error-port) @@ -744,7 +1176,8 @@ (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ - (wait (resource-pool-destroy-condition pool)))) + (wait + (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) @@ -949,34 +1382,42 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (if timeout + (let* ((reply (make-channel)) + (start-time (get-internal-real-time)) + (timeout-time + (+ start-time + (* internal-time-units-per-second timeout)))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + `(stats ,reply ,timeout-time)) + (const #t)) + (wrap-operation (sleep-operation timeout) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (let ((time-remaining + (- timeout + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) + (raise-exception + (make-resource-pool-timeout-error pool))))) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + `(stats ,reply #f)) + (get-message reply)))) (define (resource-pool-list-resources pool) (let ((reply (make-channel))) diff --git a/tests.scm b/tests.scm index 2b24c6a..a58eff0 100644 --- a/tests.scm +++ b/tests.scm @@ -4,7 +4,7 @@ #:export (run-fibers-for-tests assert-no-heap-growth)) -(define (run-fibers-for-tests thunk) +(define* (run-fibers-for-tests thunk #:key (drain? #t)) (let ((result (run-fibers (lambda () @@ -12,6 +12,7 @@ (lambda (exn) exn) (lambda () + (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) (backtrace) @@ -20,7 +21,8 @@ #t) #:unwind? #t)) #:hz 0 - #:parallelism 1))) + #:parallelism 1 + #:drain? drain?))) (if (exception? result) (raise-exception result) result))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 9881a4d..03ec376 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -111,4 +111,16 @@ (assert-equal a 1)))) +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter 2))) + (fibers-for-each + (lambda _ + (with-parallelism-limiter + parallelism-limiter + #f)) + (iota 50)) + + (destroy-parallelism-limiter parallelism-limiter)))) + (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1bc09e5..461d04b 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -19,7 +19,21 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) + +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-fixed-size-resource-pool + (list 1)))) + (assert-true + (number? + (with-resource-from-pool resource-pool + res + res))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -31,7 +45,9 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) (let* ((error-constructor (record-constructor &resource-pool-timeout)) @@ -88,10 +104,13 @@ res)) (iota 20)) - (let loop ((stats (resource-pool-stats resource-pool))) + (let loop ((stats (resource-pool-stats resource-pool + #:timeout #f))) (unless (= 0 (assq-ref stats 'resources)) (sleep 0.1) - (loop (resource-pool-stats resource-pool))))))) + (loop (resource-pool-stats resource-pool #:timeout #f)))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -115,7 +134,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -129,7 +150,7 @@ (error "collision detected"))) (new-number)) 1 - #:default-checkout-timeout 120))) + #:default-checkout-timeout 5))) (fibers-batch-for-each (lambda _ (with-resource-from-pool @@ -140,7 +161,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -164,14 +187,14 @@ (call-with-resource-from-pool resource-pool (lambda (res) - (error 'should-not-be-reached)))) + #f))) #:unwind? #t))) (while (= 0 (assq-ref - (resource-pool-stats resource-pool) + (resource-pool-stats resource-pool #:timeout #f) 'waiters)) - (sleep 0)) + (sleep 0.1)) (with-exception-handler (lambda (exn) @@ -184,6 +207,8 @@ resource-pool (lambda (res) (error 'should-not-be-reached)))) - #:unwind? #t)))))) + #:unwind? #t))) + + (destroy-resource-pool resource-pool)))) (display "resource-pool test finished successfully\n") From 09ca6cfb6bb24b94684f67daee5b1a8eae4aa3cb Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 26 Jun 2025 21:27:32 +0200 Subject: [PATCH 09/24] Fix resource-pool-destroy-resource-exception Raising the exception is more consistent, and avoids returning the resource. --- knots/resource-pool.scm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 71c378c..e55bdac 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1351,8 +1351,7 @@ available. Return the resource once PROC has returned." 'destroy 'return) resource)) - (unless (resource-pool-destroy-resource-exception? exn) - (raise-exception exn))) + (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) From 8f3e0a9a1d8572f4e96651ec0ded21717644c6bb Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 26 Jun 2025 22:53:15 +0200 Subject: [PATCH 10/24] Fix exception handling in fibers-map-with-progress --- knots/parallelism.scm | 8 ++++---- tests/parallelism.scm | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 9e80f5b..a3b04c7 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -217,9 +217,9 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception . exn)) + ((#f . ('exception exn)) (raise-exception exn)) - ((#f . ('result . val)) + ((#f . ('result val)) val)) channels-to-results) (loop @@ -239,10 +239,10 @@ (if (eq? channel c) (cons #f (match result - (('exception . exn) + (('exception exn) result) (_ - (cons 'result result)))) + (list 'result result)))) (cons c r)))) channels-to-results))) #f)))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 03ec376..91b2f3d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -61,6 +61,24 @@ identity '(())))) +(run-fibers-for-tests + (lambda () + (with-exception-handler + (lambda (exn) + (unless (and (exception-with-message? exn) + (string=? (exception-message exn) + "foo")) + (raise-exception exn))) + (lambda () + (fibers-map-with-progress + (lambda _ + (raise-exception + (make-exception-with-message "foo"))) + '((1))) + + (error 'should-not-reach-here)) + #:unwind? #t))) + (run-fibers-for-tests (lambda () (with-exception-handler From 163d775496f8413fe5c1edb5ed22289df9d4fd01 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:18 +0200 Subject: [PATCH 11/24] Fix record-predicate that should be exception-predicate --- knots/resource-pool.scm | 10 +++++----- knots/thread-pool.scm | 2 +- knots/timeout.scm | 6 +++--- knots/web-server.scm | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index e55bdac..7da76b0 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -74,7 +74,7 @@ (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? - (record-predicate &resource-pool-abort-add-resource)) + (exception-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) @@ -1196,7 +1196,7 @@ (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? - (record-predicate &resource-pool-timeout)) + (exception-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters @@ -1217,7 +1217,7 @@ (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? - (record-predicate &resource-pool-too-many-waiters)) + (exception-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed @@ -1233,7 +1233,7 @@ (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? - (record-predicate &resource-pool-destroyed)) + (exception-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource @@ -1244,7 +1244,7 @@ (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? - (record-predicate &resource-pool-destroy-resource)) + (exception-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index b176162..70d7292 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -198,7 +198,7 @@ from there, or #f if that would be an empty string." (record-accessor &thread-pool-timeout-error 'pool))) (define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout-error)) + (exception-predicate &thread-pool-timeout-error)) (define* (make-fixed-size-thread-pool size #:key diff --git a/knots/timeout.scm b/knots/timeout.scm index 58306e0..a65a095 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -85,7 +85,7 @@ (record-constructor &port-timeout-error)) (define port-timeout-error? - (record-predicate &port-timeout-error)) + (exception-predicate &port-timeout-error)) (define &port-read-timeout-error (make-exception-type '&port-read-timeout-error @@ -96,7 +96,7 @@ (record-constructor &port-read-timeout-error)) (define port-read-timeout-error? - (record-predicate &port-read-timeout-error)) + (exception-predicate &port-read-timeout-error)) (define &port-write-timeout-error (make-exception-type '&port-write-timeout-error @@ -107,7 +107,7 @@ (record-constructor &port-write-timeout-error)) (define port-write-timeout-error? - (record-predicate &port-write-timeout-error)) + (exception-predicate &port-write-timeout-error)) (define (readable? port) "Test if PORT is writable." diff --git a/knots/web-server.scm b/knots/web-server.scm index 453db44..a0a3641 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -130,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true." (record-constructor &request-body-ended-prematurely)) (define request-body-ended-prematurely-error? - (record-predicate &request-body-ended-prematurely)) + (exception-predicate &request-body-ended-prematurely)) (define (request-body-port/knots r) (cond From d8f64399cd572fb1c6b19303ee710ce6529e77b4 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:37 +0200 Subject: [PATCH 12/24] Tweak spacing --- knots/parallelism.scm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index a3b04c7..c98ca3f 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -217,9 +217,9 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception exn)) + ((#f . ('exception exn)) (raise-exception exn)) - ((#f . ('result val)) + ((#f . ('result val)) val)) channels-to-results) (loop From 6f6d57b189a7073718407df263bbe3c1245f2e51 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 00:16:41 +0200 Subject: [PATCH 13/24] Use the knots backtrace printer for tests --- tests.scm | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests.scm b/tests.scm index a58eff0..0cca3b4 100644 --- a/tests.scm +++ b/tests.scm @@ -1,6 +1,7 @@ (define-module (tests) #:use-module (ice-9 exceptions) #:use-module (fibers) + #:use-module (knots) #:export (run-fibers-for-tests assert-no-heap-growth)) @@ -15,9 +16,10 @@ (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) - (backtrace) + (print-backtrace-and-exception/knots exn) (raise-exception exn)) - thunk) + (lambda () + (start-stack #t (thunk)))) #t) #:unwind? #t)) #:hz 0 From 4140ef0bd67dfed419203713df497f94b0ac2e45 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 22:43:25 +0200 Subject: [PATCH 14/24] More consistently handle results and exceptions In the parallelism module. --- knots/parallelism.scm | 62 +++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index c98ca3f..f15dbe8 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -57,7 +58,7 @@ (lambda (exn) (put-message reply - (list 'exception exn))) + (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -78,7 +79,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply vals)))))) + (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -88,10 +89,10 @@ reply-channels))) (map (match-lambda - (('exception exn) + (('exception . exn) (raise-exception exn)) - (result - (apply values result))) + (('result . vals) + (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) @@ -114,9 +115,18 @@ (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (if (vector? (first lists)) - result-vec - (vector->list result-vec)) + (let ((processed-result-vec + (vector-map + (lambda (_ result-or-exn) + (match result-or-exn + (('exception . exn) + (raise-exception exn)) + (('result . vals) + (car vals)))) + result-vec))) + (if (vector? (first lists)) + processed-result-vec + (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -132,18 +142,13 @@ (get-operation (vector-ref result-vec index)) (lambda (result) - (match result - (('exception exn) - (raise-exception exn)) - (_ - (vector-set! result-vec - index - (first result)) - - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))))) + (vector-set! result-vec + index + result) + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -217,10 +222,10 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception exn)) + ((#f . ('exception . exn)) (raise-exception exn)) - ((#f . ('result val)) - val)) + ((#f . ('result . vals)) + (car vals))) channels-to-results) (loop (perform-operation @@ -237,12 +242,7 @@ (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f - (match result - (('exception exn) - result) - (_ - (list 'result result)))) + (cons #f result) (cons c r)))) channels-to-results))) #f)))) @@ -263,7 +263,7 @@ reply-channel (with-exception-handler (lambda (exn) - (list 'exception exn)) + (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -294,7 +294,7 @@ (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception exn) + (('exception . exn) (raise-exception exn)))))) (define-record-type From 0fa6737a39f866bdbffc11fd16348c5411c11a7c Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 23:28:47 +0200 Subject: [PATCH 15/24] Document some things --- knots/parallelism.scm | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f15dbe8..7631055 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -96,6 +96,9 @@ responses))) (define (fibers-batch-map proc parallelism-limit . lists) + "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of +the invocations of PROC raise an exception, this will be raised once +all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -171,9 +174,14 @@ channel-indexes))))))) (define (fibers-map proc . lists) + "Map PROC over LISTS in parallel, running up to 20 fibers in + PARALLEL. If any of the invocations of PROC raise an exception, this +will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) + "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in +parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -184,10 +192,13 @@ *unspecified*) (define (fibers-for-each proc . lists) + "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) + "Run each expression in parallel. If any expression raises an + exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -198,12 +209,16 @@ (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) + "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) + "Map PROC over LISTS, calling #:REPORT if specified after each +invocation of PROC finishes. REPORT is passed the results for each + element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args From deae518b528485638bef89b4230616024f5009d7 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 29 Jun 2025 08:35:28 +0200 Subject: [PATCH 16/24] Use the buffer size for chunked output ports --- knots/web-server.scm | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index a0a3641..fec349e 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -333,7 +333,8 @@ on the procedure being called at any particular time." (define (handle-request handler client read-request-exception-handler - write-response-exception-handler) + write-response-exception-handler + buffer-size) (let ((request (with-exception-handler read-request-exception-handler @@ -399,7 +400,8 @@ on the procedure being called at any particular time." client (make-chunked-output-port/knots client - #:keep-alive? #t)))) + #:keep-alive? #t + #:buffering buffer-size)))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler @@ -472,7 +474,8 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler))) + write-response-exception-handler + buffer-size))) (if keep-alive? (loop) (close-port client))))))) From 7709ffe1d33ac9da62189f725a64d59f729b4463 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 30 Jun 2025 15:41:04 +0200 Subject: [PATCH 17/24] Tweak the knots chunked output port To try and reduce the number of write syscalls. --- knots/web-server.scm | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index fec349e..3e5f388 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -63,6 +63,14 @@ (bind sock family addr port) sock)) +(define crlf-bv + (string->utf8 "\r\n")) + +(define (chunked-output-port-overhead-bytes write-size) + (+ (string-length (number->string write-size 16)) + (bytevector-length crlf-bv) + (bytevector-length crlf-bv))) + (define* (make-chunked-output-port/knots port #:key (keep-alive? #f) (buffering 1200)) "Returns a new port which translates non-encoded data into a HTTP @@ -74,10 +82,12 @@ when done, as it will output the remaining data, and encode the final zero chunk. When the port is closed it will also close PORT, unless KEEP-ALIVE? is true." (define (write! bv start count) - (put-string port (number->string count 16)) - (put-string port "\r\n") + (let ((len-string + (number->string count 16))) + (put-string port len-string)) + (put-bytevector port crlf-bv 0 2) (put-bytevector port bv start count) - (put-string port "\r\n") + (put-bytevector port crlf-bv 0 2) (force-output port) count) @@ -401,7 +411,10 @@ on the procedure being called at any particular time." (make-chunked-output-port/knots client #:keep-alive? #t - #:buffering buffer-size)))) + #:buffering + (- buffer-size + (chunked-output-port-overhead-bytes + buffer-size)))))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler From ce1b710bcf4b1874bdd06a6f96cb3e268f4b5895 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 30 Jun 2025 22:57:08 +0100 Subject: [PATCH 18/24] Use a queue for the resource pool waiters As this will maybe improve performance. --- knots/resource-pool.scm | 277 ++++++++++++++++++++-------------------- 1 file changed, 141 insertions(+), 136 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 7da76b0..b27f329 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-71) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) @@ -267,13 +268,13 @@ (define (main-loop) (let loop ((resources resources) (available resources) - (waiters '())) + (waiters (make-q))) (match (get-message channel) (('checkout reply timeout-time max-waiters) (if (null? available) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -301,8 +302,7 @@ waiters)) (loop resources available - (cons (cons reply timeout-time) - waiters)))) + (enq! waiters (cons reply timeout-time))))) (if timeout-time (let ((current-internal-time @@ -345,44 +345,46 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '()) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1)))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters))))))) + #:unwind? #t)))) (('list-resources reply) (spawn-fiber @@ -397,7 +399,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -420,7 +422,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -448,7 +450,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin @@ -747,7 +749,7 @@ (define (main-loop) (let loop ((resources '()) (available '()) - (waiters '()) + (waiters (make-q)) (resources-last-used '())) (match (get-message channel) @@ -769,50 +771,52 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (null? waiters) + (if (q-empty? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop (cons resource resources) - (cons resource available) - '() - (cons (get-internal-real-time) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn - ;; a new fiber to handle handing over - ;; the resource, and returning it if - ;; there's a timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop (cons resource resources) - available - (drop-right! alive-waiters 1) - (cons (get-internal-real-time) - resources-last-used))))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (begin + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used))))))) + #:unwind? #t)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -821,7 +825,7 @@ (spawn-fiber-to-return-new-resource)) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -850,8 +854,7 @@ resources-last-used)) (loop resources available - (cons (cons reply timeout-time) - waiters) + (enq! waiters (cons reply timeout-time)) resources-last-used)))) (if timeout-time @@ -898,7 +901,7 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters @@ -911,56 +914,58 @@ (get-internal-real-time)) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '() - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time))) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time)) + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)))))))) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource))) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used)))))))) + #:unwind? #t)))) (('remove resource) (let ((index @@ -1003,7 +1008,7 @@ (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber @@ -1067,7 +1072,7 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) @@ -1095,7 +1100,7 @@ internal-time-units-per-second)) (const #f))) op)))))))) - waiters) + (car waiters)) (if destructor (begin From ff93dc144284044255957de8870f0d848f8fa844 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 1 Jul 2025 12:45:12 +0100 Subject: [PATCH 19/24] Add a post-request-hook to the web server --- knots/web-server.scm | 51 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/knots/web-server.scm b/knots/web-server.scm index 3e5f388..4d7240b 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -341,16 +341,19 @@ on the procedure being called at any particular time." (string->utf8 "internal server error"))) -(define (handle-request handler client - read-request-exception-handler - write-response-exception-handler - buffer-size) +(define* (handle-request handler client + read-request-exception-handler + write-response-exception-handler + buffer-size + #:key post-request-hook) (let ((request (with-exception-handler read-request-exception-handler (lambda () (read-request client)) - #:unwind? #t))) + #:unwind? #t)) + (read-request-time + (get-internal-real-time))) (let ((response body (cond @@ -399,7 +402,9 @@ on the procedure being called at any particular time." (lambda () (write-response response client) - (let ((body-written? + (let ((response-start-time + (get-internal-real-time)) + (body-written? (if (procedure? body) (let* ((type (response-content-type response '(text/plain))) @@ -438,6 +443,11 @@ on the procedure being called at any particular time." (if body-written? (begin (force-output client) + (when post-request-hook + (post-request-hook request + #:read-request-time read-request-time + #:response-start-time response-start-time + #:response-end-time (get-internal-real-time))) (when (and (procedure? body) (response-content-length response)) (set-port-encoding! client "ISO-8859-1")) @@ -449,7 +459,8 @@ on the procedure being called at any particular time." read-request-exception-handler write-response-exception-handler connection-idle-timeout - buffer-size) + buffer-size + post-request-hook) ;; Always disable Nagle's algorithm, as we handle buffering ;; ourselves; when we force-output, we really want the data to go ;; out. @@ -488,11 +499,28 @@ on the procedure being called at any particular time." (let ((keep-alive? (handle-request handler client read-request-exception-handler write-response-exception-handler - buffer-size))) + buffer-size + #:post-request-hook + post-request-hook))) (if keep-alive? (loop) (close-port client))))))) +(define (post-request-hook/safe post-request-hook) + (if post-request-hook + (lambda args + (with-exception-handler + (lambda (exn) #f) + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (apply post-request-hook args)))) + #:unwind? #t)) + #f)) + (define-record-type (make-web-server socket port) web-server? @@ -512,7 +540,8 @@ on the procedure being called at any particular time." (write-response-exception-handler default-write-response-exception-handler) (connection-idle-timeout #f) - (connection-buffer-size 1024)) + (connection-buffer-size 1024) + post-request-hook) "Run the knots web server. HANDLER should be a procedure that takes one argument, the HTTP @@ -548,7 +577,9 @@ before sending back to the client." read-request-exception-handler write-response-exception-handler connection-idle-timeout - connection-buffer-size)) + connection-buffer-size + (post-request-hook/safe + post-request-hook))) #:parallel? #t) (loop)))))) From ec2f2489a2394a9f426931cee2b7a4856349603e Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Tue, 1 Jul 2025 23:13:31 +0100 Subject: [PATCH 20/24] Fix resource pool bug And remove unnecessary named let. --- knots/resource-pool.scm | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index b27f329..6b28a27 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -1317,8 +1317,9 @@ available. Return the resource once PROC has returned." start-time) 'timeout) response)) - 'timeout))))) - (let loop ((reply (make-channel))) + 'timeout)) + 'timeout))) + (let ((reply (make-channel))) (put-message channel (list 'checkout reply From f4b48a149923fd5cb4ff77772f2fdcc0b694f676 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 6 Jul 2025 18:49:09 +0100 Subject: [PATCH 21/24] Avoid calling deq! if the queue is empty --- knots/resource-pool.scm | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 6b28a27..640978f 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -365,7 +365,11 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout @@ -795,7 +799,13 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop (cons resource resources) + (cons resource available) + waiters + (cons (get-internal-real-time) + resources-last-used)) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout @@ -938,7 +948,19 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (waiter-loop (deq! waiters)) + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) + (waiter-loop (deq! waiters))) (begin (if timeout (let ((reply-timeout From d18b5b8d5de5beff3b9f84cfb359b73a4dcf2070 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 9 Jul 2025 12:06:20 +0100 Subject: [PATCH 22/24] Don't loop inside exception handlers The resource pools seemed to become slower and slower over time, this might help? --- knots/resource-pool.scm | 159 +++++++++++++++++----------------------- 1 file changed, 67 insertions(+), 92 deletions(-) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 640978f..6e9c353 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -365,30 +365,25 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop resources - available - waiters))))))) - #:unwind? #t)))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters)))) (('list-resources reply) (spawn-fiber @@ -799,34 +794,27 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop (cons resource resources) - (cons resource available) - waiters - (cons (get-internal-real-time) - resources-last-used)) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop (cons resource resources) - available - waiters - (cons current-internal-time - resources-last-used))))))) - #:unwind? #t)))))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -948,46 +936,33 @@ ((reply . timeout) (if (and timeout (< timeout current-internal-time)) - (if (q-empty? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - (waiter-loop (deq! waiters))) - (begin - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout reply - reply-timeout - resource)) - (put-message reply (cons 'success - resource))) - (loop resources - available - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - current-internal-time) - resources-last-used)))))))) - #:unwind? #t)))) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used))))) (('remove resource) (let ((index From 4468a3ef6d27556db35b0c6d07f7c5110ae362cc Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 10 Jul 2025 16:11:45 +0100 Subject: [PATCH 23/24] Generate documentation for (knots) as well As enabled by Guile Documenta 0.4. --- .forgejo/workflows/build-website.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index d859b76..ab24066 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -10,7 +10,7 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api knots + guix shell -D -f guix-dev.scm -- documenta api knots.scm knots guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | From 52092e7a99fb782a5e89a4b0ec42f93549b96437 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Thu, 10 Jul 2025 16:15:58 +0100 Subject: [PATCH 24/24] Fix call to documenta --- .forgejo/workflows/build-website.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/build-website.yaml index ab24066..ae6c4da 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -10,7 +10,7 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api knots.scm knots + guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/" guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: |