diff --git a/knots.scm b/knots.scm index 8e31c6b..42f2af7 100644 --- a/knots.scm +++ b/knots.scm @@ -67,11 +67,14 @@ (define* (print-backtrace-and-exception/knots exn #:key (port (current-error-port))) - (let* ((stack (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))))) + (let* ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t)))) (error-string (call-with-output-string (lambda (port) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 77272dd..f8b2b8b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 control) + #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) @@ -43,27 +44,33 @@ (let ((reply (make-channel))) (spawn-fiber (lambda () - (call-with-escape-continuation - (lambda (return) - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (put-message reply - (list 'exception - (make-exception - exn - (make-knots-exception stack))))))) - (return)) - (lambda () - (call-with-values - (lambda () - (start-stack #t (thunk))) - (lambda vals - (put-message reply vals)))))))) + (with-exception-handler + (lambda (exn) + (put-message + reply + (list 'exception exn))) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-values + (lambda () + (start-stack #t (thunk))) + (lambda vals + (put-message reply vals)))))) + #:unwind? #t)) #:parallel? #t) reply)) @@ -245,25 +252,31 @@ (get-message process-channel)))) (put-message reply-channel - (call-with-escape-continuation - (lambda (return) - (with-exception-handler - (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (return (list 'exception - (make-exception - exn - (make-knots-exception stack)))))))) - (lambda () - (call-with-values - (lambda () - (start-stack #t (apply proc args))) - (lambda vals - (cons 'result vals))))))))))) + (with-exception-handler + (lambda (exn) + (list 'exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-values + (lambda () + (start-stack #t (apply proc args))) + (lambda vals + (cons 'result vals)))))) + #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index f4522be..da52051 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -29,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) + #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (resource-pool? @@ -43,6 +44,19 @@ resource-pool-timeout-error-pool resource-pool-timeout-error? + &resource-pool-too-many-waiters + resource-pool-too-many-waiters-error-pool + resource-pool-too-many-waiters-error-waiters-count + resource-pool-too-many-waiters-error? + + &resource-pool-destroyed + resource-pool-destroyed-error-pool + resource-pool-destroyed-error? + + &resource-pool-destroy-resource + make-resource-pool-destroy-resource-exception + resource-pool-destroy-resource-exception? + resource-pool-default-timeout-handler call-with-resource-from-pool @@ -62,11 +76,12 @@ (record-predicate &resource-pool-abort-add-resource)) (define-record-type - (make-resource-pool-record name channel configuration) + (make-resource-pool-record name channel destroy-condition configuration) resource-pool? - (name resource-pool-name) - (channel resource-pool-channel) - (configuration resource-pool-configuration)) + (name resource-pool-name) + (channel resource-pool-channel) + (destroy-condition resource-pool-destroy-condition) + (configuration resource-pool-configuration)) (set-record-type-printer! @@ -86,13 +101,17 @@ scheduler (name "unnamed") (add-resources-parallelism 1) - default-checkout-timeout) + default-checkout-timeout + default-max-waiters) (define channel (make-channel)) + (define destroy-condition + (make-condition)) (define pool (make-resource-pool-record name channel + destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) @@ -102,7 +121,8 @@ (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) - (default-checkout-timeout . ,default-checkout-timeout)))) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) @@ -186,7 +206,8 @@ (perform-operation (choice-operation (wrap-operation - (put-operation reply-channel resource) + (put-operation reply-channel + (cons 'success resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) @@ -196,6 +217,103 @@ channel (list 'return-failed-checkout resource))))))) + (define (destroy-loop resources) + (let loop ((resources resources)) + (match (get-message channel) + (('add-resource resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (loop resources)) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation 5) + (const #f))))))) + + (loop resources)) + + (('check-for-idle-resources) + (loop resources)) + + (('destroy reply) + (loop resources)) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources))))) + (define (main-loop) (let loop ((resources '()) (available '()) @@ -257,7 +375,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop (cons resource resources) available @@ -265,17 +384,45 @@ (cons (get-internal-real-time) resources-last-used))))))))) - (('checkout reply timeout-time) + (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) - (loop resources - available - (cons (cons reply timeout-time) - waiters) - resources-last-used)) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters + resources-last-used)) + (loop resources + available + (cons (cons reply timeout-time) + waiters) + resources-last-used)))) (if timeout-time (let ((current-internal-time @@ -303,7 +450,8 @@ waiters resources-last-used))) (begin - (put-message reply (car available)) + (put-message reply (cons 'success + (car available))) (loop resources (cdr available) @@ -369,7 +517,8 @@ (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) - (put-message waiter-channel resource)) + (put-message waiter-channel (cons 'success + resource))) (loop resources available @@ -410,6 +559,24 @@ resources-last-used index)))) + (('destroy resource) + (spawn-fiber-to-destroy-resource resource) + + (loop resources + available + waiters + resources-last-used)) + + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters + resources-last-used)) + (('stats reply) (let ((stats `((resources . ,(length resources)) @@ -472,9 +639,11 @@ waiters resources-last-used)))) - (('destroy reply) - (if (null? resources) - (put-message reply 'destroy-success) + (('destroy) + (if (and (null? resources) + (null? waiters)) + (signal-condition! + destroy-condition) (begin (for-each @@ -488,16 +657,33 @@ #:parallel? #t))) available) - (spawn-fiber - (lambda () - (sleep 0.1) - (put-message channel - (list 'destroy reply)))) + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters)) - (loop resources - '() - waiters - resources-last-used)))) + (destroy-loop resources)))) (unknown (simple-format @@ -552,12 +738,16 @@ pool) (define (destroy-resource-pool pool) - (let ((reply (make-channel))) - (put-message (resource-pool-channel pool) - (list 'destroy reply)) - (let ((msg (get-message reply))) - (unless (eq? msg 'destroy-success) - (error msg))))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + (list 'destroy)) + (lambda _ + (wait (resource-pool-destroy-condition pool)))) + (wait-operation + (resource-pool-destroy-condition pool)))) + #t) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout @@ -575,12 +765,63 @@ (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) +(define &resource-pool-too-many-waiters + (make-exception-type '&recource-pool-too-many-waiters + &error + '(pool waiters-count))) + +(define resource-pool-too-many-waiters-error-pool + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'pool))) + +(define resource-pool-too-many-waiters-error-waiters-count + (exception-accessor + &resource-pool-too-many-waiters + (record-accessor &resource-pool-too-many-waiters 'waiters-count))) + +(define make-resource-pool-too-many-waiters-error + (record-constructor &resource-pool-too-many-waiters)) + +(define resource-pool-too-many-waiters-error? + (record-predicate &resource-pool-too-many-waiters)) + +(define &resource-pool-destroyed + (make-exception-type '&recource-pool-destroyed + &error + '(pool))) + +(define resource-pool-destroyed-error-pool + (exception-accessor + &resource-pool-destroyed + (record-accessor &resource-pool-destroyed 'pool))) + +(define make-resource-pool-destroyed-error + (record-constructor &resource-pool-destroyed)) + +(define resource-pool-destroyed-error? + (record-predicate &resource-pool-destroyed)) + +(define &resource-pool-destroy-resource + (make-exception-type '&recource-pool-destroy-resource + &exception + '())) + +(define make-resource-pool-destroy-resource-exception + (record-constructor &resource-pool-destroy-resource)) + +(define resource-pool-destroy-resource-exception? + (record-predicate &resource-pool-destroy-resource)) + (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) - (timeout-handler (resource-pool-default-timeout-handler))) + (timeout-handler (resource-pool-default-timeout-handler)) + (max-waiters 'default) + (channel (resource-pool-channel pool)) + (destroy-resource-on-exception? #f)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." @@ -590,7 +831,13 @@ available. Return the resource once PROC has returned." 'default-checkout-timeout) timeout)) - (let ((resource + (define max-waiters-or-default + (if (eq? max-waiters 'default) + (assq-ref (resource-pool-configuration pool) + 'default-max-waiters) + max-waiters)) + + (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) @@ -598,12 +845,13 @@ available. Return the resource once PROC has returned." (perform-operation (choice-operation (wrap-operation - (put-operation (resource-pool-channel pool) + (put-operation channel (list 'checkout reply (+ start-time (* timeout-or-default - internal-time-units-per-second)))) + internal-time-units-per-second)) + max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) @@ -629,37 +877,71 @@ available. Return the resource once PROC has returned." 0) (loop (make-channel) start-time) - #f) + 'timeout) response)) - #f))))) + 'timeout))))) (let loop ((reply (make-channel))) - (put-message (resource-pool-channel pool) + (put-message channel (list 'checkout reply - #f)) + #f + max-waiters-or-default)) (get-message reply))))) - (when (not resource) - (when timeout-handler - (timeout-handler pool proc timeout)) + (match reply + ('timeout + (when timeout-handler + (timeout-handler pool proc timeout)) - (raise-exception - (make-resource-pool-timeout-error pool))) + (raise-exception + (make-resource-pool-timeout-error pool))) + (('too-many-waiters . count) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (print-backtrace-and-exception/knots exn) - (put-message (resource-pool-channel pool) - `(return ,resource)) - (raise-exception exn)) - (lambda () - (proc resource)))) - (lambda vals - (put-message (resource-pool-channel pool) - `(return ,resource)) - (apply values vals))))) + (raise-exception + (make-resource-pool-too-many-waiters-error pool + count))) + (('resource-pool-destroyed . #f) + (raise-exception + (make-resource-pool-destroyed-error pool))) + (('success . resource) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + ;; Unwind the stack before calling put-message, as + ;; this avoids inconsistent behaviour with + ;; continuation barriers + (put-message + (resource-pool-channel pool) + (list (if (or destroy-resource-on-exception? + (resource-pool-destroy-resource-exception? exn)) + 'destroy + 'return) + resource)) + (unless (resource-pool-destroy-resource-exception? exn) + (raise-exception exn))) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (proc resource)))) + #:unwind? #t)) + (lambda vals + (put-message (resource-pool-channel pool) + `(return ,resource)) + (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool @@ -696,3 +978,8 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error pool)))))) +(define (resource-pool-list-resources pool) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + (list 'list-resources reply)) + (get-message reply))) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index f2f174e..14a8125 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -251,15 +251,18 @@ arguments of the thread pool procedure." proc) (with-exception-handler (lambda (exn) - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (let ((stack (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1)))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))))) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) (lambda () (call-with-values (lambda () diff --git a/knots/web-server.scm b/knots/web-server.scm index 9b98018..14d32f6 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -48,7 +48,6 @@ request-body-port/knots read-request-body/knots - default-exception-handler default-write-response-exception-handler web-server? @@ -310,7 +309,7 @@ on the procedure being called at any particular time." ;; Close the client port #f) -(define (default-exception-handler exn request) +(define (exception-handler exn request) (let* ((error-string (call-with-output-string (lambda (port) @@ -332,8 +331,7 @@ on the procedure being called at any particular time." (define (handle-request handler client read-request-exception-handler - write-response-exception-handler - exception-handler) + write-response-exception-handler) (let ((request (with-exception-handler read-request-exception-handler @@ -356,58 +354,14 @@ on the procedure being called at any particular time." (lambda (return) (with-exception-handler (lambda (exn) - (with-exception-handler - (lambda (exn) - (call-with-values - (lambda () - (default-exception-handler - (make-exception - exn - (make-exception-with-message - "exception in exception handler") - (make-exception-with-irritants - exception-handler)) - request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response - request - response - body)) - return))))) - (lambda () + (call-with-values + (lambda () + (exception-handler exn request)) + (lambda (response body) (call-with-values (lambda () - (exception-handler exn request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response request response body)) - return)) - (other - (call-with-values - (lambda () - (default-exception-handler - (make-exception-with-irritants - (list (make-exception-with-message - (simple-format - #f - "wrong number of values returned from exception handler, expecting 2, got ~A" - (length other))) - exception-handler)) - request)) - (match-lambda* - ((response body) - (call-with-values - (lambda () - (sanitize-response - request - response - body)) - return)))))))))) + (sanitize-response request response body)) + return)))) (lambda () (start-stack #t @@ -475,7 +429,6 @@ on the procedure being called at any particular time." #:unwind? #t)))) (define* (client-loop client handler - exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout @@ -517,8 +470,7 @@ on the procedure being called at any particular time." (else (let ((keep-alive? (handle-request handler client read-request-exception-handler - write-response-exception-handler - exception-handler))) + write-response-exception-handler))) (if keep-alive? (loop) (close-port client))))))) @@ -537,8 +489,6 @@ on the procedure being called at any particular time." INADDR_LOOPBACK)) (port 8080) (socket (make-default-socket family addr port)) - (exception-handler - default-exception-handler) (read-request-exception-handler default-read-request-exception-handler) (write-response-exception-handler @@ -577,7 +527,6 @@ before sending back to the client." ((client . sockaddr) (spawn-fiber (lambda () (client-loop client handler - exception-handler read-request-exception-handler write-response-exception-handler connection-idle-timeout diff --git a/tests/parallelism.scm b/tests/parallelism.scm index f005d71..9881a4d 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -93,4 +93,22 @@ 1)) #:unwind? #t))) +(run-fibers-for-tests + (lambda () + (let ((a 0)) + (call-with-values + (lambda () + (fibers-parallel + (begin + (sleep 1) + 1) + (begin + (set! a 1) + 2))) + (lambda (a b) + (assert-equal a 1) + (assert-equal b 2))) + + (assert-equal a 1)))) + (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1251429..1bc09e5 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -142,4 +142,48 @@ 20 (iota 50))))) +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-resource-pool + (lambda () #f) + 1 + #:default-max-waiters 1))) + (call-with-resource-from-pool + resource-pool + (lambda (res) + + ;; 1st waiter + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + (if (resource-pool-destroyed-error? exn) + #t + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t))) + + (while (= 0 + (assq-ref + (resource-pool-stats resource-pool) + 'waiters)) + (sleep 0)) + + (with-exception-handler + (lambda (exn) + (if (resource-pool-too-many-waiters-error? exn) + #t + (raise-exception exn))) + (lambda () + ;; 2nd waiter + (call-with-resource-from-pool + resource-pool + (lambda (res) + (error 'should-not-be-reached)))) + #:unwind? #t)))))) + (display "resource-pool test finished successfully\n") diff --git a/tests/web-server.scm b/tests/web-server.scm index c21851e..9ac58bb 100644 --- a/tests/web-server.scm +++ b/tests/web-server.scm @@ -52,28 +52,6 @@ uri #:port (non-blocking-open-socket-for-uri uri))))))) -(run-fibers-for-tests - (lambda () - (let* ((web-server - (run-knots-web-server - (lambda (request) - "Hello, World!") - #:port 0 - #:exception-handler - (lambda (exn request) - "Error"))) ;; Bind to any port - (port - (web-server-port web-server)) - (uri - (build-uri 'http #:host "127.0.0.1" #:port port))) - - (assert-equal - 500 - (response-code - (http-get - uri - #:port (non-blocking-open-socket-for-uri uri))))))) - (run-fibers-for-tests (lambda () (let* ((web-server