From d572f591a3c136bfc7b23160e16381c92588f8d9 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Mon, 13 Jan 2025 12:22:27 +0000 Subject: [PATCH] Rename worker threads to thread pool I think this needs more work, maybe the thread pool should be more similar to the resource pool, but I think the name change is still helpful. Maybe there's a need for a variable size thread pool and that can better integrate with the work queue. --- Makefile.am | 4 +- knots/{worker-threads.scm => thread-pool.scm} | 128 +++++++++--------- tests/thread-pool.scm | 44 ++++++ tests/worker-threads.scm | 32 ----- 4 files changed, 111 insertions(+), 97 deletions(-) rename knots/{worker-threads.scm => thread-pool.scm} (86%) create mode 100644 tests/thread-pool.scm delete mode 100644 tests/worker-threads.scm diff --git a/Makefile.am b/Makefile.am index 77fa98b..21851ae 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,7 +9,7 @@ SOURCES = \ knots/resource-pool.scm \ knots/timeout.scm \ knots/web-server.scm \ - knots/worker-threads.scm + knots/thread-pool.scm SCM_TESTS = \ tests/non-blocking.scm \ @@ -20,7 +20,7 @@ SCM_TESTS = \ tests/web-server.scm \ tests/parallelism.scm \ tests/resource-pool.scm \ - tests/worker-threads.scm + tests/thread-pool.scm TESTS_GOBJECTS = $(SCM_TESTS:%.scm=%.go) diff --git a/knots/worker-threads.scm b/knots/thread-pool.scm similarity index 86% rename from knots/worker-threads.scm rename to knots/thread-pool.scm index cc1b571..24260cd 100644 --- a/knots/worker-threads.scm +++ b/knots/thread-pool.scm @@ -17,7 +17,7 @@ ;;; along with the guix-data-service. If not, see ;;; . -(define-module (knots worker-threads) +(define-module (knots thread-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) @@ -35,18 +35,18 @@ #:export (set-thread-name thread-name - worker-thread-set? - worker-thread-set-channel - worker-thread-set-arguments-parameter - worker-thread-set-thread-proc-vector + thread-pool? + thread-pool-channel + thread-pool-arguments-parameter + thread-pool-proc-vector - make-worker-thread-set - call-with-worker-thread + make-thread-pool + call-with-thread - &worker-thread-timeout - worker-thread-timeout-error? + &thread-pool-timeout + thread-pool-timeout-error? - %worker-thread-default-timeout + %thread-pool-default-timeout create-work-queue)) @@ -145,31 +145,30 @@ from there, or #f if that would be an empty string." thread-name/linux (const ""))) -(define-record-type - (worker-thread-set channel - arguments-parameter - thread-proc-vector) - worker-thread-set? - (channel worker-thread-set-channel) - (arguments-parameter worker-thread-set-arguments-parameter) - (thread-proc-vector worker-thread-set-thread-proc-vector)) +(define-record-type + (thread-pool channel arguments-parameter proc-vector) + thread-pool? + (channel thread-pool-channel) + (arguments-parameter thread-pool-arguments-parameter) + (proc-vector thread-pool-proc-vector)) -(define* (make-worker-thread-set initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) +(define* (make-thread-pool size + #:key + thread-initializer + thread-destructor + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + thread-lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +arguments of the thread pool procedure." (define param (make-parameter #f)) (define thread-proc-vector - (make-vector parallelism #f)) + (make-vector size #f)) (define (initializer/safe) (let ((args @@ -177,14 +176,14 @@ arguments of the worker thread procedure." (lambda (exn) (simple-format (current-error-port) - "exception running initializer in worker thread (~A): ~A:\n ~A\n" + "exception running initializer in thread pool (~A): ~A:\n ~A\n" name - initializer + thread-initializer exn) #f) (lambda () (with-throw-handler #t - initializer + thread-initializer (lambda args (backtrace)))) #:unwind? #t))) @@ -202,15 +201,15 @@ arguments of the worker thread procedure." (lambda (exn) (simple-format (current-error-port) - "exception running destructor in worker thread (~A): ~A:\n ~A\n" + "exception running destructor in thread pool (~A): ~A:\n ~A\n" name - destructor + thread-destructor exn) #f) (lambda () (with-throw-handler #t (lambda () - (apply destructor args) + (apply thread-destructor args) #t) (lambda _ (backtrace)))) @@ -223,7 +222,7 @@ arguments of the worker thread procedure." (destructor/safe args))))) (define (process thread-index channel args) - (let loop ((current-lifetime lifetime)) + (let loop ((current-lifetime thread-lifetime)) (let ((exception? (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) @@ -231,13 +230,14 @@ arguments of the worker thread procedure." (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay - internal-time-units-per-second)) + internal-time-units-per-second) + proc) (let* ((start-time (get-internal-real-time)) (response (with-exception-handler (lambda (exn) - (list 'worker-thread-error + (list 'thread-pool-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) @@ -251,7 +251,7 @@ arguments of the worker thread procedure." (call-with-values (lambda () (start-stack - 'worker-thread + 'thread-pool (apply proc args))) (lambda vals (cons (/ (- (get-internal-real-time) @@ -265,7 +265,7 @@ arguments of the worker thread procedure." (_ #t)) (simple-format (current-error-port) - "worker-thread: exception: ~A\n" args) + "thread-pool: exception: ~A\n" args) (backtrace))))) #:unwind? #t))) (put-message reply @@ -276,7 +276,7 @@ arguments of the worker thread procedure." #f) (match response - (('worker-thread-error duration _) + (('thread-pool-error duration _) (when duration-logger (duration-logger duration proc)) #t) @@ -306,47 +306,49 @@ arguments of the worker thread procedure." (number->string thread-index)))) (const #t)) - (let init ((args (initializer/safe))) + (let init ((args (if thread-initializer + (initializer/safe) + '()))) (with-exception-handler (lambda (exn) (simple-format (current-error-port) - "worker-thread-channel: exception: ~A\n" exn)) + "knots: thread-pool: internal exception: ~A\n" exn)) (lambda () (parameterize ((param args)) (process thread-index channel args))) #:unwind? #t) - (when destructor + (when thread-destructor (destructor/safe args)) (init (initializer/safe)))))) - (iota parallelism)) + (iota size)) - (worker-thread-set channel - param - thread-proc-vector))) + (thread-pool channel + param + thread-proc-vector))) -(define &worker-thread-timeout - (make-exception-type '&worker-thread-timeout +(define &thread-pool-timeout + (make-exception-type '&thread-pool-timeout &error '())) -(define make-worker-thread-timeout-error - (record-constructor &worker-thread-timeout)) +(define make-thread-pool-timeout-error + (record-constructor &thread-pool-timeout)) -(define worker-thread-timeout-error? - (record-predicate &worker-thread-timeout)) +(define thread-pool-timeout-error? + (record-predicate &thread-pool-timeout)) -(define %worker-thread-default-timeout +(define %thread-pool-default-timeout (make-parameter 30)) -(define* (call-with-worker-thread record proc #:key duration-logger - (timeout (%worker-thread-default-timeout)) - (channel (worker-thread-set-channel record))) - "Send PROC to the worker thread through CHANNEL. Return the result of PROC. -If already in the worker thread, call PROC immediately." - (let ((args ((worker-thread-set-arguments-parameter record)))) +(define* (call-with-thread record proc #:key duration-logger + (timeout (%thread-pool-default-timeout)) + (channel (thread-pool-channel record))) + "Send PROC to the thread pool through CHANNEL. Return the result of PROC. +If already in the thread pool, call PROC immediately." + (let ((args ((thread-pool-arguments-parameter record)))) (if args (apply proc args) (let* ((reply (make-channel)) @@ -369,10 +371,10 @@ If already in the worker thread, call PROC immediately." (unless operation-success? (raise-exception - (make-worker-thread-timeout-error))) + (make-thread-pool-timeout-error))) (match (get-message reply) - (('worker-thread-error duration exn) + (('thread-pool-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm new file mode 100644 index 0000000..71b4494 --- /dev/null +++ b/tests/thread-pool.scm @@ -0,0 +1,44 @@ +(use-modules (tests) + (srfi srfi-71) + (fibers) + (unit-test) + (knots thread-pool)) + +(let ((thread-pool + (make-thread-pool 2))) + + (run-fibers-for-tests + (lambda () + (assert-equal + (call-with-thread + thread-pool + (lambda () + 4)) + 4)))) + +(let ((thread-pool + (make-thread-pool + 2 + #:thread-initializer (const '(2))))) + + (run-fibers-for-tests + (lambda () + (assert-equal + (call-with-thread + thread-pool + (lambda (num) + (* 2 num))) + 4)))) + +(let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue + 2 + (lambda (i) + (* i 2))))) + + (process-job 3)) + +(display "thread-pool test finished successfully\n") diff --git a/tests/worker-threads.scm b/tests/worker-threads.scm deleted file mode 100644 index cfdfbf8..0000000 --- a/tests/worker-threads.scm +++ /dev/null @@ -1,32 +0,0 @@ -(use-modules (tests) - (srfi srfi-71) - (fibers) - (unit-test) - (knots worker-threads)) - -(let ((worker-thread-set - (make-worker-thread-set - (const '()) - #:parallelism 2))) - - (run-fibers-for-tests - (lambda () - (assert-equal - (call-with-worker-thread - worker-thread-set - (lambda () - 4)) - 4)))) - -(let ((process-job - count-jobs - count-threads - list-jobs - (create-work-queue - 2 - (lambda (i) - (* i 2))))) - - (process-job 3)) - -(display "worker-threads test finished successfully\n")