guile-knots/knots/thread-pool.scm
Christopher Baines c2e1cd94d7
All checks were successful
/ test (push) Successful in 6s
Adjust the delay and duration loggers for thread pools
Based on the changes in resource pools.
2026-03-23 13:54:40 +00:00

699 lines
26 KiB
Scheme

;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots thread-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-71)
#:use-module (system foreign)
#:use-module (system base target)
#:use-module (rnrs bytevectors)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (set-thread-name
thread-name
&thread-pool-timeout-error
thread-pool-timeout-error-pool
thread-pool-timeout-error?
make-thread-pool
thread-pool?
thread-pool-resource-pool
make-fixed-size-thread-pool
fixed-size-thread-pool?
fixed-size-thread-pool-channel
fixed-size-thread-pool-current-procedures
;; These procedures work for thread pools and fixed size
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
thread-pool-delay-logger
thread-pool-duration-logger
destroy-thread-pool
call-with-thread))
(define* (syscall->procedure return-type name argument-types
#:key library)
"Return a procedure that wraps the C function NAME using the dynamic FFI,
and that returns two values: NAME's return value, and errno. When LIBRARY is
specified, look up NAME in that library rather than in the global symbol name
space.
If an error occurs while creating the binding, defer the error report until
the returned procedure is called."
(catch #t
(lambda ()
;; Note: When #:library is set, try it first and fall back to libc
;; proper. This is because libraries like libutil.so have been subsumed
;; by libc.so with glibc >= 2.34.
(let ((ptr (dynamic-func name
(if library
(or (false-if-exception
(dynamic-link library))
(dynamic-link))
(dynamic-link)))))
;; The #:return-errno? facility was introduced in Guile 2.0.12.
(pointer->procedure return-type ptr argument-types
#:return-errno? #t)))
(lambda args
(lambda _
(throw 'system-error name "~A" (list (strerror ENOSYS))
(list ENOSYS))))))
(define %prctl
;; Should it win the API contest against 'ioctl'? You tell us!
(syscall->procedure int "prctl"
(list int unsigned-long unsigned-long
unsigned-long unsigned-long)))
(define PR_SET_NAME 15) ;<linux/prctl.h>
(define PR_GET_NAME 16)
(define PR_SET_CHILD_SUBREAPER 36)
(define (set-child-subreaper!)
"Set the CHILD_SUBREAPER capability for the current process."
(%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0))
(define %max-thread-name-length
;; Maximum length in bytes of the process name, including the terminating
;; zero.
16)
(define (set-thread-name!/linux name)
"Set the name of the calling thread to NAME. NAME is truncated to 15
bytes."
(let ((ptr (string->pointer name)))
(let ((ret
err
(%prctl PR_SET_NAME
(pointer-address ptr) 0 0 0)))
(unless (zero? ret)
(throw 'set-process-name "set-process-name"
"set-process-name: ~A"
(list (strerror err))
(list err))))))
(define (bytes->string bytes)
"Read BYTES, a list of bytes, and return the null-terminated string decoded
from there, or #f if that would be an empty string."
(match (take-while (negate zero?) bytes)
(()
#f)
(non-zero
(list->string (map integer->char non-zero)))))
(define (thread-name/linux)
"Return the name of the calling thread as a string."
(let ((buf (make-bytevector %max-thread-name-length)))
(let ((ret
err
(%prctl PR_GET_NAME
(pointer-address (bytevector->pointer buf))
0 0 0)))
(if (zero? ret)
(bytes->string (bytevector->u8-list buf))
(throw 'process-name "process-name"
"process-name: ~A"
(list (strerror err))
(list err))))))
(define set-thread-name
(if (string-contains %host-type "linux")
set-thread-name!/linux
(const #f)))
(define thread-name
(if (string-contains %host-type "linux")
thread-name/linux
(const "")))
(define-record-type <thread-pool>
(thread-pool resource-pool arguments-parameter)
thread-pool?
(resource-pool thread-pool-resource-pool)
(arguments-parameter thread-pool-arguments-parameter-accessor))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'thread-pool?))
'documentation
"Return @code{#t} if OBJ is a @code{<thread-pool>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'thread-pool-resource-pool))
'documentation
"Return the underlying resource pool of the thread pool.")
(define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout delay-logger
duration-logger threads)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout)
(delay-logger fixed-size-thread-pool-delay-logger)
(duration-logger fixed-size-thread-pool-duration-logger)
(threads fixed-size-thread-pool-threads))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool?))
'documentation
"Return @code{#t} if OBJ is a @code{<fixed-size-thread-pool>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool-channel))
'documentation
"Return the channel of the fixed-size thread pool.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'fixed-size-thread-pool-current-procedures))
'documentation
"Return the current procedures vector of the fixed-size thread pool.")
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
"Return the arguments parameter for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
"Return the default checkout timeout for POOL."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define (thread-pool-delay-logger pool)
"Return the delay logger for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-delay-logger pool)
(resource-pool-delay-logger
(thread-pool-resource-pool pool))))
(define (thread-pool-duration-logger pool)
"Return the duration logger for POOL, dispatching on pool type."
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-duration-logger pool)
(resource-pool-duration-logger
(thread-pool-resource-pool pool))))
(define &thread-pool-timeout-error
(make-exception-type '&thread-pool-timeout-error
&error
'(pool)))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout-error))
(define thread-pool-timeout-error-pool
(exception-accessor
&thread-pool-timeout-error
(record-accessor &thread-pool-timeout-error 'pool)))
(set-procedure-property! thread-pool-timeout-error-pool 'documentation
"Return the pool from a @code{&thread-pool-timeout-error} exception.")
(define thread-pool-timeout-error?
(exception-predicate &thread-pool-timeout-error))
(set-procedure-property! thread-pool-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&thread-pool-timeout-error} exception.")
(define* (make-fixed-size-thread-pool size
#:key
thread-initializer
thread-destructor
delay-logger
duration-logger
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Create a pool of SIZE threads started immediately. Use
@code{call-with-thread} to run a procedure in one of the threads.
Optional keyword arguments:
@table @code
@item #:thread-initializer
A thunk called once when each thread starts. Its return value is
passed as extra arguments to every procedure run in that thread.
Defaults to @code{#f} (no extra arguments).
@item #:thread-destructor
A procedure called with the value returned by @code{#:thread-initializer}
when a thread exits. Defaults to @code{#f}.
@item #:thread-lifetime
Maximum number of procedures a thread will run before restarting (and
re-running @code{#:thread-initializer}). Defaults to @code{#f} (no
limit).
@item #:expire-on-exception?
When @code{#t}, replace a thread after any unhandled exception.
Defaults to @code{#f}.
@item #:use-default-io-waiters?
When @code{#t} (the default), each thread uses blocking I/O waiters so
that port reads and writes block the thread rather than trying to
suspend a fiber.
@item #:name
String used in thread names and log messages. Defaults to
@code{\"unnamed\"}.
@item #:default-checkout-timeout
Seconds to wait for a free thread slot before raising
@code{&thread-pool-timeout-error}. Defaults to @code{#f} (wait
forever).
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a thread to become available.
@item #:duration-logger
Called as @code{(duration-logger seconds)} after each procedure
completes, whether it returned normally or raised an exception.
@end table"
(define channel
(make-channel))
(define param
(make-parameter #f))
(define thread-proc-vector
(make-vector size #f))
(define (initializer/safe)
(let ((args
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running initializer in thread pool (~A): ~A\n"
name
thread-initializer)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
thread-initializer))
#:unwind? #t)))
(if args
args
;; never give up, just keep retrying
(begin
(sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running destructor in thread pool (~A): ~A\n"
name
thread-destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply thread-destructor args)
#t)))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((lifetime thread-lifetime))
(match (get-message channel)
('destroy #f)
((reply proc)
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'thread-pool-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack
#t
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))))
#:unwind? #t)))
(vector-set! thread-proc-vector
thread-index
#f)
(put-message reply
response)
(let ((exception?
(match response
(('thread-pool-error duration _)
(when duration-logger
(duration-logger duration))
#t)
((duration . _)
(when duration-logger
(duration-logger duration))
#f))))
(if (and exception?
expire-on-exception?)
#t
(if lifetime
(if (<= lifetime 1)
#t
(loop (- lifetime 1)))
(loop lifetime)))))))))
(define (start-thread index channel)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t " (number->string index))))
(const #t))
(let init ((args (if thread-initializer
(initializer/safe)
'())))
(let ((continue?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process index channel args)))
#:unwind? #t)))
(when thread-destructor
(destructor/safe args))
(when continue?
(init (if thread-initializer
(initializer/safe)
'()))))))))
(define threads
(map (lambda (i)
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size)))
(fixed-size-thread-pool channel
param
thread-proc-vector
default-checkout-timeout
delay-logger
duration-logger
threads))
(define* (make-thread-pool max-size
#:key
(min-size max-size)
scheduler
thread-initializer
thread-destructor
delay-logger
duration-logger
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout
default-max-waiters)
"Create a dynamic thread pool with up to MAX-SIZE threads. Use
@code{call-with-thread} to run a procedure in one of the threads.
Unlike @code{make-fixed-size-thread-pool}, threads are created on
demand and may be reclaimed when idle (controlled by @code{#:min-size}
and the resource pool's idle management).
Accepts the same @code{#:thread-initializer}, @code{#:thread-destructor},
@code{#:thread-lifetime}, @code{#:expire-on-exception?},
@code{#:use-default-io-waiters?}, @code{#:name},
@code{#:default-checkout-timeout}, @code{#:delay-logger}, and
@code{#:duration-logger} arguments as @code{make-fixed-size-thread-pool},
plus:
@table @code
@item #:min-size
Minimum number of threads to keep alive. Defaults to MAX-SIZE (i.e.@:
the pool is pre-filled and never shrinks).
@item #:scheduler
Fibers scheduler for the pool's internal resource pool fiber. Defaults
to the current scheduler.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a thread. Raises
@code{&thread-pool-timeout-error} when exceeded. Defaults to
@code{#f} (no limit).
@end table"
(define param
(make-parameter #f))
(let ((resource-pool
(make-resource-pool
(lambda ()
(make-fixed-size-thread-pool
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
max-size
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:lifetime thread-lifetime
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout
#:default-max-waiters default-max-waiters)))
(thread-pool resource-pool
param)))
(define* (call-with-thread thread-pool
proc
#:key
(delay-logger
(thread-pool-delay-logger thread-pool))
(duration-logger
(thread-pool-duration-logger thread-pool))
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Run PROC in THREAD-POOL and return its values, blocking until
complete. If called from within a thread that already belongs to
THREAD-POOL, PROC is called directly in that thread.
Optional keyword arguments:
@table @code
@item #:checkout-timeout
Seconds to wait for a free thread before raising
@code{&thread-pool-timeout-error}. Defaults to the pool's
@code{#:default-checkout-timeout}.
@item #:max-waiters
Maximum number of fibers that may queue waiting for a thread (for
dynamic pools). Defaults to the pool's @code{#:default-max-waiters}.
@item #:destroy-thread-on-exception?
When @code{#t}, destroy the thread after PROC raises an exception.
Equivalent to per-call @code{#:expire-on-exception?}. Defaults to
@code{#f}.
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a thread to become available. Defaults to the pool's
@code{#:delay-logger} if not specified.
@item #:duration-logger
Called as @code{(duration-logger seconds)} after PROC completes
(whether or not it raised an exception). Defaults to the pool's
@code{#:duration-logger} if not specified.
@item #:channel
Override the channel used to communicate with the thread.
@end table"
(define (handle-proc fixed-size-thread-pool
reply-channel
start-time
timeout
delay-logger)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
fixed-size-thread-pool)))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation request-channel
(list reply-channel
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-thread-pool-timeout-error)))
(when delay-logger
(delay-logger
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result))))))
(let ((args ((thread-pool-arguments-parameter thread-pool))))
(if args
(apply proc args)
(let ((start-time (get-internal-real-time))
(reply-channel (make-channel)))
(if (fixed-size-thread-pool? thread-pool)
(handle-proc thread-pool
reply-channel
start-time
checkout-timeout
delay-logger)
(with-exception-handler
(lambda (exn)
(if (and (resource-pool-timeout-error? exn)
(eq? (resource-pool-timeout-error-pool exn)
(thread-pool-resource-pool thread-pool)))
(raise-exception
(make-thread-pool-timeout-error thread-pool))
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool (thread-pool-resource-pool
thread-pool)
(lambda (fixed-size-thread-pool)
(if checkout-timeout
(let ((remaining-time
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(if (< remaining-time checkout-timeout)
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time
#f)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f
#f)))
#:delay-logger delay-logger
#:duration-logger #f
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(define (destroy-thread-pool pool)
"Destroy POOL, stopping all of its threads and calling the destructor
if specified. This procedure will block until the destruction is
complete."
(if (fixed-size-thread-pool? pool)
(let ((channel (fixed-size-thread-pool-channel pool))
(threads (fixed-size-thread-pool-threads pool)))
(for-each (lambda _ (put-message channel 'destroy)) threads)
(for-each join-thread threads))
(destroy-resource-pool
(thread-pool-resource-pool pool))))