Allow nesting worker thread calls

Incorporating changes from the nar-herder.
This commit is contained in:
Christopher Baines 2024-12-28 08:47:03 +00:00
parent 4e791aff68
commit 4e564b4814
2 changed files with 38 additions and 21 deletions

View file

@ -19,6 +19,7 @@
(define-module (knots worker-threads)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-71)
#:use-module (system foreign)
@ -34,7 +35,12 @@
#:export (set-thread-name
thread-name
make-worker-thread-channel
worker-thread-set?
worker-thread-set-channel
worker-thread-set-arguments-parameter
worker-thread-set-thread-proc-vector
make-worker-thread-set
call-with-worker-thread
&worker-thread-timeout
@ -139,20 +145,29 @@ from there, or #f if that would be an empty string."
thread-name/linux
(const "")))
(define %worker-thread-args
(make-parameter #f))
(define-record-type <worker-thread-set>
(worker-thread-set channel
arguments-parameter
thread-proc-vector)
worker-thread-set?
(channel worker-thread-set-channel)
(arguments-parameter worker-thread-set-arguments-parameter)
(thread-proc-vector worker-thread-set-thread-proc-vector))
(define* (make-worker-thread-channel initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
(duration-logger (const #f))
destructor
lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed"))
(define* (make-worker-thread-set initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
(duration-logger (const #f))
destructor
lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
(define param
(make-parameter #f))
(define thread-proc-vector
(make-vector parallelism #f))
@ -298,7 +313,7 @@ arguments of the worker thread procedure."
(current-error-port)
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
(parameterize ((param args))
(process thread-index channel args)))
#:unwind? #t)
@ -308,8 +323,9 @@ arguments of the worker thread procedure."
(init (initializer/safe))))))
(iota parallelism))
(values channel
thread-proc-vector)))
(worker-thread-set channel
param
thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout
@ -325,11 +341,12 @@ arguments of the worker thread procedure."
(define %worker-thread-default-timeout
(make-parameter 30))
(define* (call-with-worker-thread channel proc #:key duration-logger
(timeout (%worker-thread-default-timeout)))
(define* (call-with-worker-thread record proc #:key duration-logger
(timeout (%worker-thread-default-timeout))
(channel (worker-thread-set-channel record)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
(let ((args ((worker-thread-set-arguments-parameter record))))
(if args
(apply proc args)
(let* ((reply (make-channel))

View file

@ -4,8 +4,8 @@
(unit-test)
(knots worker-threads))
(let ((worker-thread-channel
(make-worker-thread-channel
(let ((worker-thread-set
(make-worker-thread-set
(const '())
#:parallelism 2)))
@ -13,7 +13,7 @@
(lambda ()
(assert-equal
(call-with-worker-thread
worker-thread-channel
worker-thread-set
(lambda ()
4))
4))))