diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index 88c102c..8dcf46b 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -60,6 +60,9 @@ make-resource-pool-destroy-resource-exception resource-pool-destroy-resource-exception? + resource-pool-delay-logger + resource-pool-duration-logger + resource-pool-default-timeout-handler call-with-resource-from-pool diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 3844d8f..825a24a 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -55,6 +55,8 @@ ;; thread pools thread-pool-arguments-parameter thread-pool-default-checkout-timeout + thread-pool-delay-logger + thread-pool-duration-logger destroy-thread-pool @@ -171,12 +173,15 @@ from there, or #f if that would be an empty string." (define-record-type (fixed-size-thread-pool channel arguments-parameter current-procedures - default-checkout-timeout threads) + default-checkout-timeout delay-logger + duration-logger threads) fixed-size-thread-pool? (channel fixed-size-thread-pool-channel) (arguments-parameter fixed-size-thread-pool-arguments-parameter) (current-procedures fixed-size-thread-pool-current-procedures) (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout) + (delay-logger fixed-size-thread-pool-delay-logger) + (duration-logger fixed-size-thread-pool-duration-logger) (threads fixed-size-thread-pool-threads)) (set-procedure-property! (macro-transformer (module-ref (current-module) 'fixed-size-thread-pool?)) @@ -207,6 +212,20 @@ from there, or #f if that would be an empty string." (thread-pool-resource-pool pool)) 'default-checkout-timeout))) +(define (thread-pool-delay-logger pool) + "Return the delay logger for POOL, dispatching on pool type." + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-delay-logger pool) + (resource-pool-delay-logger + (thread-pool-resource-pool pool)))) + +(define (thread-pool-duration-logger pool) + "Return the duration logger for POOL, dispatching on pool type." + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-duration-logger pool) + (resource-pool-duration-logger + (thread-pool-resource-pool pool)))) + (define &thread-pool-timeout-error (make-exception-type '&thread-pool-timeout-error &error @@ -346,14 +365,7 @@ completes, whether it returned normally or raised an exception. (let loop ((lifetime thread-lifetime)) (match (get-message channel) ('destroy #f) - ((reply sent-time proc) - (when delay-logger - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)))) - + ((reply proc) (let* ((start-time (get-internal-real-time)) (response (with-exception-handler @@ -465,6 +477,8 @@ completes, whether it returned normally or raised an exception. param thread-proc-vector default-checkout-timeout + delay-logger + duration-logger threads)) (define* (make-thread-pool max-size @@ -538,7 +552,10 @@ Maximum number of fibers that may queue waiting for a thread. Raises (define* (call-with-thread thread-pool proc #:key - duration-logger + (delay-logger + (thread-pool-delay-logger thread-pool)) + (duration-logger + (thread-pool-duration-logger thread-pool)) checkout-timeout channel destroy-thread-on-exception? @@ -564,9 +581,15 @@ When @code{#t}, destroy the thread after PROC raises an exception. Equivalent to per-call @code{#:expire-on-exception?}. Defaults to @code{#f}. +@item #:delay-logger +Called as @code{(delay-logger seconds)} with the time spent waiting +for a thread to become available. Defaults to the pool's +@code{#:delay-logger} if not specified. + @item #:duration-logger Called as @code{(duration-logger seconds)} after PROC completes -(whether or not it raised an exception). +(whether or not it raised an exception). Defaults to the pool's +@code{#:duration-logger} if not specified. @item #:channel Override the channel used to communicate with the thread. @@ -574,7 +597,8 @@ Override the channel used to communicate with the thread. (define (handle-proc fixed-size-thread-pool reply-channel start-time - timeout) + timeout + delay-logger) (let* ((request-channel (or channel (fixed-size-thread-pool-channel @@ -585,7 +609,6 @@ Override the channel used to communicate with the thread. (wrap-operation (put-operation request-channel (list reply-channel - start-time proc)) (const #t)))) @@ -600,6 +623,11 @@ Override the channel used to communicate with the thread. (raise-exception (make-thread-pool-timeout-error))) + (when delay-logger + (delay-logger + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (let ((reply (get-message reply-channel))) (match reply (('thread-pool-error duration exn) @@ -620,7 +648,8 @@ Override the channel used to communicate with the thread. (handle-proc thread-pool reply-channel start-time - checkout-timeout) + checkout-timeout + delay-logger) (with-exception-handler (lambda (exn) (if (and (resource-pool-timeout-error? exn) @@ -641,13 +670,17 @@ Override the channel used to communicate with the thread. (handle-proc fixed-size-thread-pool reply-channel start-time - remaining-time) + remaining-time + #f) (raise-exception (make-thread-pool-timeout-error thread-pool)))) (handle-proc fixed-size-thread-pool reply-channel start-time + #f #f))) + #:delay-logger delay-logger + #:duration-logger #f #:max-waiters max-waiters #:timeout checkout-timeout #:destroy-resource-on-exception?