diff --git a/knots.scm b/knots.scm index 05b2a1a..42f2af7 100644 --- a/knots.scm +++ b/knots.scm @@ -75,8 +75,6 @@ 0 (and prompt-tag 1))) (_ (make-stack #t)))) - (stack-len - (stack-length stack)) (error-string (call-with-output-string (lambda (port) @@ -85,46 +83,30 @@ (filter knots-exception? (simple-exceptions exn))))) - (let* ((stack-vec - (stack->vector stack)) - (stack-vec-length - (vector-length stack-vec))) + (let ((stack-vec + (stack->vector stack))) (print-frames (list->vector (drop (vector->list stack-vec) - (if (< stack-vec-length 5) - 0 - 4))) + 6)) port #:count (stack-length stack))) (for-each (lambda (stack) - (let* ((stack-vec - (stack->vector stack)) - (stack-vec-length - (vector-length stack-vec))) + (let ((stack-vec + (stack->vector stack))) (print-frames (list->vector (drop (vector->list stack-vec) - (if (< stack-vec-length 4) - 0 - 3))) + 3)) port #:count (stack-length stack)))) knots-stacks) (print-exception port (if (null? knots-stacks) - (stack-ref stack - (if (< stack-len 4) - stack-len - 4)) - (let* ((stack (last knots-stacks)) - (stack-len (stack-length stack))) - (stack-ref stack - (if (< stack-len 3) - stack-len - 3)))) + (stack-ref stack 1) + (stack-ref (last knots-stacks) 3)) '%exception (list exn))))))) (display error-string port))) diff --git a/knots/promise.scm b/knots/promise.scm index 9df376b..235640b 100644 --- a/knots/promise.scm +++ b/knots/promise.scm @@ -19,15 +19,10 @@ (define-module (knots promise) #:use-module (srfi srfi-9) - #:use-module (ice-9 match) #:use-module (ice-9 atomic) - #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers conditions) - #:use-module (knots) - #:export (fibers-promise? - - fibers-delay + #:export (fibers-delay fibers-force fibers-promise-reset fibers-promise-result-available?)) @@ -46,61 +41,38 @@ (make-condition))) (define (fibers-force fp) - (unless (fibers-promise? fp) - (raise-exception - (make-exception - (make-exception-with-message "fibers-force: not a fibers promise") - (make-exception-with-irritants fp)))) - (let ((res (atomic-box-compare-and-swap! (fibers-promise-values-box fp) #f 'started))) - (cond - ((eq? #f res) - (call-with-values - (lambda () - (with-exception-handler - (lambda (exn) - (atomic-box-set! (fibers-promise-values-box fp) - exn) - (signal-condition! - (fibers-promise-evaluated-condition fp)) - (raise-exception exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (fibers-promise-thunk fp))) - #:unwind? #t)) - (lambda vals - (atomic-box-set! (fibers-promise-values-box fp) - vals) - (signal-condition! - (fibers-promise-evaluated-condition fp)) - (apply values vals)))) - ((eq? res 'started) - (begin - (wait (fibers-promise-evaluated-condition fp)) - (let ((result (atomic-box-ref (fibers-promise-values-box fp)))) - (if (exception? result) - (raise-exception result) - (apply values result))))) - (else - (if (exception? res) - (raise-exception res) - (apply values res)))))) + (if (eq? #f res) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (atomic-box-set! (fibers-promise-values-box fp) + exn) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (raise-exception exn)) + (fibers-promise-thunk fp) + #:unwind? #t)) + (lambda vals + (atomic-box-set! (fibers-promise-values-box fp) + vals) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (apply values vals))) + (if (eq? res 'started) + (begin + (wait (fibers-promise-evaluated-condition fp)) + (let ((result (atomic-box-ref (fibers-promise-values-box fp)))) + (if (exception? result) + (raise-exception result) + (apply values result)))) + (if (exception? res) + (raise-exception res) + (apply values res)))))) (define (fibers-promise-reset fp) (atomic-box-set! (fibers-promise-values-box fp) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 7dad3d8..14a8125 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -27,38 +27,29 @@ #:use-module (rnrs bytevectors) #:use-module (ice-9 q) #:use-module (ice-9 match) - #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) - #:use-module (knots resource-pool) #:export (set-thread-name thread-name - &thread-pool-timeout-error - thread-pool-timeout-error-pool - thread-pool-timeout-error? + thread-pool? + thread-pool-channel + thread-pool-arguments-parameter + thread-pool-proc-vector make-thread-pool - thread-pool? - thread-pool-resource-pool + call-with-thread - make-fixed-size-thread-pool - fixed-size-thread-pool? - fixed-size-thread-pool-channel - fixed-size-thread-pool-current-procedures + &thread-pool-timeout + thread-pool-timeout-error? - ;; These procedures work for thread pools and fixed size - ;; thread pools - thread-pool-arguments-parameter - thread-pool-default-checkout-timeout + %thread-pool-default-timeout - destroy-thread-pool - - call-with-thread)) + create-work-queue)) (define* (syscall->procedure return-type name argument-types #:key library) @@ -156,64 +147,28 @@ from there, or #f if that would be an empty string." (const ""))) (define-record-type - (thread-pool resource-pool arguments-parameter) + (thread-pool channel arguments-parameter proc-vector) thread-pool? - (resource-pool thread-pool-resource-pool) - (arguments-parameter thread-pool-arguments-parameter-accessor)) - -(define-record-type - (fixed-size-thread-pool channel arguments-parameter current-procedures - default-checkout-timeout) - fixed-size-thread-pool? - (channel fixed-size-thread-pool-channel) - (arguments-parameter fixed-size-thread-pool-arguments-parameter) - (current-procedures fixed-size-thread-pool-current-procedures) - (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)) - -;; Since both thread pool records have this field, use a procedure -;; than handles the appropriate accessor -(define (thread-pool-arguments-parameter pool) - (if (fixed-size-thread-pool? pool) - (fixed-size-thread-pool-arguments-parameter pool) - (thread-pool-arguments-parameter-accessor pool))) - -(define (thread-pool-default-checkout-timeout pool) - (if (fixed-size-thread-pool? pool) - (fixed-size-thread-pool-default-checkout-timeout pool) - (assq-ref (resource-pool-configuration - (thread-pool-resource-pool pool)) - 'default-checkout-timeout))) - -(define &thread-pool-timeout - (make-exception-type '&thread-pool-timeout - &error - '(pool))) - -(define make-thread-pool-timeout-error - (record-constructor &thread-pool-timeout)) - -(define thread-pool-timeout-error-pool - (exception-accessor - &thread-pool-timeout - (record-accessor &thread-pool-timeout 'pool))) - -(define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout)) - -(define* (make-fixed-size-thread-pool size - #:key - thread-initializer - thread-destructor - delay-logger - duration-logger - thread-lifetime - (expire-on-exception? #f) - (name "unnamed") - (use-default-io-waiters? #t) - default-checkout-timeout) - (define channel - (make-channel)) + (channel thread-pool-channel) + (arguments-parameter thread-pool-arguments-parameter) + (proc-vector thread-pool-proc-vector) + (default-checkout-timeout + thread-pool-default-checkout-timeout)) +(define* (make-thread-pool size + #:key + thread-initializer + thread-destructor + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + thread-lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed") + (use-default-io-waiters? #t) + default-checkout-timeout) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the thread pool procedure." (define param (make-parameter #f)) @@ -269,256 +224,382 @@ from there, or #f if that would be an empty string." (sleep 1) (destructor/safe args))))) - (define (process channel args) - (let loop () - (match (get-message channel) - ('destroy #f) - ((reply sent-time proc) - (when delay-logger - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second) - proc))) + (define (process thread-index channel args) + (let loop ((current-lifetime thread-lifetime)) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second) + proc) - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'thread-pool-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (call-with-values - (lambda () - (start-stack - #t - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))))) - #:unwind? #t))) + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'thread-pool-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (vector-set! thread-proc-vector + thread-index + proc) + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-values + (lambda () + (start-stack + #t + (apply proc args))) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))))) + #:unwind? #t))) + (put-message reply + response) - (put-message reply - response) + (vector-set! thread-proc-vector + thread-index + #f) - (let ((exception? - (match response - (('thread-pool-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))) - (if (and exception? - expire-on-exception?) - #t - (loop)))))))) + (match response + (('thread-pool-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) - (define (start-thread index channel) - (call-with-new-thread - (lambda () - (catch 'system-error + (define (start-threads channel) + (for-each + (lambda (thread-index) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (if thread-initializer + (initializer/safe) + '()))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "knots: thread-pool: internal exception: ~A\n" exn)) + (lambda () + (parameterize ((param args)) + (process thread-index channel args))) + #:unwind? #t) + + (when thread-destructor + (destructor/safe args)) + + (init (initializer/safe)))))) + (iota size))) + + (let ((channel (make-channel))) + (if use-default-io-waiters? + (call-with-default-io-waiters (lambda () - (set-thread-name - (string-append - name " w t " (number->string index)))) - (const #t)) + (start-threads channel))) + (start-threads channel)) - (let init ((args (if thread-initializer - (initializer/safe) - '()))) - (let ((continue? - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "knots: thread-pool: internal exception: ~A\n" exn)) - (lambda () - (parameterize ((param args)) - (process channel args))) - #:unwind? #t))) + (thread-pool channel + param + thread-proc-vector))) - (when thread-destructor - (destructor/safe args)) +(define &thread-pool-timeout + (make-exception-type '&thread-pool-timeout + &error + '())) - (when continue? - (init (if thread-initializer - (initializer/safe) - '())))))))) +(define make-thread-pool-timeout-error + (record-constructor &thread-pool-timeout)) - (for-each - (lambda (i) - (if use-default-io-waiters? - (call-with-default-io-waiters - (lambda () - (start-thread i channel))) - (start-thread i channel))) - (iota size)) +(define thread-pool-timeout-error? + (record-predicate &thread-pool-timeout)) - (fixed-size-thread-pool channel - param - thread-proc-vector - default-checkout-timeout)) - -(define* (make-thread-pool max-size - #:key - (min-size max-size) - scheduler - thread-initializer - thread-destructor - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - thread-lifetime - (expire-on-exception? #f) - (name "unnamed") - (use-default-io-waiters? #t) - default-checkout-timeout) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the thread pool procedure." - (define param - (make-parameter #f)) - - (let ((resource-pool - (make-resource-pool - (lambda () - (make-fixed-size-thread-pool - 1 - #:thread-initializer thread-initializer - #:thread-destructor thread-destructor - #:thread-lifetime thread-lifetime - #:expire-on-exception? expire-on-exception? - #:name name - #:use-default-io-waiters? use-default-io-waiters?)) - max-size - #:destructor destroy-thread-pool - #:min-size min-size - #:delay-logger delay-logger - #:scheduler scheduler - #:duration-logger duration-logger - #:default-checkout-timeout default-checkout-timeout))) - - (thread-pool resource-pool - param))) - -(define* (call-with-thread thread-pool - proc - #:key - duration-logger - checkout-timeout - channel - destroy-thread-on-exception? - (max-waiters 'default)) +(define* (call-with-thread record proc #:key duration-logger + (timeout (thread-pool-default-checkout-timeout + record)) + (channel (thread-pool-channel record))) "Send PROC to the thread pool through CHANNEL. Return the result of PROC. If already in the thread pool, call PROC immediately." - (define (handle-proc fixed-size-thread-pool - reply-channel - start-time - timeout) - (let* ((request-channel - (or channel - (fixed-size-thread-pool-channel - fixed-size-thread-pool))) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation request-channel - (list reply-channel - start-time - proc)) - (const #t)))) - - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-thread-pool-timeout-error))) - - (let ((reply (get-message reply-channel))) - (match reply - (('thread-pool-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result)))))) - - (let ((args ((thread-pool-arguments-parameter thread-pool)))) + (let ((args ((thread-pool-arguments-parameter record)))) (if args (apply proc args) - (let ((start-time (get-internal-real-time)) - (reply-channel (make-channel))) - (if (fixed-size-thread-pool? thread-pool) - (handle-proc thread-pool - reply-channel - start-time - checkout-timeout) - (with-exception-handler - (lambda (exn) - (if (and (resource-pool-timeout-error? exn) - (eq? (resource-pool-timeout-error-pool exn) - (thread-pool-resource-pool thread-pool))) - (raise-exception - (make-thread-pool-timeout-error thread-pool)) - (raise-exception exn))) - (lambda () - (call-with-resource-from-pool (thread-pool-resource-pool - thread-pool) - (lambda (fixed-size-thread-pool) - (if checkout-timeout - (let ((remaining-time - (/ (- (get-internal-real-time) start-time) - internal-time-units-per-second))) - (if (< remaining-time checkout-timeout) - (handle-proc fixed-size-thread-pool - reply-channel - start-time - remaining-time) - (raise-exception - (make-thread-pool-timeout-error thread-pool)))) - (handle-proc fixed-size-thread-pool - reply-channel - start-time - #f))) - #:max-waiters max-waiters - #:timeout checkout-timeout - #:destroy-resource-on-exception? - destroy-thread-on-exception?)))))))) + (let* ((reply (make-channel)) + (operation-success? + (perform-operation + (let ((put + (wrap-operation + (put-operation channel + (list reply + (get-internal-real-time) + proc)) + (const #t)))) -(define (destroy-thread-pool pool) - (if (fixed-size-thread-pool? pool) - (put-message - (fixed-size-thread-pool-channel pool) - 'destroy) - (destroy-resource-pool - (thread-pool-resource-pool pool)))) + (if timeout + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const #f))) + put))))) + + (unless operation-success? + (raise-exception + (make-thread-pool-timeout-error))) + + (match (get-message reply) + (('thread-pool-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result))))))) + +(define* (create-work-queue thread-count-parameter proc + #:key thread-start-delay + (thread-stop-delay + (make-time time-duration 0 0)) + (name "unnamed") + priority= running-jobs-count + desired-thread-count))) + + (define (thread-idle-for-too-long? last-job-finished-at) + (time>=? + (time-difference (current-time time-monotonic) + last-job-finished-at) + thread-stop-delay)) + + (define (stop-thread) + (hash-remove! running-job-args + thread-index) + (unlock-mutex queue-mutex)) + + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t " + (number->string thread-index)))) + (const #t)) + + (let loop ((last-job-finished-at (current-time time-monotonic))) + (lock-mutex queue-mutex) + + (if (too-many-threads?) + (stop-thread) + (let ((job-args + (if (q-empty? queue) + ;; #f from wait-condition-variable indicates a timeout + (if (wait-condition-variable + job-available + queue-mutex + (+ 9 (time-second (current-time)))) + ;; Another thread could have taken + ;; the job in the mean time + (if (q-empty? queue) + #f + (if priority threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) + + (if (procedure? thread-count-parameter) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t"))) + (const #t)) + + (while #t + (sleep 15) + (with-mutex queue-mutex + (let ((idle-threads (hash-count (lambda (index val) + (eq? #f val)) + running-job-args))) + (when (= 0 idle-threads) + (start-new-threads-if-necessary (get-thread-count)))))))) + (start-new-threads-if-necessary (get-thread-count))) + + (values process-job count-jobs count-threads list-jobs))) diff --git a/knots/web-server.scm b/knots/web-server.scm index 453db44..14d32f6 100644 --- a/knots/web-server.scm +++ b/knots/web-server.scm @@ -45,8 +45,6 @@ &request-body-ended-prematurely request-body-ended-prematurely-error? - sanitize-response - request-body-port/knots read-request-body/knots diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index 1c51cb3..93a49ce 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -6,47 +6,10 @@ (knots thread-pool)) (let ((thread-pool - (make-fixed-size-thread-pool 2))) - - (assert-equal - (call-with-thread - thread-pool - (lambda () - 4)) - 4)) - -(let ((thread-pool - (make-fixed-size-thread-pool - 2 - #:thread-initializer (const '(2))))) - - (assert-equal - (call-with-thread - thread-pool - (lambda (num) - (* 2 num))) - 4)) - -(let ((thread-pool - (make-fixed-size-thread-pool 2))) - - (assert-equal - #t - (with-exception-handler - (lambda (exn) - (knots-exception? exn)) - (lambda () - (call-with-thread - thread-pool - (lambda () - (+ 1 'a)))) - #:unwind? #t))) - -(run-fibers-for-tests - (lambda () - (let ((thread-pool - (make-thread-pool 2))) + (make-thread-pool 2))) + (run-fibers-for-tests + (lambda () (assert-equal (call-with-thread thread-pool @@ -54,13 +17,13 @@ 4)) 4)))) -(run-fibers-for-tests - (lambda () - (let ((thread-pool - (make-thread-pool - 2 - #:thread-initializer (const '(2))))) +(let ((thread-pool + (make-thread-pool + 2 + #:thread-initializer (const '(2))))) + (run-fibers-for-tests + (lambda () (assert-equal (call-with-thread thread-pool @@ -68,11 +31,22 @@ (* 2 num))) 4)))) -(run-fibers-for-tests - (lambda () - (let ((thread-pool - (make-thread-pool 2))) +(let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue + 2 + (lambda (i) + (* i 2))))) + (process-job 3)) + +(let ((thread-pool + (make-thread-pool 2))) + + (run-fibers-for-tests + (lambda () (assert-equal #t (with-exception-handler