605 lines
22 KiB
Scheme
605 lines
22 KiB
Scheme
;;; Guile Knots
|
|
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
|
|
;;;
|
|
;;; This file is part of Guile Knots.
|
|
;;;
|
|
;;; The Guile Knots is free software; you can redistribute it and/or
|
|
;;; modify it under the terms of the GNU General Public License as
|
|
;;; published by the Free Software Foundation; either version 3 of the
|
|
;;; License, or (at your option) any later version.
|
|
;;;
|
|
;;; The Guile Knots is distributed in the hope that it will be useful,
|
|
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;;; General Public License for more details.
|
|
;;;
|
|
;;; You should have received a copy of the GNU General Public License
|
|
;;; along with the guix-data-service. If not, see
|
|
;;; <http://www.gnu.org/licenses/>.
|
|
|
|
(define-module (knots thread-pool)
|
|
#:use-module (srfi srfi-1)
|
|
#:use-module (srfi srfi-9)
|
|
#:use-module (srfi srfi-19)
|
|
#:use-module (srfi srfi-71)
|
|
#:use-module (system foreign)
|
|
#:use-module (system base target)
|
|
#:use-module (rnrs bytevectors)
|
|
#:use-module (ice-9 q)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 threads)
|
|
#:use-module (fibers)
|
|
#:use-module (fibers timers)
|
|
#:use-module (fibers channels)
|
|
#:use-module (fibers operations)
|
|
#:use-module (knots)
|
|
#:export (set-thread-name
|
|
thread-name
|
|
|
|
thread-pool?
|
|
thread-pool-channel
|
|
thread-pool-arguments-parameter
|
|
thread-pool-proc-vector
|
|
|
|
make-thread-pool
|
|
call-with-thread
|
|
|
|
&thread-pool-timeout
|
|
thread-pool-timeout-error?
|
|
|
|
%thread-pool-default-timeout
|
|
|
|
create-work-queue))
|
|
|
|
(define* (syscall->procedure return-type name argument-types
|
|
#:key library)
|
|
"Return a procedure that wraps the C function NAME using the dynamic FFI,
|
|
and that returns two values: NAME's return value, and errno. When LIBRARY is
|
|
specified, look up NAME in that library rather than in the global symbol name
|
|
space.
|
|
|
|
If an error occurs while creating the binding, defer the error report until
|
|
the returned procedure is called."
|
|
(catch #t
|
|
(lambda ()
|
|
;; Note: When #:library is set, try it first and fall back to libc
|
|
;; proper. This is because libraries like libutil.so have been subsumed
|
|
;; by libc.so with glibc >= 2.34.
|
|
(let ((ptr (dynamic-func name
|
|
(if library
|
|
(or (false-if-exception
|
|
(dynamic-link library))
|
|
(dynamic-link))
|
|
(dynamic-link)))))
|
|
;; The #:return-errno? facility was introduced in Guile 2.0.12.
|
|
(pointer->procedure return-type ptr argument-types
|
|
#:return-errno? #t)))
|
|
(lambda args
|
|
(lambda _
|
|
(throw 'system-error name "~A" (list (strerror ENOSYS))
|
|
(list ENOSYS))))))
|
|
|
|
(define %prctl
|
|
;; Should it win the API contest against 'ioctl'? You tell us!
|
|
(syscall->procedure int "prctl"
|
|
(list int unsigned-long unsigned-long
|
|
unsigned-long unsigned-long)))
|
|
|
|
(define PR_SET_NAME 15) ;<linux/prctl.h>
|
|
(define PR_GET_NAME 16)
|
|
(define PR_SET_CHILD_SUBREAPER 36)
|
|
|
|
(define (set-child-subreaper!)
|
|
"Set the CHILD_SUBREAPER capability for the current process."
|
|
(%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0))
|
|
|
|
(define %max-thread-name-length
|
|
;; Maximum length in bytes of the process name, including the terminating
|
|
;; zero.
|
|
16)
|
|
|
|
(define (set-thread-name!/linux name)
|
|
"Set the name of the calling thread to NAME. NAME is truncated to 15
|
|
bytes."
|
|
(let ((ptr (string->pointer name)))
|
|
(let ((ret
|
|
err
|
|
(%prctl PR_SET_NAME
|
|
(pointer-address ptr) 0 0 0)))
|
|
(unless (zero? ret)
|
|
(throw 'set-process-name "set-process-name"
|
|
"set-process-name: ~A"
|
|
(list (strerror err))
|
|
(list err))))))
|
|
|
|
(define (bytes->string bytes)
|
|
"Read BYTES, a list of bytes, and return the null-terminated string decoded
|
|
from there, or #f if that would be an empty string."
|
|
(match (take-while (negate zero?) bytes)
|
|
(()
|
|
#f)
|
|
(non-zero
|
|
(list->string (map integer->char non-zero)))))
|
|
|
|
(define (thread-name/linux)
|
|
"Return the name of the calling thread as a string."
|
|
(let ((buf (make-bytevector %max-thread-name-length)))
|
|
(let ((ret
|
|
err
|
|
(%prctl PR_GET_NAME
|
|
(pointer-address (bytevector->pointer buf))
|
|
0 0 0)))
|
|
(if (zero? ret)
|
|
(bytes->string (bytevector->u8-list buf))
|
|
(throw 'process-name "process-name"
|
|
"process-name: ~A"
|
|
(list (strerror err))
|
|
(list err))))))
|
|
|
|
(define set-thread-name
|
|
(if (string-contains %host-type "linux")
|
|
set-thread-name!/linux
|
|
(const #f)))
|
|
|
|
(define thread-name
|
|
(if (string-contains %host-type "linux")
|
|
thread-name/linux
|
|
(const "")))
|
|
|
|
(define-record-type <thread-pool>
|
|
(thread-pool channel arguments-parameter proc-vector)
|
|
thread-pool?
|
|
(channel thread-pool-channel)
|
|
(arguments-parameter thread-pool-arguments-parameter)
|
|
(proc-vector thread-pool-proc-vector)
|
|
(default-checkout-timeout
|
|
thread-pool-default-checkout-timeout))
|
|
|
|
(define* (make-thread-pool size
|
|
#:key
|
|
thread-initializer
|
|
thread-destructor
|
|
(delay-logger (lambda _ #f))
|
|
(duration-logger (const #f))
|
|
thread-lifetime
|
|
(log-exception? (const #t))
|
|
(expire-on-exception? #f)
|
|
(name "unnamed")
|
|
(use-default-io-waiters? #t)
|
|
default-checkout-timeout)
|
|
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
|
arguments of the thread pool procedure."
|
|
(define param
|
|
(make-parameter #f))
|
|
|
|
(define thread-proc-vector
|
|
(make-vector size #f))
|
|
|
|
(define (initializer/safe)
|
|
(let ((args
|
|
(with-exception-handler
|
|
(lambda _ #f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception running initializer in thread pool (~A): ~A\n"
|
|
name
|
|
thread-initializer)
|
|
(print-backtrace-and-exception/knots exn)
|
|
(raise-exception exn))
|
|
thread-initializer))
|
|
#:unwind? #t)))
|
|
|
|
(if args
|
|
args
|
|
;; never give up, just keep retrying
|
|
(begin
|
|
(sleep 1)
|
|
(initializer/safe)))))
|
|
|
|
(define (destructor/safe args)
|
|
(let ((success?
|
|
(with-exception-handler
|
|
(lambda _ #f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception running destructor in thread pool (~A): ~A\n"
|
|
name
|
|
thread-destructor)
|
|
(print-backtrace-and-exception/knots exn)
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(apply thread-destructor args)
|
|
#t)))
|
|
#:unwind? #t)))
|
|
|
|
(or success?
|
|
#t
|
|
(begin
|
|
(sleep 1)
|
|
(destructor/safe args)))))
|
|
|
|
(define (process thread-index channel args)
|
|
(let loop ((current-lifetime thread-lifetime))
|
|
(let ((exception?
|
|
(match (get-message channel)
|
|
(((? channel? reply) sent-time (? procedure? proc))
|
|
(let ((time-delay
|
|
(- (get-internal-real-time)
|
|
sent-time)))
|
|
(delay-logger (/ time-delay
|
|
internal-time-units-per-second)
|
|
proc)
|
|
|
|
(let* ((start-time (get-internal-real-time))
|
|
(response
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(list 'thread-pool-error
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second)
|
|
exn))
|
|
(lambda ()
|
|
(vector-set! thread-proc-vector
|
|
thread-index
|
|
proc)
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(let ((stack
|
|
(match (fluid-ref %stacks)
|
|
((stack-tag . prompt-tag)
|
|
(make-stack #t
|
|
0 prompt-tag
|
|
0 (and prompt-tag 1)))
|
|
(_
|
|
(make-stack #t)))))
|
|
(raise-exception
|
|
(make-exception
|
|
exn
|
|
(make-knots-exception stack)))))
|
|
(lambda ()
|
|
(call-with-values
|
|
(lambda ()
|
|
(start-stack
|
|
#t
|
|
(apply proc args)))
|
|
(lambda vals
|
|
(cons (/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second)
|
|
vals))))))
|
|
#:unwind? #t)))
|
|
(put-message reply
|
|
response)
|
|
|
|
(vector-set! thread-proc-vector
|
|
thread-index
|
|
#f)
|
|
|
|
(match response
|
|
(('thread-pool-error duration _)
|
|
(when duration-logger
|
|
(duration-logger duration proc))
|
|
#t)
|
|
((duration . _)
|
|
(when duration-logger
|
|
(duration-logger duration proc))
|
|
#f))))))))
|
|
(unless (and expire-on-exception?
|
|
exception?)
|
|
(if (number? current-lifetime)
|
|
(unless (< current-lifetime 0)
|
|
(loop (if current-lifetime
|
|
(- current-lifetime 1)
|
|
#f)))
|
|
(loop #f))))))
|
|
|
|
(define (start-threads channel)
|
|
(for-each
|
|
(lambda (thread-index)
|
|
(call-with-new-thread
|
|
(lambda ()
|
|
(catch 'system-error
|
|
(lambda ()
|
|
(set-thread-name
|
|
(string-append
|
|
name " w t "
|
|
(number->string thread-index))))
|
|
(const #t))
|
|
|
|
(let init ((args (if thread-initializer
|
|
(initializer/safe)
|
|
'())))
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"knots: thread-pool: internal exception: ~A\n" exn))
|
|
(lambda ()
|
|
(parameterize ((param args))
|
|
(process thread-index channel args)))
|
|
#:unwind? #t)
|
|
|
|
(when thread-destructor
|
|
(destructor/safe args))
|
|
|
|
(init (initializer/safe))))))
|
|
(iota size)))
|
|
|
|
(let ((channel (make-channel)))
|
|
(if use-default-io-waiters?
|
|
(call-with-default-io-waiters
|
|
(lambda ()
|
|
(start-threads channel)))
|
|
(start-threads channel))
|
|
|
|
(thread-pool channel
|
|
param
|
|
thread-proc-vector)))
|
|
|
|
(define &thread-pool-timeout
|
|
(make-exception-type '&thread-pool-timeout
|
|
&error
|
|
'()))
|
|
|
|
(define make-thread-pool-timeout-error
|
|
(record-constructor &thread-pool-timeout))
|
|
|
|
(define thread-pool-timeout-error?
|
|
(record-predicate &thread-pool-timeout))
|
|
|
|
(define* (call-with-thread record proc #:key duration-logger
|
|
(timeout (thread-pool-default-checkout-timeout
|
|
record))
|
|
(channel (thread-pool-channel record)))
|
|
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
|
If already in the thread pool, call PROC immediately."
|
|
(let ((args ((thread-pool-arguments-parameter record))))
|
|
(if args
|
|
(apply proc args)
|
|
(let* ((reply (make-channel))
|
|
(operation-success?
|
|
(perform-operation
|
|
(let ((put
|
|
(wrap-operation
|
|
(put-operation channel
|
|
(list reply
|
|
(get-internal-real-time)
|
|
proc))
|
|
(const #t))))
|
|
|
|
(if timeout
|
|
(choice-operation
|
|
put
|
|
(wrap-operation (sleep-operation timeout)
|
|
(const #f)))
|
|
put)))))
|
|
|
|
(unless operation-success?
|
|
(raise-exception
|
|
(make-thread-pool-timeout-error)))
|
|
|
|
(match (get-message reply)
|
|
(('thread-pool-error duration exn)
|
|
(when duration-logger
|
|
(duration-logger duration))
|
|
(raise-exception exn))
|
|
((duration . result)
|
|
(when duration-logger
|
|
(duration-logger duration))
|
|
(apply values result)))))))
|
|
|
|
(define* (create-work-queue thread-count-parameter proc
|
|
#:key thread-start-delay
|
|
(thread-stop-delay
|
|
(make-time time-duration 0 0))
|
|
(name "unnamed")
|
|
priority<?)
|
|
(let ((queue (make-q))
|
|
(queue-mutex (make-mutex))
|
|
(job-available (make-condition-variable))
|
|
(running-job-args (make-hash-table)))
|
|
|
|
(define get-thread-count
|
|
(cond
|
|
((number? thread-count-parameter)
|
|
(const thread-count-parameter))
|
|
((eq? thread-count-parameter #f)
|
|
;; Run one thread per job
|
|
(lambda ()
|
|
(+ (q-length queue)
|
|
(hash-count (lambda (index val)
|
|
(list? val))
|
|
running-job-args))))
|
|
(else
|
|
thread-count-parameter)))
|
|
|
|
(define process-job
|
|
(if priority<?
|
|
(lambda* (args #:key priority)
|
|
(with-mutex queue-mutex
|
|
(enq! queue (cons priority args))
|
|
(set-car!
|
|
queue
|
|
(stable-sort! (car queue)
|
|
(lambda (a b)
|
|
(priority<?
|
|
(car a)
|
|
(car b)))))
|
|
(sync-q! queue)
|
|
(start-new-threads-if-necessary (get-thread-count))
|
|
(signal-condition-variable job-available)))
|
|
(lambda args
|
|
(with-mutex queue-mutex
|
|
(enq! queue args)
|
|
(start-new-threads-if-necessary (get-thread-count))
|
|
(signal-condition-variable job-available)))))
|
|
|
|
(define (count-threads)
|
|
(with-mutex queue-mutex
|
|
(hash-count (const #t) running-job-args)))
|
|
|
|
(define (count-jobs)
|
|
(with-mutex queue-mutex
|
|
(+ (q-length queue)
|
|
(hash-count (lambda (index val)
|
|
(list? val))
|
|
running-job-args))))
|
|
|
|
(define (list-jobs)
|
|
(with-mutex queue-mutex
|
|
(append (if priority<?
|
|
(map cdr (car queue))
|
|
(list-copy (car queue)))
|
|
(hash-fold (lambda (key val result)
|
|
(if val
|
|
(cons val result)
|
|
result))
|
|
'()
|
|
running-job-args))))
|
|
|
|
(define (thread-process-job job-args)
|
|
(with-exception-handler
|
|
(lambda _ #f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format (current-error-port)
|
|
"~A work queue, job raised exception ~A\n"
|
|
name job-args)
|
|
(print-backtrace-and-exception/knots exn)
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(apply proc job-args))))
|
|
#:unwind? #t))
|
|
|
|
(define (start-thread thread-index)
|
|
(define (too-many-threads?)
|
|
(let ((running-jobs-count
|
|
(hash-count (lambda (index val)
|
|
(list? val))
|
|
running-job-args))
|
|
(desired-thread-count (get-thread-count)))
|
|
|
|
(>= running-jobs-count
|
|
desired-thread-count)))
|
|
|
|
(define (thread-idle-for-too-long? last-job-finished-at)
|
|
(time>=?
|
|
(time-difference (current-time time-monotonic)
|
|
last-job-finished-at)
|
|
thread-stop-delay))
|
|
|
|
(define (stop-thread)
|
|
(hash-remove! running-job-args
|
|
thread-index)
|
|
(unlock-mutex queue-mutex))
|
|
|
|
(call-with-new-thread
|
|
(lambda ()
|
|
(catch 'system-error
|
|
(lambda ()
|
|
(set-thread-name
|
|
(string-append name " q t "
|
|
(number->string thread-index))))
|
|
(const #t))
|
|
|
|
(let loop ((last-job-finished-at (current-time time-monotonic)))
|
|
(lock-mutex queue-mutex)
|
|
|
|
(if (too-many-threads?)
|
|
(stop-thread)
|
|
(let ((job-args
|
|
(if (q-empty? queue)
|
|
;; #f from wait-condition-variable indicates a timeout
|
|
(if (wait-condition-variable
|
|
job-available
|
|
queue-mutex
|
|
(+ 9 (time-second (current-time))))
|
|
;; Another thread could have taken
|
|
;; the job in the mean time
|
|
(if (q-empty? queue)
|
|
#f
|
|
(if priority<?
|
|
(cdr (deq! queue))
|
|
(deq! queue)))
|
|
#f)
|
|
(if priority<?
|
|
(cdr (deq! queue))
|
|
(deq! queue)))))
|
|
|
|
(if job-args
|
|
(begin
|
|
(hash-set! running-job-args
|
|
thread-index
|
|
job-args)
|
|
|
|
(unlock-mutex queue-mutex)
|
|
(thread-process-job job-args)
|
|
|
|
(with-mutex queue-mutex
|
|
(hash-set! running-job-args
|
|
thread-index
|
|
#f))
|
|
|
|
(loop (current-time time-monotonic)))
|
|
(if (thread-idle-for-too-long? last-job-finished-at)
|
|
(stop-thread)
|
|
(begin
|
|
(unlock-mutex queue-mutex)
|
|
|
|
(loop last-job-finished-at))))))))))
|
|
|
|
|
|
(define start-new-threads-if-necessary
|
|
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
|
|
(lambda (desired-count)
|
|
(let* ((thread-count
|
|
(hash-count (const #t) running-job-args))
|
|
(threads-to-start
|
|
(- desired-count thread-count)))
|
|
(when (> threads-to-start 0)
|
|
(for-each
|
|
(lambda (thread-index)
|
|
(when (eq? (hash-ref running-job-args
|
|
thread-index
|
|
'slot-free)
|
|
'slot-free)
|
|
(let* ((now (current-time time-monotonic))
|
|
(elapsed (time-difference now
|
|
previous-thread-started-at)))
|
|
(when (or (eq? #f thread-start-delay)
|
|
(time>=? elapsed thread-start-delay))
|
|
(set! previous-thread-started-at now)
|
|
(hash-set! running-job-args
|
|
thread-index
|
|
#f)
|
|
(start-thread thread-index)))))
|
|
(iota desired-count)))))))
|
|
|
|
(if (procedure? thread-count-parameter)
|
|
(call-with-new-thread
|
|
(lambda ()
|
|
(catch 'system-error
|
|
(lambda ()
|
|
(set-thread-name
|
|
(string-append name " q t")))
|
|
(const #t))
|
|
|
|
(while #t
|
|
(sleep 15)
|
|
(with-mutex queue-mutex
|
|
(let ((idle-threads (hash-count (lambda (index val)
|
|
(eq? #f val))
|
|
running-job-args)))
|
|
(when (= 0 idle-threads)
|
|
(start-new-threads-if-necessary (get-thread-count))))))))
|
|
(start-new-threads-if-necessary (get-thread-count)))
|
|
|
|
(values process-job count-jobs count-threads list-jobs)))
|