From 4e564b48146136474539fd5c598283154751caf8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sat, 28 Dec 2024 08:47:03 +0000 Subject: [PATCH] Allow nesting worker thread calls Incorporating changes from the nar-herder. --- knots/worker-threads.scm | 53 ++++++++++++++++++++++++++-------------- tests/worker-threads.scm | 6 ++--- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/knots/worker-threads.scm b/knots/worker-threads.scm index f4116e9..cc1b571 100644 --- a/knots/worker-threads.scm +++ b/knots/worker-threads.scm @@ -19,6 +19,7 @@ (define-module (knots worker-threads) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) #:use-module (srfi srfi-71) #:use-module (system foreign) @@ -34,7 +35,12 @@ #:export (set-thread-name thread-name - make-worker-thread-channel + worker-thread-set? + worker-thread-set-channel + worker-thread-set-arguments-parameter + worker-thread-set-thread-proc-vector + + make-worker-thread-set call-with-worker-thread &worker-thread-timeout @@ -139,20 +145,29 @@ from there, or #f if that would be an empty string." thread-name/linux (const ""))) -(define %worker-thread-args - (make-parameter #f)) +(define-record-type + (worker-thread-set channel + arguments-parameter + thread-proc-vector) + worker-thread-set? + (channel worker-thread-set-channel) + (arguments-parameter worker-thread-set-arguments-parameter) + (thread-proc-vector worker-thread-set-thread-proc-vector)) -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - (duration-logger (const #f)) - destructor - lifetime - (log-exception? (const #t)) - (expire-on-exception? #f) - (name "unnamed")) +(define* (make-worker-thread-set initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." + (define param + (make-parameter #f)) + (define thread-proc-vector (make-vector parallelism #f)) @@ -298,7 +313,7 @@ arguments of the worker thread procedure." (current-error-port) "worker-thread-channel: exception: ~A\n" exn)) (lambda () - (parameterize ((%worker-thread-args args)) + (parameterize ((param args)) (process thread-index channel args))) #:unwind? #t) @@ -308,8 +323,9 @@ arguments of the worker thread procedure." (init (initializer/safe)))))) (iota parallelism)) - (values channel - thread-proc-vector))) + (worker-thread-set channel + param + thread-proc-vector))) (define &worker-thread-timeout (make-exception-type '&worker-thread-timeout @@ -325,11 +341,12 @@ arguments of the worker thread procedure." (define %worker-thread-default-timeout (make-parameter 30)) -(define* (call-with-worker-thread channel proc #:key duration-logger - (timeout (%worker-thread-default-timeout))) +(define* (call-with-worker-thread record proc #:key duration-logger + (timeout (%worker-thread-default-timeout)) + (channel (worker-thread-set-channel record))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) + (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let* ((reply (make-channel)) diff --git a/tests/worker-threads.scm b/tests/worker-threads.scm index 72a56dc..cfdfbf8 100644 --- a/tests/worker-threads.scm +++ b/tests/worker-threads.scm @@ -4,8 +4,8 @@ (unit-test) (knots worker-threads)) -(let ((worker-thread-channel - (make-worker-thread-channel +(let ((worker-thread-set + (make-worker-thread-set (const '()) #:parallelism 2))) @@ -13,7 +13,7 @@ (lambda () (assert-equal (call-with-worker-thread - worker-thread-channel + worker-thread-set (lambda () 4)) 4))))