diff --git a/knots.scm b/knots.scm index 42f2af7..8e31c6b 100644 --- a/knots.scm +++ b/knots.scm @@ -67,14 +67,11 @@ (define* (print-backtrace-and-exception/knots exn #:key (port (current-error-port))) - (let* ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t)))) + (let* ((stack (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))))) (error-string (call-with-output-string (lambda (port) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f8b2b8b..77272dd 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,7 +22,6 @@ #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 control) - #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) @@ -44,33 +43,27 @@ (let ((reply (make-channel))) (spawn-fiber (lambda () - (with-exception-handler - (lambda (exn) - (put-message - reply - (list 'exception exn))) - (lambda () - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (call-with-values - (lambda () - (start-stack #t (thunk))) - (lambda vals - (put-message reply vals)))))) - #:unwind? #t)) + (call-with-escape-continuation + (lambda (return) + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (put-message reply + (list 'exception + (make-exception + exn + (make-knots-exception stack))))))) + (return)) + (lambda () + (call-with-values + (lambda () + (start-stack #t (thunk))) + (lambda vals + (put-message reply vals)))))))) #:parallel? #t) reply)) @@ -252,31 +245,25 @@ (get-message process-channel)))) (put-message reply-channel - (with-exception-handler - (lambda (exn) - (list 'exception exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (call-with-values - (lambda () - (start-stack #t (apply proc args))) - (lambda vals - (cons 'result vals)))))) - #:unwind? #t))))) + (call-with-escape-continuation + (lambda (return) + (with-exception-handler + (lambda (exn) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (return (list 'exception + (make-exception + exn + (make-knots-exception stack)))))))) + (lambda () + (call-with-values + (lambda () + (start-stack #t (apply proc args))) + (lambda vals + (cons 'result vals))))))))))) #:parallel? #t)) (iota parallelism)) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index da52051..f4522be 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -29,7 +29,6 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) - #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (resource-pool? @@ -44,19 +43,6 @@ resource-pool-timeout-error-pool resource-pool-timeout-error? - &resource-pool-too-many-waiters - resource-pool-too-many-waiters-error-pool - resource-pool-too-many-waiters-error-waiters-count - resource-pool-too-many-waiters-error? - - &resource-pool-destroyed - resource-pool-destroyed-error-pool - resource-pool-destroyed-error? - - &resource-pool-destroy-resource - make-resource-pool-destroy-resource-exception - resource-pool-destroy-resource-exception? - resource-pool-default-timeout-handler call-with-resource-from-pool @@ -76,12 +62,11 @@ (record-predicate &resource-pool-abort-add-resource)) (define-record-type - (make-resource-pool-record name channel destroy-condition configuration) + (make-resource-pool-record name channel configuration) resource-pool? - (name resource-pool-name) - (channel resource-pool-channel) - (destroy-condition resource-pool-destroy-condition) - (configuration resource-pool-configuration)) + (name resource-pool-name) + (channel resource-pool-channel) + (configuration resource-pool-configuration)) (set-record-type-printer! @@ -101,17 +86,13 @@ scheduler (name "unnamed") (add-resources-parallelism 1) - default-checkout-timeout - default-max-waiters) + default-checkout-timeout) (define channel (make-channel)) - (define destroy-condition - (make-condition)) (define pool (make-resource-pool-record name channel - destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) @@ -121,8 +102,7 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (default-checkout-timeout . ,default-checkout-timeout) - (default-max-waiters . ,default-max-waiters)))) + (default-checkout-timeout . ,default-checkout-timeout)))) (define checkout-failure-count 0) @@ -206,8 +186,7 @@ (perform-operation (choice-operation (wrap-operation - (put-operation reply-channel - (cons 'success resource)) + (put-operation reply-channel resource) (const #t)) (wrap-operation (sleep-operation reply-timeout) @@ -217,103 +196,6 @@ channel (list 'return-failed-checkout resource))))))) - (define (destroy-loop resources) - (let loop ((resources resources)) - (match (get-message channel) - (('add-resource resource) - (when destructor - (spawn-fiber-to-destroy-resource resource)) - - (loop resources)) - (('checkout reply timeout-time max-waiters) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout-time - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op))))) - (loop resources)) - (((and (or 'return - 'return-failed-checkout - 'remove) - return-type) - resource) - (when destructor - (spawn-fiber-to-destroy-resource resource)) - - (let ((index - (list-index (lambda (x) - (eq? x resource)) - resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - - (let ((new-resources - (if index - (remove-at-index! resources index) - (begin - (simple-format - (current-error-port) - "resource pool error: unable to remove ~A\n" - resource) - resources)))) - (if (null? new-resources) - (begin - (signal-condition! destroy-condition) - - ;; No loop - *unspecified*) - (loop new-resources))))) - - (('stats reply) - (let ((stats - `((resources . ,(length resources)) - (available . 0) - (waiters . 0) - (checkout-failure-count . ,checkout-failure-count)))) - - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) - - (loop resources)) - - (('check-for-idle-resources) - (loop resources)) - - (('destroy reply) - (loop resources)) - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop resources))))) - (define (main-loop) (let loop ((resources '()) (available '()) @@ -375,8 +257,7 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel (cons 'success - resource))) + (put-message waiter-channel resource)) (loop (cons resource resources) available @@ -384,45 +265,17 @@ (cons (get-internal-real-time) resources-last-used))))))))) - (('checkout reply timeout-time max-waiters) + (('checkout reply timeout-time) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) - (let ((waiters-count - (length waiters))) - (if (and max-waiters - (>= waiters-count - max-waiters)) - (begin - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'too-many-waiters - waiters-count)))) - (perform-operation - (if timeout-time - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout-time - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op))))) - (loop resources - available - waiters - resources-last-used)) - (loop resources - available - (cons (cons reply timeout-time) - waiters) - resources-last-used)))) + (loop resources + available + (cons (cons reply timeout-time) + waiters) + resources-last-used)) (if timeout-time (let ((current-internal-time @@ -450,8 +303,7 @@ waiters resources-last-used))) (begin - (put-message reply (cons 'success - (car available))) + (put-message reply (car available)) (loop resources (cdr available) @@ -517,8 +369,7 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel (cons 'success - resource))) + (put-message waiter-channel resource)) (loop resources available @@ -559,24 +410,6 @@ resources-last-used index)))) - (('destroy resource) - (spawn-fiber-to-destroy-resource resource) - - (loop resources - available - waiters - resources-last-used)) - - (('list-resources reply) - (spawn-fiber - (lambda () - (put-message reply (list-copy resources)))) - - (loop resources - available - waiters - resources-last-used)) - (('stats reply) (let ((stats `((resources . ,(length resources)) @@ -639,11 +472,9 @@ waiters resources-last-used)))) - (('destroy) - (if (and (null? resources) - (null? waiters)) - (signal-condition! - destroy-condition) + (('destroy reply) + (if (null? resources) + (put-message reply 'destroy-success) (begin (for-each @@ -657,33 +488,16 @@ #:parallel? #t))) available) - (let ((current-internal-time (get-internal-real-time))) - (for-each - (match-lambda - ((reply . timeout) - (when (or (not timeout) - (> timeout current-internal-time)) - (spawn-fiber - (lambda () - (let ((op - (put-operation - reply - (cons 'resource-pool-destroyed - #f)))) - (perform-operation - (if timeout - (choice-operation - op - (wrap-operation - (sleep-operation - (/ (- timeout - (get-internal-real-time)) - internal-time-units-per-second)) - (const #f))) - op)))))))) - waiters)) + (spawn-fiber + (lambda () + (sleep 0.1) + (put-message channel + (list 'destroy reply)))) - (destroy-loop resources)))) + (loop resources + '() + waiters + resources-last-used)))) (unknown (simple-format @@ -738,16 +552,12 @@ pool) (define (destroy-resource-pool pool) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - (list 'destroy)) - (lambda _ - (wait (resource-pool-destroy-condition pool)))) - (wait-operation - (resource-pool-destroy-condition pool)))) - #t) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + (list 'destroy reply)) + (let ((msg (get-message reply))) + (unless (eq? msg 'destroy-success) + (error msg))))) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout @@ -765,63 +575,12 @@ (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) -(define &resource-pool-too-many-waiters - (make-exception-type '&recource-pool-too-many-waiters - &error - '(pool waiters-count))) - -(define resource-pool-too-many-waiters-error-pool - (exception-accessor - &resource-pool-too-many-waiters - (record-accessor &resource-pool-too-many-waiters 'pool))) - -(define resource-pool-too-many-waiters-error-waiters-count - (exception-accessor - &resource-pool-too-many-waiters - (record-accessor &resource-pool-too-many-waiters 'waiters-count))) - -(define make-resource-pool-too-many-waiters-error - (record-constructor &resource-pool-too-many-waiters)) - -(define resource-pool-too-many-waiters-error? - (record-predicate &resource-pool-too-many-waiters)) - -(define &resource-pool-destroyed - (make-exception-type '&recource-pool-destroyed - &error - '(pool))) - -(define resource-pool-destroyed-error-pool - (exception-accessor - &resource-pool-destroyed - (record-accessor &resource-pool-destroyed 'pool))) - -(define make-resource-pool-destroyed-error - (record-constructor &resource-pool-destroyed)) - -(define resource-pool-destroyed-error? - (record-predicate &resource-pool-destroyed)) - -(define &resource-pool-destroy-resource - (make-exception-type '&recource-pool-destroy-resource - &exception - '())) - -(define make-resource-pool-destroy-resource-exception - (record-constructor &resource-pool-destroy-resource)) - -(define resource-pool-destroy-resource-exception? - (record-predicate &resource-pool-destroy-resource)) - (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (resource-pool-default-timeout-handler)) - (max-waiters 'default) - (channel (resource-pool-channel pool)) - (destroy-resource-on-exception? #f)) + (timeout-handler (resource-pool-default-timeout-handler))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -831,13 +590,7 @@ available. Return the resource once PROC has returned." 'default-checkout-timeout) timeout)) - (define max-waiters-or-default - (if (eq? max-waiters 'default) - (assq-ref (resource-pool-configuration pool) - 'default-max-waiters) - max-waiters)) - - (let ((reply + (let ((resource (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) @@ -845,13 +598,12 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation channel + (put-operation (resource-pool-channel pool) (list 'checkout reply (+ start-time (* timeout-or-default - internal-time-units-per-second)) - max-waiters-or-default)) + internal-time-units-per-second)))) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) @@ -877,71 +629,37 @@ available. Return the resource once PROC has returned." 0) (loop (make-channel) start-time) - 'timeout) + #f) response)) - 'timeout))))) + #f))))) (let loop ((reply (make-channel))) - (put-message channel + (put-message (resource-pool-channel pool) (list 'checkout reply - #f - max-waiters-or-default)) + #f)) (get-message reply))))) - (match reply - ('timeout - (when timeout-handler - (timeout-handler pool proc timeout)) + (when (not resource) + (when timeout-handler + (timeout-handler pool proc timeout)) - (raise-exception - (make-resource-pool-timeout-error pool))) - (('too-many-waiters . count) + (raise-exception + (make-resource-pool-timeout-error pool))) - (raise-exception - (make-resource-pool-too-many-waiters-error pool - count))) - (('resource-pool-destroyed . #f) - (raise-exception - (make-resource-pool-destroyed-error pool))) - (('success . resource) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - ;; Unwind the stack before calling put-message, as - ;; this avoids inconsistent behaviour with - ;; continuation barriers - (put-message - (resource-pool-channel pool) - (list (if (or destroy-resource-on-exception? - (resource-pool-destroy-resource-exception? exn)) - 'destroy - 'return) - resource)) - (unless (resource-pool-destroy-resource-exception? exn) - (raise-exception exn))) - (lambda () - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (proc resource)))) - #:unwind? #t)) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals))))))) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (put-message (resource-pool-channel pool) + `(return ,resource)) + (raise-exception exn)) + (lambda () + (proc resource)))) + (lambda vals + (put-message (resource-pool-channel pool) + `(return ,resource)) + (apply values vals))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool @@ -978,8 +696,3 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error pool)))))) -(define (resource-pool-list-resources pool) - (let ((reply (make-channel))) - (put-message (resource-pool-channel pool) - (list 'list-resources reply)) - (get-message reply))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 14a8125..f2f174e 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -251,18 +251,15 @@ arguments of the thread pool procedure." proc) (with-exception-handler (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (let ((stack (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1)))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))))) (lambda () (call-with-values (lambda () diff --git a/knots/web-server.scm b/knots/web-server.scm index 14d32f6..9b98018 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -48,6 +48,7 @@ request-body-port/knots read-request-body/knots + default-exception-handler default-write-response-exception-handler web-server? @@ -309,7 +310,7 @@ on the procedure being called at any particular time." ;; Close the client port #f) -(define (exception-handler exn request) +(define (default-exception-handler exn request) (let* ((error-string (call-with-output-string (lambda (port) @@ -331,7 +332,8 @@ on the procedure being called at any particular time." (define (handle-request handler client read-request-exception-handler - write-response-exception-handler) + write-response-exception-handler + exception-handler) (let ((request (with-exception-handler read-request-exception-handler @@ -354,14 +356,58 @@ on the procedure being called at any particular time." (lambda (return) (with-exception-handler (lambda (exn) - (call-with-values - (lambda () - (exception-handler exn request)) - (lambda (response body) + (with-exception-handler + (lambda (exn) + (call-with-values + (lambda () + (default-exception-handler + (make-exception + exn + (make-exception-with-message + "exception in exception handler") + (make-exception-with-irritants + exception-handler)) + request)) + (match-lambda* + ((response body) + (call-with-values + (lambda () + (sanitize-response + request + response + body)) + return))))) + (lambda () (call-with-values (lambda () - (sanitize-response request response body)) - return)))) + (exception-handler exn request)) + (match-lambda* + ((response body) + (call-with-values + (lambda () + (sanitize-response request response body)) + return)) + (other + (call-with-values + (lambda () + (default-exception-handler + (make-exception-with-irritants + (list (make-exception-with-message + (simple-format + #f + "wrong number of values returned from exception handler, expecting 2, got ~A" + (length other))) + exception-handler)) + request)) + (match-lambda* + ((response body) + (call-with-values + (lambda () + (sanitize-response + request + response + body)) + return)))))))))) (lambda () (start-stack #t @@ -429,6 +475,7 @@ on the procedure being called at any particular time." #:unwind? #t)))) (define* (client-loop client handler + exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout @@ -470,7 +517,8 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler))) + write-response-exception-handler + exception-handler))) (if keep-alive? (loop) (close-port client))))))) @@ -489,6 +537,8 @@ on the procedure being called at any particular time." INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port)) + (exception-handler + default-exception-handler) (read-request-exception-handler default-read-request-exception-handler) (write-response-exception-handler @@ -527,6 +577,7 @@ before sending back to the client." ((client . sockaddr) (spawn-fiber (lambda () (client-loop client handler + exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 9881a4d..f005d71 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -93,22 +93,4 @@ 1)) #:unwind? #t))) -(run-fibers-for-tests - (lambda () - (let ((a 0)) - (call-with-values - (lambda () - (fibers-parallel - (begin - (sleep 1) - 1) - (begin - (set! a 1) - 2))) - (lambda (a b) - (assert-equal a 1) - (assert-equal b 2))) - - (assert-equal a 1)))) - (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1bc09e5..1251429 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -142,48 +142,4 @@ 20 (iota 50))))) -(run-fibers-for-tests - (lambda () - (let ((resource-pool (make-resource-pool - (lambda () #f) - 1 - #:default-max-waiters 1))) - (call-with-resource-from-pool - resource-pool - (lambda (res) - - ;; 1st waiter - (spawn-fiber - (lambda () - (with-exception-handler - (lambda (exn) - (if (resource-pool-destroyed-error? exn) - #t - (raise-exception exn))) - (lambda () - (call-with-resource-from-pool - resource-pool - (lambda (res) - (error 'should-not-be-reached)))) - #:unwind? #t))) - - (while (= 0 - (assq-ref - (resource-pool-stats resource-pool) - 'waiters)) - (sleep 0)) - - (with-exception-handler - (lambda (exn) - (if (resource-pool-too-many-waiters-error? exn) - #t - (raise-exception exn))) - (lambda () - ;; 2nd waiter - (call-with-resource-from-pool - resource-pool - (lambda (res) - (error 'should-not-be-reached)))) - #:unwind? #t)))))) - (display "resource-pool test finished successfully\n") diff --git a/tests/web-server.scm b/tests/web-server.scm index 9ac58bb..c21851e 100644 --- a/tests/web-server.scm +++ b/tests/web-server.scm @@ -52,6 +52,28 @@ uri #:port (non-blocking-open-socket-for-uri uri))))))) +(run-fibers-for-tests + (lambda () + (let* ((web-server + (run-knots-web-server + (lambda (request) + "Hello, World!") + #:port 0 + #:exception-handler + (lambda (exn request) + "Error"))) ;; Bind to any port + (port + (web-server-port web-server)) + (uri + (build-uri 'http #:host "127.0.0.1" #:port port))) + + (assert-equal + 500 + (response-code + (http-get + uri + #:port (non-blocking-open-socket-for-uri uri))))))) + (run-fibers-for-tests (lambda () (let* ((web-server