From 016f37f108ca19da3664516baa97e907aa972b90 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 16 May 2025 11:48:41 +0100 Subject: [PATCH] Rework thread pools Allow the thread pool to vary in size by basing it on a resource pool of fixed size thread pools, which are similar to the previous thread pool implementation. Fixed size thread pools don't require fibers, but thread pools now do. Some procedures work with either thread pool implementation. --- knots/thread-pool.scm | 705 +++++++++++++++++++----------------------- tests/thread-pool.scm | 74 +++-- 2 files changed, 360 insertions(+), 419 deletions(-) diff --git a/knots/thread-pool.scm b/knots/thread-pool.scm index 14a8125..029a34b 100644 --- a/knots/thread-pool.scm +++ b/knots/thread-pool.scm @@ -27,29 +27,38 @@ #:use-module (rnrs bytevectors) #:use-module (ice-9 q) #:use-module (ice-9 match) + #:use-module (ice-9 atomic) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (set-thread-name thread-name - thread-pool? - thread-pool-channel - thread-pool-arguments-parameter - thread-pool-proc-vector - - make-thread-pool - call-with-thread - - &thread-pool-timeout + &thread-pool-timeout-error + thread-pool-timeout-error-pool thread-pool-timeout-error? - %thread-pool-default-timeout + make-thread-pool + thread-pool? + thread-pool-resource-pool - create-work-queue)) + make-fixed-size-thread-pool + fixed-size-thread-pool? + fixed-size-thread-pool-channel + fixed-size-thread-pool-current-procedures + + ;; These procedures work for thread pools and fixed size + ;; thread pools + thread-pool-arguments-parameter + thread-pool-default-checkout-timeout + + destroy-thread-pool + + call-with-thread)) (define* (syscall->procedure return-type name argument-types #:key library) @@ -147,28 +156,64 @@ from there, or #f if that would be an empty string." (const ""))) (define-record-type - (thread-pool channel arguments-parameter proc-vector) + (thread-pool resource-pool arguments-parameter) thread-pool? - (channel thread-pool-channel) - (arguments-parameter thread-pool-arguments-parameter) - (proc-vector thread-pool-proc-vector) - (default-checkout-timeout - thread-pool-default-checkout-timeout)) + (resource-pool thread-pool-resource-pool) + (arguments-parameter thread-pool-arguments-parameter-accessor)) + +(define-record-type + (fixed-size-thread-pool channel arguments-parameter current-procedures + default-checkout-timeout) + fixed-size-thread-pool? + (channel fixed-size-thread-pool-channel) + (arguments-parameter fixed-size-thread-pool-arguments-parameter) + (current-procedures fixed-size-thread-pool-current-procedures) + (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)) + +;; Since both thread pool records have this field, use a procedure +;; than handles the appropriate accessor +(define (thread-pool-arguments-parameter pool) + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-arguments-parameter pool) + (thread-pool-arguments-parameter-accessor pool))) + +(define (thread-pool-default-checkout-timeout pool) + (if (fixed-size-thread-pool? pool) + (fixed-size-thread-pool-default-checkout-timeout pool) + (assq-ref (resource-pool-configuration + (thread-pool-resource-pool pool)) + 'default-checkout-timeout))) + +(define &thread-pool-timeout + (make-exception-type '&thread-pool-timeout + &error + '(pool))) + +(define make-thread-pool-timeout-error + (record-constructor &thread-pool-timeout)) + +(define thread-pool-timeout-error-pool + (exception-accessor + &thread-pool-timeout + (record-accessor &thread-pool-timeout 'pool))) + +(define thread-pool-timeout-error? + (record-predicate &thread-pool-timeout)) + +(define* (make-fixed-size-thread-pool size + #:key + thread-initializer + thread-destructor + delay-logger + duration-logger + thread-lifetime + (expire-on-exception? #f) + (name "unnamed") + (use-default-io-waiters? #t) + default-checkout-timeout) + (define channel + (make-channel)) -(define* (make-thread-pool size - #:key - thread-initializer - thread-destructor - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - thread-lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed") - (use-default-io-waiters? #t) - default-checkout-timeout) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the thread pool procedure." (define param (make-parameter #f)) @@ -224,382 +269,252 @@ arguments of the thread pool procedure." (sleep 1) (destructor/safe args))))) - (define (process thread-index channel args) - (let loop ((current-lifetime thread-lifetime)) - (let ((exception? - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second) - proc) + (define (process channel args) + (let loop () + (match (get-message channel) + ('destroy #f) + ((reply sent-time proc) + (when delay-logger + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay + internal-time-units-per-second) + proc))) - (let* ((start-time (get-internal-real-time)) - (response - (with-exception-handler - (lambda (exn) - (list 'thread-pool-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (vector-set! thread-proc-vector - thread-index - proc) - (with-exception-handler - (lambda (exn) - (let ((stack - (match (fluid-ref %stacks) - ((stack-tag . prompt-tag) - (make-stack #t - 0 prompt-tag - 0 (and prompt-tag 1))) - (_ - (make-stack #t))))) - (raise-exception - (make-exception - exn - (make-knots-exception stack))))) - (lambda () - (call-with-values - (lambda () - (start-stack - #t - (apply proc args))) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - vals)))))) - #:unwind? #t))) - (put-message reply - response) + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'thread-pool-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-values + (lambda () + (start-stack + #t + (apply proc args))) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))))) + #:unwind? #t))) - (vector-set! thread-proc-vector - thread-index - #f) + (put-message reply + response) - (match response - (('thread-pool-error duration _) - (when duration-logger - (duration-logger duration proc)) - #t) - ((duration . _) - (when duration-logger - (duration-logger duration proc)) - #f)))))))) - (unless (and expire-on-exception? - exception?) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))))) + (let ((exception? + (match response + (('thread-pool-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))) + (if (and exception? + expire-on-exception?) + #t + (loop)))))))) - (define (start-threads channel) - (for-each - (lambda (thread-index) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append - name " w t " - (number->string thread-index)))) - (const #t)) - - (let init ((args (if thread-initializer - (initializer/safe) - '()))) - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "knots: thread-pool: internal exception: ~A\n" exn)) - (lambda () - (parameterize ((param args)) - (process thread-index channel args))) - #:unwind? #t) - - (when thread-destructor - (destructor/safe args)) - - (init (initializer/safe)))))) - (iota size))) - - (let ((channel (make-channel))) - (if use-default-io-waiters? - (call-with-default-io-waiters + (define (start-thread index channel) + (call-with-new-thread + (lambda () + (catch 'system-error (lambda () - (start-threads channel))) - (start-threads channel)) + (set-thread-name + (string-append + name " w t " (number->string index)))) + (const #t)) - (thread-pool channel - param - thread-proc-vector))) + (let init ((args (if thread-initializer + (initializer/safe) + '()))) + (let ((continue? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "knots: thread-pool: internal exception: ~A\n" exn)) + (lambda () + (parameterize ((param args)) + (process channel args))) + #:unwind? #t))) -(define &thread-pool-timeout - (make-exception-type '&thread-pool-timeout - &error - '())) + (when thread-destructor + (destructor/safe args)) -(define make-thread-pool-timeout-error - (record-constructor &thread-pool-timeout)) + (when continue? + (init (if thread-initializer + (initializer/safe) + '())))))))) -(define thread-pool-timeout-error? - (record-predicate &thread-pool-timeout)) + (for-each + (lambda (i) + (start-thread i channel)) + (iota size)) -(define* (call-with-thread record proc #:key duration-logger - (timeout (thread-pool-default-checkout-timeout - record)) - (channel (thread-pool-channel record))) + (fixed-size-thread-pool channel + param + thread-proc-vector + default-checkout-timeout)) + +(define* (make-thread-pool max-size + #:key + (min-size max-size) + scheduler + thread-initializer + thread-destructor + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + thread-lifetime + (expire-on-exception? #f) + (name "unnamed") + (use-default-io-waiters? #t) + default-checkout-timeout) + "Return a channel used to offload work to a dedicated thread. ARGS are the +arguments of the thread pool procedure." + (define param + (make-parameter #f)) + + (let ((resource-pool + (make-resource-pool + (lambda () + (make-fixed-size-thread-pool + 1 + #:thread-initializer thread-initializer + #:thread-destructor thread-destructor + #:thread-lifetime thread-lifetime + #:expire-on-exception? expire-on-exception? + #:name name + #:use-default-io-waiters? use-default-io-waiters?)) + max-size + #:destructor destroy-thread-pool + #:min-size min-size + #:delay-logger delay-logger + #:scheduler scheduler + #:duration-logger duration-logger + #:default-checkout-timeout default-checkout-timeout))) + + (thread-pool resource-pool + param))) + +(define* (call-with-thread thread-pool + proc + #:key + duration-logger + checkout-timeout + channel + destroy-thread-on-exception? + (max-waiters 'default)) "Send PROC to the thread pool through CHANNEL. Return the result of PROC. If already in the thread pool, call PROC immediately." - (let ((args ((thread-pool-arguments-parameter record)))) + (define (handle-proc fixed-size-thread-pool + reply-channel + start-time + timeout) + (let* ((request-channel + (or channel + (fixed-size-thread-pool-channel + fixed-size-thread-pool))) + (operation-success? + (perform-operation + (let ((put + (wrap-operation + (put-operation request-channel + (list reply-channel + start-time + proc)) + (const #t)))) + + (if timeout + (choice-operation + put + (wrap-operation (sleep-operation timeout) + (const #f))) + put))))) + + (unless operation-success? + (raise-exception + (make-thread-pool-timeout-error))) + + (let ((reply (get-message reply-channel))) + (match reply + (('thread-pool-error duration exn) + (when duration-logger + (duration-logger duration)) + (raise-exception exn)) + ((duration . result) + (when duration-logger + (duration-logger duration)) + (apply values result)))))) + + (let ((args ((thread-pool-arguments-parameter thread-pool)))) (if args (apply proc args) - (let* ((reply (make-channel)) - (operation-success? - (perform-operation - (let ((put - (wrap-operation - (put-operation channel - (list reply - (get-internal-real-time) - proc)) - (const #t)))) + (let ((start-time (get-internal-real-time)) + (reply-channel (make-channel))) + (if (fixed-size-thread-pool? thread-pool) + (handle-proc thread-pool + reply-channel + start-time + checkout-timeout) + (with-exception-handler + (lambda (exn) + (if (and (resource-pool-timeout-error? exn) + (eq? (resource-pool-timeout-error-pool exn) + (thread-pool-resource-pool thread-pool))) + (raise-exception + (make-thread-pool-timeout-error thread-pool)) + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool (thread-pool-resource-pool + thread-pool) + (lambda (fixed-size-thread-pool) + (if checkout-timeout + (let ((remaining-time + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (if (< remaining-time checkout-timeout) + (handle-proc fixed-size-thread-pool + reply-channel + start-time + remaining-time) + (raise-exception + (make-thread-pool-timeout-error thread-pool)))) + (handle-proc fixed-size-thread-pool + reply-channel + start-time + #f))) + #:max-waiters max-waiters + #:timeout checkout-timeout + #:destroy-resource-on-exception? + destroy-thread-on-exception?)))))))) - (if timeout - (choice-operation - put - (wrap-operation (sleep-operation timeout) - (const #f))) - put))))) - - (unless operation-success? - (raise-exception - (make-thread-pool-timeout-error))) - - (match (get-message reply) - (('thread-pool-error duration exn) - (when duration-logger - (duration-logger duration)) - (raise-exception exn)) - ((duration . result) - (when duration-logger - (duration-logger duration)) - (apply values result))))))) - -(define* (create-work-queue thread-count-parameter proc - #:key thread-start-delay - (thread-stop-delay - (make-time time-duration 0 0)) - (name "unnamed") - priority= running-jobs-count - desired-thread-count))) - - (define (thread-idle-for-too-long? last-job-finished-at) - (time>=? - (time-difference (current-time time-monotonic) - last-job-finished-at) - thread-stop-delay)) - - (define (stop-thread) - (hash-remove! running-job-args - thread-index) - (unlock-mutex queue-mutex)) - - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t " - (number->string thread-index)))) - (const #t)) - - (let loop ((last-job-finished-at (current-time time-monotonic))) - (lock-mutex queue-mutex) - - (if (too-many-threads?) - (stop-thread) - (let ((job-args - (if (q-empty? queue) - ;; #f from wait-condition-variable indicates a timeout - (if (wait-condition-variable - job-available - queue-mutex - (+ 9 (time-second (current-time)))) - ;; Another thread could have taken - ;; the job in the mean time - (if (q-empty? queue) - #f - (if priority threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) - - (if (procedure? thread-count-parameter) - (call-with-new-thread - (lambda () - (catch 'system-error - (lambda () - (set-thread-name - (string-append name " q t"))) - (const #t)) - - (while #t - (sleep 15) - (with-mutex queue-mutex - (let ((idle-threads (hash-count (lambda (index val) - (eq? #f val)) - running-job-args))) - (when (= 0 idle-threads) - (start-new-threads-if-necessary (get-thread-count)))))))) - (start-new-threads-if-necessary (get-thread-count))) - - (values process-job count-jobs count-threads list-jobs))) +(define (destroy-thread-pool pool) + (if (fixed-size-thread-pool? pool) + (put-message + (fixed-size-thread-pool-channel pool) + 'destroy) + (destroy-resource-pool + (thread-pool-resource-pool pool)))) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm index 93a49ce..1c51cb3 100644 --- a/tests/thread-pool.scm +++ b/tests/thread-pool.scm @@ -6,10 +6,47 @@ (knots thread-pool)) (let ((thread-pool - (make-thread-pool 2))) + (make-fixed-size-thread-pool 2))) + + (assert-equal + (call-with-thread + thread-pool + (lambda () + 4)) + 4)) + +(let ((thread-pool + (make-fixed-size-thread-pool + 2 + #:thread-initializer (const '(2))))) + + (assert-equal + (call-with-thread + thread-pool + (lambda (num) + (* 2 num))) + 4)) + +(let ((thread-pool + (make-fixed-size-thread-pool 2))) + + (assert-equal + #t + (with-exception-handler + (lambda (exn) + (knots-exception? exn)) + (lambda () + (call-with-thread + thread-pool + (lambda () + (+ 1 'a)))) + #:unwind? #t))) + +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 2))) - (run-fibers-for-tests - (lambda () (assert-equal (call-with-thread thread-pool @@ -17,13 +54,13 @@ 4)) 4)))) -(let ((thread-pool - (make-thread-pool - 2 - #:thread-initializer (const '(2))))) +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool + 2 + #:thread-initializer (const '(2))))) - (run-fibers-for-tests - (lambda () (assert-equal (call-with-thread thread-pool @@ -31,22 +68,11 @@ (* 2 num))) 4)))) -(let ((process-job - count-jobs - count-threads - list-jobs - (create-work-queue - 2 - (lambda (i) - (* i 2))))) +(run-fibers-for-tests + (lambda () + (let ((thread-pool + (make-thread-pool 2))) - (process-job 3)) - -(let ((thread-pool - (make-thread-pool 2))) - - (run-fibers-for-tests - (lambda () (assert-equal #t (with-exception-handler