Compare commits

..

No commits in common. "78d22d1accce74191bbfd68dce9e6ac11bc04fcc" and "7ba77010ae98e675340a7ea22b400f0dcc20ef65" have entirely different histories.

5 changed files with 454 additions and 447 deletions

View file

@ -75,8 +75,6 @@
0 (and prompt-tag 1))) 0 (and prompt-tag 1)))
(_ (_
(make-stack #t)))) (make-stack #t))))
(stack-len
(stack-length stack))
(error-string (error-string
(call-with-output-string (call-with-output-string
(lambda (port) (lambda (port)
@ -85,46 +83,30 @@
(filter knots-exception? (filter knots-exception?
(simple-exceptions exn))))) (simple-exceptions exn)))))
(let* ((stack-vec (let ((stack-vec
(stack->vector stack)) (stack->vector stack)))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector (print-frames (list->vector
(drop (drop
(vector->list stack-vec) (vector->list stack-vec)
(if (< stack-vec-length 5) 6))
0
4)))
port port
#:count (stack-length stack))) #:count (stack-length stack)))
(for-each (for-each
(lambda (stack) (lambda (stack)
(let* ((stack-vec (let ((stack-vec
(stack->vector stack)) (stack->vector stack)))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector (print-frames (list->vector
(drop (drop
(vector->list stack-vec) (vector->list stack-vec)
(if (< stack-vec-length 4) 3))
0
3)))
port port
#:count (stack-length stack)))) #:count (stack-length stack))))
knots-stacks) knots-stacks)
(print-exception (print-exception
port port
(if (null? knots-stacks) (if (null? knots-stacks)
(stack-ref stack (stack-ref stack 1)
(if (< stack-len 4) (stack-ref (last knots-stacks) 3))
stack-len
4))
(let* ((stack (last knots-stacks))
(stack-len (stack-length stack)))
(stack-ref stack
(if (< stack-len 3)
stack-len
3))))
'%exception '%exception
(list exn))))))) (list exn)))))))
(display error-string port))) (display error-string port)))

View file

@ -19,15 +19,10 @@
(define-module (knots promise) (define-module (knots promise)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic) #:use-module (ice-9 atomic)
#:use-module (ice-9 exceptions)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers conditions) #:use-module (fibers conditions)
#:use-module (knots) #:export (fibers-delay
#:export (fibers-promise?
fibers-delay
fibers-force fibers-force
fibers-promise-reset fibers-promise-reset
fibers-promise-result-available?)) fibers-promise-result-available?))
@ -46,61 +41,38 @@
(make-condition))) (make-condition)))
(define (fibers-force fp) (define (fibers-force fp)
(unless (fibers-promise? fp)
(raise-exception
(make-exception
(make-exception-with-message "fibers-force: not a fibers promise")
(make-exception-with-irritants fp))))
(let ((res (atomic-box-compare-and-swap! (let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp) (fibers-promise-values-box fp)
#f #f
'started))) 'started)))
(cond (if (eq? #f res)
((eq? #f res) (call-with-values
(call-with-values (lambda ()
(lambda () (with-exception-handler
(with-exception-handler (lambda (exn)
(lambda (exn) (atomic-box-set! (fibers-promise-values-box fp)
(atomic-box-set! (fibers-promise-values-box fp) exn)
exn) (signal-condition!
(signal-condition! (fibers-promise-evaluated-condition fp))
(fibers-promise-evaluated-condition fp)) (raise-exception exn))
(raise-exception exn)) (fibers-promise-thunk fp)
(lambda () #:unwind? #t))
(with-exception-handler (lambda vals
(lambda (exn) (atomic-box-set! (fibers-promise-values-box fp)
(let ((stack vals)
(match (fluid-ref %stacks) (signal-condition!
((stack-tag . prompt-tag) (fibers-promise-evaluated-condition fp))
(make-stack #t (apply values vals)))
0 prompt-tag (if (eq? res 'started)
0 (and prompt-tag 1))) (begin
(_ (wait (fibers-promise-evaluated-condition fp))
(make-stack #t))))) (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(raise-exception (if (exception? result)
(make-exception (raise-exception result)
exn (apply values result))))
(make-knots-exception stack))))) (if (exception? res)
(fibers-promise-thunk fp))) (raise-exception res)
#:unwind? #t)) (apply values res))))))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals))))
((eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result)))))
(else
(if (exception? res)
(raise-exception res)
(apply values res))))))
(define (fibers-promise-reset fp) (define (fibers-promise-reset fp)
(atomic-box-set! (fibers-promise-values-box fp) (atomic-box-set! (fibers-promise-values-box fp)

View file

@ -27,38 +27,29 @@
#:use-module (rnrs bytevectors) #:use-module (rnrs bytevectors)
#:use-module (ice-9 q) #:use-module (ice-9 q)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads) #:use-module (ice-9 threads)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers timers) #:use-module (fibers timers)
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots) #:use-module (knots)
#:use-module (knots resource-pool)
#:export (set-thread-name #:export (set-thread-name
thread-name thread-name
&thread-pool-timeout-error thread-pool?
thread-pool-timeout-error-pool thread-pool-channel
thread-pool-timeout-error? thread-pool-arguments-parameter
thread-pool-proc-vector
make-thread-pool make-thread-pool
thread-pool? call-with-thread
thread-pool-resource-pool
make-fixed-size-thread-pool &thread-pool-timeout
fixed-size-thread-pool? thread-pool-timeout-error?
fixed-size-thread-pool-channel
fixed-size-thread-pool-current-procedures
;; These procedures work for thread pools and fixed size %thread-pool-default-timeout
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
destroy-thread-pool create-work-queue))
call-with-thread))
(define* (syscall->procedure return-type name argument-types (define* (syscall->procedure return-type name argument-types
#:key library) #:key library)
@ -156,64 +147,28 @@ from there, or #f if that would be an empty string."
(const ""))) (const "")))
(define-record-type <thread-pool> (define-record-type <thread-pool>
(thread-pool resource-pool arguments-parameter) (thread-pool channel arguments-parameter proc-vector)
thread-pool? thread-pool?
(resource-pool thread-pool-resource-pool) (channel thread-pool-channel)
(arguments-parameter thread-pool-arguments-parameter-accessor)) (arguments-parameter thread-pool-arguments-parameter)
(proc-vector thread-pool-proc-vector)
(define-record-type <fixed-size-thread-pool> (default-checkout-timeout
(fixed-size-thread-pool channel arguments-parameter current-procedures thread-pool-default-checkout-timeout))
default-checkout-timeout)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'(pool)))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(define thread-pool-timeout-error-pool
(exception-accessor
&thread-pool-timeout
(record-accessor &thread-pool-timeout 'pool)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(define* (make-fixed-size-thread-pool size
#:key
thread-initializer
thread-destructor
delay-logger
duration-logger
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
(define channel
(make-channel))
(define* (make-thread-pool size
#:key
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param (define param
(make-parameter #f)) (make-parameter #f))
@ -269,256 +224,382 @@ from there, or #f if that would be an empty string."
(sleep 1) (sleep 1)
(destructor/safe args))))) (destructor/safe args)))))
(define (process channel args) (define (process thread-index channel args)
(let loop () (let loop ((current-lifetime thread-lifetime))
(match (get-message channel) (let ((exception?
('destroy #f) (match (get-message channel)
((reply sent-time proc) (((? channel? reply) sent-time (? procedure? proc))
(when delay-logger (let ((time-delay
(let ((time-delay (- (get-internal-real-time)
(- (get-internal-real-time) sent-time)))
sent-time))) (delay-logger (/ time-delay
(delay-logger (/ time-delay internal-time-units-per-second)
internal-time-units-per-second) proc)
proc)))
(let* ((start-time (get-internal-real-time)) (let* ((start-time (get-internal-real-time))
(response (response
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(list 'thread-pool-error (list 'thread-pool-error
(/ (- (get-internal-real-time) (/ (- (get-internal-real-time)
start-time) start-time)
internal-time-units-per-second) internal-time-units-per-second)
exn)) exn))
(lambda () (lambda ()
(with-exception-handler (vector-set! thread-proc-vector
(lambda (exn) thread-index
(let ((stack proc)
(match (fluid-ref %stacks) (with-exception-handler
((stack-tag . prompt-tag) (lambda (exn)
(make-stack #t (let ((stack
0 prompt-tag (match (fluid-ref %stacks)
0 (and prompt-tag 1))) ((stack-tag . prompt-tag)
(_ (make-stack #t
(make-stack #t))))) 0 prompt-tag
(raise-exception 0 (and prompt-tag 1)))
(make-exception (_
exn (make-stack #t)))))
(make-knots-exception stack))))) (raise-exception
(lambda () (make-exception
(call-with-values exn
(lambda () (make-knots-exception stack)))))
(start-stack (lambda ()
#t (call-with-values
(apply proc args))) (lambda ()
(lambda vals (start-stack
(cons (/ (- (get-internal-real-time) #t
start-time) (apply proc args)))
internal-time-units-per-second) (lambda vals
vals)))))) (cons (/ (- (get-internal-real-time)
#:unwind? #t))) start-time)
internal-time-units-per-second)
vals))))))
#:unwind? #t)))
(put-message reply
response)
(put-message reply (vector-set! thread-proc-vector
response) thread-index
#f)
(let ((exception? (match response
(match response (('thread-pool-error duration _)
(('thread-pool-error duration _) (when duration-logger
(when duration-logger (duration-logger duration proc))
(duration-logger duration proc)) #t)
#t) ((duration . _)
((duration . _) (when duration-logger
(when duration-logger (duration-logger duration proc))
(duration-logger duration proc)) #f))))))))
#f)))) (unless (and expire-on-exception?
(if (and exception? exception?)
expire-on-exception?) (if (number? current-lifetime)
#t (unless (< current-lifetime 0)
(loop)))))))) (loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(define (start-thread index channel) (define (start-threads channel)
(call-with-new-thread (for-each
(lambda () (lambda (thread-index)
(catch 'system-error (call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
(const #t))
(let init ((args (if thread-initializer
(initializer/safe)
'())))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process thread-index channel args)))
#:unwind? #t)
(when thread-destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota size)))
(let ((channel (make-channel)))
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda () (lambda ()
(set-thread-name (start-threads channel)))
(string-append (start-threads channel))
name " w t " (number->string index))))
(const #t))
(let init ((args (if thread-initializer (thread-pool channel
(initializer/safe) param
'()))) thread-proc-vector)))
(let ((continue?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"knots: thread-pool: internal exception: ~A\n" exn))
(lambda ()
(parameterize ((param args))
(process channel args)))
#:unwind? #t)))
(when thread-destructor (define &thread-pool-timeout
(destructor/safe args)) (make-exception-type '&thread-pool-timeout
&error
'()))
(when continue? (define make-thread-pool-timeout-error
(init (if thread-initializer (record-constructor &thread-pool-timeout))
(initializer/safe)
'()))))))))
(for-each (define thread-pool-timeout-error?
(lambda (i) (record-predicate &thread-pool-timeout))
(if use-default-io-waiters?
(call-with-default-io-waiters
(lambda ()
(start-thread i channel)))
(start-thread i channel)))
(iota size))
(fixed-size-thread-pool channel (define* (call-with-thread record proc #:key duration-logger
param (timeout (thread-pool-default-checkout-timeout
thread-proc-vector record))
default-checkout-timeout)) (channel (thread-pool-channel record)))
(define* (make-thread-pool max-size
#:key
(min-size max-size)
scheduler
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
(let ((resource-pool
(make-resource-pool
(lambda ()
(make-fixed-size-thread-pool
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
max-size
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout)))
(thread-pool resource-pool
param)))
(define* (call-with-thread thread-pool
proc
#:key
duration-logger
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Send PROC to the thread pool through CHANNEL. Return the result of PROC. "Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately." If already in the thread pool, call PROC immediately."
(define (handle-proc fixed-size-thread-pool (let ((args ((thread-pool-arguments-parameter record))))
reply-channel
start-time
timeout)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
fixed-size-thread-pool)))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation request-channel
(list reply-channel
start-time
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-thread-pool-timeout-error)))
(let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result))))))
(let ((args ((thread-pool-arguments-parameter thread-pool))))
(if args (if args
(apply proc args) (apply proc args)
(let ((start-time (get-internal-real-time)) (let* ((reply (make-channel))
(reply-channel (make-channel))) (operation-success?
(if (fixed-size-thread-pool? thread-pool) (perform-operation
(handle-proc thread-pool (let ((put
reply-channel (wrap-operation
start-time (put-operation channel
checkout-timeout) (list reply
(with-exception-handler (get-internal-real-time)
(lambda (exn) proc))
(if (and (resource-pool-timeout-error? exn) (const #t))))
(eq? (resource-pool-timeout-error-pool exn)
(thread-pool-resource-pool thread-pool)))
(raise-exception
(make-thread-pool-timeout-error thread-pool))
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool (thread-pool-resource-pool
thread-pool)
(lambda (fixed-size-thread-pool)
(if checkout-timeout
(let ((remaining-time
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(if (< remaining-time checkout-timeout)
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f)))
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(define (destroy-thread-pool pool) (if timeout
(if (fixed-size-thread-pool? pool) (choice-operation
(put-message put
(fixed-size-thread-pool-channel pool) (wrap-operation (sleep-operation timeout)
'destroy) (const #f)))
(destroy-resource-pool put)))))
(thread-pool-resource-pool pool))))
(unless operation-success?
(raise-exception
(make-thread-pool-timeout-error)))
(match (get-message reply)
(('thread-pool-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(define* (create-work-queue thread-count-parameter proc
#:key thread-start-delay
(thread-stop-delay
(make-time time-duration 0 0))
(name "unnamed")
priority<?)
(let ((queue (make-q))
(queue-mutex (make-mutex))
(job-available (make-condition-variable))
(running-job-args (make-hash-table)))
(define get-thread-count
(cond
((number? thread-count-parameter)
(const thread-count-parameter))
((eq? thread-count-parameter #f)
;; Run one thread per job
(lambda ()
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(else
thread-count-parameter)))
(define process-job
(if priority<?
(lambda* (args #:key priority)
(with-mutex queue-mutex
(enq! queue (cons priority args))
(set-car!
queue
(stable-sort! (car queue)
(lambda (a b)
(priority<?
(car a)
(car b)))))
(sync-q! queue)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))
(lambda args
(with-mutex queue-mutex
(enq! queue args)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
(hash-count (const #t) running-job-args)))
(define (count-jobs)
(with-mutex queue-mutex
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(define (list-jobs)
(with-mutex queue-mutex
(append (if priority<?
(map cdr (car queue))
(list-copy (car queue)))
(hash-fold (lambda (key val result)
(if val
(cons val result)
result))
'()
running-job-args))))
(define (thread-process-job job-args)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"~A work queue, job raised exception ~A\n"
name job-args)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply proc job-args))))
#:unwind? #t))
(define (start-thread thread-index)
(define (too-many-threads?)
(let ((running-jobs-count
(hash-count (lambda (index val)
(list? val))
running-job-args))
(desired-thread-count (get-thread-count)))
(>= running-jobs-count
desired-thread-count)))
(define (thread-idle-for-too-long? last-job-finished-at)
(time>=?
(time-difference (current-time time-monotonic)
last-job-finished-at)
thread-stop-delay))
(define (stop-thread)
(hash-remove! running-job-args
thread-index)
(unlock-mutex queue-mutex))
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t "
(number->string thread-index))))
(const #t))
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
(if (too-many-threads?)
(stop-thread)
(let ((job-args
(if (q-empty? queue)
;; #f from wait-condition-variable indicates a timeout
(if (wait-condition-variable
job-available
queue-mutex
(+ 9 (time-second (current-time))))
;; Another thread could have taken
;; the job in the mean time
(if (q-empty? queue)
#f
(if priority<?
(cdr (deq! queue))
(deq! queue)))
#f)
(if priority<?
(cdr (deq! queue))
(deq! queue)))))
(if job-args
(begin
(hash-set! running-job-args
thread-index
job-args)
(unlock-mutex queue-mutex)
(thread-process-job job-args)
(with-mutex queue-mutex
(hash-set! running-job-args
thread-index
#f))
(loop (current-time time-monotonic)))
(if (thread-idle-for-too-long? last-job-finished-at)
(stop-thread)
(begin
(unlock-mutex queue-mutex)
(loop last-job-finished-at))))))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
(let* ((thread-count
(hash-count (const #t) running-job-args))
(threads-to-start
(- desired-count thread-count)))
(when (> threads-to-start 0)
(for-each
(lambda (thread-index)
(when (eq? (hash-ref running-job-args
thread-index
'slot-free)
'slot-free)
(let* ((now (current-time time-monotonic))
(elapsed (time-difference now
previous-thread-started-at)))
(when (or (eq? #f thread-start-delay)
(time>=? elapsed thread-start-delay))
(set! previous-thread-started-at now)
(hash-set! running-job-args
thread-index
#f)
(start-thread thread-index)))))
(iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t")))
(const #t))
(while #t
(sleep 15)
(with-mutex queue-mutex
(let ((idle-threads (hash-count (lambda (index val)
(eq? #f val))
running-job-args)))
(when (= 0 idle-threads)
(start-new-threads-if-necessary (get-thread-count))))))))
(start-new-threads-if-necessary (get-thread-count)))
(values process-job count-jobs count-threads list-jobs)))

View file

@ -45,8 +45,6 @@
&request-body-ended-prematurely &request-body-ended-prematurely
request-body-ended-prematurely-error? request-body-ended-prematurely-error?
sanitize-response
request-body-port/knots request-body-port/knots
read-request-body/knots read-request-body/knots

View file

@ -6,47 +6,10 @@
(knots thread-pool)) (knots thread-pool))
(let ((thread-pool (let ((thread-pool
(make-fixed-size-thread-pool 2))) (make-thread-pool 2)))
(assert-equal
(call-with-thread
thread-pool
(lambda ()
4))
4))
(let ((thread-pool
(make-fixed-size-thread-pool
2
#:thread-initializer (const '(2)))))
(assert-equal
(call-with-thread
thread-pool
(lambda (num)
(* 2 num)))
4))
(let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
#t
(with-exception-handler
(lambda (exn)
(knots-exception? exn))
(lambda ()
(call-with-thread
thread-pool
(lambda ()
(+ 1 'a))))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
(call-with-thread (call-with-thread
thread-pool thread-pool
@ -54,13 +17,13 @@
4)) 4))
4)))) 4))))
(run-fibers-for-tests (let ((thread-pool
(lambda () (make-thread-pool
(let ((thread-pool 2
(make-thread-pool #:thread-initializer (const '(2)))))
2
#:thread-initializer (const '(2)))))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
(call-with-thread (call-with-thread
thread-pool thread-pool
@ -68,11 +31,22 @@
(* 2 num))) (* 2 num)))
4)))) 4))))
(run-fibers-for-tests (let ((process-job
(lambda () count-jobs
(let ((thread-pool count-threads
(make-thread-pool 2))) list-jobs
(create-work-queue
2
(lambda (i)
(* i 2)))))
(process-job 3))
(let ((thread-pool
(make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
#t #t
(with-exception-handler (with-exception-handler