;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots worker-threads) #:use-module (srfi srfi-1) #:use-module (srfi srfi-19) #:use-module (srfi srfi-71) #:use-module (system foreign) #:use-module (system base target) #:use-module (rnrs bytevectors) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers operations) #:export (set-thread-name thread-name make-worker-thread-channel call-with-worker-thread &worker-thread-timeout worker-thread-timeout-error? %worker-thread-default-timeout create-work-queue)) (define* (syscall->procedure return-type name argument-types #:key library) "Return a procedure that wraps the C function NAME using the dynamic FFI, and that returns two values: NAME's return value, and errno. When LIBRARY is specified, look up NAME in that library rather than in the global symbol name space. If an error occurs while creating the binding, defer the error report until the returned procedure is called." (catch #t (lambda () ;; Note: When #:library is set, try it first and fall back to libc ;; proper. This is because libraries like libutil.so have been subsumed ;; by libc.so with glibc >= 2.34. (let ((ptr (dynamic-func name (if library (or (false-if-exception (dynamic-link library)) (dynamic-link)) (dynamic-link))))) ;; The #:return-errno? facility was introduced in Guile 2.0.12. (pointer->procedure return-type ptr argument-types #:return-errno? #t))) (lambda args (lambda _ (throw 'system-error name "~A" (list (strerror ENOSYS)) (list ENOSYS)))))) (define %prctl ;; Should it win the API contest against 'ioctl'? You tell us! (syscall->procedure int "prctl" (list int unsigned-long unsigned-long unsigned-long unsigned-long))) (define PR_SET_NAME 15) ; (define PR_GET_NAME 16) (define PR_SET_CHILD_SUBREAPER 36) (define (set-child-subreaper!) "Set the CHILD_SUBREAPER capability for the current process." (%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0)) (define %max-thread-name-length ;; Maximum length in bytes of the process name, including the terminating ;; zero. 16) (define (set-thread-name!/linux name) "Set the name of the calling thread to NAME. NAME is truncated to 15 bytes." (let ((ptr (string->pointer name))) (let ((ret err (%prctl PR_SET_NAME (pointer-address ptr) 0 0 0))) (unless (zero? ret) (throw 'set-process-name "set-process-name" "set-process-name: ~A" (list (strerror err)) (list err)))))) (define (bytes->string bytes) "Read BYTES, a list of bytes, and return the null-terminated string decoded from there, or #f if that would be an empty string." (match (take-while (negate zero?) bytes) (() #f) (non-zero (list->string (map integer->char non-zero))))) (define (thread-name/linux) "Return the name of the calling thread as a string." (let ((buf (make-bytevector %max-thread-name-length))) (let ((ret err (%prctl PR_GET_NAME (pointer-address (bytevector->pointer buf)) 0 0 0))) (if (zero? ret) (bytes->string (bytevector->u8-list buf)) (throw 'process-name "process-name" "process-name: ~A" (list (strerror err)) (list err)))))) (define set-thread-name (if (string-contains %host-type "linux") set-thread-name!/linux (const #f))) (define thread-name (if (string-contains %host-type "linux") thread-name/linux (const ""))) (define %worker-thread-args (make-parameter #f)) (define* (make-worker-thread-channel initializer #:key (parallelism 1) (delay-logger (lambda _ #f)) (duration-logger (const #f)) destructor lifetime (log-exception? (const #t)) (expire-on-exception? #f) (name "unnamed")) "Return a channel used to offload work to a dedicated thread. ARGS are the arguments of the worker thread procedure." (define thread-proc-vector (make-vector parallelism #f)) (define (initializer/safe) (let ((args (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running initializer in worker thread (~A): ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t initializer (lambda args (backtrace)))) #:unwind? #t))) (if args args ;; never give up, just keep retrying (begin (sleep 1) (initializer/safe))))) (define (destructor/safe args) (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running destructor in worker thread (~A): ~A:\n ~A\n" name destructor exn) #f) (lambda () (with-throw-handler #t (lambda () (apply destructor args) #t) (lambda _ (backtrace)))) #:unwind? #t))) (or success? #t (begin (sleep 1) (destructor/safe args))))) (define (process thread-index channel args) (let loop ((current-lifetime lifetime)) (let ((exception? (match (get-message channel) (((? channel? reply) sent-time (? procedure? proc)) (let ((time-delay (- (get-internal-real-time) sent-time))) (delay-logger (/ time-delay internal-time-units-per-second)) (let* ((start-time (get-internal-real-time)) (response (with-exception-handler (lambda (exn) (list 'worker-thread-error (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) exn)) (lambda () (vector-set! thread-proc-vector thread-index proc) (with-throw-handler #t (lambda () (call-with-values (lambda () (start-stack 'worker-thread (apply proc args))) (lambda vals (cons (/ (- (get-internal-real-time) start-time) internal-time-units-per-second) vals)))) (lambda args (when (match args (('%exception exn) (log-exception? exn)) (_ #t)) (simple-format (current-error-port) "worker-thread: exception: ~A\n" args) (backtrace))))) #:unwind? #t))) (put-message reply response) (vector-set! thread-proc-vector thread-index #f) (match response (('worker-thread-error duration _) (when duration-logger (duration-logger duration proc)) #t) ((duration . _) (when duration-logger (duration-logger duration proc)) #f)))))))) (unless (and expire-on-exception? exception?) (if (number? current-lifetime) (unless (< current-lifetime 0) (loop (if current-lifetime (- current-lifetime 1) #f))) (loop #f)))))) (let ((channel (make-channel))) (for-each (lambda (thread-index) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " w t " (number->string thread-index)))) (const #t)) (let init ((args (initializer/safe))) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "worker-thread-channel: exception: ~A\n" exn)) (lambda () (parameterize ((%worker-thread-args args)) (process thread-index channel args))) #:unwind? #t) (when destructor (destructor/safe args)) (init (initializer/safe)))))) (iota parallelism)) (values channel thread-proc-vector))) (define &worker-thread-timeout (make-exception-type '&worker-thread-timeout &error '())) (define make-worker-thread-timeout-error (record-constructor &worker-thread-timeout)) (define worker-thread-timeout-error? (record-predicate &worker-thread-timeout)) (define %worker-thread-default-timeout (make-parameter 30)) (define* (call-with-worker-thread channel proc #:key duration-logger (timeout (%worker-thread-default-timeout))) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." (let ((args (%worker-thread-args))) (if args (apply proc args) (let* ((reply (make-channel)) (operation-success? (perform-operation (let ((put (wrap-operation (put-operation channel (list reply (get-internal-real-time) proc)) (const #t)))) (if timeout (choice-operation put (wrap-operation (sleep-operation timeout) (const #f))) put))))) (unless operation-success? (raise-exception (make-worker-thread-timeout-error))) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger (duration-logger duration)) (raise-exception exn)) ((duration . result) (when duration-logger (duration-logger duration)) (apply values result))))))) (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay (make-time time-duration 0 0)) (name "unnamed") priority= running-jobs-count desired-thread-count))) (define (thread-idle-for-too-long? last-job-finished-at) (time>=? (time-difference (current-time time-monotonic) last-job-finished-at) thread-stop-delay)) (define (stop-thread) (hash-remove! running-job-args thread-index) (unlock-mutex queue-mutex)) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t " (number->string thread-index)))) (const #t)) (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) (if (too-many-threads?) (stop-thread) (let ((job-args (if (q-empty? queue) ;; #f from wait-condition-variable indicates a timeout (if (wait-condition-variable job-available queue-mutex (+ 9 (time-second (current-time)))) ;; Another thread could have taken ;; the job in the mean time (if (q-empty? queue) #f (if priority threads-to-start 0) (for-each (lambda (thread-index) (when (eq? (hash-ref running-job-args thread-index 'slot-free) 'slot-free) (let* ((now (current-time time-monotonic)) (elapsed (time-difference now previous-thread-started-at))) (when (or (eq? #f thread-start-delay) (time>=? elapsed thread-start-delay)) (set! previous-thread-started-at now) (hash-set! running-job-args thread-index #f) (start-thread thread-index))))) (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () (catch 'system-error (lambda () (set-thread-name (string-append name " q t"))) (const #t)) (while #t (sleep 15) (with-mutex queue-mutex (let ((idle-threads (hash-count (lambda (index val) (eq? #f val)) running-job-args))) (when (= 0 idle-threads) (start-new-threads-if-necessary (get-thread-count)))))))) (start-new-threads-if-necessary (get-thread-count))) (values process-job count-jobs count-threads list-jobs)))