diff --git a/.forgejo/workflows/demo.yaml b/.forgejo/workflows/build-website.yaml similarity index 61% rename from .forgejo/workflows/demo.yaml rename to .forgejo/workflows/build-website.yaml index c3f4156..ae6c4da 100644 --- a/.forgejo/workflows/demo.yaml +++ b/.forgejo/workflows/build-website.yaml @@ -1,7 +1,7 @@ on: push: branches: - - actions-test + - trunk jobs: test: runs-on: host @@ -10,13 +10,17 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api knots + guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/" guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages git add . - git config user.email "" - git config user.name "Automatic website updater" - git commit -m "Automatic website update" - git push + if [[ -z "$(git status -s)" ]]; then + echo "Nothing to push" + else + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push + fi diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f8b2b8b..7631055 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -20,6 +20,9 @@ (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -27,6 +30,7 @@ #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map @@ -38,7 +42,13 @@ fibers-parallel fibers-let - fiberize)) + fiberize + + make-parallelism-limiter + parallelism-limiter? + destroy-parallelism-limiter + call-with-parallelism-limiter + with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -48,7 +58,7 @@ (lambda (exn) (put-message reply - (list 'exception exn))) + (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -69,7 +79,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply vals)))))) + (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -79,13 +89,16 @@ reply-channels))) (map (match-lambda - (('exception exn) + (('exception . exn) (raise-exception exn)) - (result - (apply values result))) + (('result . vals) + (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) + "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of +the invocations of PROC raise an exception, this will be raised once +all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -105,9 +118,18 @@ (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (if (vector? (first lists)) - result-vec - (vector->list result-vec)) + (let ((processed-result-vec + (vector-map + (lambda (_ result-or-exn) + (match result-or-exn + (('exception . exn) + (raise-exception exn)) + (('result . vals) + (car vals)))) + result-vec))) + (if (vector? (first lists)) + processed-result-vec + (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -123,18 +145,13 @@ (get-operation (vector-ref result-vec index)) (lambda (result) - (match result - (('exception exn) - (raise-exception exn)) - (_ - (vector-set! result-vec - index - (first result)) - - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))))) + (vector-set! result-vec + index + result) + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -157,9 +174,14 @@ channel-indexes))))))) (define (fibers-map proc . lists) + "Map PROC over LISTS in parallel, running up to 20 fibers in + PARALLEL. If any of the invocations of PROC raise an exception, this +will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) + "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in +parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -170,10 +192,13 @@ *unspecified*) (define (fibers-for-each proc . lists) + "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) + "Run each expression in parallel. If any expression raises an + exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -184,12 +209,16 @@ (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) + "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) + "Map PROC over LISTS, calling #:REPORT if specified after each +invocation of PROC finishes. REPORT is passed the results for each + element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args @@ -210,8 +239,8 @@ (match-lambda ((#f . ('exception . exn)) (raise-exception exn)) - ((#f . ('result . val)) - val)) + ((#f . ('result . vals)) + (car vals))) channels-to-results) (loop (perform-operation @@ -228,12 +257,7 @@ (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f - (match result - (('exception . exn) - result) - (_ - (cons 'result result)))) + (cons #f result) (cons c r)))) channels-to-results))) #f)))) @@ -254,7 +278,7 @@ reply-channel (with-exception-handler (lambda (exn) - (list 'exception exn)) + (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -285,5 +309,32 @@ (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception exn) + (('exception . exn) (raise-exception exn)))))) + +(define-record-type + (make-parallelism-limiter-record resource-pool) + parallelism-limiter? + (resource-pool parallelism-limiter-resource-pool)) + +(define* (make-parallelism-limiter limit #:key (name "unnamed")) + (make-parallelism-limiter-record + (make-fixed-size-resource-pool + (iota limit) + #:name name))) + +(define (destroy-parallelism-limiter parallelism-limiter) + (destroy-resource-pool + (parallelism-limiter-resource-pool + parallelism-limiter))) + +(define* (call-with-parallelism-limiter parallelism-limiter thunk) + (call-with-resource-from-pool + (parallelism-limiter-resource-pool parallelism-limiter) + (lambda _ + (thunk)))) + +(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) + (call-with-parallelism-limiter + parallelism-limiter + (lambda () exp ...))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index da52051..6e9c353 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-71) + #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) @@ -32,9 +33,10 @@ #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) - #:export (resource-pool? - + #:export (make-fixed-size-resource-pool make-resource-pool + + resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration @@ -73,7 +75,7 @@ (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? - (record-predicate &resource-pool-abort-add-resource)) + (exception-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) @@ -91,6 +93,429 @@ (resource-pool-name resource-pool)) port))) +(define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + +(define* (make-fixed-size-resource-pool resources + #:key + (delay-logger (const #f)) + (duration-logger (const #f)) + destructor + scheduler + (name "unnamed") + default-checkout-timeout + default-max-waiters) + (define channel (make-channel)) + (define destroy-condition + (make-condition)) + + (define pool + (make-resource-pool-record + name + channel + destroy-condition + `((delay-logger . ,delay-logger) + (duration-logger . ,duration-logger) + (destructor . ,destructor) + (scheduler . ,scheduler) + (name . ,name) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) + + (define checkout-failure-count 0) + + (define (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (let loop () + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) + + (if success? + (put-message channel + (list 'remove resource)) + (begin + (sleep 5) + + (loop)))))))) + + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when (and (not (eq? return-type 'remove)) + destructor) + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources)) + + (('destroy reply) + (loop resources)) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources))))) + + (define (main-loop) + (let loop ((resources resources) + (available resources) + (waiters (make-q))) + + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (if (null? available) + (let ((waiters-count + (q-length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters)) + (loop resources + available + (enq! waiters (cons reply timeout-time))))) + + (if timeout-time + (let ((current-internal-time + (get-internal-real-time))) + ;; If this client is still waiting + (if (> timeout-time + current-internal-time) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) + + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the resource, + ;; and returning it if there's a timeout + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) + waiters)) + (loop resources + available + waiters))) + (begin + (put-message reply (cons 'success + (car available))) + + (loop resources + (cdr available) + waiters))))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (q-empty? waiters) + (loop resources + (cons resource available) + waiters) + + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters)))) + + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters)) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(q-length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources + available + waiters)) + + (('destroy) + (if (and (null? resources) + (q-empty? waiters)) + (signal-condition! + destroy-condition) + + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) + + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) + + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters))))) + + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + #f) + (lambda () + (with-exception-handler + (lambda (exn) + (let* ((stack (make-stack #t)) + (error-string + (call-with-output-string + (lambda (port) + (display-backtrace stack port 3) + (simple-format + port + "exception in the ~A pool fiber, " name) + (print-exception + port + (stack-ref stack 3) + '%exception + (list exn)))))) + (display error-string + (current-error-port))) + (raise-exception exn)) + (lambda () + (start-stack + #t + (main-loop))))) + #:unwind? #t)) + (or scheduler + (current-scheduler))) + + pool) + (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) @@ -126,46 +551,52 @@ (define checkout-failure-count 0) - (define spawn-fiber-to-return-new-resource - (if add-resources-parallelism - (let ((thunk - (fiberize - (lambda () - (let ((max-size - (assq-ref (resource-pool-configuration pool) - 'max-size)) - (size (assq-ref (resource-pool-stats pool) - 'resources))) - (unless (= size max-size) - (let ((new-resource - (return-new-resource))) - (put-message channel - (list 'add-resource new-resource)))))) - #:parallelism add-resources-parallelism))) - (lambda () - (spawn-fiber thunk))) - (lambda () - (spawn-fiber - (lambda () - (let ((new-resource + (define return-new-resource/parallelism-limiter + (make-parallelism-limiter + (or add-resources-parallelism + max-size) + #:name + (string-append + name + " resource pool new resource parallelism limiter"))) + + (define (spawn-fiber-to-return-new-resource) + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + ;; This can happen if the resource pool is destroyed very + ;; quickly + (unless (resource-pool-destroyed-error? exn) + (raise-exception exn))) + (lambda () + (with-parallelism-limiter + return-new-resource/parallelism-limiter + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool #:timeout #f) + 'resources))) + (unless (= size max-size) + (with-exception-handler + (lambda _ #f) + (lambda () (with-exception-handler - (lambda _ #f) + (lambda (exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A\n\n" + name + return-new-resource) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception adding resource to pool ~A: ~A\n\n" - name - return-new-resource) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (return-new-resource))))) - #:unwind? #t))) - (when new-resource - (put-message channel - (list 'add-resource new-resource))))))))) + (let ((new-resource + (start-stack #t (return-new-resource)))) + (put-message channel + (list 'add-resource new-resource)))))) + #:unwind? #t))))) + #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber @@ -250,21 +681,14 @@ 'remove) return-type) resource) - (when destructor + (when (and (not (eq? return-type 'remove)) + destructor) (spawn-fiber-to-destroy-resource resource)) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (let ((new-resources (if index (remove-at-index! resources index) @@ -276,13 +700,16 @@ resources)))) (if (null? new-resources) (begin + (and=> return-new-resource/parallelism-limiter + destroy-parallelism-limiter) + (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop new-resources))))) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . 0) @@ -291,13 +718,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources)) @@ -317,7 +748,7 @@ (define (main-loop) (let loop ((resources '()) (available '()) - (waiters '()) + (waiters (make-q)) (resources-last-used '())) (match (get-message channel) @@ -339,50 +770,51 @@ (cons (get-internal-real-time) resources-last-used)))) - (if (null? waiters) + (if (q-empty? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop (cons resource resources) - (cons resource available) - '() - (cons (get-internal-real-time) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn - ;; a new fiber to handle handing over - ;; the resource, and returning it if - ;; there's a timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop (cons resource resources) - available - (drop-right! alive-waiters 1) - (cons (get-internal-real-time) - resources-last-used))))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop (cons resource resources) + (cons resource available) + waiters + (cons current-internal-time + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop (cons resource resources) + available + waiters + (cons current-internal-time + resources-last-used)))))) (('checkout reply timeout-time max-waiters) (if (null? available) @@ -391,7 +823,7 @@ (spawn-fiber-to-return-new-resource)) (let ((waiters-count - (length waiters))) + (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -420,8 +852,7 @@ resources-last-used)) (loop resources available - (cons (cons reply timeout-time) - waiters) + (enq! waiters (cons reply timeout-time)) resources-last-used)))) (if timeout-time @@ -468,7 +899,7 @@ (set! checkout-failure-count (+ 1 checkout-failure-count))) - (if (null? waiters) + (if (q-empty? waiters) (loop resources (cons resource available) waiters @@ -481,70 +912,63 @@ (get-internal-real-time)) resources-last-used)) - (let* ((current-internal-time (get-internal-real-time)) - (alive-waiters - dead-waiters - (partition! - (match-lambda - ((reply . timeout) - (or (not timeout) - (> timeout current-internal-time)))) - waiters))) - (if (null? alive-waiters) - (loop resources - (cons resource available) - '() - (begin - (when (eq? return-type 'return) - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time))) - resources-last-used)) - (match (last alive-waiters) - ((waiter-channel . waiter-timeout) - (if waiter-timeout - (let ((reply-timeout - (/ (- waiter-timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout waiter-channel - reply-timeout - resource)) - (put-message waiter-channel (cons 'success - resource))) - - (loop resources - available - (drop-right! alive-waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)))))))) + (let ((current-internal-time + (get-internal-real-time))) + (with-exception-handler + (lambda (exn) + (if (eq? (exception-kind exn) 'q-empty) + (loop resources + (cons resource available) + waiters + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time)) + resources-last-used)) + (raise-exception exn))) + (lambda () + (let waiter-loop ((waiter (deq! waiters))) + (match waiter + ((reply . timeout) + (if (and timeout + (< timeout current-internal-time)) + (waiter-loop (deq! waiters)) + (if timeout + (let ((reply-timeout + (/ (- timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's + ;; a timeout + (spawn-fiber-for-checkout reply + reply-timeout + resource)) + (put-message reply (cons 'success + resource)))))))) + #:unwind? #t) + (loop resources + available + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + current-internal-time) + resources-last-used))))) (('remove resource) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - (loop (if index (remove-at-index! resources index) (begin @@ -577,22 +1001,26 @@ waiters resources-last-used)) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(length waiters)) + (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources available @@ -641,50 +1069,56 @@ (('destroy) (if (and (null? resources) - (null? waiters)) + (q-empty? waiters)) (signal-condition! destroy-condition) - (begin + (let ((current-internal-time (get-internal-real-time))) (for-each - (lambda (resource) - (if destructor - (spawn-fiber-to-destroy-resource resource) - (spawn-fiber - (lambda () - (put-message channel - (list 'remove resource))) - #:parallel? #t))) - available) - - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - waiters)) - - (destroy-loop resources)))) + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + (car waiters)) + (if destructor + (begin + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + available) + (destroy-loop resources)) + (let dl ((resources resources) + (available available)) + (if (null? available) + (if (null? resources) + (signal-condition! + destroy-condition) + (destroy-loop resources)) + (let ((index + (list-index (lambda (x) + (eq? x (car available))) + resources))) + (dl (remove-at-index! resources index) + (cdr available))))))))) (unknown (simple-format (current-error-port) @@ -744,7 +1178,8 @@ (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ - (wait (resource-pool-destroy-condition pool)))) + (wait + (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) @@ -763,7 +1198,7 @@ (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? - (record-predicate &resource-pool-timeout)) + (exception-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters @@ -784,7 +1219,7 @@ (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? - (record-predicate &resource-pool-too-many-waiters)) + (exception-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed @@ -800,7 +1235,7 @@ (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? - (record-predicate &resource-pool-destroyed)) + (exception-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource @@ -811,7 +1246,7 @@ (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? - (record-predicate &resource-pool-destroy-resource)) + (exception-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) @@ -879,8 +1314,9 @@ available. Return the resource once PROC has returned." start-time) 'timeout) response)) - 'timeout))))) - (let loop ((reply (make-channel))) + 'timeout)) + 'timeout))) + (let ((reply (make-channel))) (put-message channel (list 'checkout reply @@ -918,8 +1354,7 @@ available. Return the resource once PROC has returned." 'destroy 'return) resource)) - (unless (resource-pool-destroy-resource-exception? exn) - (raise-exception exn))) + (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -949,34 +1384,42 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (if timeout + (let* ((reply (make-channel)) + (start-time (get-internal-real-time)) + (timeout-time + (+ start-time + (* internal-time-units-per-second timeout)))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + `(stats ,reply ,timeout-time)) + (const #t)) + (wrap-operation (sleep-operation timeout) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (let ((time-remaining + (- timeout + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) + (raise-exception + (make-resource-pool-timeout-error pool))))) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + `(stats ,reply #f)) + (get-message reply)))) (define (resource-pool-list-resources pool) (let ((reply (make-channel))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index b176162..70d7292 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -198,7 +198,7 @@ from there, or #f if that would be an empty string." (record-accessor &thread-pool-timeout-error 'pool))) (define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout-error)) + (exception-predicate &thread-pool-timeout-error)) (define* (make-fixed-size-thread-pool size #:key diff --git a/knots/timeout.scm b/knots/timeout.scm index 58306e0..a65a095 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -85,7 +85,7 @@ (record-constructor &port-timeout-error)) (define port-timeout-error? - (record-predicate &port-timeout-error)) + (exception-predicate &port-timeout-error)) (define &port-read-timeout-error (make-exception-type '&port-read-timeout-error @@ -96,7 +96,7 @@ (record-constructor &port-read-timeout-error)) (define port-read-timeout-error? - (record-predicate &port-read-timeout-error)) + (exception-predicate &port-read-timeout-error)) (define &port-write-timeout-error (make-exception-type '&port-write-timeout-error @@ -107,7 +107,7 @@ (record-constructor &port-write-timeout-error)) (define port-write-timeout-error? - (record-predicate &port-write-timeout-error)) + (exception-predicate &port-write-timeout-error)) (define (readable? port) "Test if PORT is writable." diff --git a/knots/web-server.scm b/knots/web-server.scm index 453db44..4d7240b 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -63,6 +63,14 @@ (bind sock family addr port) sock)) +(define crlf-bv + (string->utf8 "\r\n")) + +(define (chunked-output-port-overhead-bytes write-size) + (+ (string-length (number->string write-size 16)) + (bytevector-length crlf-bv) + (bytevector-length crlf-bv))) + (define* (make-chunked-output-port/knots port #:key (keep-alive? #f) (buffering 1200)) "Returns a new port which translates non-encoded data into a HTTP @@ -74,10 +82,12 @@ when done, as it will output the remaining data, and encode the final zero chunk. When the port is closed it will also close PORT, unless KEEP-ALIVE? is true." (define (write! bv start count) - (put-string port (number->string count 16)) - (put-string port "\r\n") + (let ((len-string + (number->string count 16))) + (put-string port len-string)) + (put-bytevector port crlf-bv 0 2) (put-bytevector port bv start count) - (put-string port "\r\n") + (put-bytevector port crlf-bv 0 2) (force-output port) count) @@ -130,7 +140,7 @@ closes PORT, unless KEEP-ALIVE? is true." (record-constructor &request-body-ended-prematurely)) (define request-body-ended-prematurely-error? - (record-predicate &request-body-ended-prematurely)) + (exception-predicate &request-body-ended-prematurely)) (define (request-body-port/knots r) (cond @@ -331,15 +341,19 @@ on the procedure being called at any particular time." (string->utf8 "internal server error"))) -(define (handle-request handler client - read-request-exception-handler - write-response-exception-handler) +(define* (handle-request handler client + read-request-exception-handler + write-response-exception-handler + buffer-size + #:key post-request-hook) (let ((request (with-exception-handler read-request-exception-handler (lambda () (read-request client)) - #:unwind? #t))) + #:unwind? #t)) + (read-request-time + (get-internal-real-time))) (let ((response body (cond @@ -388,7 +402,9 @@ on the procedure being called at any particular time." (lambda () (write-response response client) - (let ((body-written? + (let ((response-start-time + (get-internal-real-time)) + (body-written? (if (procedure? body) (let* ((type (response-content-type response '(text/plain))) @@ -399,7 +415,11 @@ on the procedure being called at any particular time." client (make-chunked-output-port/knots client - #:keep-alive? #t)))) + #:keep-alive? #t + #:buffering + (- buffer-size + (chunked-output-port-overhead-bytes + buffer-size)))))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler @@ -423,6 +443,11 @@ on the procedure being called at any particular time." (if body-written? (begin (force-output client) + (when post-request-hook + (post-request-hook request + #:read-request-time read-request-time + #:response-start-time response-start-time + #:response-end-time (get-internal-real-time))) (when (and (procedure? body) (response-content-length response)) (set-port-encoding! client "ISO-8859-1")) @@ -434,7 +459,8 @@ on the procedure being called at any particular time." read-request-exception-handler write-response-exception-handler connection-idle-timeout - buffer-size) + buffer-size + post-request-hook) ;; Always disable Nagle's algorithm, as we handle buffering ;; ourselves; when we force-output, we really want the data to go ;; out. @@ -472,11 +498,29 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler))) + write-response-exception-handler + buffer-size + #:post-request-hook + post-request-hook))) (if keep-alive? (loop) (close-port client))))))) +(define (post-request-hook/safe post-request-hook) + (if post-request-hook + (lambda args + (with-exception-handler + (lambda (exn) #f) + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (apply post-request-hook args)))) + #:unwind? #t)) + #f)) + (define-record-type (make-web-server socket port) web-server? @@ -496,7 +540,8 @@ on the procedure being called at any particular time." (write-response-exception-handler default-write-response-exception-handler) (connection-idle-timeout #f) - (connection-buffer-size 1024)) + (connection-buffer-size 1024) + post-request-hook) "Run the knots web server. HANDLER should be a procedure that takes one argument, the HTTP @@ -532,7 +577,9 @@ before sending back to the client." read-request-exception-handler write-response-exception-handler connection-idle-timeout - connection-buffer-size)) + connection-buffer-size + (post-request-hook/safe + post-request-hook))) #:parallel? #t) (loop)))))) diff --git a/tests.scm b/tests.scm index 2b24c6a..0cca3b4 100644 --- a/tests.scm +++ b/tests.scm @@ -1,10 +1,11 @@ (define-module (tests) #:use-module (ice-9 exceptions) #:use-module (fibers) + #:use-module (knots) #:export (run-fibers-for-tests assert-no-heap-growth)) -(define (run-fibers-for-tests thunk) +(define* (run-fibers-for-tests thunk #:key (drain? #t)) (let ((result (run-fibers (lambda () @@ -12,15 +13,18 @@ (lambda (exn) exn) (lambda () + (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) - (backtrace) + (print-backtrace-and-exception/knots exn) (raise-exception exn)) - thunk) + (lambda () + (start-stack #t (thunk)))) #t) #:unwind? #t)) #:hz 0 - #:parallelism 1))) + #:parallelism 1 + #:drain? drain?))) (if (exception? result) (raise-exception result) result))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 9881a4d..91b2f3d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -61,6 +61,24 @@ identity '(())))) +(run-fibers-for-tests + (lambda () + (with-exception-handler + (lambda (exn) + (unless (and (exception-with-message? exn) + (string=? (exception-message exn) + "foo")) + (raise-exception exn))) + (lambda () + (fibers-map-with-progress + (lambda _ + (raise-exception + (make-exception-with-message "foo"))) + '((1))) + + (error 'should-not-reach-here)) + #:unwind? #t))) + (run-fibers-for-tests (lambda () (with-exception-handler @@ -111,4 +129,16 @@ (assert-equal a 1)))) +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter 2))) + (fibers-for-each + (lambda _ + (with-parallelism-limiter + parallelism-limiter + #f)) + (iota 50)) + + (destroy-parallelism-limiter parallelism-limiter)))) + (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1bc09e5..461d04b 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -19,7 +19,21 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) + +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-fixed-size-resource-pool + (list 1)))) + (assert-true + (number? + (with-resource-from-pool resource-pool + res + res))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -31,7 +45,9 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) (let* ((error-constructor (record-constructor &resource-pool-timeout)) @@ -88,10 +104,13 @@ res)) (iota 20)) - (let loop ((stats (resource-pool-stats resource-pool))) + (let loop ((stats (resource-pool-stats resource-pool + #:timeout #f))) (unless (= 0 (assq-ref stats 'resources)) (sleep 0.1) - (loop (resource-pool-stats resource-pool))))))) + (loop (resource-pool-stats resource-pool #:timeout #f)))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -115,7 +134,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -129,7 +150,7 @@ (error "collision detected"))) (new-number)) 1 - #:default-checkout-timeout 120))) + #:default-checkout-timeout 5))) (fibers-batch-for-each (lambda _ (with-resource-from-pool @@ -140,7 +161,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -164,14 +187,14 @@ (call-with-resource-from-pool resource-pool (lambda (res) - (error 'should-not-be-reached)))) + #f))) #:unwind? #t))) (while (= 0 (assq-ref - (resource-pool-stats resource-pool) + (resource-pool-stats resource-pool #:timeout #f) 'waiters)) - (sleep 0)) + (sleep 0.1)) (with-exception-handler (lambda (exn) @@ -184,6 +207,8 @@ resource-pool (lambda (res) (error 'should-not-be-reached)))) - #:unwind? #t)))))) + #:unwind? #t))) + + (destroy-resource-pool resource-pool)))) (display "resource-pool test finished successfully\n")