diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 14a8125..029a34b 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -27,29 +27,38 @@ #:use-module (rnrs bytevectors) #:use-module (ice-9 q) #:use-module (ice-9 match) + #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (set-thread-name thread-name - thread-pool? - thread-pool-channel - thread-pool-arguments-parameter - thread-pool-proc-vector - - make-thread-pool - call-with-thread - - &thread-pool-timeout + &thread-pool-timeout-error + thread-pool-timeout-error-pool thread-pool-timeout-error? - %thread-pool-default-timeout + make-thread-pool + thread-pool? + thread-pool-resource-pool - create-work-queue)) + make-fixed-size-thread-pool + fixed-size-thread-pool? + fixed-size-thread-pool-channel + fixed-size-thread-pool-current-procedures + + ;; These procedures work for thread pools and fixed size + ;; thread pools + thread-pool-arguments-parameter + thread-pool-default-checkout-timeout + + destroy-thread-pool + + call-with-thread)) (define* (syscall->procedure return-type name argument-types #:key library) @@ -147,28 +156,64 @@ from there, or #f if that would be an empty string." (const ""))) (define-record-type - (thread-pool channel arguments-parameter proc-vector) + (thread-pool resource-pool arguments-parameter) thread-pool? - (channel thread-pool-channel) - (arguments-parameter thread-pool-arguments-parameter) - (proc-vector thread-pool-proc-vector) - (default-checkout-timeout - thread-pool-default-checkout-timeout)) + (resource-pool thread-pool-resource-pool) + (arguments-parameter thread-pool-arguments-parameter-accessor)) + +(define-record-type + (fixed-size-thread-pool channel arguments-parameter current-procedures + default-checkout-timeout) + fixed-size-thread-pool? + (channel fixed-size-thread-pool-channel) + (arguments-parameter fixed-size-thread-pool-arguments-parameter) + (current-procedures fixed-size-thread-pool-current-procedures) + (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)) + +;; Since both thread pool records have this field, use a procedure +;; than handles the appropriate accessor +(define (thread-pool-arguments-parameter pool) + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-arguments-parameter pool) + (thread-pool-arguments-parameter-accessor pool))) + +(define (thread-pool-default-checkout-timeout pool) + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-default-checkout-timeout pool) + (assq-ref (resource-pool-configuration + (thread-pool-resource-pool pool)) + 'default-checkout-timeout))) + +(define &thread-pool-timeout + (make-exception-type '&thread-pool-timeout + &error + '(pool))) + +(define make-thread-pool-timeout-error + (record-constructor &thread-pool-timeout)) + +(define thread-pool-timeout-error-pool + (exception-accessor + &thread-pool-timeout + (record-accessor &thread-pool-timeout 'pool))) + +(define thread-pool-timeout-error? + (record-predicate &thread-pool-timeout)) + +(define* (make-fixed-size-thread-pool size + #:key + thread-initializer + thread-destructor + delay-logger + duration-logger + thread-lifetime + (expire-on-exception? #f) + (name "unnamed") + (use-default-io-waiters? #t) + default-checkout-timeout) + (define channel + (make-channel)) -(define* (make-thread-pool size - #:key - thread-initializer - thread-destructor - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - thread-lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed") - (use-default-io-waiters? #t) - default-checkout-timeout) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the thread pool procedure." (define param (make-parameter #f)) @@ -224,382 +269,252 @@ arguments of the thread pool procedure." (sleep 1) (destructor/safe args))))) - (define (process thread-index channel args) - (let loop ((current-lifetime thread-lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second) - proc) + (define (process channel args) + (let loop () + (match (get-message channel) + ('destroy #f) + ((reply sent-time proc) + (when delay-logger + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second) + proc))) - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'thread-pool-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (vector-set! thread-proc-vector - thread-index - proc) - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (call-with-values - (lambda () - (start-stack - #t - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))))) - #:unwind? #t))) - (put-message reply - response) + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'thread-pool-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-values + (lambda () + (start-stack + #t + (apply proc args))) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))))) + #:unwind? #t))) - (vector-set! thread-proc-vector - thread-index - #f) + (put-message reply + response) - (match response - (('thread-pool-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) + (let ((exception? + (match response + (('thread-pool-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))) + (if (and exception? + expire-on-exception?) + #t + (loop)))))))) - (define (start-threads channel) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (if thread-initializer - (initializer/safe) - '()))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "knots: thread-pool: internal exception: ~A\n" exn)) - (lambda () - (parameterize ((param args)) - (process thread-index channel args))) - #:unwind? #t) - - (when thread-destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota size))) - - (let ((channel (make-channel))) - (if use-default-io-waiters? - (call-with-default-io-waiters + (define (start-thread index channel) + (call-with-new-thread + (lambda () + (catch 'system-error (lambda () - (start-threads channel))) - (start-threads channel)) + (set-thread-name + (string-append + name " w t " (number->string index)))) + (const #t)) - (thread-pool channel - param - thread-proc-vector))) + (let init ((args (if thread-initializer + (initializer/safe) + '()))) + (let ((continue? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "knots: thread-pool: internal exception: ~A\n" exn)) + (lambda () + (parameterize ((param args)) + (process channel args))) + #:unwind? #t))) -(define &thread-pool-timeout - (make-exception-type '&thread-pool-timeout - &error - '())) + (when thread-destructor + (destructor/safe args)) -(define make-thread-pool-timeout-error - (record-constructor &thread-pool-timeout)) + (when continue? + (init (if thread-initializer + (initializer/safe) + '())))))))) -(define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout)) + (for-each + (lambda (i) + (start-thread i channel)) + (iota size)) -(define* (call-with-thread record proc #:key duration-logger - (timeout (thread-pool-default-checkout-timeout - record)) - (channel (thread-pool-channel record))) + (fixed-size-thread-pool channel + param + thread-proc-vector + default-checkout-timeout)) + +(define* (make-thread-pool max-size + #:key + (min-size max-size) + scheduler + thread-initializer + thread-destructor + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + thread-lifetime + (expire-on-exception? #f) + (name "unnamed") + (use-default-io-waiters? #t) + default-checkout-timeout) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the thread pool procedure." + (define param + (make-parameter #f)) + + (let ((resource-pool + (make-resource-pool + (lambda () + (make-fixed-size-thread-pool + 1 + #:thread-initializer thread-initializer + #:thread-destructor thread-destructor + #:thread-lifetime thread-lifetime + #:expire-on-exception? expire-on-exception? + #:name name + #:use-default-io-waiters? use-default-io-waiters?)) + max-size + #:destructor destroy-thread-pool + #:min-size min-size + #:delay-logger delay-logger + #:scheduler scheduler + #:duration-logger duration-logger + #:default-checkout-timeout default-checkout-timeout))) + + (thread-pool resource-pool + param))) + +(define* (call-with-thread thread-pool + proc + #:key + duration-logger + checkout-timeout + channel + destroy-thread-on-exception? + (max-waiters 'default)) "Send PROC to the thread pool through CHANNEL. Return the result of PROC. If already in the thread pool, call PROC immediately." - (let ((args ((thread-pool-arguments-parameter record)))) + (define (handle-proc fixed-size-thread-pool + reply-channel + start-time + timeout) + (let* ((request-channel + (or channel + (fixed-size-thread-pool-channel + fixed-size-thread-pool))) + (operation-success? + (perform-operation + (let ((put + (wrap-operation + (put-operation request-channel + (list reply-channel + start-time + proc)) + (const #t)))) + + (if timeout + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const #f))) + put))))) + + (unless operation-success? + (raise-exception + (make-thread-pool-timeout-error))) + + (let ((reply (get-message reply-channel))) + (match reply + (('thread-pool-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result)))))) + + (let ((args ((thread-pool-arguments-parameter thread-pool)))) (if args (apply proc args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) + (let ((start-time (get-internal-real-time)) + (reply-channel (make-channel))) + (if (fixed-size-thread-pool? thread-pool) + (handle-proc thread-pool + reply-channel + start-time + checkout-timeout) + (with-exception-handler + (lambda (exn) + (if (and (resource-pool-timeout-error? exn) + (eq? (resource-pool-timeout-error-pool exn) + (thread-pool-resource-pool thread-pool))) + (raise-exception + (make-thread-pool-timeout-error thread-pool)) + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool (thread-pool-resource-pool + thread-pool) + (lambda (fixed-size-thread-pool) + (if checkout-timeout + (let ((remaining-time + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (if (< remaining-time checkout-timeout) + (handle-proc fixed-size-thread-pool + reply-channel + start-time + remaining-time) + (raise-exception + (make-thread-pool-timeout-error thread-pool)))) + (handle-proc fixed-size-thread-pool + reply-channel + start-time + #f))) + #:max-waiters max-waiters + #:timeout checkout-timeout + #:destroy-resource-on-exception? + destroy-thread-on-exception?)))))))) - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-thread-pool-timeout-error))) - - (match (get-message reply) - (('thread-pool-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -(define* (create-work-queue thread-count-parameter proc - #:key thread-start-delay - (thread-stop-delay - (make-time time-duration 0 0)) - (name "unnamed") - priority= running-jobs-count - desired-thread-count))) - - (define (thread-idle-for-too-long? last-job-finished-at) - (time>=? - (time-difference (current-time time-monotonic) - last-job-finished-at) - thread-stop-delay)) - - (define (stop-thread) - (hash-remove! running-job-args - thread-index) - (unlock-mutex queue-mutex)) - - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t " - (number->string thread-index)))) - (const #t)) - - (let loop ((last-job-finished-at (current-time time-monotonic))) - (lock-mutex queue-mutex) - - (if (too-many-threads?) - (stop-thread) - (let ((job-args - (if (q-empty? queue) - ;; #f from wait-condition-variable indicates a timeout - (if (wait-condition-variable - job-available - queue-mutex - (+ 9 (time-second (current-time)))) - ;; Another thread could have taken - ;; the job in the mean time - (if (q-empty? queue) - #f - (if priority threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) - - (if (procedure? thread-count-parameter) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t"))) - (const #t)) - - (while #t - (sleep 15) - (with-mutex queue-mutex - (let ((idle-threads (hash-count (lambda (index val) - (eq? #f val)) - running-job-args))) - (when (= 0 idle-threads) - (start-new-threads-if-necessary (get-thread-count)))))))) - (start-new-threads-if-necessary (get-thread-count))) - - (values process-job count-jobs count-threads list-jobs))) +(define (destroy-thread-pool pool) + (if (fixed-size-thread-pool? pool) + (put-message + (fixed-size-thread-pool-channel pool) + 'destroy) + (destroy-resource-pool + (thread-pool-resource-pool pool)))) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index 93a49ce..1c51cb3 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -6,10 +6,47 @@ (knots thread-pool)) (let ((thread-pool - (make-thread-pool 2))) + (make-fixed-size-thread-pool 2))) + + (assert-equal + (call-with-thread + thread-pool + (lambda () + 4)) + 4)) + +(let ((thread-pool + (make-fixed-size-thread-pool + 2 + #:thread-initializer (const '(2))))) + + (assert-equal + (call-with-thread + thread-pool + (lambda (num) + (* 2 num))) + 4)) + +(let ((thread-pool + (make-fixed-size-thread-pool 2))) + + (assert-equal + #t + (with-exception-handler + (lambda (exn) + (knots-exception? exn)) + (lambda () + (call-with-thread + thread-pool + (lambda () + (+ 1 'a)))) + #:unwind? #t))) + +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 2))) - (run-fibers-for-tests - (lambda () (assert-equal (call-with-thread thread-pool @@ -17,13 +54,13 @@ 4)) 4)))) -(let ((thread-pool - (make-thread-pool - 2 - #:thread-initializer (const '(2))))) +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool + 2 + #:thread-initializer (const '(2))))) - (run-fibers-for-tests - (lambda () (assert-equal (call-with-thread thread-pool @@ -31,22 +68,11 @@ (* 2 num))) 4)))) -(let ((process-job - count-jobs - count-threads - list-jobs - (create-work-queue - 2 - (lambda (i) - (* i 2))))) +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 2))) - (process-job 3)) - -(let ((thread-pool - (make-thread-pool 2))) - - (run-fibers-for-tests - (lambda () (assert-equal #t (with-exception-handler