Add worker thread utils

This commit is contained in:
Christopher Baines 2024-07-20 00:20:20 +01:00
parent 7df7fd3e52
commit 2a80304e0c

View file

@ -29,6 +29,8 @@
#:use-module (ice-9 ports internal) #:use-module (ice-9 ports internal)
#:use-module (ice-9 suspendable-ports) #:use-module (ice-9 suspendable-ports)
#:use-module (lzlib) #:use-module (lzlib)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers operations) #:use-module (fibers operations)
@ -48,6 +50,11 @@
with-resource-from-pool with-resource-from-pool
resource-pool-stats resource-pool-stats
make-worker-thread-channel
%worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
fibers-delay fibers-delay
fibers-force fibers-force
@ -464,6 +471,231 @@ available. Return the resource once PROC has returned."
(raise-exception (raise-exception
(make-resource-pool-timeout-error)))))) (make-resource-pool-timeout-error))))))
(define %worker-thread-args
(make-parameter #f))
(define* (make-worker-thread-channel initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
(duration-logger (const #f))
destructor
lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
(define thread-proc-vector
(make-vector parallelism #f))
(define (initializer/safe)
(let ((args
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running initializer in worker thread (~A): ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t)))
(if args
args
;; never give up, just keep retrying
(begin
(sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running destructor in worker thread (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(apply destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((current-lifetime lifetime))
(let ((exception?
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'worker-thread-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-throw-handler #t
(lambda ()
(call-with-values
(lambda ()
(start-stack
'worker-thread
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))
(lambda args
(when (match args
(('%exception exn)
(log-exception? exn))
(_ #t))
(simple-format
(current-error-port)
"worker-thread: exception: ~A\n" args)
(backtrace)))))
#:unwind? #t)))
(put-message reply
response)
(vector-set! thread-proc-vector
thread-index
#f)
(match response
(('worker-thread-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))))))
(unless (and expire-on-exception?
exception?)
(if (number? current-lifetime)
(unless (< current-lifetime 0)
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(let ((channel (make-channel)))
(for-each
(lambda (thread-index)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
(const #t))
(let init ((args (initializer/safe)))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
(process thread-index channel args)))
#:unwind? #t)
(when destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota parallelism))
(values channel
thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout
&error
'()))
(define make-worker-thread-timeout-error
(record-constructor &worker-thread-timeout))
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
(define %worker-thread-default-timeout
(make-parameter 30))
(define* (call-with-worker-thread channel proc #:key duration-logger
(timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation channel
(list reply
(get-internal-real-time)
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-worker-thread-timeout-error)))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(define-record-type <fibers-promise> (define-record-type <fibers-promise>
(make-fibers-promise thunk values-box evaluated-condition) (make-fibers-promise thunk values-box evaluated-condition)
fibers-promise? fibers-promise?