Compare commits

..

No commits in common. "78d22d1accce74191bbfd68dce9e6ac11bc04fcc" and "7ba77010ae98e675340a7ea22b400f0dcc20ef65" have entirely different histories.

5 changed files with 454 additions and 447 deletions

View file

@ -75,8 +75,6 @@
0 (and prompt-tag 1)))
(_
(make-stack #t))))
(stack-len
(stack-length stack))
(error-string
(call-with-output-string
(lambda (port)
@ -85,46 +83,30 @@
(filter knots-exception?
(simple-exceptions exn)))))
(let* ((stack-vec
(stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(let ((stack-vec
(stack->vector stack)))
(print-frames (list->vector
(drop
(vector->list stack-vec)
(if (< stack-vec-length 5)
0
4)))
6))
port
#:count (stack-length stack)))
(for-each
(lambda (stack)
(let* ((stack-vec
(stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(let ((stack-vec
(stack->vector stack)))
(print-frames (list->vector
(drop
(vector->list stack-vec)
(if (< stack-vec-length 4)
0
3)))
3))
port
#:count (stack-length stack))))
knots-stacks)
(print-exception
port
(if (null? knots-stacks)
(stack-ref stack
(if (< stack-len 4)
stack-len
4))
(let* ((stack (last knots-stacks))
(stack-len (stack-length stack)))
(stack-ref stack
(if (< stack-len 3)
stack-len
3))))
(stack-ref stack 1)
(stack-ref (last knots-stacks) 3))
'%exception
(list exn)))))))
(display error-string port)))

View file

@ -19,15 +19,10 @@
(define-module (knots promise)
#:use-module (srfi srfi-9)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers conditions)
#:use-module (knots)
#:export (fibers-promise?
fibers-delay
#:export (fibers-delay
fibers-force
fibers-promise-reset
fibers-promise-result-available?))
@ -46,61 +41,38 @@
(make-condition)))
(define (fibers-force fp)
(unless (fibers-promise? fp)
(raise-exception
(make-exception
(make-exception-with-message "fibers-force: not a fibers promise")
(make-exception-with-irritants fp))))
(let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp)
#f
'started)))
(cond
((eq? #f res)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(atomic-box-set! (fibers-promise-values-box fp)
exn)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(raise-exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(fibers-promise-thunk fp)))
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals))))
((eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result)))))
(else
(if (exception? res)
(raise-exception res)
(apply values res))))))
(if (eq? #f res)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(atomic-box-set! (fibers-promise-values-box fp)
exn)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(raise-exception exn))
(fibers-promise-thunk fp)
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals)))
(if (eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result))))
(if (exception? res)
(raise-exception res)
(apply values res))))))
(define (fibers-promise-reset fp)
(atomic-box-set! (fibers-promise-values-box fp)

View file

@ -27,38 +27,29 @@
#:use-module (rnrs bytevectors)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots resource-pool)
#:export (set-thread-name
thread-name
&thread-pool-timeout-error
thread-pool-timeout-error-pool
thread-pool-timeout-error?
thread-pool?
thread-pool-channel
thread-pool-arguments-parameter
thread-pool-proc-vector
make-thread-pool
thread-pool?
thread-pool-resource-pool
call-with-thread
make-fixed-size-thread-pool
fixed-size-thread-pool?
fixed-size-thread-pool-channel
fixed-size-thread-pool-current-procedures
&thread-pool-timeout
thread-pool-timeout-error?
;; These procedures work for thread pools and fixed size
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
%thread-pool-default-timeout
destroy-thread-pool
call-with-thread))
create-work-queue))
(define* (syscall->procedure return-type name argument-types
#:key library)
@ -156,64 +147,28 @@ from there, or #f if that would be an empty string."
(const "")))
(define-record-type <thread-pool>
(thread-pool resource-pool arguments-parameter)
(thread-pool channel arguments-parameter proc-vector)
thread-pool?
(resource-pool thread-pool-resource-pool)
(arguments-parameter thread-pool-arguments-parameter-accessor))
(define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'(pool)))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(define thread-pool-timeout-error-pool
(exception-accessor
&thread-pool-timeout
(record-accessor &thread-pool-timeout 'pool)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(define* (make-fixed-size-thread-pool size
#:key
thread-initializer
thread-destructor
delay-logger
duration-logger
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
(define channel
(make-channel))
(channel thread-pool-channel)
(arguments-parameter thread-pool-arguments-parameter)
(proc-vector thread-pool-proc-vector)
(default-checkout-timeout
thread-pool-default-checkout-timeout))
(define* (make-thread-pool size
#:key
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
@ -269,256 +224,382 @@ from there, or #f if that would be an empty string."
(sleep 1)
(destructor/safe args)))))
(define (process channel args)
(let loop ()
(match (get-message channel)
('destroy #f)
((reply sent-time proc)
(when delay-logger
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second)
proc)))
(define (process thread-index channel args)
(let loop ((current-lifetime thread-lifetime))
(let ((exception?
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second)
proc)
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'thread-pool-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack
#t
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))))
#:unwind? #t)))
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'thread-pool-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack
#t
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))))
#:unwind? #t)))
(put-message reply
response)
(put-message reply
response)
(vector-set! thread-proc-vector
thread-index
#f)
(let ((exception?
(match response
(('thread-pool-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))
(if (and exception?
expire-on-exception?)
#t
(loop))))))))
(match response
(('thread-pool-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))))))
(unless (and expire-on-exception?
exception?)
(if (number? current-lifetime)
(unless (< current-lifetime 0)
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(define (start-thread index channel)
(call-with-new-thread
(lambda ()
(catch 'system-error
(define (start-threads channel)
(for-each
(lambda (thread-index)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
(const #t))
(let init ((args (if thread-initializer
(initializer/safe)
'())))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process thread-index channel args)))
#:unwind? #t)
(when thread-destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota size)))
(let ((channel (make-channel)))
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(set-thread-name
(string-append
name " w t " (number->string index))))
(const #t))
(start-threads channel)))
(start-threads channel))
(let init ((args (if thread-initializer
(initializer/safe)
'())))
(let ((continue?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process channel args)))
#:unwind? #t)))
(thread-pool channel
param
thread-proc-vector)))
(when thread-destructor
(destructor/safe args))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'()))
(when continue?
(init (if thread-initializer
(initializer/safe)
'()))))))))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(for-each
(lambda (i)
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(fixed-size-thread-pool channel
param
thread-proc-vector
default-checkout-timeout))
(define* (make-thread-pool max-size
#:key
(min-size max-size)
scheduler
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
(let ((resource-pool
(make-resource-pool
(lambda ()
(make-fixed-size-thread-pool
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
max-size
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout)))
(thread-pool resource-pool
param)))
(define* (call-with-thread thread-pool
proc
#:key
duration-logger
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
(define* (call-with-thread record proc #:key duration-logger
(timeout (thread-pool-default-checkout-timeout
record))
(channel (thread-pool-channel record)))
"Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately."
(define (handle-proc fixed-size-thread-pool
reply-channel
start-time
timeout)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
fixed-size-thread-pool)))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation request-channel
(list reply-channel
start-time
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-thread-pool-timeout-error)))
(let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result))))))
(let ((args ((thread-pool-arguments-parameter thread-pool))))
(let ((args ((thread-pool-arguments-parameter record))))
(if args
(apply proc args)
(let ((start-time (get-internal-real-time))
(reply-channel (make-channel)))
(if (fixed-size-thread-pool? thread-pool)
(handle-proc thread-pool
reply-channel
start-time
checkout-timeout)
(with-exception-handler
(lambda (exn)
(if (and (resource-pool-timeout-error? exn)
(eq? (resource-pool-timeout-error-pool exn)
(thread-pool-resource-pool thread-pool)))
(raise-exception
(make-thread-pool-timeout-error thread-pool))
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool (thread-pool-resource-pool
thread-pool)
(lambda (fixed-size-thread-pool)
(if checkout-timeout
(let ((remaining-time
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(if (< remaining-time checkout-timeout)
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f)))
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(let* ((reply (make-channel))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation channel
(list reply
(get-internal-real-time)
proc))
(const #t))))
(define (destroy-thread-pool pool)
(if (fixed-size-thread-pool? pool)
(put-message
(fixed-size-thread-pool-channel pool)
'destroy)
(destroy-resource-pool
(thread-pool-resource-pool pool))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-thread-pool-timeout-error)))
(match (get-message reply)
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
(make-time time-duration 0 0))
(name "unnamed")
priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
(running-job-args (make-hash-table)))
(define get-thread-count
(cond
((number? thread-count-parameter)
(const thread-count-parameter))
((eq? thread-count-parameter #f)
;; Run one thread per job
(lambda ()
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(else
thread-count-parameter)))
(define process-job
(if priority<?
(lambda* (args #:key priority)
(with-mutex queue-mutex
(enq! queue (cons priority args))
(set-car!
queue
(stable-sort! (car queue)
(lambda (a b)
(priority<?
(car a)
(car b)))))
(sync-q! queue)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))
(lambda args
(with-mutex queue-mutex
(enq! queue args)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
(hash-count (const #t) running-job-args)))
(define (count-jobs)
(with-mutex queue-mutex
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(define (list-jobs)
(with-mutex queue-mutex
(append (if priority<?
(map cdr (car queue))
(list-copy (car queue)))
(hash-fold (lambda (key val result)
(if val
(cons val result)
result))
'()
running-job-args))))
(define (thread-process-job job-args)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"~A work queue, job raised exception ~A\n"
name job-args)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
(define (too-many-threads?)
(let ((running-jobs-count
(hash-count (lambda (index val)
(list? val))
running-job-args))
(desired-thread-count (get-thread-count)))
(>= running-jobs-count
desired-thread-count)))
(define (thread-idle-for-too-long? last-job-finished-at)
(time>=?
(time-difference (current-time time-monotonic)
last-job-finished-at)
thread-stop-delay))
(define (stop-thread)
(hash-remove! running-job-args
thread-index)
(unlock-mutex queue-mutex))
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t "
(number->string thread-index))))
(const #t))
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
(if (too-many-threads?)
(stop-thread)
(let ((job-args
(if (q-empty? queue)
;; #f from wait-condition-variable indicates a timeout
(if (wait-condition-variable
job-available
queue-mutex
(+ 9 (time-second (current-time))))
;; Another thread could have taken
;; the job in the mean time
(if (q-empty? queue)
#f
(if priority<?
(cdr (deq! queue))
(deq! queue)))
#f)
(if priority<?
(cdr (deq! queue))
(deq! queue)))))
(if job-args
(begin
(hash-set! running-job-args
thread-index
job-args)
(unlock-mutex queue-mutex)
(thread-process-job job-args)
(with-mutex queue-mutex
(hash-set! running-job-args
thread-index
#f))
(loop (current-time time-monotonic)))
(if (thread-idle-for-too-long? last-job-finished-at)
(stop-thread)
(begin
(unlock-mutex queue-mutex)
(loop last-job-finished-at))))))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
(let* ((thread-count
(hash-count (const #t) running-job-args))
(threads-to-start
(- desired-count thread-count)))
(when (> threads-to-start 0)
(for-each
(lambda (thread-index)
(when (eq? (hash-ref running-job-args
thread-index
'slot-free)
'slot-free)
(let* ((now (current-time time-monotonic))
(elapsed (time-difference now
previous-thread-started-at)))
(when (or (eq? #f thread-start-delay)
(time>=? elapsed thread-start-delay))
(set! previous-thread-started-at now)
(hash-set! running-job-args
thread-index
#f)
(start-thread thread-index)))))
(iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t")))
(const #t))
(while #t
(sleep 15)
(with-mutex queue-mutex
(let ((idle-threads (hash-count (lambda (index val)
(eq? #f val))
running-job-args)))
(when (= 0 idle-threads)
(start-new-threads-if-necessary (get-thread-count))))))))
(start-new-threads-if-necessary (get-thread-count)))
(values process-job count-jobs count-threads list-jobs)))

View file

@ -45,8 +45,6 @@
&request-body-ended-prematurely
request-body-ended-prematurely-error?
sanitize-response
request-body-port/knots
read-request-body/knots

View file

@ -6,47 +6,10 @@
(knots thread-pool))
(let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
(call-with-thread
thread-pool
(lambda ()
4))
4))
(let ((thread-pool
(make-fixed-size-thread-pool
2
#:thread-initializer (const '(2)))))
(assert-equal
(call-with-thread
thread-pool
(lambda (num)
(* 2 num)))
4))
(let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
#t
(with-exception-handler
(lambda (exn)
(knots-exception? exn))
(lambda ()
(call-with-thread
thread-pool
(lambda ()
(+ 1 'a))))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2)))
(make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal
(call-with-thread
thread-pool
@ -54,13 +17,13 @@
4))
4))))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool
2
#:thread-initializer (const '(2)))))
(let ((thread-pool
(make-thread-pool
2
#:thread-initializer (const '(2)))))
(run-fibers-for-tests
(lambda ()
(assert-equal
(call-with-thread
thread-pool
@ -68,11 +31,22 @@
(* 2 num)))
4))))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2)))
(let ((process-job
count-jobs
count-threads
list-jobs
(create-work-queue
2
(lambda (i)
(* i 2)))))
(process-job 3))
(let ((thread-pool
(make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal
#t
(with-exception-handler