Compare commits

..

5 commits

Author SHA1 Message Date
78d22d1acc Tweak display of stack traces
Tweak the frames to display and try to harden the code to crash less.
2025-06-09 12:19:56 +01:00
123c920122 Export sanitize-response
As this is useful.
2025-06-09 12:19:32 +01:00
8e582a2d73 Improve promise exception reporting
And guard against calling fibers-force not on a fibers promise record.
2025-05-26 14:50:45 +01:00
cbafdb8668 Respect use-default-io-waiters? for the fixed size thread pools 2025-05-25 15:34:07 +01:00
016f37f108 Rework thread pools
Allow the thread pool to vary in size by basing it on a resource pool
of fixed size thread pools, which are similar to the previous thread
pool implementation.

Fixed size thread pools don't require fibers, but thread pools now
do. Some procedures work with either thread pool implementation.
2025-05-19 09:06:08 +01:00
5 changed files with 449 additions and 456 deletions

View file

@ -75,6 +75,8 @@
0 (and prompt-tag 1))) 0 (and prompt-tag 1)))
(_ (_
(make-stack #t)))) (make-stack #t))))
(stack-len
(stack-length stack))
(error-string (error-string
(call-with-output-string (call-with-output-string
(lambda (port) (lambda (port)
@ -83,30 +85,46 @@
(filter knots-exception? (filter knots-exception?
(simple-exceptions exn))))) (simple-exceptions exn)))))
(let ((stack-vec (let* ((stack-vec
(stack->vector stack))) (stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector (print-frames (list->vector
(drop (drop
(vector->list stack-vec) (vector->list stack-vec)
6)) (if (< stack-vec-length 5)
0
4)))
port port
#:count (stack-length stack))) #:count (stack-length stack)))
(for-each (for-each
(lambda (stack) (lambda (stack)
(let ((stack-vec (let* ((stack-vec
(stack->vector stack))) (stack->vector stack))
(stack-vec-length
(vector-length stack-vec)))
(print-frames (list->vector (print-frames (list->vector
(drop (drop
(vector->list stack-vec) (vector->list stack-vec)
3)) (if (< stack-vec-length 4)
0
3)))
port port
#:count (stack-length stack)))) #:count (stack-length stack))))
knots-stacks) knots-stacks)
(print-exception (print-exception
port port
(if (null? knots-stacks) (if (null? knots-stacks)
(stack-ref stack 1) (stack-ref stack
(stack-ref (last knots-stacks) 3)) (if (< stack-len 4)
stack-len
4))
(let* ((stack (last knots-stacks))
(stack-len (stack-length stack)))
(stack-ref stack
(if (< stack-len 3)
stack-len
3))))
'%exception '%exception
(list exn))))))) (list exn)))))))
(display error-string port))) (display error-string port)))

View file

@ -19,10 +19,15 @@
(define-module (knots promise) (define-module (knots promise)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic) #:use-module (ice-9 atomic)
#:use-module (ice-9 exceptions)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers conditions) #:use-module (fibers conditions)
#:export (fibers-delay #:use-module (knots)
#:export (fibers-promise?
fibers-delay
fibers-force fibers-force
fibers-promise-reset fibers-promise-reset
fibers-promise-result-available?)) fibers-promise-result-available?))
@ -41,11 +46,18 @@
(make-condition))) (make-condition)))
(define (fibers-force fp) (define (fibers-force fp)
(unless (fibers-promise? fp)
(raise-exception
(make-exception
(make-exception-with-message "fibers-force: not a fibers promise")
(make-exception-with-irritants fp))))
(let ((res (atomic-box-compare-and-swap! (let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp) (fibers-promise-values-box fp)
#f #f
'started))) 'started)))
(if (eq? #f res) (cond
((eq? #f res)
(call-with-values (call-with-values
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
@ -55,21 +67,37 @@
(signal-condition! (signal-condition!
(fibers-promise-evaluated-condition fp)) (fibers-promise-evaluated-condition fp))
(raise-exception exn)) (raise-exception exn))
(fibers-promise-thunk fp) (lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(fibers-promise-thunk fp)))
#:unwind? #t)) #:unwind? #t))
(lambda vals (lambda vals
(atomic-box-set! (fibers-promise-values-box fp) (atomic-box-set! (fibers-promise-values-box fp)
vals) vals)
(signal-condition! (signal-condition!
(fibers-promise-evaluated-condition fp)) (fibers-promise-evaluated-condition fp))
(apply values vals))) (apply values vals))))
(if (eq? res 'started) ((eq? res 'started)
(begin (begin
(wait (fibers-promise-evaluated-condition fp)) (wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp)))) (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result) (if (exception? result)
(raise-exception result) (raise-exception result)
(apply values result)))) (apply values result)))))
(else
(if (exception? res) (if (exception? res)
(raise-exception res) (raise-exception res)
(apply values res)))))) (apply values res))))))

View file

@ -27,29 +27,38 @@
#:use-module (rnrs bytevectors) #:use-module (rnrs bytevectors)
#:use-module (ice-9 q) #:use-module (ice-9 q)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 threads) #:use-module (ice-9 threads)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers timers) #:use-module (fibers timers)
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots) #:use-module (knots)
#:use-module (knots resource-pool)
#:export (set-thread-name #:export (set-thread-name
thread-name thread-name
thread-pool? &thread-pool-timeout-error
thread-pool-channel thread-pool-timeout-error-pool
thread-pool-arguments-parameter
thread-pool-proc-vector
make-thread-pool
call-with-thread
&thread-pool-timeout
thread-pool-timeout-error? thread-pool-timeout-error?
%thread-pool-default-timeout make-thread-pool
thread-pool?
thread-pool-resource-pool
create-work-queue)) make-fixed-size-thread-pool
fixed-size-thread-pool?
fixed-size-thread-pool-channel
fixed-size-thread-pool-current-procedures
;; These procedures work for thread pools and fixed size
;; thread pools
thread-pool-arguments-parameter
thread-pool-default-checkout-timeout
destroy-thread-pool
call-with-thread))
(define* (syscall->procedure return-type name argument-types (define* (syscall->procedure return-type name argument-types
#:key library) #:key library)
@ -147,28 +156,64 @@ from there, or #f if that would be an empty string."
(const ""))) (const "")))
(define-record-type <thread-pool> (define-record-type <thread-pool>
(thread-pool channel arguments-parameter proc-vector) (thread-pool resource-pool arguments-parameter)
thread-pool? thread-pool?
(channel thread-pool-channel) (resource-pool thread-pool-resource-pool)
(arguments-parameter thread-pool-arguments-parameter) (arguments-parameter thread-pool-arguments-parameter-accessor))
(proc-vector thread-pool-proc-vector)
(default-checkout-timeout
thread-pool-default-checkout-timeout))
(define* (make-thread-pool size (define-record-type <fixed-size-thread-pool>
(fixed-size-thread-pool channel arguments-parameter current-procedures
default-checkout-timeout)
fixed-size-thread-pool?
(channel fixed-size-thread-pool-channel)
(arguments-parameter fixed-size-thread-pool-arguments-parameter)
(current-procedures fixed-size-thread-pool-current-procedures)
(default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
;; Since both thread pool records have this field, use a procedure
;; than handles the appropriate accessor
(define (thread-pool-arguments-parameter pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-arguments-parameter pool)
(thread-pool-arguments-parameter-accessor pool)))
(define (thread-pool-default-checkout-timeout pool)
(if (fixed-size-thread-pool? pool)
(fixed-size-thread-pool-default-checkout-timeout pool)
(assq-ref (resource-pool-configuration
(thread-pool-resource-pool pool))
'default-checkout-timeout)))
(define &thread-pool-timeout
(make-exception-type '&thread-pool-timeout
&error
'(pool)))
(define make-thread-pool-timeout-error
(record-constructor &thread-pool-timeout))
(define thread-pool-timeout-error-pool
(exception-accessor
&thread-pool-timeout
(record-accessor &thread-pool-timeout 'pool)))
(define thread-pool-timeout-error?
(record-predicate &thread-pool-timeout))
(define* (make-fixed-size-thread-pool size
#:key #:key
thread-initializer thread-initializer
thread-destructor thread-destructor
(delay-logger (lambda _ #f)) delay-logger
(duration-logger (const #f)) duration-logger
thread-lifetime thread-lifetime
(log-exception? (const #t))
(expire-on-exception? #f) (expire-on-exception? #f)
(name "unnamed") (name "unnamed")
(use-default-io-waiters? #t) (use-default-io-waiters? #t)
default-checkout-timeout) default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the (define channel
arguments of the thread pool procedure." (make-channel))
(define param (define param
(make-parameter #f)) (make-parameter #f))
@ -224,17 +269,18 @@ arguments of the thread pool procedure."
(sleep 1) (sleep 1)
(destructor/safe args))))) (destructor/safe args)))))
(define (process thread-index channel args) (define (process channel args)
(let loop ((current-lifetime thread-lifetime)) (let loop ()
(let ((exception?
(match (get-message channel) (match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc)) ('destroy #f)
((reply sent-time proc)
(when delay-logger
(let ((time-delay (let ((time-delay
(- (get-internal-real-time) (- (get-internal-real-time)
sent-time))) sent-time)))
(delay-logger (/ time-delay (delay-logger (/ time-delay
internal-time-units-per-second) internal-time-units-per-second)
proc) proc)))
(let* ((start-time (get-internal-real-time)) (let* ((start-time (get-internal-real-time))
(response (response
@ -246,9 +292,6 @@ arguments of the thread pool procedure."
internal-time-units-per-second) internal-time-units-per-second)
exn)) exn))
(lambda () (lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(let ((stack (let ((stack
@ -275,13 +318,11 @@ arguments of the thread pool procedure."
internal-time-units-per-second) internal-time-units-per-second)
vals)))))) vals))))))
#:unwind? #t))) #:unwind? #t)))
(put-message reply (put-message reply
response) response)
(vector-set! thread-proc-vector (let ((exception?
thread-index
#f)
(match response (match response
(('thread-pool-error duration _) (('thread-pool-error duration _)
(when duration-logger (when duration-logger
@ -290,32 +331,26 @@ arguments of the thread pool procedure."
((duration . _) ((duration . _)
(when duration-logger (when duration-logger
(duration-logger duration proc)) (duration-logger duration proc))
#f)))))))) #f))))
(unless (and expire-on-exception? (if (and exception?
exception?) expire-on-exception?)
(if (number? current-lifetime) #t
(unless (< current-lifetime 0) (loop))))))))
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(define (start-threads channel) (define (start-thread index channel)
(for-each
(lambda (thread-index)
(call-with-new-thread (call-with-new-thread
(lambda () (lambda ()
(catch 'system-error (catch 'system-error
(lambda () (lambda ()
(set-thread-name (set-thread-name
(string-append (string-append
name " w t " name " w t " (number->string index))))
(number->string thread-index))))
(const #t)) (const #t))
(let init ((args (if thread-initializer (let init ((args (if thread-initializer
(initializer/safe) (initializer/safe)
'()))) '())))
(let ((continue?
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(simple-format (simple-format
@ -323,54 +358,96 @@ arguments of the thread pool procedure."
"knots: thread-pool: internal exception: ~A\n" exn)) "knots: thread-pool: internal exception: ~A\n" exn))
(lambda () (lambda ()
(parameterize ((param args)) (parameterize ((param args))
(process thread-index channel args))) (process channel args)))
#:unwind? #t) #:unwind? #t)))
(when thread-destructor (when thread-destructor
(destructor/safe args)) (destructor/safe args))
(init (initializer/safe)))))) (when continue?
(iota size))) (init (if thread-initializer
(initializer/safe)
'()))))))))
(let ((channel (make-channel))) (for-each
(lambda (i)
(if use-default-io-waiters? (if use-default-io-waiters?
(call-with-default-io-waiters (call-with-default-io-waiters
(lambda () (lambda ()
(start-threads channel))) (start-thread i channel)))
(start-threads channel)) (start-thread i channel)))
(iota size))
(thread-pool channel (fixed-size-thread-pool channel
param param
thread-proc-vector))) thread-proc-vector
default-checkout-timeout))
(define &thread-pool-timeout (define* (make-thread-pool max-size
(make-exception-type '&thread-pool-timeout #:key
&error (min-size max-size)
'())) scheduler
thread-initializer
thread-destructor
(delay-logger (lambda _ #f))
(duration-logger (const #f))
thread-lifetime
(expire-on-exception? #f)
(name "unnamed")
(use-default-io-waiters? #t)
default-checkout-timeout)
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the thread pool procedure."
(define param
(make-parameter #f))
(define make-thread-pool-timeout-error (let ((resource-pool
(record-constructor &thread-pool-timeout)) (make-resource-pool
(lambda ()
(make-fixed-size-thread-pool
1
#:thread-initializer thread-initializer
#:thread-destructor thread-destructor
#:thread-lifetime thread-lifetime
#:expire-on-exception? expire-on-exception?
#:name name
#:use-default-io-waiters? use-default-io-waiters?))
max-size
#:destructor destroy-thread-pool
#:min-size min-size
#:delay-logger delay-logger
#:scheduler scheduler
#:duration-logger duration-logger
#:default-checkout-timeout default-checkout-timeout)))
(define thread-pool-timeout-error? (thread-pool resource-pool
(record-predicate &thread-pool-timeout)) param)))
(define* (call-with-thread record proc #:key duration-logger (define* (call-with-thread thread-pool
(timeout (thread-pool-default-checkout-timeout proc
record)) #:key
(channel (thread-pool-channel record))) duration-logger
checkout-timeout
channel
destroy-thread-on-exception?
(max-waiters 'default))
"Send PROC to the thread pool through CHANNEL. Return the result of PROC. "Send PROC to the thread pool through CHANNEL. Return the result of PROC.
If already in the thread pool, call PROC immediately." If already in the thread pool, call PROC immediately."
(let ((args ((thread-pool-arguments-parameter record)))) (define (handle-proc fixed-size-thread-pool
(if args reply-channel
(apply proc args) start-time
(let* ((reply (make-channel)) timeout)
(let* ((request-channel
(or channel
(fixed-size-thread-pool-channel
fixed-size-thread-pool)))
(operation-success? (operation-success?
(perform-operation (perform-operation
(let ((put (let ((put
(wrap-operation (wrap-operation
(put-operation channel (put-operation request-channel
(list reply (list reply-channel
(get-internal-real-time) start-time
proc)) proc))
(const #t)))) (const #t))))
@ -385,7 +462,8 @@ If already in the thread pool, call PROC immediately."
(raise-exception (raise-exception
(make-thread-pool-timeout-error))) (make-thread-pool-timeout-error)))
(match (get-message reply) (let ((reply (get-message reply-channel)))
(match reply
(('thread-pool-error duration exn) (('thread-pool-error duration exn)
(when duration-logger (when duration-logger
(duration-logger duration)) (duration-logger duration))
@ -393,213 +471,54 @@ If already in the thread pool, call PROC immediately."
((duration . result) ((duration . result)
(when duration-logger (when duration-logger
(duration-logger duration)) (duration-logger duration))
(apply values result))))))) (apply values result))))))
(define* (create-work-queue thread-count-parameter proc (let ((args ((thread-pool-arguments-parameter thread-pool))))
#:key thread-start-delay (if args
(thread-stop-delay (apply proc args)
(make-time time-duration 0 0)) (let ((start-time (get-internal-real-time))
(name "unnamed") (reply-channel (make-channel)))
priority<?) (if (fixed-size-thread-pool? thread-pool)
(let ((queue (make-q)) (handle-proc thread-pool
(queue-mutex (make-mutex)) reply-channel
(job-available (make-condition-variable)) start-time
(running-job-args (make-hash-table))) checkout-timeout)
(define get-thread-count
(cond
((number? thread-count-parameter)
(const thread-count-parameter))
((eq? thread-count-parameter #f)
;; Run one thread per job
(lambda ()
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(else
thread-count-parameter)))
(define process-job
(if priority<?
(lambda* (args #:key priority)
(with-mutex queue-mutex
(enq! queue (cons priority args))
(set-car!
queue
(stable-sort! (car queue)
(lambda (a b)
(priority<?
(car a)
(car b)))))
(sync-q! queue)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))
(lambda args
(with-mutex queue-mutex
(enq! queue args)
(start-new-threads-if-necessary (get-thread-count))
(signal-condition-variable job-available)))))
(define (count-threads)
(with-mutex queue-mutex
(hash-count (const #t) running-job-args)))
(define (count-jobs)
(with-mutex queue-mutex
(+ (q-length queue)
(hash-count (lambda (index val)
(list? val))
running-job-args))))
(define (list-jobs)
(with-mutex queue-mutex
(append (if priority<?
(map cdr (car queue))
(list-copy (car queue)))
(hash-fold (lambda (key val result)
(if val
(cons val result)
result))
'()
running-job-args))))
(define (thread-process-job job-args)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(simple-format (current-error-port) (if (and (resource-pool-timeout-error? exn)
"~A work queue, job raised exception ~A\n" (eq? (resource-pool-timeout-error-pool exn)
name job-args) (thread-pool-resource-pool thread-pool)))
(print-backtrace-and-exception/knots exn) (raise-exception
(raise-exception exn)) (make-thread-pool-timeout-error thread-pool))
(raise-exception exn)))
(lambda () (lambda ()
(apply proc job-args)))) (call-with-resource-from-pool (thread-pool-resource-pool
#:unwind? #t)) thread-pool)
(lambda (fixed-size-thread-pool)
(if checkout-timeout
(let ((remaining-time
(/ (- (get-internal-real-time) start-time)
internal-time-units-per-second)))
(if (< remaining-time checkout-timeout)
(handle-proc fixed-size-thread-pool
reply-channel
start-time
remaining-time)
(raise-exception
(make-thread-pool-timeout-error thread-pool))))
(handle-proc fixed-size-thread-pool
reply-channel
start-time
#f)))
#:max-waiters max-waiters
#:timeout checkout-timeout
#:destroy-resource-on-exception?
destroy-thread-on-exception?))))))))
(define (start-thread thread-index) (define (destroy-thread-pool pool)
(define (too-many-threads?) (if (fixed-size-thread-pool? pool)
(let ((running-jobs-count (put-message
(hash-count (lambda (index val) (fixed-size-thread-pool-channel pool)
(list? val)) 'destroy)
running-job-args)) (destroy-resource-pool
(desired-thread-count (get-thread-count))) (thread-pool-resource-pool pool))))
(>= running-jobs-count
desired-thread-count)))
(define (thread-idle-for-too-long? last-job-finished-at)
(time>=?
(time-difference (current-time time-monotonic)
last-job-finished-at)
thread-stop-delay))
(define (stop-thread)
(hash-remove! running-job-args
thread-index)
(unlock-mutex queue-mutex))
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t "
(number->string thread-index))))
(const #t))
(let loop ((last-job-finished-at (current-time time-monotonic)))
(lock-mutex queue-mutex)
(if (too-many-threads?)
(stop-thread)
(let ((job-args
(if (q-empty? queue)
;; #f from wait-condition-variable indicates a timeout
(if (wait-condition-variable
job-available
queue-mutex
(+ 9 (time-second (current-time))))
;; Another thread could have taken
;; the job in the mean time
(if (q-empty? queue)
#f
(if priority<?
(cdr (deq! queue))
(deq! queue)))
#f)
(if priority<?
(cdr (deq! queue))
(deq! queue)))))
(if job-args
(begin
(hash-set! running-job-args
thread-index
job-args)
(unlock-mutex queue-mutex)
(thread-process-job job-args)
(with-mutex queue-mutex
(hash-set! running-job-args
thread-index
#f))
(loop (current-time time-monotonic)))
(if (thread-idle-for-too-long? last-job-finished-at)
(stop-thread)
(begin
(unlock-mutex queue-mutex)
(loop last-job-finished-at))))))))))
(define start-new-threads-if-necessary
(let ((previous-thread-started-at (make-time time-monotonic 0 0)))
(lambda (desired-count)
(let* ((thread-count
(hash-count (const #t) running-job-args))
(threads-to-start
(- desired-count thread-count)))
(when (> threads-to-start 0)
(for-each
(lambda (thread-index)
(when (eq? (hash-ref running-job-args
thread-index
'slot-free)
'slot-free)
(let* ((now (current-time time-monotonic))
(elapsed (time-difference now
previous-thread-started-at)))
(when (or (eq? #f thread-start-delay)
(time>=? elapsed thread-start-delay))
(set! previous-thread-started-at now)
(hash-set! running-job-args
thread-index
#f)
(start-thread thread-index)))))
(iota desired-count)))))))
(if (procedure? thread-count-parameter)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append name " q t")))
(const #t))
(while #t
(sleep 15)
(with-mutex queue-mutex
(let ((idle-threads (hash-count (lambda (index val)
(eq? #f val))
running-job-args)))
(when (= 0 idle-threads)
(start-new-threads-if-necessary (get-thread-count))))))))
(start-new-threads-if-necessary (get-thread-count)))
(values process-job count-jobs count-threads list-jobs)))

View file

@ -45,6 +45,8 @@
&request-body-ended-prematurely &request-body-ended-prematurely
request-body-ended-prematurely-error? request-body-ended-prematurely-error?
sanitize-response
request-body-port/knots request-body-port/knots
read-request-body/knots read-request-body/knots

View file

@ -6,10 +6,47 @@
(knots thread-pool)) (knots thread-pool))
(let ((thread-pool (let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
(call-with-thread
thread-pool
(lambda ()
4))
4))
(let ((thread-pool
(make-fixed-size-thread-pool
2
#:thread-initializer (const '(2)))))
(assert-equal
(call-with-thread
thread-pool
(lambda (num)
(* 2 num)))
4))
(let ((thread-pool
(make-fixed-size-thread-pool 2)))
(assert-equal
#t
(with-exception-handler
(lambda (exn)
(knots-exception? exn))
(lambda ()
(call-with-thread
thread-pool
(lambda ()
(+ 1 'a))))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool 2))) (make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
(call-with-thread (call-with-thread
thread-pool thread-pool
@ -17,13 +54,13 @@
4)) 4))
4)))) 4))))
(let ((thread-pool (run-fibers-for-tests
(lambda ()
(let ((thread-pool
(make-thread-pool (make-thread-pool
2 2
#:thread-initializer (const '(2))))) #:thread-initializer (const '(2)))))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
(call-with-thread (call-with-thread
thread-pool thread-pool
@ -31,22 +68,11 @@
(* 2 num))) (* 2 num)))
4)))) 4))))
(let ((process-job (run-fibers-for-tests
count-jobs (lambda ()
count-threads (let ((thread-pool
list-jobs
(create-work-queue
2
(lambda (i)
(* i 2)))))
(process-job 3))
(let ((thread-pool
(make-thread-pool 2))) (make-thread-pool 2)))
(run-fibers-for-tests
(lambda ()
(assert-equal (assert-equal
#t #t
(with-exception-handler (with-exception-handler