Initial commit

This commit is contained in:
Christopher Baines 2024-11-19 18:43:43 +00:00
commit 2f39c58d6c
27 changed files with 2969 additions and 0 deletions

63
knots/non-blocking.scm Normal file
View file

@ -0,0 +1,63 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots non-blocking)
#:use-module (web uri)
#:use-module (web client)
#:export (non-blocking-port
nonblocking-open-socket-for-uri))
(define (non-blocking-port port)
"Make PORT non-blocking and return it."
(let ((flags (fcntl port F_GETFL)))
(when (zero? (logand O_NONBLOCK flags))
(fcntl port F_SETFL (logior O_NONBLOCK flags)))
port))
(define* (nonblocking-open-socket-for-uri uri
#:key (verify-certificate? #t))
(define tls-wrap
(@@ (web client) tls-wrap))
(define https?
(eq? 'https (uri-scheme uri)))
(define plain-uri
(if https?
(build-uri
'http
#:userinfo (uri-userinfo uri)
#:host (uri-host uri)
#:port (or (uri-port uri) 443)
#:path (uri-path uri)
#:query (uri-query uri)
#:fragment (uri-fragment uri))
uri))
(let ((s (open-socket-for-uri plain-uri)))
(if https?
(let ((port
(tls-wrap s (uri-host uri)
#:verify-certificate? verify-certificate?)))
;; Guile/guile-gnutls don't handle the handshake happening on a non
;; blocking socket, so change the behavior here.
(non-blocking-port s)
port)
(non-blocking-port s))))

197
knots/parallelism.scm Normal file
View file

@ -0,0 +1,197 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots parallelism)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:export (fibers-batch-map
fibers-map
fibers-map-with-progress
fibers-batch-for-each
fibers-for-each
fibers-parallel
fibers-let))
;; Like split-at, but don't care about the order of the resulting lists, and
;; don't error if the list is shorter than i elements
(define (split-at* lst i)
(let lp ((l lst) (n i) (acc '()))
(if (or (<= n 0) (null? l))
(values (reverse! acc) l)
(lp (cdr l) (- n 1) (cons (car l) acc)))))
;; As this can be called with lists with tens of thousands of items in them,
;; batch the
(define (get-batch batch-size lists)
(let ((split-lists
(map (lambda (lst)
(let ((batch rest (split-at* lst batch-size)))
(cons batch rest)))
lists)))
(values (map car split-lists)
(map cdr split-lists))))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(put-message reply (cons 'exception exn)))
(lambda ()
(call-with-values
(lambda ()
(with-throw-handler #t
thunk
(lambda _
(backtrace))))
(lambda vals
(put-message reply vals))))
#:unwind? #t))
#:parallel? #t)
reply))
(define (fetch-result-of-defered-thunks . reply-channels)
(let ((responses (map get-message
reply-channels)))
(map
(match-lambda
(('exception . exn)
(raise-exception exn))
(result
(apply values result)))
responses)))
(define (fibers-batch-map proc batch-size . lists)
(let loop ((lists lists)
(result '()))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
result
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(loop rest
(append! result
(apply fetch-result-of-defered-thunks
response-channels))))))))
(define (fibers-map proc . lists)
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc batch-size . lists)
(let loop ((lists lists))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
*unspecified*
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(apply fetch-result-of-defered-thunks
response-channels)
(loop rest))))))
(define (fibers-for-each proc . lists)
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (defer-to-parallel-fiber
(lambda ()
e0)))
...)
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
(let loop ((channels-to-results
(apply map
(lambda args
(cons (defer-to-parallel-fiber
(lambda ()
(apply proc args)))
#f))
lists)))
(let ((active-channels
(filter-map car channels-to-results)))
(when report
(report (apply map
list
(map cdr channels-to-results)
lists)))
(if (null? active-channels)
(map
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . val))
val))
channels-to-results)
(loop
(perform-operation
(apply
choice-operation
(filter-map
(lambda (p)
(match p
((channel . _)
(if channel
(wrap-operation
(get-operation channel)
(lambda (result)
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r))))
channels-to-results)))
#f))))
channels-to-results))))))))

78
knots/promise.scm Normal file
View file

@ -0,0 +1,78 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots promise)
#:use-module (srfi srfi-9)
#:use-module (ice-9 atomic)
#:use-module (fibers)
#:use-module (fibers conditions)
#:export (fibers-delay
fibers-force
fibers-promise-reset))
(define-record-type <fibers-promise>
(make-fibers-promise thunk values-box evaluated-condition)
fibers-promise?
(thunk fibers-promise-thunk)
(values-box fibers-promise-values-box)
(evaluated-condition fibers-promise-evaluated-condition))
(define (fibers-delay thunk)
(make-fibers-promise
thunk
(make-atomic-box #f)
(make-condition)))
(define (fibers-force fp)
(let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp)
#f
'started)))
(if (eq? #f res)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(atomic-box-set! (fibers-promise-values-box fp)
exn)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(raise-exception exn))
(fibers-promise-thunk fp)
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals)))
(if (eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result))))
(if (exception? res)
(raise-exception res)
(apply values res))))))
(define (fibers-promise-reset fp)
(atomic-box-set! (fibers-promise-values-box fp)
#f))

47
knots/queue.scm Normal file
View file

@ -0,0 +1,47 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots queue)
#:use-module (ice-9 q)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:export (spawn-queueing-fiber))
(define (spawn-queueing-fiber dest-channel)
(define queue (make-q))
(let ((queue-channel (make-channel)))
(spawn-fiber
(lambda ()
(while #t
(if (q-empty? queue)
(enq! queue
(perform-operation
(get-operation queue-channel)))
(let ((front (q-front queue)))
(perform-operation
(choice-operation
(wrap-operation (get-operation queue-channel)
(lambda (val)
(enq! queue val)))
(wrap-operation (put-operation dest-channel front)
(lambda _
(q-pop! queue))))))))))
queue-channel))

485
knots/resource-pool.scm Normal file
View file

@ -0,0 +1,485 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:export (resource-pool?
make-resource-pool
destroy-resource-pool
resource-pool-default-timeout
resource-pool-retry-checkout-timeout
&resource-pool-timeout
resource-pool-timeout-error?
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define-record-type <resource-pool>
(make-resource-pool-record name channel)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel))
(define* (make-resource-pool initializer max-size
#:key (min-size max-size)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed")
;; Add options for customizing timeouts
)
(define (initializer/safe)
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running ~A resource pool initializer: ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 5)
(destructor/safe args)))))
(let ((channel (make-channel))
(checkout-failure-count 0))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(while #t
(sleep idle-seconds)
(put-message channel '(check-for-idle-resources))))))
(while #t
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception in the ~A pool fiber: ~A\n"
name
exn))
(lambda ()
(let loop ((resources '())
(available '())
(waiters '())
(resources-last-used '()))
(match (get-message channel)
(('checkout reply)
(if (null? available)
(if (= (length resources) max-size)
(loop resources
available
(cons reply waiters)
resources-last-used)
(let ((new-resource (initializer/safe)))
(if new-resource
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply new-resource)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(loop (cons new-resource resources)
(if checkout-success?
available
(cons new-resource available))
waiters
(cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
(cons reply waiters)
resources-last-used))))
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply (car available))
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success?
(loop resources
(cdr available)
waiters
resources-last-used)
(loop resources
available
waiters
resources-last-used)))))
(('return resource)
(if (null? waiters)
(loop resources
(cons resource available)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (last waiters)
resource)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success?
(loop resources
available
(drop-right! waiters 1)
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(begin
(for-each
(lambda (waiter)
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation waiter 'resource-pool-retry-checkout)
(sleep-operation 10))))))
waiters)
(loop resources
(cons resource available)
'()
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used)))))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f)))))))
(loop resources
available
waiters
resources-last-used))
(('check-for-idle-resources)
(let* ((resources-last-used-seconds
(map
(lambda (internal-time)
(/ (- (get-internal-real-time) internal-time)
internal-time-units-per-second))
resources-last-used))
(resources-to-destroy
(filter-map
(lambda (resource last-used-seconds)
(if (and (member resource available)
(> last-used-seconds idle-seconds))
resource
#f))
resources
resources-last-used-seconds)))
(for-each
(lambda (resource)
(destructor/safe resource))
resources-to-destroy)
(loop (lset-difference eq? resources resources-to-destroy)
(lset-difference eq? available resources-to-destroy)
waiters
(filter-map
(lambda (resource last-used)
(if (memq resource resources-to-destroy)
#f
last-used))
resources
resources-last-used))))
(('destroy reply)
(if (= (length resources) (length available))
(begin
(for-each
(lambda (resource)
(destructor/safe resource))
resources)
(put-message reply 'destroy-success))
(begin
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation reply 'resource-pool-destroy-failed)
(sleep-operation 10)))))
(loop resources
available
waiters
resources-last-used))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters
resources-last-used)))))
#:unwind? #t)))
(or scheduler
(current-scheduler)))
(make-resource-pool-record name channel)))
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define resource-pool-default-timeout
(make-parameter #f))
(define resource-pool-retry-checkout-timeout
(make-parameter 5))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(name)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler)))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define retry-timeout
(resource-pool-retry-checkout-timeout))
(define timeout-or-default
(if (eq? timeout 'default)
(resource-pool-default-timeout)
timeout))
(let ((resource
(let ((reply (make-channel)))
(let loop ((start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(checkout ,reply))
(const #t))
(wrap-operation (sleep-operation (or timeout-or-default
retry-timeout))
(const #f))))))
(if request-success?
(let ((time-remaining
(- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop start-time)
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f))
response))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))))))
(when (or (not resource)
(eq? resource 'resource-pool-retry-checkout))
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error (resource-pool-name pool))))
(with-exception-handler
(lambda (exception)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values
(lambda ()
(with-throw-handler #t
(lambda ()
(proc resource))
(lambda _
(backtrace))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
(start-time (get-internal-real-time)))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(stats ,reply))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error))))))
(raise-exception
(make-resource-pool-timeout-error))))))

200
knots/timeout.scm Normal file
View file

@ -0,0 +1,200 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots timeout)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 exceptions)
#:use-module (ice-9 ports internal)
#:use-module (ice-9 suspendable-ports)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:export (with-fibers-timeout
&port-timeout
port-timeout-error?
&port-read-timeout
port-read-timeout-error
&port-write-timeout
port-write-timeout-error?
with-port-timeouts))
(define* (with-fibers-timeout thunk #:key timeout on-timeout)
(let ((channel (make-channel)))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(perform-operation
(choice-operation
(put-operation channel (cons 'exception exn))
(sleep-operation timeout))))
(lambda ()
(call-with-values thunk
(lambda vals
(perform-operation
(choice-operation
(put-operation channel vals)
(sleep-operation timeout))))))
#:unwind? #t)))
(match (perform-operation
(choice-operation
(get-operation channel)
(wrap-operation (sleep-operation timeout)
(const 'timeout))))
('timeout
(on-timeout))
(('exception . exn)
(raise-exception exn))
(vals
(apply values vals)))))
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
'(thunk port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
(define port-timeout-error?
(record-predicate &port-timeout))
(define &port-read-timeout
(make-exception-type '&port-read-timeout
&port-timeout
'()))
(define make-port-read-timeout-error
(record-constructor &port-read-timeout))
(define port-read-timeout-error?
(record-predicate &port-read-timeout))
(define &port-write-timeout
(make-exception-type '&port-write-timeout
&port-timeout
'()))
(define make-port-write-timeout-error
(record-constructor &port-write-timeout))
(define port-write-timeout-error?
(record-predicate &port-write-timeout))
(define (readable? port)
"Test if PORT is writable."
(= 1 (port-poll port "r" 0)))
(define (writable? port)
"Test if PORT is writable."
(= 1 (port-poll port "w" 0)))
(define (make-wait-operation ready? schedule-when-ready port
port-ready-fd this-procedure)
(make-base-operation #f
(lambda _
(and (ready? port) values))
(lambda (flag sched resume)
(define (commit)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (commit))
('S #f)))
(schedule-when-ready
sched (port-ready-fd port) commit))))
(define (wait-until-port-readable-operation port)
"Make an operation that will succeed when PORT is readable."
(unless (input-port? port)
(error "refusing to wait forever for input on non-input port"))
(make-wait-operation readable? schedule-task-when-fd-readable port
port-read-wait-fd
wait-until-port-readable-operation))
(define (wait-until-port-writable-operation port)
"Make an operation that will succeed when PORT is writable."
(unless (output-port? port)
(error "refusing to wait forever for output on non-output port"))
(make-wait-operation writable? schedule-task-when-fd-writable port
port-write-wait-fd
wait-until-port-writable-operation))
(define* (with-port-timeouts thunk
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
(define (no-fibers-wait thunk port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
;; remains unchanged! When the timeout is longer than the time
;; between the syscall restarting, I think this renders the
;; timeout useless. Therefore, this code uses a short timeout, and
;; repeatedly calls poll while watching the clock to see if it has
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
(* internal-time-units-per-second timeout))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
(if (> (get-internal-real-time)
timeout-internal)
(raise-exception
(if (string=? mode "r")
(make-port-read-timeout-error thunk port)
(make-port-write-timeout-error thunk port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
(parameterize
((current-read-waiter
(lambda (port)
(if (current-scheduler)
(perform-operation
(choice-operation
(wait-until-port-readable-operation port)
(wrap-operation
(sleep-operation read-timeout)
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
(no-fibers-wait thunk port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
(perform-operation
(choice-operation
(wait-until-port-writable-operation port)
(wrap-operation
(sleep-operation write-timeout)
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
(no-fibers-wait thunk port "w" write-timeout)))))
(thunk)))

263
knots/web-server.scm Normal file
View file

@ -0,0 +1,263 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;; Copyright (C) 2010-2013,2015,2017 Free Software Foundation, Inc.
;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public License
;; along with this program. If not, see <http://www.gnu.org/licenses/>.
(define-module (knots web-server)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-71)
#:use-module (fibers)
#:use-module (fibers conditions)
#:use-module (rnrs bytevectors)
#:use-module (ice-9 binary-ports)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 iconv)
#:use-module (ice-9 match)
#:use-module ((srfi srfi-9 gnu) #:select (set-field))
#:use-module (system repl error-handling)
#:use-module (web http)
#:use-module (web request)
#:use-module (web response)
#:use-module (knots non-blocking)
#:export (run-knots-web-server
web-server?
web-server-socket
web-server-port))
(define (make-default-socket family addr port)
(let ((sock (socket PF_INET SOCK_STREAM 0)))
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
(fcntl sock F_SETFD FD_CLOEXEC)
(bind sock family addr port)
sock))
(define (extend-response r k v . additional)
(define (extend-alist alist k v)
(let ((pair (assq k alist)))
(acons k v (if pair (delq pair alist) alist))))
(let ((r (set-field r (response-headers)
(extend-alist (response-headers r) k v))))
(if (null? additional)
r
(apply extend-response r additional))))
;; -> response body
(define (sanitize-response request response body)
"\"Sanitize\" the given response and body, making them appropriate for
the given request.
As a convenience to web handler authors, RESPONSE may be given as
an alist of headers, in which case it is used to construct a default
response. Ensures that the response version corresponds to the request
version. If BODY is a string, encodes the string to a bytevector,
in an encoding appropriate for RESPONSE. Adds a
content-length and content-type header, as necessary.
If BODY is a procedure, it is called with a port as an argument,
and the output collected as a bytevector. In the future we might try to
instead use a compressing, chunk-encoded port, and call this procedure
later, in the write-client procedure. Authors are advised not to rely
on the procedure being called at any particular time."
(cond
((list? response)
(sanitize-response request
(build-response #:version (request-version request)
#:headers response)
body))
((not (equal? (request-version request) (response-version response)))
(sanitize-response request
(adapt-response-version response
(request-version request))
body))
((not body)
(values response #vu8()))
((string? body)
(let* ((type (response-content-type response
'(text/plain)))
(declared-charset (assq-ref (cdr type) 'charset))
(charset (or declared-charset "utf-8")))
(sanitize-response
request
(if declared-charset
response
(extend-response response 'content-type
`(,@type (charset . ,charset))))
(string->bytevector body charset))))
((not (or (bytevector? body)
(procedure? body)))
(error "unexpected body type"))
((and (response-must-not-include-body? response)
body
;; FIXME make this stricter: even an empty body should be prohibited.
(not (zero? (bytevector-length body))))
(error "response with this status code must not include body" response))
(else
;; check length; assert type; add other required fields?
(values (if (procedure? body)
(if (response-content-length response)
response
(extend-response response
'transfer-encoding
'((chunked))))
(let ((rlen (response-content-length response))
(blen (bytevector-length body)))
(cond
(rlen (if (= rlen blen)
response
(error "bad content-length" rlen blen)))
(else (extend-response response 'content-length blen)))))
(if (eq? (request-method request) 'HEAD)
;; Responses to HEAD requests must not include bodies.
;; We could raise an error here, but it seems more
;; appropriate to just do something sensible.
#f
body)))))
(define (with-stack-and-prompt thunk)
(call-with-prompt (default-prompt-tag)
(lambda () (start-stack #t (thunk)))
(lambda (k proc)
(with-stack-and-prompt (lambda () (proc k))))))
(define (keep-alive? response)
(let ((v (response-version response)))
(and (or (< (response-code response) 400)
(= (response-code response) 404))
(case (car v)
((1)
(case (cdr v)
((1) (not (memq 'close (response-connection response))))
((0) (memq 'keep-alive (response-connection response)))))
(else #f)))))
(define (handle-request handler client)
(let ((request
(catch #t
(lambda ()
(read-request client))
(lambda (key . args)
(display "While reading request:\n" (current-error-port))
(print-exception (current-error-port) #f key args)
#f))))
(let ((response
body
(cond
((not request)
;; Bad request.
(values (build-response #:version '(1 . 0) #:code 400
#:headers '((content-length . 0)))
#vu8()))
(else
(call-with-error-handling
(lambda ()
(call-with-values (lambda ()
(with-stack-and-prompt
(lambda ()
(handler request))))
(lambda (response body)
(sanitize-response request response body))))
#:on-error 'backtrace
#:post-error (lambda _
(values (build-response #:code 500) #f)))))))
(write-response response client)
(when body
(if (procedure? body)
(if (response-content-length response)
(body client)
(let ((chunked-port
(make-chunked-output-port client
#:keep-alive? #t)))
(body chunked-port)
(close-port chunked-port)))
(put-bytevector client body)))
(force-output client)
(keep-alive? response))))
(define (client-loop client handler)
;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves; when we force-output, we really want the data to go
;; out.
(setvbuf client 'block 1024)
(setsockopt client IPPROTO_TCP TCP_NODELAY 1)
(with-throw-handler #t
(lambda ()
(let loop ()
(cond
((catch #t
(lambda () (eof-object? (lookahead-u8 client)))
(lambda _ #t))
(close-port client))
(else
(let ((keep-alive? (handle-request handler client)))
(if keep-alive?
(loop)
(close-port client)))))))
(lambda (k . args)
(close-port client))))
(define-record-type <web-server>
(make-web-server socket port)
web-server?
(socket web-server-socket)
(port web-server-port))
(define* (run-knots-web-server handler #:key
(host #f)
(family AF_INET)
(addr (if host
(inet-pton family host)
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port)))
"Run the fibers web server.
HANDLER should be a procedure that takes one argument, the HTTP
request and returns two values, the response and response body.
For example, here is a simple \"Hello, World!\" server:
@example
(define (handler request)
(let ((body (read-request-body request)))
(values '((content-type . (text/plain)))
\"Hello, World!\")))
(run-server handler)
@end example
The response and body will be run through sanitize-response
before sending back to the client."
(non-blocking-port socket)
;; We use a large backlog by default. If the server is suddenly hit
;; with a number of connections on a small backlog, clients won't
;; receive confirmation for their SYN, leading them to retry --
;; probably successfully, but with a large latency.
(listen socket 1024)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber
(lambda ()
(let loop ()
(match (accept socket (logior SOCK_NONBLOCK SOCK_CLOEXEC))
((client . sockaddr)
(spawn-fiber (lambda ()
(client-loop client handler))
#:parallel? #t)
(loop))))))
(make-web-server socket
(vector-ref (getsockname socket)
2))) ; Not sure what this structure is

577
knots/worker-threads.scm Normal file
View file

@ -0,0 +1,577 @@
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots worker-threads)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-19)
#:use-module (srfi srfi-71)
#:use-module (system foreign)
#:use-module (system base target)
#:use-module (rnrs bytevectors)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:export (set-thread-name
thread-name
make-worker-thread-channel
call-with-worker-thread
&worker-thread-timeout
worker-thread-timeout-error?
%worker-thread-default-timeout
create-work-queue))
(define* (syscall->procedure return-type name argument-types
#:key library)
"Return a procedure that wraps the C function NAME using the dynamic FFI,
and that returns two values: NAME's return value, and errno. When LIBRARY is
specified, look up NAME in that library rather than in the global symbol name
space.
If an error occurs while creating the binding, defer the error report until
the returned procedure is called."
(catch #t
(lambda ()
;; Note: When #:library is set, try it first and fall back to libc
;; proper. This is because libraries like libutil.so have been subsumed
;; by libc.so with glibc >= 2.34.
(let ((ptr (dynamic-func name
(if library
(or (false-if-exception
(dynamic-link library))
(dynamic-link))
(dynamic-link)))))
;; The #:return-errno? facility was introduced in Guile 2.0.12.
(pointer->procedure return-type ptr argument-types
#:return-errno? #t)))
(lambda args
(lambda _
(throw 'system-error name "~A" (list (strerror ENOSYS))
(list ENOSYS))))))
(define %prctl
;; Should it win the API contest against 'ioctl'? You tell us!
(syscall->procedure int "prctl"
(list int unsigned-long unsigned-long
unsigned-long unsigned-long)))
(define PR_SET_NAME 15) ;<linux/prctl.h>
(define PR_GET_NAME 16)
(define PR_SET_CHILD_SUBREAPER 36)
(define (set-child-subreaper!)
"Set the CHILD_SUBREAPER capability for the current process."
(%prctl PR_SET_CHILD_SUBREAPER 1 0 0 0))
(define %max-thread-name-length
;; Maximum length in bytes of the process name, including the terminating
;; zero.
16)
(define (set-thread-name!/linux name)
"Set the name of the calling thread to NAME. NAME is truncated to 15
bytes."
(let ((ptr (string->pointer name)))
(let ((ret
err
(%prctl PR_SET_NAME
(pointer-address ptr) 0 0 0)))
(unless (zero? ret)
(throw 'set-process-name "set-process-name"
"set-process-name: ~A"
(list (strerror err))
(list err))))))
(define (bytes->string bytes)
"Read BYTES, a list of bytes, and return the null-terminated string decoded
from there, or #f if that would be an empty string."
(match (take-while (negate zero?) bytes)
(()
#f)
(non-zero
(list->string (map integer->char non-zero)))))
(define (thread-name/linux)
"Return the name of the calling thread as a string."
(let ((buf (make-bytevector %max-thread-name-length)))
(let ((ret
err
(%prctl PR_GET_NAME
(pointer-address (bytevector->pointer buf))
0 0 0)))
(if (zero? ret)
(bytes->string (bytevector->u8-list buf))
(throw 'process-name "process-name"
"process-name: ~A"
(list (strerror err))
(list err))))))
(define set-thread-name
(if (string-contains %host-type "linux")
set-thread-name!/linux
(const #f)))
(define thread-name
(if (string-contains %host-type "linux")
thread-name/linux
(const "")))
(define %worker-thread-args
(make-parameter #f))
(define* (make-worker-thread-channel initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
(duration-logger (const #f))
destructor
lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
(define thread-proc-vector
(make-vector parallelism #f))
(define (initializer/safe)
(let ((args
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running initializer in worker thread (~A): ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t)))
(if args
args
;; never give up, just keep retrying
(begin
(sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running destructor in worker thread (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(apply destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((current-lifetime lifetime))
(let ((exception?
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'worker-thread-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-throw-handler #t
(lambda ()
(call-with-values
(lambda ()
(start-stack
'worker-thread
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))
(lambda args
(when (match args
(('%exception exn)
(log-exception? exn))
(_ #t))
(simple-format
(current-error-port)
"worker-thread: exception: ~A\n" args)
(backtrace)))))
#:unwind? #t)))
(put-message reply
response)
(vector-set! thread-proc-vector
thread-index
#f)
(match response
(('worker-thread-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))))))
(unless (and expire-on-exception?
exception?)
(if (number? current-lifetime)
(unless (< current-lifetime 0)
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(let ((channel (make-channel)))
(for-each
(lambda (thread-index)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
(const #t))
(let init ((args (initializer/safe)))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
(process thread-index channel args)))
#:unwind? #t)
(when destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota parallelism))
(values channel
thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout
&error
'()))
(define make-worker-thread-timeout-error
(record-constructor &worker-thread-timeout))
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
(define %worker-thread-default-timeout
(make-parameter 30))
(define* (call-with-worker-thread channel proc #:key duration-logger
(timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation channel
(list reply
(get-internal-real-time)
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-worker-thread-timeout-error)))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
(make-time time-duration 0 0))
(name "unnamed")
priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
(running-job-args (make-hash-table)))
(define get-thread-count
(cond
((number? thread-count-parameter)
(const thread-count-parameter))
((eq? thread-count-parameter #f)
;; Run one thread per job
(lambda ()
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(else
thread-count-parameter)))
(define process-job
(if priority<?
(lambda* (args #:key priority)
(with-mutex queue-mutex
(enq! queue (cons priority args))
(set-car!
queue
(stable-sort! (car queue)
(lambda (a b)
(priority<?
(car a)
(car b)))))
(sync-q! queue)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))
(lambda args
(with-mutex queue-mutex
(enq! queue args)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
(hash-count (const #t) running-job-args)))
(define (count-jobs)
(with-mutex queue-mutex
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(define (list-jobs)
(with-mutex queue-mutex
(append (if priority<?
(map cdr (car queue))
(list-copy (car queue)))
(hash-fold (lambda (key val result)
(if val
(cons val result)
result))
'()
running-job-args))))
(define (thread-process-job job-args)
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"~A work queue, job raised exception ~A: ~A\n"
name job-args exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(apply proc job-args))
(lambda (key . args)
(simple-format
(current-error-port)
"~A work queue, exception when handling job: ~A ~A\n"
name key args)
(backtrace))))
#:unwind? #t))
(define (start-thread thread-index)
(define (too-many-threads?)
(let ((running-jobs-count
(hash-count (lambda (index val)
(list? val))
running-job-args))
(desired-thread-count (get-thread-count)))
(>= running-jobs-count
desired-thread-count)))
(define (thread-idle-for-too-long? last-job-finished-at)
(time>=?
(time-difference (current-time time-monotonic)
last-job-finished-at)
thread-stop-delay))
(define (stop-thread)
(hash-remove! running-job-args
thread-index)
(unlock-mutex queue-mutex))
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t "
(number->string thread-index))))
(const #t))
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
(if (too-many-threads?)
(stop-thread)
(let ((job-args
(if (q-empty? queue)
;; #f from wait-condition-variable indicates a timeout
(if (wait-condition-variable
job-available
queue-mutex
(+ 9 (time-second (current-time))))
;; Another thread could have taken
;; the job in the mean time
(if (q-empty? queue)
#f
(if priority<?
(cdr (deq! queue))
(deq! queue)))
#f)
(if priority<?
(cdr (deq! queue))
(deq! queue)))))
(if job-args
(begin
(hash-set! running-job-args
thread-index
job-args)
(unlock-mutex queue-mutex)
(thread-process-job job-args)
(with-mutex queue-mutex
(hash-set! running-job-args
thread-index
#f))
(loop (current-time time-monotonic)))
(if (thread-idle-for-too-long? last-job-finished-at)
(stop-thread)
(begin
(unlock-mutex queue-mutex)
(loop last-job-finished-at))))))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
(let* ((thread-count
(hash-count (const #t) running-job-args))
(threads-to-start
(- desired-count thread-count)))
(when (> threads-to-start 0)
(for-each
(lambda (thread-index)
(when (eq? (hash-ref running-job-args
thread-index
'slot-free)
'slot-free)
(let* ((now (current-time time-monotonic))
(elapsed (time-difference now
previous-thread-started-at)))
(when (or (eq? #f thread-start-delay)
(time>=? elapsed thread-start-delay))
(set! previous-thread-started-at now)
(hash-set! running-job-args
thread-index
#f)
(start-thread thread-index)))))
(iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t")))
(const #t))
(while #t
(sleep 15)
(with-mutex queue-mutex
(let ((idle-threads (hash-count (lambda (index val)
(eq? #f val))
running-job-args)))
(when (= 0 idle-threads)
(start-new-threads-if-necessary (get-thread-count))))))))
(start-new-threads-if-necessary (get-thread-count)))
(values process-job count-jobs count-threads list-jobs)))