Compare commits
5 commits
7ba77010ae
...
78d22d1acc
Author | SHA1 | Date | |
---|---|---|---|
78d22d1acc | |||
123c920122 | |||
8e582a2d73 | |||
cbafdb8668 | |||
016f37f108 |
5 changed files with 449 additions and 456 deletions
34
knots.scm
34
knots.scm
|
@ -75,6 +75,8 @@
|
||||||
0 (and prompt-tag 1)))
|
0 (and prompt-tag 1)))
|
||||||
(_
|
(_
|
||||||
(make-stack #t))))
|
(make-stack #t))))
|
||||||
|
(stack-len
|
||||||
|
(stack-length stack))
|
||||||
(error-string
|
(error-string
|
||||||
(call-with-output-string
|
(call-with-output-string
|
||||||
(lambda (port)
|
(lambda (port)
|
||||||
|
@ -83,30 +85,46 @@
|
||||||
(filter knots-exception?
|
(filter knots-exception?
|
||||||
(simple-exceptions exn)))))
|
(simple-exceptions exn)))))
|
||||||
|
|
||||||
(let ((stack-vec
|
(let* ((stack-vec
|
||||||
(stack->vector stack)))
|
(stack->vector stack))
|
||||||
|
(stack-vec-length
|
||||||
|
(vector-length stack-vec)))
|
||||||
(print-frames (list->vector
|
(print-frames (list->vector
|
||||||
(drop
|
(drop
|
||||||
(vector->list stack-vec)
|
(vector->list stack-vec)
|
||||||
6))
|
(if (< stack-vec-length 5)
|
||||||
|
0
|
||||||
|
4)))
|
||||||
port
|
port
|
||||||
#:count (stack-length stack)))
|
#:count (stack-length stack)))
|
||||||
(for-each
|
(for-each
|
||||||
(lambda (stack)
|
(lambda (stack)
|
||||||
(let ((stack-vec
|
(let* ((stack-vec
|
||||||
(stack->vector stack)))
|
(stack->vector stack))
|
||||||
|
(stack-vec-length
|
||||||
|
(vector-length stack-vec)))
|
||||||
(print-frames (list->vector
|
(print-frames (list->vector
|
||||||
(drop
|
(drop
|
||||||
(vector->list stack-vec)
|
(vector->list stack-vec)
|
||||||
3))
|
(if (< stack-vec-length 4)
|
||||||
|
0
|
||||||
|
3)))
|
||||||
port
|
port
|
||||||
#:count (stack-length stack))))
|
#:count (stack-length stack))))
|
||||||
knots-stacks)
|
knots-stacks)
|
||||||
(print-exception
|
(print-exception
|
||||||
port
|
port
|
||||||
(if (null? knots-stacks)
|
(if (null? knots-stacks)
|
||||||
(stack-ref stack 1)
|
(stack-ref stack
|
||||||
(stack-ref (last knots-stacks) 3))
|
(if (< stack-len 4)
|
||||||
|
stack-len
|
||||||
|
4))
|
||||||
|
(let* ((stack (last knots-stacks))
|
||||||
|
(stack-len (stack-length stack)))
|
||||||
|
(stack-ref stack
|
||||||
|
(if (< stack-len 3)
|
||||||
|
stack-len
|
||||||
|
3))))
|
||||||
'%exception
|
'%exception
|
||||||
(list exn)))))))
|
(list exn)))))))
|
||||||
(display error-string port)))
|
(display error-string port)))
|
||||||
|
|
|
@ -19,10 +19,15 @@
|
||||||
|
|
||||||
(define-module (knots promise)
|
(define-module (knots promise)
|
||||||
#:use-module (srfi srfi-9)
|
#:use-module (srfi srfi-9)
|
||||||
|
#:use-module (ice-9 match)
|
||||||
#:use-module (ice-9 atomic)
|
#:use-module (ice-9 atomic)
|
||||||
|
#:use-module (ice-9 exceptions)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
#:use-module (fibers conditions)
|
#:use-module (fibers conditions)
|
||||||
#:export (fibers-delay
|
#:use-module (knots)
|
||||||
|
#:export (fibers-promise?
|
||||||
|
|
||||||
|
fibers-delay
|
||||||
fibers-force
|
fibers-force
|
||||||
fibers-promise-reset
|
fibers-promise-reset
|
||||||
fibers-promise-result-available?))
|
fibers-promise-result-available?))
|
||||||
|
@ -41,38 +46,61 @@
|
||||||
(make-condition)))
|
(make-condition)))
|
||||||
|
|
||||||
(define (fibers-force fp)
|
(define (fibers-force fp)
|
||||||
|
(unless (fibers-promise? fp)
|
||||||
|
(raise-exception
|
||||||
|
(make-exception
|
||||||
|
(make-exception-with-message "fibers-force: not a fibers promise")
|
||||||
|
(make-exception-with-irritants fp))))
|
||||||
|
|
||||||
(let ((res (atomic-box-compare-and-swap!
|
(let ((res (atomic-box-compare-and-swap!
|
||||||
(fibers-promise-values-box fp)
|
(fibers-promise-values-box fp)
|
||||||
#f
|
#f
|
||||||
'started)))
|
'started)))
|
||||||
(if (eq? #f res)
|
(cond
|
||||||
(call-with-values
|
((eq? #f res)
|
||||||
(lambda ()
|
(call-with-values
|
||||||
(with-exception-handler
|
(lambda ()
|
||||||
(lambda (exn)
|
(with-exception-handler
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(lambda (exn)
|
||||||
exn)
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
(signal-condition!
|
exn)
|
||||||
(fibers-promise-evaluated-condition fp))
|
(signal-condition!
|
||||||
(raise-exception exn))
|
(fibers-promise-evaluated-condition fp))
|
||||||
(fibers-promise-thunk fp)
|
(raise-exception exn))
|
||||||
#:unwind? #t))
|
(lambda ()
|
||||||
(lambda vals
|
(with-exception-handler
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(lambda (exn)
|
||||||
vals)
|
(let ((stack
|
||||||
(signal-condition!
|
(match (fluid-ref %stacks)
|
||||||
(fibers-promise-evaluated-condition fp))
|
((stack-tag . prompt-tag)
|
||||||
(apply values vals)))
|
(make-stack #t
|
||||||
(if (eq? res 'started)
|
0 prompt-tag
|
||||||
(begin
|
0 (and prompt-tag 1)))
|
||||||
(wait (fibers-promise-evaluated-condition fp))
|
(_
|
||||||
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
|
(make-stack #t)))))
|
||||||
(if (exception? result)
|
(raise-exception
|
||||||
(raise-exception result)
|
(make-exception
|
||||||
(apply values result))))
|
exn
|
||||||
(if (exception? res)
|
(make-knots-exception stack)))))
|
||||||
(raise-exception res)
|
(fibers-promise-thunk fp)))
|
||||||
(apply values res))))))
|
#:unwind? #t))
|
||||||
|
(lambda vals
|
||||||
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
|
vals)
|
||||||
|
(signal-condition!
|
||||||
|
(fibers-promise-evaluated-condition fp))
|
||||||
|
(apply values vals))))
|
||||||
|
((eq? res 'started)
|
||||||
|
(begin
|
||||||
|
(wait (fibers-promise-evaluated-condition fp))
|
||||||
|
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
|
||||||
|
(if (exception? result)
|
||||||
|
(raise-exception result)
|
||||||
|
(apply values result)))))
|
||||||
|
(else
|
||||||
|
(if (exception? res)
|
||||||
|
(raise-exception res)
|
||||||
|
(apply values res))))))
|
||||||
|
|
||||||
(define (fibers-promise-reset fp)
|
(define (fibers-promise-reset fp)
|
||||||
(atomic-box-set! (fibers-promise-values-box fp)
|
(atomic-box-set! (fibers-promise-values-box fp)
|
||||||
|
|
|
@ -27,29 +27,38 @@
|
||||||
#:use-module (rnrs bytevectors)
|
#:use-module (rnrs bytevectors)
|
||||||
#:use-module (ice-9 q)
|
#:use-module (ice-9 q)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
|
#:use-module (ice-9 atomic)
|
||||||
#:use-module (ice-9 threads)
|
#:use-module (ice-9 threads)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
#:use-module (fibers timers)
|
#:use-module (fibers timers)
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
#:use-module (fibers operations)
|
#:use-module (fibers operations)
|
||||||
#:use-module (knots)
|
#:use-module (knots)
|
||||||
|
#:use-module (knots resource-pool)
|
||||||
#:export (set-thread-name
|
#:export (set-thread-name
|
||||||
thread-name
|
thread-name
|
||||||
|
|
||||||
thread-pool?
|
&thread-pool-timeout-error
|
||||||
thread-pool-channel
|
thread-pool-timeout-error-pool
|
||||||
thread-pool-arguments-parameter
|
|
||||||
thread-pool-proc-vector
|
|
||||||
|
|
||||||
make-thread-pool
|
|
||||||
call-with-thread
|
|
||||||
|
|
||||||
&thread-pool-timeout
|
|
||||||
thread-pool-timeout-error?
|
thread-pool-timeout-error?
|
||||||
|
|
||||||
%thread-pool-default-timeout
|
make-thread-pool
|
||||||
|
thread-pool?
|
||||||
|
thread-pool-resource-pool
|
||||||
|
|
||||||
create-work-queue))
|
make-fixed-size-thread-pool
|
||||||
|
fixed-size-thread-pool?
|
||||||
|
fixed-size-thread-pool-channel
|
||||||
|
fixed-size-thread-pool-current-procedures
|
||||||
|
|
||||||
|
;; These procedures work for thread pools and fixed size
|
||||||
|
;; thread pools
|
||||||
|
thread-pool-arguments-parameter
|
||||||
|
thread-pool-default-checkout-timeout
|
||||||
|
|
||||||
|
destroy-thread-pool
|
||||||
|
|
||||||
|
call-with-thread))
|
||||||
|
|
||||||
(define* (syscall->procedure return-type name argument-types
|
(define* (syscall->procedure return-type name argument-types
|
||||||
#:key library)
|
#:key library)
|
||||||
|
@ -147,28 +156,64 @@ from there, or #f if that would be an empty string."
|
||||||
(const "")))
|
(const "")))
|
||||||
|
|
||||||
(define-record-type <thread-pool>
|
(define-record-type <thread-pool>
|
||||||
(thread-pool channel arguments-parameter proc-vector)
|
(thread-pool resource-pool arguments-parameter)
|
||||||
thread-pool?
|
thread-pool?
|
||||||
(channel thread-pool-channel)
|
(resource-pool thread-pool-resource-pool)
|
||||||
(arguments-parameter thread-pool-arguments-parameter)
|
(arguments-parameter thread-pool-arguments-parameter-accessor))
|
||||||
(proc-vector thread-pool-proc-vector)
|
|
||||||
(default-checkout-timeout
|
(define-record-type <fixed-size-thread-pool>
|
||||||
thread-pool-default-checkout-timeout))
|
(fixed-size-thread-pool channel arguments-parameter current-procedures
|
||||||
|
default-checkout-timeout)
|
||||||
|
fixed-size-thread-pool?
|
||||||
|
(channel fixed-size-thread-pool-channel)
|
||||||
|
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
|
||||||
|
(current-procedures fixed-size-thread-pool-current-procedures)
|
||||||
|
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
|
||||||
|
|
||||||
|
;; Since both thread pool records have this field, use a procedure
|
||||||
|
;; than handles the appropriate accessor
|
||||||
|
(define (thread-pool-arguments-parameter pool)
|
||||||
|
(if (fixed-size-thread-pool? pool)
|
||||||
|
(fixed-size-thread-pool-arguments-parameter pool)
|
||||||
|
(thread-pool-arguments-parameter-accessor pool)))
|
||||||
|
|
||||||
|
(define (thread-pool-default-checkout-timeout pool)
|
||||||
|
(if (fixed-size-thread-pool? pool)
|
||||||
|
(fixed-size-thread-pool-default-checkout-timeout pool)
|
||||||
|
(assq-ref (resource-pool-configuration
|
||||||
|
(thread-pool-resource-pool pool))
|
||||||
|
'default-checkout-timeout)))
|
||||||
|
|
||||||
|
(define &thread-pool-timeout
|
||||||
|
(make-exception-type '&thread-pool-timeout
|
||||||
|
&error
|
||||||
|
'(pool)))
|
||||||
|
|
||||||
|
(define make-thread-pool-timeout-error
|
||||||
|
(record-constructor &thread-pool-timeout))
|
||||||
|
|
||||||
|
(define thread-pool-timeout-error-pool
|
||||||
|
(exception-accessor
|
||||||
|
&thread-pool-timeout
|
||||||
|
(record-accessor &thread-pool-timeout 'pool)))
|
||||||
|
|
||||||
|
(define thread-pool-timeout-error?
|
||||||
|
(record-predicate &thread-pool-timeout))
|
||||||
|
|
||||||
|
(define* (make-fixed-size-thread-pool size
|
||||||
|
#:key
|
||||||
|
thread-initializer
|
||||||
|
thread-destructor
|
||||||
|
delay-logger
|
||||||
|
duration-logger
|
||||||
|
thread-lifetime
|
||||||
|
(expire-on-exception? #f)
|
||||||
|
(name "unnamed")
|
||||||
|
(use-default-io-waiters? #t)
|
||||||
|
default-checkout-timeout)
|
||||||
|
(define channel
|
||||||
|
(make-channel))
|
||||||
|
|
||||||
(define* (make-thread-pool size
|
|
||||||
#:key
|
|
||||||
thread-initializer
|
|
||||||
thread-destructor
|
|
||||||
(delay-logger (lambda _ #f))
|
|
||||||
(duration-logger (const #f))
|
|
||||||
thread-lifetime
|
|
||||||
(log-exception? (const #t))
|
|
||||||
(expire-on-exception? #f)
|
|
||||||
(name "unnamed")
|
|
||||||
(use-default-io-waiters? #t)
|
|
||||||
default-checkout-timeout)
|
|
||||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
|
||||||
arguments of the thread pool procedure."
|
|
||||||
(define param
|
(define param
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
|
||||||
|
@ -224,382 +269,256 @@ arguments of the thread pool procedure."
|
||||||
(sleep 1)
|
(sleep 1)
|
||||||
(destructor/safe args)))))
|
(destructor/safe args)))))
|
||||||
|
|
||||||
(define (process thread-index channel args)
|
(define (process channel args)
|
||||||
(let loop ((current-lifetime thread-lifetime))
|
(let loop ()
|
||||||
(let ((exception?
|
(match (get-message channel)
|
||||||
(match (get-message channel)
|
('destroy #f)
|
||||||
(((? channel? reply) sent-time (? procedure? proc))
|
((reply sent-time proc)
|
||||||
(let ((time-delay
|
(when delay-logger
|
||||||
(- (get-internal-real-time)
|
(let ((time-delay
|
||||||
sent-time)))
|
(- (get-internal-real-time)
|
||||||
(delay-logger (/ time-delay
|
sent-time)))
|
||||||
internal-time-units-per-second)
|
(delay-logger (/ time-delay
|
||||||
proc)
|
internal-time-units-per-second)
|
||||||
|
proc)))
|
||||||
|
|
||||||
(let* ((start-time (get-internal-real-time))
|
(let* ((start-time (get-internal-real-time))
|
||||||
(response
|
(response
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(list 'thread-pool-error
|
(list 'thread-pool-error
|
||||||
(/ (- (get-internal-real-time)
|
(/ (- (get-internal-real-time)
|
||||||
start-time)
|
start-time)
|
||||||
internal-time-units-per-second)
|
internal-time-units-per-second)
|
||||||
exn))
|
exn))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(vector-set! thread-proc-vector
|
(with-exception-handler
|
||||||
thread-index
|
(lambda (exn)
|
||||||
proc)
|
(let ((stack
|
||||||
(with-exception-handler
|
(match (fluid-ref %stacks)
|
||||||
(lambda (exn)
|
((stack-tag . prompt-tag)
|
||||||
(let ((stack
|
(make-stack #t
|
||||||
(match (fluid-ref %stacks)
|
0 prompt-tag
|
||||||
((stack-tag . prompt-tag)
|
0 (and prompt-tag 1)))
|
||||||
(make-stack #t
|
(_
|
||||||
0 prompt-tag
|
(make-stack #t)))))
|
||||||
0 (and prompt-tag 1)))
|
(raise-exception
|
||||||
(_
|
(make-exception
|
||||||
(make-stack #t)))))
|
exn
|
||||||
(raise-exception
|
(make-knots-exception stack)))))
|
||||||
(make-exception
|
(lambda ()
|
||||||
exn
|
(call-with-values
|
||||||
(make-knots-exception stack)))))
|
(lambda ()
|
||||||
(lambda ()
|
(start-stack
|
||||||
(call-with-values
|
#t
|
||||||
(lambda ()
|
(apply proc args)))
|
||||||
(start-stack
|
(lambda vals
|
||||||
#t
|
(cons (/ (- (get-internal-real-time)
|
||||||
(apply proc args)))
|
start-time)
|
||||||
(lambda vals
|
internal-time-units-per-second)
|
||||||
(cons (/ (- (get-internal-real-time)
|
vals))))))
|
||||||
start-time)
|
#:unwind? #t)))
|
||||||
internal-time-units-per-second)
|
|
||||||
vals))))))
|
|
||||||
#:unwind? #t)))
|
|
||||||
(put-message reply
|
|
||||||
response)
|
|
||||||
|
|
||||||
(vector-set! thread-proc-vector
|
(put-message reply
|
||||||
thread-index
|
response)
|
||||||
#f)
|
|
||||||
|
|
||||||
(match response
|
(let ((exception?
|
||||||
(('thread-pool-error duration _)
|
(match response
|
||||||
(when duration-logger
|
(('thread-pool-error duration _)
|
||||||
(duration-logger duration proc))
|
(when duration-logger
|
||||||
#t)
|
(duration-logger duration proc))
|
||||||
((duration . _)
|
#t)
|
||||||
(when duration-logger
|
((duration . _)
|
||||||
(duration-logger duration proc))
|
(when duration-logger
|
||||||
#f))))))))
|
(duration-logger duration proc))
|
||||||
(unless (and expire-on-exception?
|
#f))))
|
||||||
exception?)
|
(if (and exception?
|
||||||
(if (number? current-lifetime)
|
expire-on-exception?)
|
||||||
(unless (< current-lifetime 0)
|
#t
|
||||||
(loop (if current-lifetime
|
(loop))))))))
|
||||||
(- current-lifetime 1)
|
|
||||||
#f)))
|
|
||||||
(loop #f))))))
|
|
||||||
|
|
||||||
(define (start-threads channel)
|
(define (start-thread index channel)
|
||||||
(for-each
|
(call-with-new-thread
|
||||||
(lambda (thread-index)
|
(lambda ()
|
||||||
(call-with-new-thread
|
(catch 'system-error
|
||||||
(lambda ()
|
|
||||||
(catch 'system-error
|
|
||||||
(lambda ()
|
|
||||||
(set-thread-name
|
|
||||||
(string-append
|
|
||||||
name " w t "
|
|
||||||
(number->string thread-index))))
|
|
||||||
(const #t))
|
|
||||||
|
|
||||||
(let init ((args (if thread-initializer
|
|
||||||
(initializer/safe)
|
|
||||||
'())))
|
|
||||||
(with-exception-handler
|
|
||||||
(lambda (exn)
|
|
||||||
(simple-format
|
|
||||||
(current-error-port)
|
|
||||||
"knots: thread-pool: internal exception: ~A\n" exn))
|
|
||||||
(lambda ()
|
|
||||||
(parameterize ((param args))
|
|
||||||
(process thread-index channel args)))
|
|
||||||
#:unwind? #t)
|
|
||||||
|
|
||||||
(when thread-destructor
|
|
||||||
(destructor/safe args))
|
|
||||||
|
|
||||||
(init (initializer/safe))))))
|
|
||||||
(iota size)))
|
|
||||||
|
|
||||||
(let ((channel (make-channel)))
|
|
||||||
(if use-default-io-waiters?
|
|
||||||
(call-with-default-io-waiters
|
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-threads channel)))
|
(set-thread-name
|
||||||
(start-threads channel))
|
(string-append
|
||||||
|
name " w t " (number->string index))))
|
||||||
|
(const #t))
|
||||||
|
|
||||||
(thread-pool channel
|
(let init ((args (if thread-initializer
|
||||||
param
|
(initializer/safe)
|
||||||
thread-proc-vector)))
|
'())))
|
||||||
|
(let ((continue?
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"knots: thread-pool: internal exception: ~A\n" exn))
|
||||||
|
(lambda ()
|
||||||
|
(parameterize ((param args))
|
||||||
|
(process channel args)))
|
||||||
|
#:unwind? #t)))
|
||||||
|
|
||||||
(define &thread-pool-timeout
|
(when thread-destructor
|
||||||
(make-exception-type '&thread-pool-timeout
|
(destructor/safe args))
|
||||||
&error
|
|
||||||
'()))
|
|
||||||
|
|
||||||
(define make-thread-pool-timeout-error
|
(when continue?
|
||||||
(record-constructor &thread-pool-timeout))
|
(init (if thread-initializer
|
||||||
|
(initializer/safe)
|
||||||
|
'()))))))))
|
||||||
|
|
||||||
(define thread-pool-timeout-error?
|
(for-each
|
||||||
(record-predicate &thread-pool-timeout))
|
(lambda (i)
|
||||||
|
(if use-default-io-waiters?
|
||||||
|
(call-with-default-io-waiters
|
||||||
|
(lambda ()
|
||||||
|
(start-thread i channel)))
|
||||||
|
(start-thread i channel)))
|
||||||
|
(iota size))
|
||||||
|
|
||||||
(define* (call-with-thread record proc #:key duration-logger
|
(fixed-size-thread-pool channel
|
||||||
(timeout (thread-pool-default-checkout-timeout
|
param
|
||||||
record))
|
thread-proc-vector
|
||||||
(channel (thread-pool-channel record)))
|
default-checkout-timeout))
|
||||||
|
|
||||||
|
(define* (make-thread-pool max-size
|
||||||
|
#:key
|
||||||
|
(min-size max-size)
|
||||||
|
scheduler
|
||||||
|
thread-initializer
|
||||||
|
thread-destructor
|
||||||
|
(delay-logger (lambda _ #f))
|
||||||
|
(duration-logger (const #f))
|
||||||
|
thread-lifetime
|
||||||
|
(expire-on-exception? #f)
|
||||||
|
(name "unnamed")
|
||||||
|
(use-default-io-waiters? #t)
|
||||||
|
default-checkout-timeout)
|
||||||
|
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||||
|
arguments of the thread pool procedure."
|
||||||
|
(define param
|
||||||
|
(make-parameter #f))
|
||||||
|
|
||||||
|
(let ((resource-pool
|
||||||
|
(make-resource-pool
|
||||||
|
(lambda ()
|
||||||
|
(make-fixed-size-thread-pool
|
||||||
|
1
|
||||||
|
#:thread-initializer thread-initializer
|
||||||
|
#:thread-destructor thread-destructor
|
||||||
|
#:thread-lifetime thread-lifetime
|
||||||
|
#:expire-on-exception? expire-on-exception?
|
||||||
|
#:name name
|
||||||
|
#:use-default-io-waiters? use-default-io-waiters?))
|
||||||
|
max-size
|
||||||
|
#:destructor destroy-thread-pool
|
||||||
|
#:min-size min-size
|
||||||
|
#:delay-logger delay-logger
|
||||||
|
#:scheduler scheduler
|
||||||
|
#:duration-logger duration-logger
|
||||||
|
#:default-checkout-timeout default-checkout-timeout)))
|
||||||
|
|
||||||
|
(thread-pool resource-pool
|
||||||
|
param)))
|
||||||
|
|
||||||
|
(define* (call-with-thread thread-pool
|
||||||
|
proc
|
||||||
|
#:key
|
||||||
|
duration-logger
|
||||||
|
checkout-timeout
|
||||||
|
channel
|
||||||
|
destroy-thread-on-exception?
|
||||||
|
(max-waiters 'default))
|
||||||
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
|
||||||
If already in the thread pool, call PROC immediately."
|
If already in the thread pool, call PROC immediately."
|
||||||
(let ((args ((thread-pool-arguments-parameter record))))
|
(define (handle-proc fixed-size-thread-pool
|
||||||
|
reply-channel
|
||||||
|
start-time
|
||||||
|
timeout)
|
||||||
|
(let* ((request-channel
|
||||||
|
(or channel
|
||||||
|
(fixed-size-thread-pool-channel
|
||||||
|
fixed-size-thread-pool)))
|
||||||
|
(operation-success?
|
||||||
|
(perform-operation
|
||||||
|
(let ((put
|
||||||
|
(wrap-operation
|
||||||
|
(put-operation request-channel
|
||||||
|
(list reply-channel
|
||||||
|
start-time
|
||||||
|
proc))
|
||||||
|
(const #t))))
|
||||||
|
|
||||||
|
(if timeout
|
||||||
|
(choice-operation
|
||||||
|
put
|
||||||
|
(wrap-operation (sleep-operation timeout)
|
||||||
|
(const #f)))
|
||||||
|
put)))))
|
||||||
|
|
||||||
|
(unless operation-success?
|
||||||
|
(raise-exception
|
||||||
|
(make-thread-pool-timeout-error)))
|
||||||
|
|
||||||
|
(let ((reply (get-message reply-channel)))
|
||||||
|
(match reply
|
||||||
|
(('thread-pool-error duration exn)
|
||||||
|
(when duration-logger
|
||||||
|
(duration-logger duration))
|
||||||
|
(raise-exception exn))
|
||||||
|
((duration . result)
|
||||||
|
(when duration-logger
|
||||||
|
(duration-logger duration))
|
||||||
|
(apply values result))))))
|
||||||
|
|
||||||
|
(let ((args ((thread-pool-arguments-parameter thread-pool))))
|
||||||
(if args
|
(if args
|
||||||
(apply proc args)
|
(apply proc args)
|
||||||
(let* ((reply (make-channel))
|
(let ((start-time (get-internal-real-time))
|
||||||
(operation-success?
|
(reply-channel (make-channel)))
|
||||||
(perform-operation
|
(if (fixed-size-thread-pool? thread-pool)
|
||||||
(let ((put
|
(handle-proc thread-pool
|
||||||
(wrap-operation
|
reply-channel
|
||||||
(put-operation channel
|
start-time
|
||||||
(list reply
|
checkout-timeout)
|
||||||
(get-internal-real-time)
|
(with-exception-handler
|
||||||
proc))
|
(lambda (exn)
|
||||||
(const #t))))
|
(if (and (resource-pool-timeout-error? exn)
|
||||||
|
(eq? (resource-pool-timeout-error-pool exn)
|
||||||
|
(thread-pool-resource-pool thread-pool)))
|
||||||
|
(raise-exception
|
||||||
|
(make-thread-pool-timeout-error thread-pool))
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(call-with-resource-from-pool (thread-pool-resource-pool
|
||||||
|
thread-pool)
|
||||||
|
(lambda (fixed-size-thread-pool)
|
||||||
|
(if checkout-timeout
|
||||||
|
(let ((remaining-time
|
||||||
|
(/ (- (get-internal-real-time) start-time)
|
||||||
|
internal-time-units-per-second)))
|
||||||
|
(if (< remaining-time checkout-timeout)
|
||||||
|
(handle-proc fixed-size-thread-pool
|
||||||
|
reply-channel
|
||||||
|
start-time
|
||||||
|
remaining-time)
|
||||||
|
(raise-exception
|
||||||
|
(make-thread-pool-timeout-error thread-pool))))
|
||||||
|
(handle-proc fixed-size-thread-pool
|
||||||
|
reply-channel
|
||||||
|
start-time
|
||||||
|
#f)))
|
||||||
|
#:max-waiters max-waiters
|
||||||
|
#:timeout checkout-timeout
|
||||||
|
#:destroy-resource-on-exception?
|
||||||
|
destroy-thread-on-exception?))))))))
|
||||||
|
|
||||||
(if timeout
|
(define (destroy-thread-pool pool)
|
||||||
(choice-operation
|
(if (fixed-size-thread-pool? pool)
|
||||||
put
|
(put-message
|
||||||
(wrap-operation (sleep-operation timeout)
|
(fixed-size-thread-pool-channel pool)
|
||||||
(const #f)))
|
'destroy)
|
||||||
put)))))
|
(destroy-resource-pool
|
||||||
|
(thread-pool-resource-pool pool))))
|
||||||
(unless operation-success?
|
|
||||||
(raise-exception
|
|
||||||
(make-thread-pool-timeout-error)))
|
|
||||||
|
|
||||||
(match (get-message reply)
|
|
||||||
(('thread-pool-error duration exn)
|
|
||||||
(when duration-logger
|
|
||||||
(duration-logger duration))
|
|
||||||
(raise-exception exn))
|
|
||||||
((duration . result)
|
|
||||||
(when duration-logger
|
|
||||||
(duration-logger duration))
|
|
||||||
(apply values result)))))))
|
|
||||||
|
|
||||||
(define* (create-work-queue thread-count-parameter proc
|
|
||||||
#:key thread-start-delay
|
|
||||||
(thread-stop-delay
|
|
||||||
(make-time time-duration 0 0))
|
|
||||||
(name "unnamed")
|
|
||||||
priority<?)
|
|
||||||
(let ((queue (make-q))
|
|
||||||
(queue-mutex (make-mutex))
|
|
||||||
(job-available (make-condition-variable))
|
|
||||||
(running-job-args (make-hash-table)))
|
|
||||||
|
|
||||||
(define get-thread-count
|
|
||||||
(cond
|
|
||||||
((number? thread-count-parameter)
|
|
||||||
(const thread-count-parameter))
|
|
||||||
((eq? thread-count-parameter #f)
|
|
||||||
;; Run one thread per job
|
|
||||||
(lambda ()
|
|
||||||
(+ (q-length queue)
|
|
||||||
(hash-count (lambda (index val)
|
|
||||||
(list? val))
|
|
||||||
running-job-args))))
|
|
||||||
(else
|
|
||||||
thread-count-parameter)))
|
|
||||||
|
|
||||||
(define process-job
|
|
||||||
(if priority<?
|
|
||||||
(lambda* (args #:key priority)
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(enq! queue (cons priority args))
|
|
||||||
(set-car!
|
|
||||||
queue
|
|
||||||
(stable-sort! (car queue)
|
|
||||||
(lambda (a b)
|
|
||||||
(priority<?
|
|
||||||
(car a)
|
|
||||||
(car b)))))
|
|
||||||
(sync-q! queue)
|
|
||||||
(start-new-threads-if-necessary (get-thread-count))
|
|
||||||
(signal-condition-variable job-available)))
|
|
||||||
(lambda args
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(enq! queue args)
|
|
||||||
(start-new-threads-if-necessary (get-thread-count))
|
|
||||||
(signal-condition-variable job-available)))))
|
|
||||||
|
|
||||||
(define (count-threads)
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(hash-count (const #t) running-job-args)))
|
|
||||||
|
|
||||||
(define (count-jobs)
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(+ (q-length queue)
|
|
||||||
(hash-count (lambda (index val)
|
|
||||||
(list? val))
|
|
||||||
running-job-args))))
|
|
||||||
|
|
||||||
(define (list-jobs)
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(append (if priority<?
|
|
||||||
(map cdr (car queue))
|
|
||||||
(list-copy (car queue)))
|
|
||||||
(hash-fold (lambda (key val result)
|
|
||||||
(if val
|
|
||||||
(cons val result)
|
|
||||||
result))
|
|
||||||
'()
|
|
||||||
running-job-args))))
|
|
||||||
|
|
||||||
(define (thread-process-job job-args)
|
|
||||||
(with-exception-handler
|
|
||||||
(lambda _ #f)
|
|
||||||
(lambda ()
|
|
||||||
(with-exception-handler
|
|
||||||
(lambda (exn)
|
|
||||||
(simple-format (current-error-port)
|
|
||||||
"~A work queue, job raised exception ~A\n"
|
|
||||||
name job-args)
|
|
||||||
(print-backtrace-and-exception/knots exn)
|
|
||||||
(raise-exception exn))
|
|
||||||
(lambda ()
|
|
||||||
(apply proc job-args))))
|
|
||||||
#:unwind? #t))
|
|
||||||
|
|
||||||
(define (start-thread thread-index)
|
|
||||||
(define (too-many-threads?)
|
|
||||||
(let ((running-jobs-count
|
|
||||||
(hash-count (lambda (index val)
|
|
||||||
(list? val))
|
|
||||||
running-job-args))
|
|
||||||
(desired-thread-count (get-thread-count)))
|
|
||||||
|
|
||||||
(>= running-jobs-count
|
|
||||||
desired-thread-count)))
|
|
||||||
|
|
||||||
(define (thread-idle-for-too-long? last-job-finished-at)
|
|
||||||
(time>=?
|
|
||||||
(time-difference (current-time time-monotonic)
|
|
||||||
last-job-finished-at)
|
|
||||||
thread-stop-delay))
|
|
||||||
|
|
||||||
(define (stop-thread)
|
|
||||||
(hash-remove! running-job-args
|
|
||||||
thread-index)
|
|
||||||
(unlock-mutex queue-mutex))
|
|
||||||
|
|
||||||
(call-with-new-thread
|
|
||||||
(lambda ()
|
|
||||||
(catch 'system-error
|
|
||||||
(lambda ()
|
|
||||||
(set-thread-name
|
|
||||||
(string-append name " q t "
|
|
||||||
(number->string thread-index))))
|
|
||||||
(const #t))
|
|
||||||
|
|
||||||
(let loop ((last-job-finished-at (current-time time-monotonic)))
|
|
||||||
(lock-mutex queue-mutex)
|
|
||||||
|
|
||||||
(if (too-many-threads?)
|
|
||||||
(stop-thread)
|
|
||||||
(let ((job-args
|
|
||||||
(if (q-empty? queue)
|
|
||||||
;; #f from wait-condition-variable indicates a timeout
|
|
||||||
(if (wait-condition-variable
|
|
||||||
job-available
|
|
||||||
queue-mutex
|
|
||||||
(+ 9 (time-second (current-time))))
|
|
||||||
;; Another thread could have taken
|
|
||||||
;; the job in the mean time
|
|
||||||
(if (q-empty? queue)
|
|
||||||
#f
|
|
||||||
(if priority<?
|
|
||||||
(cdr (deq! queue))
|
|
||||||
(deq! queue)))
|
|
||||||
#f)
|
|
||||||
(if priority<?
|
|
||||||
(cdr (deq! queue))
|
|
||||||
(deq! queue)))))
|
|
||||||
|
|
||||||
(if job-args
|
|
||||||
(begin
|
|
||||||
(hash-set! running-job-args
|
|
||||||
thread-index
|
|
||||||
job-args)
|
|
||||||
|
|
||||||
(unlock-mutex queue-mutex)
|
|
||||||
(thread-process-job job-args)
|
|
||||||
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(hash-set! running-job-args
|
|
||||||
thread-index
|
|
||||||
#f))
|
|
||||||
|
|
||||||
(loop (current-time time-monotonic)))
|
|
||||||
(if (thread-idle-for-too-long? last-job-finished-at)
|
|
||||||
(stop-thread)
|
|
||||||
(begin
|
|
||||||
(unlock-mutex queue-mutex)
|
|
||||||
|
|
||||||
(loop last-job-finished-at))))))))))
|
|
||||||
|
|
||||||
|
|
||||||
(define start-new-threads-if-necessary
|
|
||||||
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
|
|
||||||
(lambda (desired-count)
|
|
||||||
(let* ((thread-count
|
|
||||||
(hash-count (const #t) running-job-args))
|
|
||||||
(threads-to-start
|
|
||||||
(- desired-count thread-count)))
|
|
||||||
(when (> threads-to-start 0)
|
|
||||||
(for-each
|
|
||||||
(lambda (thread-index)
|
|
||||||
(when (eq? (hash-ref running-job-args
|
|
||||||
thread-index
|
|
||||||
'slot-free)
|
|
||||||
'slot-free)
|
|
||||||
(let* ((now (current-time time-monotonic))
|
|
||||||
(elapsed (time-difference now
|
|
||||||
previous-thread-started-at)))
|
|
||||||
(when (or (eq? #f thread-start-delay)
|
|
||||||
(time>=? elapsed thread-start-delay))
|
|
||||||
(set! previous-thread-started-at now)
|
|
||||||
(hash-set! running-job-args
|
|
||||||
thread-index
|
|
||||||
#f)
|
|
||||||
(start-thread thread-index)))))
|
|
||||||
(iota desired-count)))))))
|
|
||||||
|
|
||||||
(if (procedure? thread-count-parameter)
|
|
||||||
(call-with-new-thread
|
|
||||||
(lambda ()
|
|
||||||
(catch 'system-error
|
|
||||||
(lambda ()
|
|
||||||
(set-thread-name
|
|
||||||
(string-append name " q t")))
|
|
||||||
(const #t))
|
|
||||||
|
|
||||||
(while #t
|
|
||||||
(sleep 15)
|
|
||||||
(with-mutex queue-mutex
|
|
||||||
(let ((idle-threads (hash-count (lambda (index val)
|
|
||||||
(eq? #f val))
|
|
||||||
running-job-args)))
|
|
||||||
(when (= 0 idle-threads)
|
|
||||||
(start-new-threads-if-necessary (get-thread-count))))))))
|
|
||||||
(start-new-threads-if-necessary (get-thread-count)))
|
|
||||||
|
|
||||||
(values process-job count-jobs count-threads list-jobs)))
|
|
||||||
|
|
|
@ -45,6 +45,8 @@
|
||||||
&request-body-ended-prematurely
|
&request-body-ended-prematurely
|
||||||
request-body-ended-prematurely-error?
|
request-body-ended-prematurely-error?
|
||||||
|
|
||||||
|
sanitize-response
|
||||||
|
|
||||||
request-body-port/knots
|
request-body-port/knots
|
||||||
read-request-body/knots
|
read-request-body/knots
|
||||||
|
|
||||||
|
|
|
@ -6,10 +6,47 @@
|
||||||
(knots thread-pool))
|
(knots thread-pool))
|
||||||
|
|
||||||
(let ((thread-pool
|
(let ((thread-pool
|
||||||
(make-thread-pool 2)))
|
(make-fixed-size-thread-pool 2)))
|
||||||
|
|
||||||
|
(assert-equal
|
||||||
|
(call-with-thread
|
||||||
|
thread-pool
|
||||||
|
(lambda ()
|
||||||
|
4))
|
||||||
|
4))
|
||||||
|
|
||||||
|
(let ((thread-pool
|
||||||
|
(make-fixed-size-thread-pool
|
||||||
|
2
|
||||||
|
#:thread-initializer (const '(2)))))
|
||||||
|
|
||||||
|
(assert-equal
|
||||||
|
(call-with-thread
|
||||||
|
thread-pool
|
||||||
|
(lambda (num)
|
||||||
|
(* 2 num)))
|
||||||
|
4))
|
||||||
|
|
||||||
|
(let ((thread-pool
|
||||||
|
(make-fixed-size-thread-pool 2)))
|
||||||
|
|
||||||
|
(assert-equal
|
||||||
|
#t
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(knots-exception? exn))
|
||||||
|
(lambda ()
|
||||||
|
(call-with-thread
|
||||||
|
thread-pool
|
||||||
|
(lambda ()
|
||||||
|
(+ 1 'a))))
|
||||||
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(let ((thread-pool
|
||||||
|
(make-thread-pool 2)))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
|
||||||
(lambda ()
|
|
||||||
(assert-equal
|
(assert-equal
|
||||||
(call-with-thread
|
(call-with-thread
|
||||||
thread-pool
|
thread-pool
|
||||||
|
@ -17,13 +54,13 @@
|
||||||
4))
|
4))
|
||||||
4))))
|
4))))
|
||||||
|
|
||||||
(let ((thread-pool
|
(run-fibers-for-tests
|
||||||
(make-thread-pool
|
(lambda ()
|
||||||
2
|
(let ((thread-pool
|
||||||
#:thread-initializer (const '(2)))))
|
(make-thread-pool
|
||||||
|
2
|
||||||
|
#:thread-initializer (const '(2)))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
|
||||||
(lambda ()
|
|
||||||
(assert-equal
|
(assert-equal
|
||||||
(call-with-thread
|
(call-with-thread
|
||||||
thread-pool
|
thread-pool
|
||||||
|
@ -31,22 +68,11 @@
|
||||||
(* 2 num)))
|
(* 2 num)))
|
||||||
4))))
|
4))))
|
||||||
|
|
||||||
(let ((process-job
|
(run-fibers-for-tests
|
||||||
count-jobs
|
(lambda ()
|
||||||
count-threads
|
(let ((thread-pool
|
||||||
list-jobs
|
(make-thread-pool 2)))
|
||||||
(create-work-queue
|
|
||||||
2
|
|
||||||
(lambda (i)
|
|
||||||
(* i 2)))))
|
|
||||||
|
|
||||||
(process-job 3))
|
|
||||||
|
|
||||||
(let ((thread-pool
|
|
||||||
(make-thread-pool 2)))
|
|
||||||
|
|
||||||
(run-fibers-for-tests
|
|
||||||
(lambda ()
|
|
||||||
(assert-equal
|
(assert-equal
|
||||||
#t
|
#t
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue