Rework thread pools

Allow the thread pool to vary in size by basing it on a resource pool
of fixed size thread pools, which are similar to the previous thread
pool implementation.

Fixed size thread pools don't require fibers, but thread pools now
do. Some procedures work with either thread pool implementation.
This commit is contained in:
Christopher Baines 2025-05-16 11:48:41 +01:00
parent 7ba77010ae
commit 016f37f108
2 changed files with 360 additions and 419 deletions

View file

@ -27,29 +27,38 @@
#:use-module (rnrs bytevectors)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (set-thread-name
thread-name
thread-pool?
thread-pool-channel
thread-pool-arguments-parameter
thread-pool-proc-vector
make-thread-pool
call-with-thread
&thread-pool-timeout
&thread-pool-timeout-error
thread-pool-timeout-error-pool
thread-pool-timeout-error?
%thread-pool-default-timeout
make-thread-pool
thread-pool?
thread-pool-resource-pool
create-work-queue))
make-fixed-size-thread-pool
fixed-size-thread-pool?
fixed-size-thread-pool-channel
fixed-size-thread-pool-current-procedures
;; These procedures work for thread pools and fixed size
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
destroy-thread-pool
call-with-thread))
(define* (syscall->procedure return-type name argument-types
#:key library)
@ -147,28 +156,64 @@ from there, or #f if that would be an empty string."
(const "")))
(define-record-type <thread-pool>
(thread-pool channel arguments-parameter proc-vector)
(thread-pool resource-pool arguments-parameter)
thread-pool?
(channel thread-pool-channel)
(arguments-parameter thread-pool-arguments-parameter)
(proc-vector thread-pool-proc-vector)
(default-checkout-timeout
thread-pool-default-checkout-timeout))
(resource-pool thread-pool-resource-pool)
(arguments-parameter thread-pool-arguments-parameter-accessor))
(define* (make-thread-pool size
(define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'(pool)))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(define thread-pool-timeout-error-pool
(exception-accessor
&thread-pool-timeout
(record-accessor &thread-pool-timeout 'pool)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(define* (make-fixed-size-thread-pool size
#:key
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
delay-logger
duration-logger
thread-lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define channel
(make-channel))
(define param
(make-parameter #f))
@ -224,17 +269,18 @@ arguments of the thread pool procedure."
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((current-lifetime thread-lifetime))
(let ((exception?
(define (process channel args)
(let loop ()
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
('destroy #f)
((reply sent-time proc)
(when delay-logger
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second)
proc)
proc)))
(let* ((start-time (get-internal-real-time))
(response
@ -246,9 +292,6 @@ arguments of the thread pool procedure."
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
@ -275,13 +318,11 @@ arguments of the thread pool procedure."
internal-time-units-per-second)
vals))))))
#:unwind? #t)))
(put-message reply
response)
(vector-set! thread-proc-vector
thread-index
#f)
(let ((exception?
(match response
(('thread-pool-error duration _)
(when duration-logger
@ -290,32 +331,26 @@ arguments of the thread pool procedure."
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))))))
(unless (and expire-on-exception?
exception?)
(if (number? current-lifetime)
(unless (< current-lifetime 0)
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
#f))))
(if (and exception?
expire-on-exception?)
#t
(loop))))))))
(define (start-threads channel)
(for-each
(lambda (thread-index)
(define (start-thread index channel)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
name " w t " (number->string index))))
(const #t))
(let init ((args (if thread-initializer
(initializer/safe)
'())))
(let ((continue?
(with-exception-handler
(lambda (exn)
(simple-format
@ -323,54 +358,92 @@ arguments of the thread pool procedure."
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process thread-index channel args)))
#:unwind? #t)
(process channel args)))
#:unwind? #t)))
(when thread-destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota size)))
(when continue?
(init (if thread-initializer
(initializer/safe)
'()))))))))
(let ((channel (make-channel)))
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-threads channel)))
(start-threads channel))
(for-each
(lambda (i)
(start-thread i channel))
(iota size))
(thread-pool channel
(fixed-size-thread-pool channel
param
thread-proc-vector)))
thread-proc-vector
default-checkout-timeout))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'()))
(define* (make-thread-pool max-size
#:key
(min-size max-size)
scheduler
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(let ((resource-pool
(make-resource-pool
(lambda ()
(make-fixed-size-thread-pool
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
max-size
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(thread-pool resource-pool
param)))
(define* (call-with-thread record proc #:key duration-logger
(timeout (thread-pool-default-checkout-timeout
record))
(channel (thread-pool-channel record)))
(define* (call-with-thread thread-pool
proc
#:key
duration-logger
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately."
(let ((args ((thread-pool-arguments-parameter record))))
(if args
(apply proc args)
(let* ((reply (make-channel))
(define (handle-proc fixed-size-thread-pool
reply-channel
start-time
timeout)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
fixed-size-thread-pool)))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation channel
(list reply
(get-internal-real-time)
(put-operation request-channel
(list reply-channel
start-time
proc))
(const #t))))
@ -385,7 +458,8 @@ If already in the thread pool, call PROC immediately."
(raise-exception
(make-thread-pool-timeout-error)))
(match (get-message reply)
(let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
@ -393,213 +467,54 @@ If already in the thread pool, call PROC immediately."
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(apply values result))))))
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
(make-time time-duration 0 0))
(name "unnamed")
priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
(running-job-args (make-hash-table)))
(define get-thread-count
(cond
((number? thread-count-parameter)
(const thread-count-parameter))
((eq? thread-count-parameter #f)
;; Run one thread per job
(lambda ()
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(else
thread-count-parameter)))
(define process-job
(if priority<?
(lambda* (args #:key priority)
(with-mutex queue-mutex
(enq! queue (cons priority args))
(set-car!
queue
(stable-sort! (car queue)
(lambda (a b)
(priority<?
(car a)
(car b)))))
(sync-q! queue)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))
(lambda args
(with-mutex queue-mutex
(enq! queue args)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
(hash-count (const #t) running-job-args)))
(define (count-jobs)
(with-mutex queue-mutex
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(define (list-jobs)
(with-mutex queue-mutex
(append (if priority<?
(map cdr (car queue))
(list-copy (car queue)))
(hash-fold (lambda (key val result)
(if val
(cons val result)
result))
'()
running-job-args))))
(define (thread-process-job job-args)
(with-exception-handler
(lambda _ #f)
(lambda ()
(let ((args ((thread-pool-arguments-parameter thread-pool))))
(if args
(apply proc args)
(let ((start-time (get-internal-real-time))
(reply-channel (make-channel)))
(if (fixed-size-thread-pool? thread-pool)
(handle-proc thread-pool
reply-channel
start-time
checkout-timeout)
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"~A work queue, job raised exception ~A\n"
name job-args)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(if (and (resource-pool-timeout-error? exn)
(eq? (resource-pool-timeout-error-pool exn)
(thread-pool-resource-pool thread-pool)))
(raise-exception
(make-thread-pool-timeout-error thread-pool))
(raise-exception exn)))
(lambda ()
(apply proc job-args))))
#:unwind? #t))
(call-with-resource-from-pool (thread-pool-resource-pool
thread-pool)
(lambda (fixed-size-thread-pool)
(if checkout-timeout
(let ((remaining-time
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(if (< remaining-time checkout-timeout)
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f)))
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(define (start-thread thread-index)
(define (too-many-threads?)
(let ((running-jobs-count
(hash-count (lambda (index val)
(list? val))
running-job-args))
(desired-thread-count (get-thread-count)))
(>= running-jobs-count
desired-thread-count)))
(define (thread-idle-for-too-long? last-job-finished-at)
(time>=?
(time-difference (current-time time-monotonic)
last-job-finished-at)
thread-stop-delay))
(define (stop-thread)
(hash-remove! running-job-args
thread-index)
(unlock-mutex queue-mutex))
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t "
(number->string thread-index))))
(const #t))
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
(if (too-many-threads?)
(stop-thread)
(let ((job-args
(if (q-empty? queue)
;; #f from wait-condition-variable indicates a timeout
(if (wait-condition-variable
job-available
queue-mutex
(+ 9 (time-second (current-time))))
;; Another thread could have taken
;; the job in the mean time
(if (q-empty? queue)
#f
(if priority<?
(cdr (deq! queue))
(deq! queue)))
#f)
(if priority<?
(cdr (deq! queue))
(deq! queue)))))
(if job-args
(begin
(hash-set! running-job-args
thread-index
job-args)
(unlock-mutex queue-mutex)
(thread-process-job job-args)
(with-mutex queue-mutex
(hash-set! running-job-args
thread-index
#f))
(loop (current-time time-monotonic)))
(if (thread-idle-for-too-long? last-job-finished-at)
(stop-thread)
(begin
(unlock-mutex queue-mutex)
(loop last-job-finished-at))))))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
(let* ((thread-count
(hash-count (const #t) running-job-args))
(threads-to-start
(- desired-count thread-count)))
(when (> threads-to-start 0)
(for-each
(lambda (thread-index)
(when (eq? (hash-ref running-job-args
thread-index
'slot-free)
'slot-free)
(let* ((now (current-time time-monotonic))
(elapsed (time-difference now
previous-thread-started-at)))
(when (or (eq? #f thread-start-delay)
(time>=? elapsed thread-start-delay))
(set! previous-thread-started-at now)
(hash-set! running-job-args
thread-index
#f)
(start-thread thread-index)))))
(iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t")))
(const #t))
(while #t
(sleep 15)
(with-mutex queue-mutex
(let ((idle-threads (hash-count (lambda (index val)
(eq? #f val))
running-job-args)))
(when (= 0 idle-threads)
(start-new-threads-if-necessary (get-thread-count))))))))
(start-new-threads-if-necessary (get-thread-count)))
(values process-job count-jobs count-threads list-jobs)))
(define (destroy-thread-pool pool)
(if (fixed-size-thread-pool? pool)
(put-message
(fixed-size-thread-pool-channel pool)
'destroy)
(destroy-resource-pool
(thread-pool-resource-pool pool))))

View file

@ -6,10 +6,47 @@
(knots thread-pool))
(let ((thread-pool
(make-thread-pool 2)))
(make-fixed-size-thread-pool 2)))
(assert-equal
(call-with-thread
thread-pool
(lambda ()
4))
4))
(let ((thread-pool
(make-fixed-size-thread-pool
2
#:thread-initializer (const '(2)))))
(assert-equal
(call-with-thread
thread-pool
(lambda (num)
(* 2 num)))
4))
(let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
#t
(with-exception-handler
(lambda (exn)
(knots-exception? exn))
(lambda ()
(call-with-thread
thread-pool
(lambda ()
(+ 1 'a))))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2)))
(assert-equal
(call-with-thread
thread-pool
@ -17,13 +54,13 @@
4))
4))))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool
2
#:thread-initializer (const '(2)))))
(run-fibers-for-tests
(lambda ()
(assert-equal
(call-with-thread
thread-pool
@ -31,22 +68,11 @@
(* 2 num)))
4))))
(let ((process-job
count-jobs
count-threads
list-jobs
(create-work-queue
2
(lambda (i)
(* i 2)))))
(process-job 3))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal
#t
(with-exception-handler