diff --git a/.forgejo/workflows/build-website.yaml b/.forgejo/workflows/demo.yaml similarity index 61% rename from .forgejo/workflows/build-website.yaml rename to .forgejo/workflows/demo.yaml index ae6c4da..c3f4156 100644 --- a/.forgejo/workflows/build-website.yaml +++ b/.forgejo/workflows/demo.yaml @@ -1,7 +1,7 @@ on: push: branches: - - trunk + - actions-test jobs: test: runs-on: host @@ -10,17 +10,13 @@ jobs: - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: | cd knots-trunk - guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/" + guix shell -D -f guix-dev.scm -- documenta api knots guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi - run: | cd knots-pages git add . - if [[ -z "$(git status -s)" ]]; then - echo "Nothing to push" - else - git config user.email "" - git config user.name "Automatic website updater" - git commit -m "Automatic website update" - git push - fi + git config user.email "" + git config user.name "Automatic website updater" + git commit -m "Automatic website update" + git push diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 7631055..f8b2b8b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -20,9 +20,6 @@ (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) - #:use-module (srfi srfi-9) - #:use-module (srfi srfi-9 gnu) - #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -30,7 +27,6 @@ #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) - #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map @@ -42,13 +38,7 @@ fibers-parallel fibers-let - fiberize - - make-parallelism-limiter - parallelism-limiter? - destroy-parallelism-limiter - call-with-parallelism-limiter - with-parallelism-limiter)) + fiberize)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -58,7 +48,7 @@ (lambda (exn) (put-message reply - (cons 'exception exn))) + (list 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -79,7 +69,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply (cons 'result vals))))))) + (put-message reply vals)))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -89,16 +79,13 @@ reply-channels))) (map (match-lambda - (('exception . exn) + (('exception exn) (raise-exception exn)) - (('result . vals) - (apply values vals))) + (result + (apply values result))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) - "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of -the invocations of PROC raise an exception, this will be raised once -all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -118,18 +105,9 @@ all of the calls to PROC have finished." (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (let ((processed-result-vec - (vector-map - (lambda (_ result-or-exn) - (match result-or-exn - (('exception . exn) - (raise-exception exn)) - (('result . vals) - (car vals)))) - result-vec))) - (if (vector? (first lists)) - processed-result-vec - (vector->list processed-result-vec))) + (if (vector? (first lists)) + result-vec + (vector->list result-vec)) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -145,13 +123,18 @@ all of the calls to PROC have finished." (get-operation (vector-ref result-vec index)) (lambda (result) - (vector-set! result-vec - index - result) - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))) + (match result + (('exception exn) + (raise-exception exn)) + (_ + (vector-set! result-vec + index + (first result)) + + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -174,14 +157,9 @@ all of the calls to PROC have finished." channel-indexes))))))) (define (fibers-map proc . lists) - "Map PROC over LISTS in parallel, running up to 20 fibers in - PARALLEL. If any of the invocations of PROC raise an exception, this -will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) - "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in -parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -192,13 +170,10 @@ parallel." *unspecified*) (define (fibers-for-each proc . lists) - "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) - "Run each expression in parallel. If any expression raises an - exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -209,16 +184,12 @@ parallel." (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) - "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) - "Map PROC over LISTS, calling #:REPORT if specified after each -invocation of PROC finishes. REPORT is passed the results for each - element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args @@ -239,8 +210,8 @@ invocation of PROC finishes. REPORT is passed the results for each (match-lambda ((#f . ('exception . exn)) (raise-exception exn)) - ((#f . ('result . vals)) - (car vals))) + ((#f . ('result . val)) + val)) channels-to-results) (loop (perform-operation @@ -257,7 +228,12 @@ invocation of PROC finishes. REPORT is passed the results for each (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f result) + (cons #f + (match result + (('exception . exn) + result) + (_ + (cons 'result result)))) (cons c r)))) channels-to-results))) #f)))) @@ -278,7 +254,7 @@ invocation of PROC finishes. REPORT is passed the results for each reply-channel (with-exception-handler (lambda (exn) - (cons 'exception exn)) + (list 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -309,32 +285,5 @@ invocation of PROC finishes. REPORT is passed the results for each (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception . exn) + (('exception exn) (raise-exception exn)))))) - -(define-record-type - (make-parallelism-limiter-record resource-pool) - parallelism-limiter? - (resource-pool parallelism-limiter-resource-pool)) - -(define* (make-parallelism-limiter limit #:key (name "unnamed")) - (make-parallelism-limiter-record - (make-fixed-size-resource-pool - (iota limit) - #:name name))) - -(define (destroy-parallelism-limiter parallelism-limiter) - (destroy-resource-pool - (parallelism-limiter-resource-pool - parallelism-limiter))) - -(define* (call-with-parallelism-limiter parallelism-limiter thunk) - (call-with-resource-from-pool - (parallelism-limiter-resource-pool parallelism-limiter) - (lambda _ - (thunk)))) - -(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) - (call-with-parallelism-limiter - parallelism-limiter - (lambda () exp ...))) diff --git a/knots/promise.scm b/knots/promise.scm index 6aa3f0b..9df376b 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -82,10 +82,7 @@ (make-exception exn (make-knots-exception stack))))) - (lambda () - (start-stack - #t - ((fibers-promise-thunk fp)))))) + (fibers-promise-thunk fp))) #:unwind? #t)) (lambda vals (atomic-box-set! (fibers-promise-values-box fp) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index f957c3d..da52051 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -21,9 +21,7 @@ #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) - #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) - #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) @@ -34,10 +32,9 @@ #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) - #:export (make-fixed-size-resource-pool - make-resource-pool + #:export (resource-pool? - resource-pool? + make-resource-pool resource-pool-name resource-pool-channel resource-pool-configuration @@ -76,14 +73,13 @@ (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? - (exception-predicate &resource-pool-abort-add-resource)) + (record-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) - (channel resource-pool-channel - set-resource-pool-channel!) + (channel resource-pool-channel) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) @@ -95,412 +91,6 @@ (resource-pool-name resource-pool)) port))) -(define (safe-deq q) - (if (null? (car q)) - #f - (let ((it (caar q)) - (next (cdar q))) - (if (null? next) - (set-cdr! q #f)) - (set-car! q next) - it))) - -(define-record-type - (make-resource-details value checkout-count last-used) - resource-details? - (value resource-details-value) - (checkout-count resource-details-checkout-count - set-resource-details-checkout-count!) - (last-used resource-details-last-used - set-resource-details-last-used!)) - -(define-inlinable (increment-resource-checkout-count! resource) - (set-resource-details-checkout-count! - resource - (1+ (resource-details-checkout-count resource)))) - -(define-inlinable (decrement-resource-checkout-count! resource) - (set-resource-details-checkout-count! - resource - (1+ (resource-details-checkout-count resource)))) - -(define (spawn-fiber-for-checkout channel - reply-channel - reply-timeout - resource-id - resource) - (spawn-fiber - (lambda () - (let ((checkout-success? - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply-channel - (list 'success resource-id resource)) - (const #t)) - (wrap-operation (sleep-operation - reply-timeout) - (const #f)))))) - (unless checkout-success? - (put-message - channel - (list 'return-failed-checkout resource-id))))))) - -(define* (make-fixed-size-resource-pool resources-list-or-vector - #:key - (delay-logger (const #f)) - (duration-logger (const #f)) - scheduler - (name "unnamed") - default-checkout-timeout - default-max-waiters) - (define channel (make-channel)) - (define destroy-condition - (make-condition)) - - (define pool - (make-resource-pool-record - name - channel - destroy-condition - `((delay-logger . ,delay-logger) - (duration-logger . ,duration-logger) - (scheduler . ,scheduler) - (name . ,name) - (default-checkout-timeout . ,default-checkout-timeout) - (default-max-waiters . ,default-max-waiters)))) - - (define checkout-failure-count 0) - - (define resources - (vector-map - (lambda (_ resource) - (make-resource-details - resource - 0 - #f)) - (if (vector? resources-list-or-vector) - resources-list-or-vector - (list->vector resources-list-or-vector)))) - - (define (destroy-loop) - (define (empty?) - (vector-every (lambda (r) - (eq? r #f)) - resources)) - - (let loop () - (match (get-message channel) - (('checkout reply timeout-time max-waiters) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout-time - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op))))) - (loop)) - (((and (or 'return - 'return-failed-checkout) - return-type) - resource-id) - (vector-set! resources - resource-id - #f) - - (if (empty?) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (loop))) - - (('stats reply timeout-time) - (let ((stats - `((resources . ,(vector-length resources)) - (available . 0) - (waiters . 0) - (checkout-failure-count . ,checkout-failure-count)))) - - (spawn-fiber - (lambda () - (let ((op - (put-operation reply stats))) - (perform-operation - (if timeout-time - (choice-operation - op - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second))) - op)))))) - - (loop)) - - (('destroy) - (loop)) - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop))))) - - (define (main-loop) - (let loop ((available (iota (vector-length resources))) - (waiters (make-q))) - - (match (get-message channel) - (('checkout reply timeout-time max-waiters) - (if (null? available) - (let ((waiters-count - (q-length waiters))) - (if (and max-waiters - (>= waiters-count - max-waiters)) - (begin - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'too-many-waiters - waiters-count)))) - (perform-operation - (if timeout-time - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op))))) - (loop available - waiters)) - (loop available - (enq! waiters (cons reply timeout-time))))) - - (if timeout-time - (let ((current-internal-time - (get-internal-real-time))) - ;; If this client is still waiting - (if (> timeout-time - current-internal-time) - (let ((reply-timeout - (/ (- timeout-time - current-internal-time) - internal-time-units-per-second)) - (resource-id - new-available - (car+cdr available))) - - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the resource, - ;; and returning it if there's a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value - (vector-ref resources - resource-id))) - (loop new-available - waiters)) - (loop available - waiters))) - (let* ((resource-id - next-available - (car+cdr available)) - (resource-details - (vector-ref resources - resource-id))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))) - - (loop next-available - waiters))))) - - (((and (or 'return - 'return-failed-checkout) - return-type) - resource-id) - - (when (eq? 'return-failed-checkout - return-type) - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (let ((current-internal-time - (get-internal-real-time))) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop (cons resource-id available) - waiters)) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value - (vector-ref resources - resource-id)))) - (put-message reply - (list 'success - resource-id - (resource-details-value - (vector-ref resources - resource-id)))))) - (loop available - waiters)))))) - - (('list-resources reply) - (spawn-fiber - (lambda () - (put-message reply (vector->list resources)))) - - (loop available - waiters)) - - (('stats reply timeout-time) - (let ((stats - `((resources . ,(vector-length resources)) - (available . ,(length available)) - (waiters . ,(q-length waiters)) - (checkout-failure-count . ,checkout-failure-count)))) - - (spawn-fiber - (lambda () - (let ((op - (put-operation reply stats))) - (perform-operation - (if timeout-time - (choice-operation - op - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second))) - op)))))) - - (loop available - waiters)) - - (('destroy) - (let ((current-internal-time (get-internal-real-time))) - ;; Notify all waiters that the pool has been destroyed - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) - - (if (= (vector-length resources) - (length available)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (destroy-loop)))) - - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop available - waiters))))) - - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - #f) - (lambda () - (with-exception-handler - (lambda (exn) - (let* ((stack (make-stack #t)) - (error-string - (call-with-output-string - (lambda (port) - (display-backtrace stack port 3) - (simple-format - port - "exception in the ~A pool fiber, " name) - (print-exception - port - (stack-ref stack 3) - '%exception - (list exn)))))) - (display error-string - (current-error-port))) - (raise-exception exn)) - (lambda () - (start-stack - #t - (main-loop))))) - #:unwind? #t)) - (or scheduler - (current-scheduler))) - - pool) - (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) @@ -536,39 +126,52 @@ (define checkout-failure-count 0) - (define resources - (make-hash-table)) + (define spawn-fiber-to-return-new-resource + (if add-resources-parallelism + (let ((thunk + (fiberize + (lambda () + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool) + 'resources))) + (unless (= size max-size) + (let ((new-resource + (return-new-resource))) + (put-message channel + (list 'add-resource new-resource)))))) + #:parallelism add-resources-parallelism))) + (lambda () + (spawn-fiber thunk))) + (lambda () + (spawn-fiber + (lambda () + (let ((new-resource + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A\n\n" + name + return-new-resource) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (return-new-resource))))) + #:unwind? #t))) + (when new-resource + (put-message channel + (list 'add-resource new-resource))))))))) - (define-inlinable (count-resources resources) - (hash-count (const #t) resources)) - - (define return-new-resource/parallelism-limiter - (make-parallelism-limiter - (or add-resources-parallelism - max-size) - #:name - (string-append - name - " resource pool new resource parallelism limiter"))) - - (define (spawn-fiber-to-return-new-resource) + (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () - (with-exception-handler - (lambda (exn) - ;; This can happen if the resource pool is destroyed very - ;; quickly - (if (resource-pool-destroyed-error? exn) - #f - (raise-exception exn))) - (lambda () - (with-parallelism-limiter - return-new-resource/parallelism-limiter - (let ((max-size - (assq-ref (resource-pool-configuration pool) - 'max-size)) - (size (count-resources resources))) - (unless (>= size max-size) + (let loop () + (let ((success? (with-exception-handler (lambda _ #f) (lambda () @@ -576,62 +179,52 @@ (lambda (exn) (simple-format (current-error-port) - "exception adding resource to pool ~A: ~A\n\n" + "exception running resource pool destructor (~A): ~A\n" name - return-new-resource) + destructor) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () - (let ((new-resource - (start-stack #t (return-new-resource)))) - (put-message channel - (list 'add-resource new-resource)))))) - #:unwind? #t))))) - #:unwind? #t)))) - - (define (spawn-fiber-to-destroy-resource resource-id resource-value) - (spawn-fiber - (lambda () - (let loop () - (let* ((success? - (with-exception-handler - (lambda _ #f) - (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception running resource pool destructor (~A): ~A\n" - name - destructor) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (destructor resource-value)) - #t))) - #:unwind? #t))) + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) (if success? (put-message channel - (list 'remove resource-id)) + (list 'remove resource)) (begin (sleep 5) (loop)))))))) - (define (destroy-loop resources next-resource-id) - (let loop ((next-resource-id next-resource-id)) + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) (match (get-message channel) (('add-resource resource) - (if destructor - (begin - (spawn-fiber-to-destroy-resource next-resource-id - resource) - (hash-set! resources next-resource-id resource) - - (loop (1+ next-resource-id))) - (loop next-resource-id))) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + (loop resources)) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () @@ -651,142 +244,154 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop next-resource-id)) + (loop resources)) (((and (or 'return 'return-failed-checkout 'remove) return-type) - resource-id) - (when (and (not (eq? return-type 'remove)) - destructor) - (spawn-fiber-to-destroy-resource - resource-id - (resource-details-value - (hash-ref resources resource-id)))) + resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) - (hash-remove! resources resource-id) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) - (if (= 0 (count-resources resources)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) - ;; No loop - *unspecified*) - (loop next-resource-id))) - (('stats reply timeout-time) + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply) (let ((stats - `((resources . ,(count-resources resources)) + `((resources . ,(length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () - (let ((op - (put-operation reply stats))) - (perform-operation - (if timeout-time - (choice-operation - op - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second))) - op)))))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 5) + (const #f))))))) - (loop next-resource-id)) + (loop resources)) (('check-for-idle-resources) - (loop next-resource-id)) + (loop resources)) - (('destroy) - (loop next-resource-id)) + (('destroy reply) + (loop resources)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop next-resource-id))))) + (loop resources))))) (define (main-loop) - (let loop ((next-resource-id 0) + (let loop ((resources '()) (available '()) - (waiters (make-q))) + (waiters '()) + (resources-last-used '())) (match (get-message channel) (('add-resource resource) - (if (= (count-resources resources) max-size) - (if destructor - (begin - (hash-set! resources - next-resource-id - (make-resource-details - resource - 0 - (get-internal-real-time))) - (spawn-fiber-to-destroy-resource next-resource-id - resource) + (if (= (length resources) max-size) + (begin + (if destructor + (begin + (spawn-fiber-to-destroy-resource resource) - (loop (1+ next-resource-id) + (loop (cons resource resources) + available + waiters + (cons (get-internal-real-time) + resources-last-used))) + (loop resources available - waiters)) - (loop next-resource-id - available - waiters)) + waiters + (cons (get-internal-real-time) + resources-last-used)))) - (let* ((current-internal-time - (get-internal-real-time)) - (resource-details - (make-resource-details - resource - 0 - current-internal-time))) - (hash-set! resources - next-resource-id - resource-details) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (loop (1+ next-resource-id) - (cons next-resource-id available) - waiters)) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout channel - reply - reply-timeout - next-resource-id - resource)) - (put-message reply (list 'success - next-resource-id - resource)))) - (set-resource-details-checkout-count! resource-details - 1) - (loop (1+ next-resource-id) - available - waiters))))))) + (if (null? waiters) + (loop (cons resource resources) + (cons resource available) + waiters + (cons (get-internal-real-time) + resources-last-used)) + + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop (cons resource resources) + (cons resource available) + '() + (cons (get-internal-real-time) + resources-last-used)) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn + ;; a new fiber to handle handing over + ;; the resource, and returning it if + ;; there's a timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel (cons 'success + resource))) + + (loop (cons resource resources) + available + (drop-right! alive-waiters 1) + (cons (get-internal-real-time) + resources-last-used))))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin - (unless (= (count-resources resources) max-size) + (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count - (q-length waiters))) + (length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) @@ -809,12 +414,15 @@ internal-time-units-per-second)) (const #f))) op))))) - (loop next-resource-id + (loop resources available - waiters)) - (loop next-resource-id + waiters + resources-last-used)) + (loop resources available - (enq! waiters (cons reply timeout-time)))))) + (cons (cons reply timeout-time) + waiters) + resources-last-used)))) (if timeout-time (let ((current-internal-time @@ -822,283 +430,260 @@ ;; If this client is still waiting (if (> timeout-time current-internal-time) - (let* ((reply-timeout - (/ (- timeout-time - current-internal-time) - internal-time-units-per-second)) - (resource-id - (car available)) - (resource-details - (hash-ref resources resource-id))) - - (increment-resource-checkout-count! - resource-details) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout - (spawn-fiber-for-checkout channel - reply + (spawn-fiber-for-checkout reply reply-timeout - resource-id - (resource-details-value - resource-details)) - (loop next-resource-id + (car available)) + (loop resources (cdr available) - waiters)) - (loop next-resource-id + waiters + resources-last-used)) + (loop resources available - waiters))) - (let* ((resource-id - next-available - (car+cdr available)) - (resource-details - (hash-ref resources - resource-id))) - (increment-resource-checkout-count! resource-details) + waiters + resources-last-used))) + (begin + (put-message reply (cons 'success + (car available))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))) - - (loop next-resource-id - next-available - waiters))))) + (loop resources + (cdr available) + waiters + resources-last-used))))) (((and (or 'return 'return-failed-checkout) return-type) - resource-id) + resource) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) - (let ((current-internal-time - (get-internal-real-time)) - (resource-details - (hash-ref resources resource-id))) - (if (and lifetime - (>= (resource-details-checkout-count resource-details) - lifetime)) - (begin - (spawn-fiber-to-destroy-resource resource-id - (resource-details-value - resource-details)) - (loop next-resource-id - available - waiters)) - (let waiter-loop ((waiter (safe-deq waiters))) - (match waiter - (#f - (if (eq? 'return-failed-checkout - return-type) - (decrement-resource-checkout-count! resource-details) - (set-resource-details-last-used! - resource-details - current-internal-time)) + (if (null? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) - (loop next-resource-id - (cons resource-id available) - waiters)) - ((reply . timeout) - (if (and timeout - (< timeout current-internal-time)) - (waiter-loop (safe-deq waiters)) - (if timeout - (let ((reply-timeout - (/ (- timeout - current-internal-time) - internal-time-units-per-second))) - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's - ;; a timeout - (spawn-fiber-for-checkout - channel - reply - reply-timeout - resource-id - (resource-details-value resource-details))) - (put-message reply - (list 'success - resource-id - (resource-details-value - resource-details))))) + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop resources + (cons resource available) + '() + (begin + (when (eq? return-type 'return) + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time))) + resources-last-used)) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel (cons 'success + resource))) - (set-resource-details-last-used! resource-details - current-internal-time) - (when (eq? 'return-failed-checkout - return-type) - (decrement-resource-checkout-count! resource-details)) + (loop resources + available + (drop-right! alive-waiters 1) + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)))))))) - (loop next-resource-id - available - waiters))))))) + (('remove resource) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) - (('remove resource-id) - (hash-remove! resources - resource-id) + (loop (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)) + available ; resource shouldn't be in this list + waiters + (remove-at-index! + resources-last-used + index)))) - (when (and (not (q-empty? waiters)) - (< (- (count-resources resources) 1) - max-size)) - (spawn-fiber-to-return-new-resource)) + (('destroy resource) + (spawn-fiber-to-destroy-resource resource) - (loop next-resource-id - available ; resource shouldn't be in this list - waiters)) - - (('destroy resource-id) - (let ((resource-details - (hash-ref resources - resource-id))) - (spawn-fiber-to-destroy-resource resource-id - (resource-details-value - resource-details)) - - (loop next-resource-id - available - waiters))) + (loop resources + available + waiters + resources-last-used)) (('list-resources reply) (spawn-fiber (lambda () - (put-message reply (hash-map->list - (lambda (_ value) value) - resources)))) + (put-message reply (list-copy resources)))) - (loop next-resource-id + (loop resources available - waiters)) + waiters + resources-last-used)) - (('stats reply timeout-time) + (('stats reply) (let ((stats - `((resources . ,(count-resources resources)) + `((resources . ,(length resources)) (available . ,(length available)) - (waiters . ,(q-length waiters)) - (resources-checkout-count - . ,(hash-fold - (lambda (_ resource-details result) - (cons (resource-details-checkout-count - resource-details) - result)) - '() - resources)) + (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () - (let ((op - (put-operation reply stats))) - (perform-operation - (if timeout-time - (choice-operation - op - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second))) - op)))))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 5) + (const #f))))))) - (loop next-resource-id + (loop resources available - waiters)) + waiters + resources-last-used)) (('check-for-idle-resources) - (let* ((internal-real-time - (get-internal-real-time)) - (candidate-resource-ids-to-destroy + (let* ((resources-last-used-seconds + (map + (lambda (internal-time) + (/ (- (get-internal-real-time) internal-time) + internal-time-units-per-second)) + resources-last-used)) + (candidate-resources-to-destroy (filter-map - (lambda (resource-id) - (let ((resource-details - (hash-ref resources resource-id))) - (if (> (/ (- internal-real-time - (resource-details-last-used - resource-details)) - internal-time-units-per-second) - idle-seconds) - resource-id - #f))) - available)) - (max-resources-to-destroy - (max 0 - (- (count-resources resources) - min-size))) - (resources-to-destroy - (take candidate-resource-ids-to-destroy - (min max-resources-to-destroy - (length candidate-resource-ids-to-destroy))))) - (when destructor - (for-each - (lambda (resource-id) - (spawn-fiber-to-destroy-resource - resource-id - (resource-details-value - (hash-ref resources resource-id)))) - resources-to-destroy)) + (lambda (resource last-used-seconds) + (if (and (member resource available) + (> last-used-seconds idle-seconds)) + resource + #f)) + resources + resources-last-used-seconds))) - (loop next-resource-id - (lset-difference = available resources-to-destroy) - waiters))) + (let* ((available-resources-to-destroy + (lset-intersection eq? + available + candidate-resources-to-destroy)) + (max-resources-to-destroy + (max 0 + (- (length resources) + min-size))) + (resources-to-destroy + (take available-resources-to-destroy + (min max-resources-to-destroy + (length available-resources-to-destroy))))) + (when destructor + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + resources-to-destroy)) + + (loop resources + (lset-difference eq? available resources-to-destroy) + waiters + resources-last-used)))) (('destroy) - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - (car waiters)) + (if (and (null? resources) + (null? waiters)) + (signal-condition! + destroy-condition) - (when destructor - (for-each - (lambda (resource-id) - (spawn-fiber-to-destroy-resource - resource-id - (resource-details-value - (hash-ref resources - resource-id)))) - available)) + (begin + (for-each + (lambda (resource) + (if destructor + (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (put-message channel + (list 'remove resource))) + #:parallel? #t))) + available) - ;; Do this in parallel to avoid deadlocks between the - ;; limiter and returning new resources to this pool - (and=> return-new-resource/parallelism-limiter - (lambda (limiter) - (spawn-fiber - (lambda () - (destroy-parallelism-limiter limiter))))) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters)) - (if (or (= 0 (count-resources resources)) - (not destructor)) - (begin - (set-resource-pool-channel! pool #f) - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (destroy-loop resources next-resource-id)))) + (destroy-loop resources)))) (unknown (simple-format @@ -1106,26 +691,19 @@ "unrecognised message to ~A resource pool channel: ~A\n" name unknown) - (loop next-resource-id + (loop resources available - waiters))))) + waiters + resources-last-used))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () - (let loop () - (put-message channel '(check-for-idle-resources)) - (when (perform-operation - (choice-operation - (wrap-operation - (sleep-operation idle-seconds) - (const #t)) - (wrap-operation - (wait-operation destroy-condition) - (const #f)))) - (loop)))))) + (while #t + (sleep idle-seconds) + (put-message channel '(check-for-idle-resources)))))) (with-exception-handler (lambda (exn) @@ -1166,8 +744,7 @@ (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ - (wait - (resource-pool-destroy-condition pool)))) + (wait (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) @@ -1186,7 +763,7 @@ (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? - (exception-predicate &resource-pool-timeout)) + (record-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters @@ -1207,7 +784,7 @@ (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? - (exception-predicate &resource-pool-too-many-waiters)) + (record-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed @@ -1223,7 +800,7 @@ (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? - (exception-predicate &resource-pool-destroyed)) + (record-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource @@ -1234,7 +811,7 @@ (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? - (exception-predicate &resource-pool-destroy-resource)) + (record-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) @@ -1260,10 +837,6 @@ available. Return the resource once PROC has returned." 'default-max-waiters) max-waiters)) - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) @@ -1306,9 +879,8 @@ available. Return the resource once PROC has returned." start-time) 'timeout) response)) - 'timeout)) - 'timeout))) - (let ((reply (make-channel))) + 'timeout))))) + (let loop ((reply (make-channel))) (put-message channel (list 'checkout reply @@ -1331,7 +903,7 @@ available. Return the resource once PROC has returned." (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) - (('success resource-id resource-value) + (('success . resource) (call-with-values (lambda () (with-exception-handler @@ -1340,13 +912,14 @@ available. Return the resource once PROC has returned." ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message - channel + (resource-pool-channel pool) (list (if (or destroy-resource-on-exception? (resource-pool-destroy-resource-exception? exn)) 'destroy 'return) - resource-id)) - (raise-exception exn)) + resource)) + (unless (resource-pool-destroy-resource-exception? exn) + (raise-exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -1363,11 +936,11 @@ available. Return the resource once PROC has returned." exn (make-knots-exception stack))))) (lambda () - (proc resource-value)))) + (proc resource)))) #:unwind? #t)) (lambda vals - (put-message channel - `(return ,resource-id)) + (put-message (resource-pool-channel pool) + `(return ,resource)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) @@ -1376,58 +949,36 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (define channel - (resource-pool-channel pool)) + (let ((reply (make-channel)) + (start-time (get-internal-real-time))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + `(stats ,reply)) + (const #t)) + (wrap-operation (sleep-operation timeout) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - - (if timeout - (let* ((reply (make-channel)) - (start-time (get-internal-real-time)) - (timeout-time - (+ start-time - (* internal-time-units-per-second timeout)))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation channel - `(stats ,reply ,timeout-time)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - (raise-exception - (make-resource-pool-timeout-error pool))))) - (let ((reply (make-channel))) - (put-message channel - `(stats ,reply #f)) - (get-message reply)))) + (let ((time-remaining + (- timeout + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) + (raise-exception + (make-resource-pool-timeout-error pool)))))) (define (resource-pool-list-resources pool) - (define channel - (resource-pool-channel pool)) - - (unless channel - (raise-exception - (make-resource-pool-destroyed-error pool))) - (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'list-resources reply)) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 22c1b5c..b176162 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -198,7 +198,7 @@ from there, or #f if that would be an empty string." (record-accessor &thread-pool-timeout-error 'pool))) (define thread-pool-timeout-error? - (exception-predicate &thread-pool-timeout-error)) + (record-predicate &thread-pool-timeout-error)) (define* (make-fixed-size-thread-pool size #:key @@ -269,8 +269,8 @@ from there, or #f if that would be an empty string." (sleep 1) (destructor/safe args))))) - (define (process thread-index channel args) - (let loop ((lifetime thread-lifetime)) + (define (process channel args) + (let loop () (match (get-message channel) ('destroy #f) ((reply sent-time proc) @@ -292,9 +292,6 @@ from there, or #f if that would be an empty string." internal-time-units-per-second) exn)) (lambda () - (vector-set! thread-proc-vector - thread-index - proc) (with-exception-handler (lambda (exn) (let ((stack @@ -322,10 +319,6 @@ from there, or #f if that would be an empty string." vals)))))) #:unwind? #t))) - (vector-set! thread-proc-vector - thread-index - #f) - (put-message reply response) @@ -342,11 +335,7 @@ from there, or #f if that would be an empty string." (if (and exception? expire-on-exception?) #t - (if lifetime - (if (<= lifetime 1) - #t - (loop (- lifetime 1))) - (loop lifetime))))))))) + (loop)))))))) (define (start-thread index channel) (call-with-new-thread @@ -369,7 +358,7 @@ from there, or #f if that would be an empty string." "knots: thread-pool: internal exception: ~A\n" exn)) (lambda () (parameterize ((param args)) - (process index channel args))) + (process channel args))) #:unwind? #t))) (when thread-destructor @@ -406,8 +395,7 @@ from there, or #f if that would be an empty string." (expire-on-exception? #f) (name "unnamed") (use-default-io-waiters? #t) - default-checkout-timeout - default-max-waiters) + default-checkout-timeout) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the thread pool procedure." (define param @@ -420,6 +408,7 @@ arguments of the thread pool procedure." 1 #:thread-initializer thread-initializer #:thread-destructor thread-destructor + #:thread-lifetime thread-lifetime #:expire-on-exception? expire-on-exception? #:name name #:use-default-io-waiters? use-default-io-waiters?)) @@ -427,11 +416,9 @@ arguments of the thread pool procedure." #:destructor destroy-thread-pool #:min-size min-size #:delay-logger delay-logger - #:lifetime thread-lifetime #:scheduler scheduler #:duration-logger duration-logger - #:default-checkout-timeout default-checkout-timeout - #:default-max-waiters default-max-waiters))) + #:default-checkout-timeout default-checkout-timeout))) (thread-pool resource-pool param))) diff --git a/knots/timeout.scm b/knots/timeout.scm index a65a095..58306e0 100644 --- a/knots/timeout.scm +++ b/knots/timeout.scm @@ -85,7 +85,7 @@ (record-constructor &port-timeout-error)) (define port-timeout-error? - (exception-predicate &port-timeout-error)) + (record-predicate &port-timeout-error)) (define &port-read-timeout-error (make-exception-type '&port-read-timeout-error @@ -96,7 +96,7 @@ (record-constructor &port-read-timeout-error)) (define port-read-timeout-error? - (exception-predicate &port-read-timeout-error)) + (record-predicate &port-read-timeout-error)) (define &port-write-timeout-error (make-exception-type '&port-write-timeout-error @@ -107,7 +107,7 @@ (record-constructor &port-write-timeout-error)) (define port-write-timeout-error? - (exception-predicate &port-write-timeout-error)) + (record-predicate &port-write-timeout-error)) (define (readable? port) "Test if PORT is writable." diff --git a/knots/web-server.scm b/knots/web-server.scm index 4d7240b..453db44 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -63,14 +63,6 @@ (bind sock family addr port) sock)) -(define crlf-bv - (string->utf8 "\r\n")) - -(define (chunked-output-port-overhead-bytes write-size) - (+ (string-length (number->string write-size 16)) - (bytevector-length crlf-bv) - (bytevector-length crlf-bv))) - (define* (make-chunked-output-port/knots port #:key (keep-alive? #f) (buffering 1200)) "Returns a new port which translates non-encoded data into a HTTP @@ -82,12 +74,10 @@ when done, as it will output the remaining data, and encode the final zero chunk. When the port is closed it will also close PORT, unless KEEP-ALIVE? is true." (define (write! bv start count) - (let ((len-string - (number->string count 16))) - (put-string port len-string)) - (put-bytevector port crlf-bv 0 2) + (put-string port (number->string count 16)) + (put-string port "\r\n") (put-bytevector port bv start count) - (put-bytevector port crlf-bv 0 2) + (put-string port "\r\n") (force-output port) count) @@ -140,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true." (record-constructor &request-body-ended-prematurely)) (define request-body-ended-prematurely-error? - (exception-predicate &request-body-ended-prematurely)) + (record-predicate &request-body-ended-prematurely)) (define (request-body-port/knots r) (cond @@ -341,19 +331,15 @@ on the procedure being called at any particular time." (string->utf8 "internal server error"))) -(define* (handle-request handler client - read-request-exception-handler - write-response-exception-handler - buffer-size - #:key post-request-hook) +(define (handle-request handler client + read-request-exception-handler + write-response-exception-handler) (let ((request (with-exception-handler read-request-exception-handler (lambda () (read-request client)) - #:unwind? #t)) - (read-request-time - (get-internal-real-time))) + #:unwind? #t))) (let ((response body (cond @@ -402,9 +388,7 @@ on the procedure being called at any particular time." (lambda () (write-response response client) - (let ((response-start-time - (get-internal-real-time)) - (body-written? + (let ((body-written? (if (procedure? body) (let* ((type (response-content-type response '(text/plain))) @@ -415,11 +399,7 @@ on the procedure being called at any particular time." client (make-chunked-output-port/knots client - #:keep-alive? #t - #:buffering - (- buffer-size - (chunked-output-port-overhead-bytes - buffer-size)))))) + #:keep-alive? #t)))) (set-port-encoding! body-port charset) (let ((body-written? (with-exception-handler @@ -443,11 +423,6 @@ on the procedure being called at any particular time." (if body-written? (begin (force-output client) - (when post-request-hook - (post-request-hook request - #:read-request-time read-request-time - #:response-start-time response-start-time - #:response-end-time (get-internal-real-time))) (when (and (procedure? body) (response-content-length response)) (set-port-encoding! client "ISO-8859-1")) @@ -459,8 +434,7 @@ on the procedure being called at any particular time." read-request-exception-handler write-response-exception-handler connection-idle-timeout - buffer-size - post-request-hook) + buffer-size) ;; Always disable Nagle's algorithm, as we handle buffering ;; ourselves; when we force-output, we really want the data to go ;; out. @@ -498,29 +472,11 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler - buffer-size - #:post-request-hook - post-request-hook))) + write-response-exception-handler))) (if keep-alive? (loop) (close-port client))))))) -(define (post-request-hook/safe post-request-hook) - (if post-request-hook - (lambda args - (with-exception-handler - (lambda (exn) #f) - (lambda () - (with-exception-handler - (lambda (exn) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (apply post-request-hook args)))) - #:unwind? #t)) - #f)) - (define-record-type (make-web-server socket port) web-server? @@ -540,8 +496,7 @@ on the procedure being called at any particular time." (write-response-exception-handler default-write-response-exception-handler) (connection-idle-timeout #f) - (connection-buffer-size 1024) - post-request-hook) + (connection-buffer-size 1024)) "Run the knots web server. HANDLER should be a procedure that takes one argument, the HTTP @@ -577,9 +532,7 @@ before sending back to the client." read-request-exception-handler write-response-exception-handler connection-idle-timeout - connection-buffer-size - (post-request-hook/safe - post-request-hook))) + connection-buffer-size)) #:parallel? #t) (loop)))))) diff --git a/tests.scm b/tests.scm index 0cca3b4..2b24c6a 100644 --- a/tests.scm +++ b/tests.scm @@ -1,11 +1,10 @@ (define-module (tests) #:use-module (ice-9 exceptions) #:use-module (fibers) - #:use-module (knots) #:export (run-fibers-for-tests assert-no-heap-growth)) -(define* (run-fibers-for-tests thunk #:key (drain? #t)) +(define (run-fibers-for-tests thunk) (let ((result (run-fibers (lambda () @@ -13,18 +12,15 @@ (lambda (exn) exn) (lambda () - (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) - (print-backtrace-and-exception/knots exn) + (backtrace) (raise-exception exn)) - (lambda () - (start-stack #t (thunk)))) + thunk) #t) #:unwind? #t)) #:hz 0 - #:parallelism 1 - #:drain? drain?))) + #:parallelism 1))) (if (exception? result) (raise-exception result) result))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 91b2f3d..9881a4d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -61,24 +61,6 @@ identity '(())))) -(run-fibers-for-tests - (lambda () - (with-exception-handler - (lambda (exn) - (unless (and (exception-with-message? exn) - (string=? (exception-message exn) - "foo")) - (raise-exception exn))) - (lambda () - (fibers-map-with-progress - (lambda _ - (raise-exception - (make-exception-with-message "foo"))) - '((1))) - - (error 'should-not-reach-here)) - #:unwind? #t))) - (run-fibers-for-tests (lambda () (with-exception-handler @@ -129,16 +111,4 @@ (assert-equal a 1)))) -(run-fibers-for-tests - (lambda () - (let ((parallelism-limiter (make-parallelism-limiter 2))) - (fibers-for-each - (lambda _ - (with-parallelism-limiter - parallelism-limiter - #f)) - (iota 50)) - - (destroy-parallelism-limiter parallelism-limiter)))) - (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index b3a84d7..1bc09e5 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -1,33 +1,9 @@ (use-modules (tests) (fibers) - (fibers channels) (unit-test) (knots parallelism) (knots resource-pool)) -(run-fibers-for-tests - (lambda () - (let ((parallelism-limiter (make-parallelism-limiter - 1))) - (with-parallelism-limiter parallelism-limiter - #f) - - (destroy-parallelism-limiter parallelism-limiter)))) - -(run-fibers-for-tests - (lambda () - (let ((parallelism-limiter (make-parallelism-limiter - 1)) - (channel - (make-channel))) - (spawn-fiber - (lambda () - (with-parallelism-limiter parallelism-limiter - (put-message channel #t) - (sleep 1)))) - (get-message channel) - (destroy-parallelism-limiter parallelism-limiter)))) - (define new-number (let ((val 0)) (lambda () @@ -43,21 +19,7 @@ (number? (with-resource-from-pool resource-pool res - res))) - - (destroy-resource-pool resource-pool)))) - -(run-fibers-for-tests - (lambda () - (let ((resource-pool (make-fixed-size-resource-pool - (list 1)))) - (assert-true - (number? - (with-resource-from-pool resource-pool - res - res))) - - (destroy-resource-pool resource-pool)))) + res)))))) (run-fibers-for-tests (lambda () @@ -69,9 +31,7 @@ (number? (with-resource-from-pool resource-pool res - res))) - - (destroy-resource-pool resource-pool)))) + res)))))) (let* ((error-constructor (record-constructor &resource-pool-timeout)) @@ -128,13 +88,10 @@ res)) (iota 20)) - (let loop ((stats (resource-pool-stats resource-pool - #:timeout #f))) + (let loop ((stats (resource-pool-stats resource-pool))) (unless (= 0 (assq-ref stats 'resources)) (sleep 0.1) - (loop (resource-pool-stats resource-pool #:timeout #f)))) - - (destroy-resource-pool resource-pool)))) + (loop (resource-pool-stats resource-pool))))))) (run-fibers-for-tests (lambda () @@ -158,9 +115,7 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50)) - - (destroy-resource-pool resource-pool)))) + (iota 50))))) (run-fibers-for-tests (lambda () @@ -174,7 +129,7 @@ (error "collision detected"))) (new-number)) 1 - #:default-checkout-timeout 5))) + #:default-checkout-timeout 120))) (fibers-batch-for-each (lambda _ (with-resource-from-pool @@ -185,9 +140,7 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50)) - - (destroy-resource-pool resource-pool)))) + (iota 50))))) (run-fibers-for-tests (lambda () @@ -211,14 +164,14 @@ (call-with-resource-from-pool resource-pool (lambda (res) - #f))) + (error 'should-not-be-reached)))) #:unwind? #t))) (while (= 0 (assq-ref - (resource-pool-stats resource-pool #:timeout #f) + (resource-pool-stats resource-pool) 'waiters)) - (sleep 0.1)) + (sleep 0)) (with-exception-handler (lambda (exn) @@ -231,55 +184,6 @@ resource-pool (lambda (res) (error 'should-not-be-reached)))) - #:unwind? #t))) - - (destroy-resource-pool resource-pool)))) - -(run-fibers-for-tests - (lambda () - (let ((resource-pool (make-resource-pool - (const 'foo) - 1 - #:lifetime 1 - #:destructor - (const #t)))) - (for-each - (lambda _ - (with-resource-from-pool resource-pool - res - res)) - (iota 20)) - - (destroy-resource-pool resource-pool)))) - -;; Test allocating resources to waiters and destroying resources -(run-fibers-for-tests - (lambda () - (let ((resource-pool (make-resource-pool - (lambda () - (sleep 1) - 'res) - 2 - #:idle-seconds 1 - #:add-resources-parallelism 10 - #:destructor - (const #t)))) - (fibers-for-each - (lambda _ - (with-resource-from-pool resource-pool - res - res)) - (iota 20)) - - (sleep 2) - - (fibers-for-each - (lambda _ - (with-resource-from-pool resource-pool - res - res)) - (iota 20)) - - (destroy-resource-pool resource-pool)))) + #:unwind? #t)))))) (display "resource-pool test finished successfully\n") diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index e3a1cdd..1c51cb3 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -1,5 +1,4 @@ (use-modules (tests) - (ice-9 atomic) (srfi srfi-71) (fibers) (unit-test) @@ -86,60 +85,4 @@ (+ 1 'a)))) #:unwind? #t))))) -(let ((thread-pool - (make-fixed-size-thread-pool - 1 - #:thread-lifetime 1 - #:thread-initializer - (lambda () - (list (make-atomic-box #t)))))) - - (for-each - (lambda _ - (call-with-thread - thread-pool - (lambda (box) - (if (atomic-box-ref box) - (atomic-box-set! box #f) - (error (atomic-box-ref box)))))) - (iota 10))) - -(run-fibers-for-tests - (lambda () - (let ((thread-pool - (make-thread-pool 1 #:thread-lifetime 1))) - - (for-each - (lambda _ - (call-with-thread - thread-pool - (lambda () #f))) - (iota 10))))) - -(let ((thread-pool - (make-fixed-size-thread-pool - 1 - #:thread-lifetime 2 - #:thread-initializer - (lambda () - (list (make-atomic-box 2)))))) - - (define (ref-and-decrement box) - (let ((val (atomic-box-ref box))) - (atomic-box-set! box (- val 1)) - val)) - - (unless (= 2 (call-with-thread - thread-pool - ref-and-decrement)) - (error)) - (unless (= 1 (call-with-thread - thread-pool - ref-and-decrement)) - (error)) - (unless (= 2 (call-with-thread - thread-pool - ref-and-decrement)) - (error))) - (display "thread-pool test finished successfully\n")