diff --git a/Makefile.am b/Makefile.am
index 77fa98b..21851ae 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,7 +9,7 @@ SOURCES = \
knots/resource-pool.scm \
knots/timeout.scm \
knots/web-server.scm \
- knots/worker-threads.scm
+ knots/thread-pool.scm
SCM_TESTS = \
tests/non-blocking.scm \
@@ -20,7 +20,7 @@ SCM_TESTS = \
tests/web-server.scm \
tests/parallelism.scm \
tests/resource-pool.scm \
- tests/worker-threads.scm
+ tests/thread-pool.scm
TESTS_GOBJECTS = $(SCM_TESTS:%.scm=%.go)
diff --git a/knots/worker-threads.scm b/knots/thread-pool.scm
similarity index 86%
rename from knots/worker-threads.scm
rename to knots/thread-pool.scm
index cc1b571..24260cd 100644
--- a/knots/worker-threads.scm
+++ b/knots/thread-pool.scm
@@ -17,7 +17,7 @@
;;; along with the guix-data-service. If not, see
;;; .
-(define-module (knots worker-threads)
+(define-module (knots thread-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-19)
@@ -35,18 +35,18 @@
#:export (set-thread-name
thread-name
- worker-thread-set?
- worker-thread-set-channel
- worker-thread-set-arguments-parameter
- worker-thread-set-thread-proc-vector
+ thread-pool?
+ thread-pool-channel
+ thread-pool-arguments-parameter
+ thread-pool-proc-vector
- make-worker-thread-set
- call-with-worker-thread
+ make-thread-pool
+ call-with-thread
- &worker-thread-timeout
- worker-thread-timeout-error?
+ &thread-pool-timeout
+ thread-pool-timeout-error?
- %worker-thread-default-timeout
+ %thread-pool-default-timeout
create-work-queue))
@@ -145,31 +145,30 @@ from there, or #f if that would be an empty string."
thread-name/linux
(const "")))
-(define-record-type
- (worker-thread-set channel
- arguments-parameter
- thread-proc-vector)
- worker-thread-set?
- (channel worker-thread-set-channel)
- (arguments-parameter worker-thread-set-arguments-parameter)
- (thread-proc-vector worker-thread-set-thread-proc-vector))
+(define-record-type
+ (thread-pool channel arguments-parameter proc-vector)
+ thread-pool?
+ (channel thread-pool-channel)
+ (arguments-parameter thread-pool-arguments-parameter)
+ (proc-vector thread-pool-proc-vector))
-(define* (make-worker-thread-set initializer
- #:key (parallelism 1)
- (delay-logger (lambda _ #f))
- (duration-logger (const #f))
- destructor
- lifetime
- (log-exception? (const #t))
- (expire-on-exception? #f)
- (name "unnamed"))
+(define* (make-thread-pool size
+ #:key
+ thread-initializer
+ thread-destructor
+ (delay-logger (lambda _ #f))
+ (duration-logger (const #f))
+ thread-lifetime
+ (log-exception? (const #t))
+ (expire-on-exception? #f)
+ (name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
-arguments of the worker thread procedure."
+arguments of the thread pool procedure."
(define param
(make-parameter #f))
(define thread-proc-vector
- (make-vector parallelism #f))
+ (make-vector size #f))
(define (initializer/safe)
(let ((args
@@ -177,14 +176,14 @@ arguments of the worker thread procedure."
(lambda (exn)
(simple-format
(current-error-port)
- "exception running initializer in worker thread (~A): ~A:\n ~A\n"
+ "exception running initializer in thread pool (~A): ~A:\n ~A\n"
name
- initializer
+ thread-initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
- initializer
+ thread-initializer
(lambda args
(backtrace))))
#:unwind? #t)))
@@ -202,15 +201,15 @@ arguments of the worker thread procedure."
(lambda (exn)
(simple-format
(current-error-port)
- "exception running destructor in worker thread (~A): ~A:\n ~A\n"
+ "exception running destructor in thread pool (~A): ~A:\n ~A\n"
name
- destructor
+ thread-destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
- (apply destructor args)
+ (apply thread-destructor args)
#t)
(lambda _
(backtrace))))
@@ -223,7 +222,7 @@ arguments of the worker thread procedure."
(destructor/safe args)))))
(define (process thread-index channel args)
- (let loop ((current-lifetime lifetime))
+ (let loop ((current-lifetime thread-lifetime))
(let ((exception?
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
@@ -231,13 +230,14 @@ arguments of the worker thread procedure."
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
- internal-time-units-per-second))
+ internal-time-units-per-second)
+ proc)
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
- (list 'worker-thread-error
+ (list 'thread-pool-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
@@ -251,7 +251,7 @@ arguments of the worker thread procedure."
(call-with-values
(lambda ()
(start-stack
- 'worker-thread
+ 'thread-pool
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
@@ -265,7 +265,7 @@ arguments of the worker thread procedure."
(_ #t))
(simple-format
(current-error-port)
- "worker-thread: exception: ~A\n" args)
+ "thread-pool: exception: ~A\n" args)
(backtrace)))))
#:unwind? #t)))
(put-message reply
@@ -276,7 +276,7 @@ arguments of the worker thread procedure."
#f)
(match response
- (('worker-thread-error duration _)
+ (('thread-pool-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
@@ -306,47 +306,49 @@ arguments of the worker thread procedure."
(number->string thread-index))))
(const #t))
- (let init ((args (initializer/safe)))
+ (let init ((args (if thread-initializer
+ (initializer/safe)
+ '())))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
- "worker-thread-channel: exception: ~A\n" exn))
+ "knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process thread-index channel args)))
#:unwind? #t)
- (when destructor
+ (when thread-destructor
(destructor/safe args))
(init (initializer/safe))))))
- (iota parallelism))
+ (iota size))
- (worker-thread-set channel
- param
- thread-proc-vector)))
+ (thread-pool channel
+ param
+ thread-proc-vector)))
-(define &worker-thread-timeout
- (make-exception-type '&worker-thread-timeout
+(define &thread-pool-timeout
+ (make-exception-type '&thread-pool-timeout
&error
'()))
-(define make-worker-thread-timeout-error
- (record-constructor &worker-thread-timeout))
+(define make-thread-pool-timeout-error
+ (record-constructor &thread-pool-timeout))
-(define worker-thread-timeout-error?
- (record-predicate &worker-thread-timeout))
+(define thread-pool-timeout-error?
+ (record-predicate &thread-pool-timeout))
-(define %worker-thread-default-timeout
+(define %thread-pool-default-timeout
(make-parameter 30))
-(define* (call-with-worker-thread record proc #:key duration-logger
- (timeout (%worker-thread-default-timeout))
- (channel (worker-thread-set-channel record)))
- "Send PROC to the worker thread through CHANNEL. Return the result of PROC.
-If already in the worker thread, call PROC immediately."
- (let ((args ((worker-thread-set-arguments-parameter record))))
+(define* (call-with-thread record proc #:key duration-logger
+ (timeout (%thread-pool-default-timeout))
+ (channel (thread-pool-channel record)))
+ "Send PROC to the thread pool through CHANNEL. Return the result of PROC.
+If already in the thread pool, call PROC immediately."
+ (let ((args ((thread-pool-arguments-parameter record))))
(if args
(apply proc args)
(let* ((reply (make-channel))
@@ -369,10 +371,10 @@ If already in the worker thread, call PROC immediately."
(unless operation-success?
(raise-exception
- (make-worker-thread-timeout-error)))
+ (make-thread-pool-timeout-error)))
(match (get-message reply)
- (('worker-thread-error duration exn)
+ (('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
diff --git a/tests/thread-pool.scm b/tests/thread-pool.scm
new file mode 100644
index 0000000..71b4494
--- /dev/null
+++ b/tests/thread-pool.scm
@@ -0,0 +1,44 @@
+(use-modules (tests)
+ (srfi srfi-71)
+ (fibers)
+ (unit-test)
+ (knots thread-pool))
+
+(let ((thread-pool
+ (make-thread-pool 2)))
+
+ (run-fibers-for-tests
+ (lambda ()
+ (assert-equal
+ (call-with-thread
+ thread-pool
+ (lambda ()
+ 4))
+ 4))))
+
+(let ((thread-pool
+ (make-thread-pool
+ 2
+ #:thread-initializer (const '(2)))))
+
+ (run-fibers-for-tests
+ (lambda ()
+ (assert-equal
+ (call-with-thread
+ thread-pool
+ (lambda (num)
+ (* 2 num)))
+ 4))))
+
+(let ((process-job
+ count-jobs
+ count-threads
+ list-jobs
+ (create-work-queue
+ 2
+ (lambda (i)
+ (* i 2)))))
+
+ (process-job 3))
+
+(display "thread-pool test finished successfully\n")
diff --git a/tests/worker-threads.scm b/tests/worker-threads.scm
deleted file mode 100644
index cfdfbf8..0000000
--- a/tests/worker-threads.scm
+++ /dev/null
@@ -1,32 +0,0 @@
-(use-modules (tests)
- (srfi srfi-71)
- (fibers)
- (unit-test)
- (knots worker-threads))
-
-(let ((worker-thread-set
- (make-worker-thread-set
- (const '())
- #:parallelism 2)))
-
- (run-fibers-for-tests
- (lambda ()
- (assert-equal
- (call-with-worker-thread
- worker-thread-set
- (lambda ()
- 4))
- 4))))
-
-(let ((process-job
- count-jobs
- count-threads
- list-jobs
- (create-work-queue
- 2
- (lambda (i)
- (* i 2)))))
-
- (process-job 3))
-
-(display "worker-threads test finished successfully\n")