Rename worker threads to thread pool
I think this needs more work, maybe the thread pool should be more similar to the resource pool, but I think the name change is still helpful. Maybe there's a need for a variable size thread pool and that can better integrate with the work queue.
This commit is contained in:
parent
dcb56ee2c5
commit
d572f591a3
4 changed files with 111 additions and 97 deletions
|
@ -1,594 +0,0 @@
|
|||
;;; Guile Knots
|
||||
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
|
||||
;;;
|
||||
;;; This file is part of Guile Knots.
|
||||
;;;
|
||||
;;; The Guile Knots is free software; you can redistribute it and/or
|
||||
;;; modify it under the terms of the GNU General Public License as
|
||||
;;; published by the Free Software Foundation; either version 3 of the
|
||||
;;; License, or (at your option) any later version.
|
||||
;;;
|
||||
;;; The Guile Knots is distributed in the hope that it will be useful,
|
||||
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;;; General Public License for more details.
|
||||
;;;
|
||||
;;; You should have received a copy of the GNU General Public License
|
||||
;;; along with the guix-data-service. If not, see
|
||||
;;; <http://www.gnu.org/licenses/>.
|
||||
|
||||
(define-module (knots worker-threads)
|
||||
#:use-module (srfi srfi-1)
|
||||
#:use-module (srfi srfi-9)
|
||||
#:use-module (srfi srfi-19)
|
||||
#:use-module (srfi srfi-71)
|
||||
#:use-module (system foreign)
|
||||
#:use-module (system base target)
|
||||
#:use-module (rnrs bytevectors)
|
||||
#:use-module (ice-9 q)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 threads)
|
||||
#:use-module (fibers)
|
||||
#:use-module (fibers timers)
|
||||
#:use-module (fibers channels)
|
||||
#:use-module (fibers operations)
|
||||
#:export (set-thread-name
|
||||
thread-name
|
||||
|
||||
worker-thread-set?
|
||||
worker-thread-set-channel
|
||||
worker-thread-set-arguments-parameter
|
||||
worker-thread-set-thread-proc-vector
|
||||
|
||||
make-worker-thread-set
|
||||
call-with-worker-thread
|
||||
|
||||
&worker-thread-timeout
|
||||
worker-thread-timeout-error?
|
||||
|
||||
%worker-thread-default-timeout
|
||||
|
||||
create-work-queue))
|
||||
|
||||
(define* (syscall->procedure return-type name argument-types
|
||||
#:key library)
|
||||
"Return a procedure that wraps the C function NAME using the dynamic FFI,
|
||||
and that returns two values: NAME's return value, and errno. When LIBRARY is
|
||||
specified, look up NAME in that library rather than in the global symbol name
|
||||
space.
|
||||
|
||||
If an error occurs while creating the binding, defer the error report until
|
||||
the returned procedure is called."
|
||||
(catch #t
|
||||
(lambda ()
|
||||
;; Note: When #:library is set, try it first and fall back to libc
|
||||
;; proper. This is because libraries like libutil.so have been subsumed
|
||||
;; by libc.so with glibc >= 2.34.
|
||||
(let ((ptr (dynamic-func name
|
||||
(if library
|
||||
(or (false-if-exception
|
||||
(dynamic-link library))
|
||||
(dynamic-link))
|
||||
(dynamic-link)))))
|
||||
;; The #:return-errno? facility was introduced in Guile 2.0.12.
|
||||
(pointer->procedure return-type ptr argument-types
|
||||
#:return-errno? #t)))
|
||||
(lambda args
|
||||
(lambda _
|
||||
(throw 'system-error name "~A" (list (strerror ENOSYS))
|
||||
(list ENOSYS))))))
|
||||
|
||||
(define %prctl
|
||||
;; Should it win the API contest against 'ioctl'? You tell us!
|
||||
(syscall->procedure int "prctl"
|
||||
(list int unsigned-long unsigned-long
|
||||
unsigned-long unsigned-long)))
|
||||
|
||||
(define PR_SET_NAME 15) ;<linux/prctl.h>
|
||||
(define PR_GET_NAME 16)
|
||||
(define PR_SET_CHILD_SUBREAPER 36)
|
||||
|
||||
(define (set-child-subreaper!)
|
||||
"Set the CHILD_SUBREAPER capability for the current process."
|
||||
(%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0))
|
||||
|
||||
(define %max-thread-name-length
|
||||
;; Maximum length in bytes of the process name, including the terminating
|
||||
;; zero.
|
||||
16)
|
||||
|
||||
(define (set-thread-name!/linux name)
|
||||
"Set the name of the calling thread to NAME. NAME is truncated to 15
|
||||
bytes."
|
||||
(let ((ptr (string->pointer name)))
|
||||
(let ((ret
|
||||
err
|
||||
(%prctl PR_SET_NAME
|
||||
(pointer-address ptr) 0 0 0)))
|
||||
(unless (zero? ret)
|
||||
(throw 'set-process-name "set-process-name"
|
||||
"set-process-name: ~A"
|
||||
(list (strerror err))
|
||||
(list err))))))
|
||||
|
||||
(define (bytes->string bytes)
|
||||
"Read BYTES, a list of bytes, and return the null-terminated string decoded
|
||||
from there, or #f if that would be an empty string."
|
||||
(match (take-while (negate zero?) bytes)
|
||||
(()
|
||||
#f)
|
||||
(non-zero
|
||||
(list->string (map integer->char non-zero)))))
|
||||
|
||||
(define (thread-name/linux)
|
||||
"Return the name of the calling thread as a string."
|
||||
(let ((buf (make-bytevector %max-thread-name-length)))
|
||||
(let ((ret
|
||||
err
|
||||
(%prctl PR_GET_NAME
|
||||
(pointer-address (bytevector->pointer buf))
|
||||
0 0 0)))
|
||||
(if (zero? ret)
|
||||
(bytes->string (bytevector->u8-list buf))
|
||||
(throw 'process-name "process-name"
|
||||
"process-name: ~A"
|
||||
(list (strerror err))
|
||||
(list err))))))
|
||||
|
||||
(define set-thread-name
|
||||
(if (string-contains %host-type "linux")
|
||||
set-thread-name!/linux
|
||||
(const #f)))
|
||||
|
||||
(define thread-name
|
||||
(if (string-contains %host-type "linux")
|
||||
thread-name/linux
|
||||
(const "")))
|
||||
|
||||
(define-record-type <worker-thread-set>
|
||||
(worker-thread-set channel
|
||||
arguments-parameter
|
||||
thread-proc-vector)
|
||||
worker-thread-set?
|
||||
(channel worker-thread-set-channel)
|
||||
(arguments-parameter worker-thread-set-arguments-parameter)
|
||||
(thread-proc-vector worker-thread-set-thread-proc-vector))
|
||||
|
||||
(define* (make-worker-thread-set initializer
|
||||
#:key (parallelism 1)
|
||||
(delay-logger (lambda _ #f))
|
||||
(duration-logger (const #f))
|
||||
destructor
|
||||
lifetime
|
||||
(log-exception? (const #t))
|
||||
(expire-on-exception? #f)
|
||||
(name "unnamed"))
|
||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||
arguments of the worker thread procedure."
|
||||
(define param
|
||||
(make-parameter #f))
|
||||
|
||||
(define thread-proc-vector
|
||||
(make-vector parallelism #f))
|
||||
|
||||
(define (initializer/safe)
|
||||
(let ((args
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"exception running initializer in worker thread (~A): ~A:\n ~A\n"
|
||||
name
|
||||
initializer
|
||||
exn)
|
||||
#f)
|
||||
(lambda ()
|
||||
(with-throw-handler #t
|
||||
initializer
|
||||
(lambda args
|
||||
(backtrace))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(if args
|
||||
args
|
||||
;; never give up, just keep retrying
|
||||
(begin
|
||||
(sleep 1)
|
||||
(initializer/safe)))))
|
||||
|
||||
(define (destructor/safe args)
|
||||
(let ((success?
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"exception running destructor in worker thread (~A): ~A:\n ~A\n"
|
||||
name
|
||||
destructor
|
||||
exn)
|
||||
#f)
|
||||
(lambda ()
|
||||
(with-throw-handler #t
|
||||
(lambda ()
|
||||
(apply destructor args)
|
||||
#t)
|
||||
(lambda _
|
||||
(backtrace))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(or success?
|
||||
#t
|
||||
(begin
|
||||
(sleep 1)
|
||||
(destructor/safe args)))))
|
||||
|
||||
(define (process thread-index channel args)
|
||||
(let loop ((current-lifetime lifetime))
|
||||
(let ((exception?
|
||||
(match (get-message channel)
|
||||
(((? channel? reply) sent-time (? procedure? proc))
|
||||
(let ((time-delay
|
||||
(- (get-internal-real-time)
|
||||
sent-time)))
|
||||
(delay-logger (/ time-delay
|
||||
internal-time-units-per-second))
|
||||
|
||||
(let* ((start-time (get-internal-real-time))
|
||||
(response
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(list 'worker-thread-error
|
||||
(/ (- (get-internal-real-time)
|
||||
start-time)
|
||||
internal-time-units-per-second)
|
||||
exn))
|
||||
(lambda ()
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
proc)
|
||||
(with-throw-handler #t
|
||||
(lambda ()
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(start-stack
|
||||
'worker-thread
|
||||
(apply proc args)))
|
||||
(lambda vals
|
||||
(cons (/ (- (get-internal-real-time)
|
||||
start-time)
|
||||
internal-time-units-per-second)
|
||||
vals))))
|
||||
(lambda args
|
||||
(when (match args
|
||||
(('%exception exn)
|
||||
(log-exception? exn))
|
||||
(_ #t))
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"worker-thread: exception: ~A\n" args)
|
||||
(backtrace)))))
|
||||
#:unwind? #t)))
|
||||
(put-message reply
|
||||
response)
|
||||
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
#f)
|
||||
|
||||
(match response
|
||||
(('worker-thread-error duration _)
|
||||
(when duration-logger
|
||||
(duration-logger duration proc))
|
||||
#t)
|
||||
((duration . _)
|
||||
(when duration-logger
|
||||
(duration-logger duration proc))
|
||||
#f))))))))
|
||||
(unless (and expire-on-exception?
|
||||
exception?)
|
||||
(if (number? current-lifetime)
|
||||
(unless (< current-lifetime 0)
|
||||
(loop (if current-lifetime
|
||||
(- current-lifetime 1)
|
||||
#f)))
|
||||
(loop #f))))))
|
||||
|
||||
(let ((channel (make-channel)))
|
||||
(for-each
|
||||
(lambda (thread-index)
|
||||
(call-with-new-thread
|
||||
(lambda ()
|
||||
(catch 'system-error
|
||||
(lambda ()
|
||||
(set-thread-name
|
||||
(string-append
|
||||
name " w t "
|
||||
(number->string thread-index))))
|
||||
(const #t))
|
||||
|
||||
(let init ((args (initializer/safe)))
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"worker-thread-channel: exception: ~A\n" exn))
|
||||
(lambda ()
|
||||
(parameterize ((param args))
|
||||
(process thread-index channel args)))
|
||||
#:unwind? #t)
|
||||
|
||||
(when destructor
|
||||
(destructor/safe args))
|
||||
|
||||
(init (initializer/safe))))))
|
||||
(iota parallelism))
|
||||
|
||||
(worker-thread-set channel
|
||||
param
|
||||
thread-proc-vector)))
|
||||
|
||||
(define &worker-thread-timeout
|
||||
(make-exception-type '&worker-thread-timeout
|
||||
&error
|
||||
'()))
|
||||
|
||||
(define make-worker-thread-timeout-error
|
||||
(record-constructor &worker-thread-timeout))
|
||||
|
||||
(define worker-thread-timeout-error?
|
||||
(record-predicate &worker-thread-timeout))
|
||||
|
||||
(define %worker-thread-default-timeout
|
||||
(make-parameter 30))
|
||||
|
||||
(define* (call-with-worker-thread record proc #:key duration-logger
|
||||
(timeout (%worker-thread-default-timeout))
|
||||
(channel (worker-thread-set-channel record)))
|
||||
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
|
||||
If already in the worker thread, call PROC immediately."
|
||||
(let ((args ((worker-thread-set-arguments-parameter record))))
|
||||
(if args
|
||||
(apply proc args)
|
||||
(let* ((reply (make-channel))
|
||||
(operation-success?
|
||||
(perform-operation
|
||||
(let ((put
|
||||
(wrap-operation
|
||||
(put-operation channel
|
||||
(list reply
|
||||
(get-internal-real-time)
|
||||
proc))
|
||||
(const #t))))
|
||||
|
||||
(if timeout
|
||||
(choice-operation
|
||||
put
|
||||
(wrap-operation (sleep-operation timeout)
|
||||
(const #f)))
|
||||
put)))))
|
||||
|
||||
(unless operation-success?
|
||||
(raise-exception
|
||||
(make-worker-thread-timeout-error)))
|
||||
|
||||
(match (get-message reply)
|
||||
(('worker-thread-error duration exn)
|
||||
(when duration-logger
|
||||
(duration-logger duration))
|
||||
(raise-exception exn))
|
||||
((duration . result)
|
||||
(when duration-logger
|
||||
(duration-logger duration))
|
||||
(apply values result)))))))
|
||||
|
||||
(define* (create-work-queue thread-count-parameter proc
|
||||
#:key thread-start-delay
|
||||
(thread-stop-delay
|
||||
(make-time time-duration 0 0))
|
||||
(name "unnamed")
|
||||
priority<?)
|
||||
(let ((queue (make-q))
|
||||
(queue-mutex (make-mutex))
|
||||
(job-available (make-condition-variable))
|
||||
(running-job-args (make-hash-table)))
|
||||
|
||||
(define get-thread-count
|
||||
(cond
|
||||
((number? thread-count-parameter)
|
||||
(const thread-count-parameter))
|
||||
((eq? thread-count-parameter #f)
|
||||
;; Run one thread per job
|
||||
(lambda ()
|
||||
(+ (q-length queue)
|
||||
(hash-count (lambda (index val)
|
||||
(list? val))
|
||||
running-job-args))))
|
||||
(else
|
||||
thread-count-parameter)))
|
||||
|
||||
(define process-job
|
||||
(if priority<?
|
||||
(lambda* (args #:key priority)
|
||||
(with-mutex queue-mutex
|
||||
(enq! queue (cons priority args))
|
||||
(set-car!
|
||||
queue
|
||||
(stable-sort! (car queue)
|
||||
(lambda (a b)
|
||||
(priority<?
|
||||
(car a)
|
||||
(car b)))))
|
||||
(sync-q! queue)
|
||||
(start-new-threads-if-necessary (get-thread-count))
|
||||
(signal-condition-variable job-available)))
|
||||
(lambda args
|
||||
(with-mutex queue-mutex
|
||||
(enq! queue args)
|
||||
(start-new-threads-if-necessary (get-thread-count))
|
||||
(signal-condition-variable job-available)))))
|
||||
|
||||
(define (count-threads)
|
||||
(with-mutex queue-mutex
|
||||
(hash-count (const #t) running-job-args)))
|
||||
|
||||
(define (count-jobs)
|
||||
(with-mutex queue-mutex
|
||||
(+ (q-length queue)
|
||||
(hash-count (lambda (index val)
|
||||
(list? val))
|
||||
running-job-args))))
|
||||
|
||||
(define (list-jobs)
|
||||
(with-mutex queue-mutex
|
||||
(append (if priority<?
|
||||
(map cdr (car queue))
|
||||
(list-copy (car queue)))
|
||||
(hash-fold (lambda (key val result)
|
||||
(if val
|
||||
(cons val result)
|
||||
result))
|
||||
'()
|
||||
running-job-args))))
|
||||
|
||||
(define (thread-process-job job-args)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format (current-error-port)
|
||||
"~A work queue, job raised exception ~A: ~A\n"
|
||||
name job-args exn))
|
||||
(lambda ()
|
||||
(with-throw-handler #t
|
||||
(lambda ()
|
||||
(apply proc job-args))
|
||||
(lambda (key . args)
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"~A work queue, exception when handling job: ~A ~A\n"
|
||||
name key args)
|
||||
(backtrace))))
|
||||
#:unwind? #t))
|
||||
|
||||
(define (start-thread thread-index)
|
||||
(define (too-many-threads?)
|
||||
(let ((running-jobs-count
|
||||
(hash-count (lambda (index val)
|
||||
(list? val))
|
||||
running-job-args))
|
||||
(desired-thread-count (get-thread-count)))
|
||||
|
||||
(>= running-jobs-count
|
||||
desired-thread-count)))
|
||||
|
||||
(define (thread-idle-for-too-long? last-job-finished-at)
|
||||
(time>=?
|
||||
(time-difference (current-time time-monotonic)
|
||||
last-job-finished-at)
|
||||
thread-stop-delay))
|
||||
|
||||
(define (stop-thread)
|
||||
(hash-remove! running-job-args
|
||||
thread-index)
|
||||
(unlock-mutex queue-mutex))
|
||||
|
||||
(call-with-new-thread
|
||||
(lambda ()
|
||||
(catch 'system-error
|
||||
(lambda ()
|
||||
(set-thread-name
|
||||
(string-append name " q t "
|
||||
(number->string thread-index))))
|
||||
(const #t))
|
||||
|
||||
(let loop ((last-job-finished-at (current-time time-monotonic)))
|
||||
(lock-mutex queue-mutex)
|
||||
|
||||
(if (too-many-threads?)
|
||||
(stop-thread)
|
||||
(let ((job-args
|
||||
(if (q-empty? queue)
|
||||
;; #f from wait-condition-variable indicates a timeout
|
||||
(if (wait-condition-variable
|
||||
job-available
|
||||
queue-mutex
|
||||
(+ 9 (time-second (current-time))))
|
||||
;; Another thread could have taken
|
||||
;; the job in the mean time
|
||||
(if (q-empty? queue)
|
||||
#f
|
||||
(if priority<?
|
||||
(cdr (deq! queue))
|
||||
(deq! queue)))
|
||||
#f)
|
||||
(if priority<?
|
||||
(cdr (deq! queue))
|
||||
(deq! queue)))))
|
||||
|
||||
(if job-args
|
||||
(begin
|
||||
(hash-set! running-job-args
|
||||
thread-index
|
||||
job-args)
|
||||
|
||||
(unlock-mutex queue-mutex)
|
||||
(thread-process-job job-args)
|
||||
|
||||
(with-mutex queue-mutex
|
||||
(hash-set! running-job-args
|
||||
thread-index
|
||||
#f))
|
||||
|
||||
(loop (current-time time-monotonic)))
|
||||
(if (thread-idle-for-too-long? last-job-finished-at)
|
||||
(stop-thread)
|
||||
(begin
|
||||
(unlock-mutex queue-mutex)
|
||||
|
||||
(loop last-job-finished-at))))))))))
|
||||
|
||||
|
||||
(define start-new-threads-if-necessary
|
||||
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
|
||||
(lambda (desired-count)
|
||||
(let* ((thread-count
|
||||
(hash-count (const #t) running-job-args))
|
||||
(threads-to-start
|
||||
(- desired-count thread-count)))
|
||||
(when (> threads-to-start 0)
|
||||
(for-each
|
||||
(lambda (thread-index)
|
||||
(when (eq? (hash-ref running-job-args
|
||||
thread-index
|
||||
'slot-free)
|
||||
'slot-free)
|
||||
(let* ((now (current-time time-monotonic))
|
||||
(elapsed (time-difference now
|
||||
previous-thread-started-at)))
|
||||
(when (or (eq? #f thread-start-delay)
|
||||
(time>=? elapsed thread-start-delay))
|
||||
(set! previous-thread-started-at now)
|
||||
(hash-set! running-job-args
|
||||
thread-index
|
||||
#f)
|
||||
(start-thread thread-index)))))
|
||||
(iota desired-count)))))))
|
||||
|
||||
(if (procedure? thread-count-parameter)
|
||||
(call-with-new-thread
|
||||
(lambda ()
|
||||
(catch 'system-error
|
||||
(lambda ()
|
||||
(set-thread-name
|
||||
(string-append name " q t")))
|
||||
(const #t))
|
||||
|
||||
(while #t
|
||||
(sleep 15)
|
||||
(with-mutex queue-mutex
|
||||
(let ((idle-threads (hash-count (lambda (index val)
|
||||
(eq? #f val))
|
||||
running-job-args)))
|
||||
(when (= 0 idle-threads)
|
||||
(start-new-threads-if-necessary (get-thread-count))))))))
|
||||
(start-new-threads-if-necessary (get-thread-count)))
|
||||
|
||||
(values process-job count-jobs count-threads list-jobs)))
|
Loading…
Add table
Add a link
Reference in a new issue