Compare commits

..

5 commits

Author SHA1 Message Date
003c5aa6b0 WIP
All checks were successful
/ test (push) Successful in 12s
2025-06-24 21:03:34 +02:00
eadfa53b36 WIP
All checks were successful
/ test (push) Successful in 13s
2025-06-24 21:00:54 +02:00
81dd3370e6 WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:59:57 +02:00
7f5f05ef2b WIP
Some checks failed
/ test (push) Failing after 12s
2025-06-24 20:58:25 +02:00
7c2c6f2de9 WIP 2025-06-24 20:58:25 +02:00
9 changed files with 313 additions and 917 deletions

View file

@ -1,7 +1,7 @@
on: on:
push: push:
branches: branches:
- trunk - actions-test
jobs: jobs:
test: test:
runs-on: host runs-on: host
@ -10,17 +10,13 @@ jobs:
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages - run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
- run: | - run: |
cd knots-trunk cd knots-trunk
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/" guix shell -D -f guix-dev.scm -- documenta api knots
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
- run: | - run: |
cd knots-pages cd knots-pages
git add . git add .
if [[ -z "$(git status -s)" ]]; then
echo "Nothing to push"
else
git config user.email "" git config user.email ""
git config user.name "Automatic website updater" git config user.name "Automatic website updater"
git commit -m "Automatic website update" git commit -m "Automatic website update"
git push git push
fi

View file

@ -20,9 +20,6 @@
(define-module (knots parallelism) (define-module (knots parallelism)
#:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 control) #:use-module (ice-9 control)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
@ -30,7 +27,6 @@
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots) #:use-module (knots)
#:use-module (knots resource-pool)
#:export (fibers-batch-map #:export (fibers-batch-map
fibers-map fibers-map
@ -42,13 +38,7 @@
fibers-parallel fibers-parallel
fibers-let fibers-let
fiberize fiberize))
make-parallelism-limiter
parallelism-limiter?
destroy-parallelism-limiter
call-with-parallelism-limiter
with-parallelism-limiter))
(define (defer-to-parallel-fiber thunk) (define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel))) (let ((reply (make-channel)))
@ -58,7 +48,7 @@
(lambda (exn) (lambda (exn)
(put-message (put-message
reply reply
(cons 'exception exn))) (list 'exception exn)))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -79,7 +69,7 @@
(lambda () (lambda ()
(start-stack #t (thunk))) (start-stack #t (thunk)))
(lambda vals (lambda vals
(put-message reply (cons 'result vals))))))) (put-message reply vals))))))
#:unwind? #t)) #:unwind? #t))
#:parallel? #t) #:parallel? #t)
reply)) reply))
@ -89,16 +79,13 @@
reply-channels))) reply-channels)))
(map (map
(match-lambda (match-lambda
(('exception . exn) (('exception exn)
(raise-exception exn)) (raise-exception exn))
(('result . vals) (result
(apply values vals))) (apply values result)))
responses))) responses)))
(define (fibers-batch-map proc parallelism-limit . lists) (define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec) (define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec) (if (vector? list-or-vec)
list-or-vec list-or-vec
@ -118,18 +105,9 @@ all of the calls to PROC have finished."
(channel-indexes '())) (channel-indexes '()))
(if (and (eq? #f next-to-process-index) (if (and (eq? #f next-to-process-index)
(null? channel-indexes)) (null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists)) (if (vector? (first lists))
processed-result-vec result-vec
(vector->list processed-result-vec))) (vector->list result-vec))
(if (or (= (length channel-indexes) (if (or (= (length channel-indexes)
(min parallelism-limit vecs-length)) (min parallelism-limit vecs-length))
@ -145,13 +123,18 @@ all of the calls to PROC have finished."
(get-operation (get-operation
(vector-ref result-vec index)) (vector-ref result-vec index))
(lambda (result) (lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec (vector-set! result-vec
index index
result) (first result))
(values next-to-process-index (values next-to-process-index
(lset-difference = (lset-difference =
channel-indexes channel-indexes
(list index)))))) (list index))))))))
channel-indexes))))) channel-indexes)))))
(loop new-index (loop new-index
new-channel-indexes)) new-channel-indexes))
@ -174,14 +157,9 @@ all of the calls to PROC have finished."
channel-indexes))))))) channel-indexes)))))))
(define (fibers-map proc . lists) (define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists)) (apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists) (define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map (apply fibers-batch-map
(lambda args (lambda args
(apply proc args) (apply proc args)
@ -192,13 +170,10 @@ parallel."
*unspecified*) *unspecified*)
(define (fibers-for-each proc . lists) (define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists)) (apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel (define-syntax fibers-parallel
(lambda (x) (lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x () (syntax-case x ()
((_ e0 ...) ((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -209,16 +184,12 @@ parallel."
(apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values (call-with-values
(lambda () (fibers-parallel e ...)) (lambda () (fibers-parallel e ...))
(lambda (v ...) (lambda (v ...)
b0 b1 ...))) b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report) (define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results (let loop ((channels-to-results
(apply map (apply map
(lambda args (lambda args
@ -239,8 +210,8 @@ invocation of PROC finishes. REPORT is passed the results for each
(match-lambda (match-lambda
((#f . ('exception . exn)) ((#f . ('exception . exn))
(raise-exception exn)) (raise-exception exn))
((#f . ('result . vals)) ((#f . ('result . val))
(car vals))) val))
channels-to-results) channels-to-results)
(loop (loop
(perform-operation (perform-operation
@ -257,7 +228,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(map (match-lambda (map (match-lambda
((c . r) ((c . r)
(if (eq? channel c) (if (eq? channel c)
(cons #f result) (cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r)))) (cons c r))))
channels-to-results))) channels-to-results)))
#f)))) #f))))
@ -278,7 +254,7 @@ invocation of PROC finishes. REPORT is passed the results for each
reply-channel reply-channel
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(cons 'exception exn)) (list 'exception exn))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -309,32 +285,5 @@ invocation of PROC finishes. REPORT is passed the results for each
(put-message input-channel (cons reply-channel args)) (put-message input-channel (cons reply-channel args))
(match (get-message reply-channel) (match (get-message reply-channel)
(('result . vals) (apply values vals)) (('result . vals) (apply values vals))
(('exception . exn) (('exception exn)
(raise-exception exn)))))) (raise-exception exn))))))
(define-record-type <parallelism-limiter>
(make-parallelism-limiter-record resource-pool)
parallelism-limiter?
(resource-pool parallelism-limiter-resource-pool))
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
#:name name)))
(define (destroy-parallelism-limiter parallelism-limiter)
(destroy-resource-pool
(parallelism-limiter-resource-pool
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _
(thunk))))
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
(call-with-parallelism-limiter
parallelism-limiter
(lambda () exp ...)))

View file

@ -22,7 +22,6 @@
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (ice-9 q)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
#:use-module (fibers) #:use-module (fibers)
@ -33,10 +32,9 @@
#:use-module (fibers conditions) #:use-module (fibers conditions)
#:use-module (knots) #:use-module (knots)
#:use-module (knots parallelism) #:use-module (knots parallelism)
#:export (make-fixed-size-resource-pool #:export (resource-pool?
make-resource-pool
resource-pool? make-resource-pool
resource-pool-name resource-pool-name
resource-pool-channel resource-pool-channel
resource-pool-configuration resource-pool-configuration
@ -75,7 +73,7 @@
(record-constructor &resource-pool-abort-add-resource)) (record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error? (define resource-pool-abort-add-resource-error?
(exception-predicate &resource-pool-abort-add-resource)) (record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool> (define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration) (make-resource-pool-record name channel destroy-condition configuration)
@ -93,429 +91,6 @@
(resource-pool-name resource-pool)) (resource-pool-name resource-pool))
port))) port)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(define* (make-fixed-size-resource-pool resources
#:key
(delay-logger (const #f))
(duration-logger (const #f))
destructor
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource))
#t)))
#:unwind? #t)))
(if success?
(put-message channel
(list 'remove resource))
(begin
(sleep 5)
(loop))))))))
(define (spawn-fiber-for-checkout reply-channel
reply-timeout
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(cons 'success resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(define (destroy-loop resources)
(let loop ((resources resources))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources))
(('destroy reply)
(loop resources))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources)))))
(define (main-loop)
(let loop ((resources resources)
(available resources)
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters))
(loop resources
available
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout reply
reply-timeout
(car available))
(loop resources
(cdr available)
waiters))
(loop resources
available
waiters)))
(begin
(put-message reply (cons 'success
(car available)))
(loop resources
(cdr available)
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (q-empty? waiters)
(loop resources
(cons resource available)
waiters)
(let ((current-internal-time
(get-internal-real-time)))
(with-exception-handler
(lambda (exn)
(if (eq? (exception-kind exn) 'q-empty)
(loop resources
(cons resource available)
waiters)
(raise-exception exn)))
(lambda ()
(let waiter-loop ((waiter (deq! waiters)))
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout reply
reply-timeout
resource))
(put-message reply (cons 'success
resource))))))))
#:unwind? #t)
(loop resources
available
waiters))))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (list-copy resources))))
(loop resources
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources
available
waiters))
(('destroy)
(if (and (null? resources)
(q-empty? waiters))
(signal-condition!
destroy-condition)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(if destructor
(begin
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
available)
(destroy-loop resources))
(let dl ((resources resources)
(available available))
(if (null? available)
(if (null? resources)
(signal-condition!
destroy-condition)
(destroy-loop resources))
(let ((index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(dl (remove-at-index! resources index)
(cdr available)))))))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size (define* (make-resource-pool return-new-resource max-size
#:key (min-size 0) #:key (min-size 0)
(idle-seconds #f) (idle-seconds #f)
@ -551,33 +126,28 @@
(define checkout-failure-count 0) (define checkout-failure-count 0)
(define return-new-resource/parallelism-limiter (define spawn-fiber-to-return-new-resource
(make-parallelism-limiter (if add-resources-parallelism
(or add-resources-parallelism (let ((thunk
max-size) (fiberize
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda () (lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(unless (resource-pool-destroyed-error? exn)
(raise-exception exn)))
(lambda ()
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size (let ((max-size
(assq-ref (resource-pool-configuration pool) (assq-ref (resource-pool-configuration pool)
'max-size)) 'max-size))
(size (assq-ref (resource-pool-stats pool #:timeout #f) (size (assq-ref (resource-pool-stats pool)
'resources))) 'resources)))
(unless (= size max-size) (unless (= size max-size)
(let ((new-resource
(return-new-resource)))
(put-message channel
(list 'add-resource new-resource))))))
#:parallelism add-resources-parallelism)))
(lambda ()
(spawn-fiber thunk)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler (with-exception-handler
(lambda _ #f) (lambda _ #f)
(lambda () (lambda ()
@ -591,12 +161,11 @@
(print-backtrace-and-exception/knots exn) (print-backtrace-and-exception/knots exn)
(raise-exception exn)) (raise-exception exn))
(lambda () (lambda ()
(let ((new-resource (start-stack #t (return-new-resource)))))
(start-stack #t (return-new-resource)))) #:unwind? #t)))
(when new-resource
(put-message channel (put-message channel
(list 'add-resource new-resource)))))) (list 'add-resource new-resource)))))))))
#:unwind? #t)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource) (define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber (spawn-fiber
@ -681,14 +250,21 @@
'remove) 'remove)
return-type) return-type)
resource) resource)
(when (and (not (eq? return-type 'remove)) (when destructor
destructor)
(spawn-fiber-to-destroy-resource resource)) (spawn-fiber-to-destroy-resource resource))
(let ((index (let ((index
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources))) resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources (let ((new-resources
(if index (if index
(remove-at-index! resources index) (remove-at-index! resources index)
@ -700,16 +276,13 @@
resources)))) resources))))
(if (null? new-resources) (if (null? new-resources)
(begin (begin
(and=> return-new-resource/parallelism-limiter
destroy-parallelism-limiter)
(signal-condition! destroy-condition) (signal-condition! destroy-condition)
;; No loop ;; No loop
*unspecified*) *unspecified*)
(loop new-resources))))) (loop new-resources)))))
(('stats reply timeout-time) (('stats reply)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . 0) (available . 0)
@ -718,17 +291,13 @@
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation (perform-operation
(if timeout-time
(choice-operation (choice-operation
op (wrap-operation
(sleep-operation (put-operation reply stats)
(/ (- timeout-time (const #t))
(get-internal-real-time)) (wrap-operation (sleep-operation 5)
internal-time-units-per-second))) (const #f)))))))
op))))))
(loop resources)) (loop resources))
@ -748,7 +317,7 @@
(define (main-loop) (define (main-loop)
(let loop ((resources '()) (let loop ((resources '())
(available '()) (available '())
(waiters (make-q)) (waiters '())
(resources-last-used '())) (resources-last-used '()))
(match (get-message channel) (match (get-message channel)
@ -770,51 +339,50 @@
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used)))) resources-last-used))))
(if (q-empty? waiters) (if (null? waiters)
(loop (cons resource resources) (loop (cons resource resources)
(cons resource available) (cons resource available)
waiters waiters
(cons (get-internal-real-time) (cons (get-internal-real-time)
resources-last-used)) resources-last-used))
(let ((current-internal-time (let* ((current-internal-time (get-internal-real-time))
(get-internal-real-time))) (alive-waiters
(with-exception-handler dead-waiters
(lambda (exn) (partition!
(if (eq? (exception-kind exn) 'q-empty) (match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop (cons resource resources) (loop (cons resource resources)
(cons resource available) (cons resource available)
waiters '()
(cons current-internal-time (cons (get-internal-real-time)
resources-last-used)) resources-last-used))
(raise-exception exn))) (match (last alive-waiters)
(lambda () ((waiter-channel . waiter-timeout)
(let waiter-loop ((waiter (deq! waiters))) (if waiter-timeout
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout (let ((reply-timeout
(/ (- timeout (/ (- waiter-timeout
current-internal-time) current-internal-time)
internal-time-units-per-second))) internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a ;; Don't sleep in this fiber, so spawn
;; new fiber to handle handing over the ;; a new fiber to handle handing over
;; resource, and returning it if there's ;; the resource, and returning it if
;; a timeout ;; there's a timeout
(spawn-fiber-for-checkout reply (spawn-fiber-for-checkout waiter-channel
reply-timeout reply-timeout
resource)) resource))
(put-message reply (cons 'success (put-message waiter-channel (cons 'success
resource)))))))) resource)))
#:unwind? #t)
(loop (cons resource resources) (loop (cons resource resources)
available available
waiters (drop-right! alive-waiters 1)
(cons current-internal-time (cons (get-internal-real-time)
resources-last-used)))))) resources-last-used)))))))))
(('checkout reply timeout-time max-waiters) (('checkout reply timeout-time max-waiters)
(if (null? available) (if (null? available)
@ -823,7 +391,7 @@
(spawn-fiber-to-return-new-resource)) (spawn-fiber-to-return-new-resource))
(let ((waiters-count (let ((waiters-count
(q-length waiters))) (length waiters)))
(if (and max-waiters (if (and max-waiters
(>= waiters-count (>= waiters-count
max-waiters)) max-waiters))
@ -852,7 +420,8 @@
resources-last-used)) resources-last-used))
(loop resources (loop resources
available available
(enq! waiters (cons reply timeout-time)) (cons (cons reply timeout-time)
waiters)
resources-last-used)))) resources-last-used))))
(if timeout-time (if timeout-time
@ -899,7 +468,7 @@
(set! checkout-failure-count (set! checkout-failure-count
(+ 1 checkout-failure-count))) (+ 1 checkout-failure-count)))
(if (q-empty? waiters) (if (null? waiters)
(loop resources (loop resources
(cons resource available) (cons resource available)
waiters waiters
@ -912,14 +481,19 @@
(get-internal-real-time)) (get-internal-real-time))
resources-last-used)) resources-last-used))
(let ((current-internal-time (let* ((current-internal-time (get-internal-real-time))
(get-internal-real-time))) (alive-waiters
(with-exception-handler dead-waiters
(lambda (exn) (partition!
(if (eq? (exception-kind exn) 'q-empty) (match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources (loop resources
(cons resource available) (cons resource available)
waiters '()
(begin (begin
(when (eq? return-type 'return) (when (eq? return-type 'return)
(list-set! (list-set!
@ -927,48 +501,50 @@
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources) resources)
current-internal-time)) (get-internal-real-time)))
resources-last-used)) resources-last-used))
(raise-exception exn))) (match (last alive-waiters)
(lambda () ((waiter-channel . waiter-timeout)
(let waiter-loop ((waiter (deq! waiters))) (if waiter-timeout
(match waiter
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (deq! waiters))
(if timeout
(let ((reply-timeout (let ((reply-timeout
(/ (- timeout (/ (- waiter-timeout
current-internal-time) current-internal-time)
internal-time-units-per-second))) internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a ;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the ;; new fiber to handle handing over the
;; resource, and returning it if there's ;; resource, and returning it if there's a
;; a timeout ;; timeout
(spawn-fiber-for-checkout reply (spawn-fiber-for-checkout waiter-channel
reply-timeout reply-timeout
resource)) resource))
(put-message reply (cons 'success (put-message waiter-channel (cons 'success
resource)))))))) resource)))
#:unwind? #t)
(loop resources (loop resources
available available
waiters (drop-right! alive-waiters 1)
(begin (begin
(list-set! (list-set!
resources-last-used resources-last-used
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources) resources)
current-internal-time) (get-internal-real-time))
resources-last-used))))) resources-last-used))))))))
(('remove resource) (('remove resource)
(let ((index (let ((index
(list-index (lambda (x) (list-index (lambda (x)
(eq? x resource)) (eq? x resource))
resources))) resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(loop (if index (loop (if index
(remove-at-index! resources index) (remove-at-index! resources index)
(begin (begin
@ -1001,26 +577,22 @@
waiters waiters
resources-last-used)) resources-last-used))
(('stats reply timeout-time) (('stats reply)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . ,(length available)) (available . ,(length available))
(waiters . ,(q-length waiters)) (waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count)))) (checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation (perform-operation
(if timeout-time
(choice-operation (choice-operation
op (wrap-operation
(sleep-operation (put-operation reply stats)
(/ (- timeout-time (const #t))
(get-internal-real-time)) (wrap-operation (sleep-operation 5)
internal-time-units-per-second))) (const #f)))))))
op))))))
(loop resources (loop resources
available available
@ -1069,10 +641,22 @@
(('destroy) (('destroy)
(if (and (null? resources) (if (and (null? resources)
(q-empty? waiters)) (null? waiters))
(signal-condition! (signal-condition!
destroy-condition) destroy-condition)
(begin
(for-each
(lambda (resource)
(if destructor
(spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(put-message channel
(list 'remove resource)))
#:parallel? #t)))
available)
(let ((current-internal-time (get-internal-real-time))) (let ((current-internal-time (get-internal-real-time)))
(for-each (for-each
(match-lambda (match-lambda
@ -1097,28 +681,10 @@
internal-time-units-per-second)) internal-time-units-per-second))
(const #f))) (const #f)))
op)))))))) op))))))))
(car waiters)) waiters))
(destroy-loop resources))))
(if destructor
(begin
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
available)
(destroy-loop resources))
(let dl ((resources resources)
(available available))
(if (null? available)
(if (null? resources)
(signal-condition!
destroy-condition)
(destroy-loop resources))
(let ((index
(list-index (lambda (x)
(eq? x (car available)))
resources)))
(dl (remove-at-index! resources index)
(cdr available)))))))))
(unknown (unknown
(simple-format (simple-format
(current-error-port) (current-error-port)
@ -1178,8 +744,7 @@
(put-operation (resource-pool-channel pool) (put-operation (resource-pool-channel pool)
(list 'destroy)) (list 'destroy))
(lambda _ (lambda _
(wait (wait (resource-pool-destroy-condition pool))))
(resource-pool-destroy-condition pool))))
(wait-operation (wait-operation
(resource-pool-destroy-condition pool)))) (resource-pool-destroy-condition pool))))
#t) #t)
@ -1198,7 +763,7 @@
(record-constructor &resource-pool-timeout)) (record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error? (define resource-pool-timeout-error?
(exception-predicate &resource-pool-timeout)) (record-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters (define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters
@ -1219,7 +784,7 @@
(record-constructor &resource-pool-too-many-waiters)) (record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error? (define resource-pool-too-many-waiters-error?
(exception-predicate &resource-pool-too-many-waiters)) (record-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed (define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed (make-exception-type '&recource-pool-destroyed
@ -1235,7 +800,7 @@
(record-constructor &resource-pool-destroyed)) (record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error? (define resource-pool-destroyed-error?
(exception-predicate &resource-pool-destroyed)) (record-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource (define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource
@ -1246,7 +811,7 @@
(record-constructor &resource-pool-destroy-resource)) (record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception? (define resource-pool-destroy-resource-exception?
(exception-predicate &resource-pool-destroy-resource)) (record-predicate &resource-pool-destroy-resource))
(define resource-pool-default-timeout-handler (define resource-pool-default-timeout-handler
(make-parameter #f)) (make-parameter #f))
@ -1314,9 +879,8 @@ available. Return the resource once PROC has returned."
start-time) start-time)
'timeout) 'timeout)
response)) response))
'timeout)) 'timeout)))))
'timeout))) (let loop ((reply (make-channel)))
(let ((reply (make-channel)))
(put-message channel (put-message channel
(list 'checkout (list 'checkout
reply reply
@ -1354,7 +918,8 @@ available. Return the resource once PROC has returned."
'destroy 'destroy
'return) 'return)
resource)) resource))
(raise-exception exn)) (unless (resource-pool-destroy-resource-exception? exn)
(raise-exception exn)))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -1384,17 +949,13 @@ available. Return the resource once PROC has returned."
(lambda (resource) exp ...))) (lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5)) (define* (resource-pool-stats pool #:key (timeout 5))
(if timeout (let ((reply (make-channel))
(let* ((reply (make-channel)) (start-time (get-internal-real-time)))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation (perform-operation
(choice-operation (choice-operation
(wrap-operation (wrap-operation
(put-operation (resource-pool-channel pool) (put-operation (resource-pool-channel pool)
`(stats ,reply ,timeout-time)) `(stats ,reply))
(const #t)) (const #t))
(wrap-operation (sleep-operation timeout) (wrap-operation (sleep-operation timeout)
(lambda _ (lambda _
@ -1415,11 +976,7 @@ available. Return the resource once PROC has returned."
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool)))))) (make-resource-pool-timeout-error pool))))))
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool))))) (make-resource-pool-timeout-error pool))))))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
`(stats ,reply #f))
(get-message reply))))
(define (resource-pool-list-resources pool) (define (resource-pool-list-resources pool)
(let ((reply (make-channel))) (let ((reply (make-channel)))

View file

@ -198,7 +198,7 @@ from there, or #f if that would be an empty string."
(record-accessor &thread-pool-timeout-error 'pool))) (record-accessor &thread-pool-timeout-error 'pool)))
(define thread-pool-timeout-error? (define thread-pool-timeout-error?
(exception-predicate &thread-pool-timeout-error)) (record-predicate &thread-pool-timeout-error))
(define* (make-fixed-size-thread-pool size (define* (make-fixed-size-thread-pool size
#:key #:key

View file

@ -85,7 +85,7 @@
(record-constructor &port-timeout-error)) (record-constructor &port-timeout-error))
(define port-timeout-error? (define port-timeout-error?
(exception-predicate &port-timeout-error)) (record-predicate &port-timeout-error))
(define &port-read-timeout-error (define &port-read-timeout-error
(make-exception-type '&port-read-timeout-error (make-exception-type '&port-read-timeout-error
@ -96,7 +96,7 @@
(record-constructor &port-read-timeout-error)) (record-constructor &port-read-timeout-error))
(define port-read-timeout-error? (define port-read-timeout-error?
(exception-predicate &port-read-timeout-error)) (record-predicate &port-read-timeout-error))
(define &port-write-timeout-error (define &port-write-timeout-error
(make-exception-type '&port-write-timeout-error (make-exception-type '&port-write-timeout-error
@ -107,7 +107,7 @@
(record-constructor &port-write-timeout-error)) (record-constructor &port-write-timeout-error))
(define port-write-timeout-error? (define port-write-timeout-error?
(exception-predicate &port-write-timeout-error)) (record-predicate &port-write-timeout-error))
(define (readable? port) (define (readable? port)
"Test if PORT is writable." "Test if PORT is writable."

View file

@ -63,14 +63,6 @@
(bind sock family addr port) (bind sock family addr port)
sock)) sock))
(define crlf-bv
(string->utf8 "\r\n"))
(define (chunked-output-port-overhead-bytes write-size)
(+ (string-length (number->string write-size 16))
(bytevector-length crlf-bv)
(bytevector-length crlf-bv)))
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f) (define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
(buffering 1200)) (buffering 1200))
"Returns a new port which translates non-encoded data into a HTTP "Returns a new port which translates non-encoded data into a HTTP
@ -82,12 +74,10 @@ when done, as it will output the remaining data, and encode the final
zero chunk. When the port is closed it will also close PORT, unless zero chunk. When the port is closed it will also close PORT, unless
KEEP-ALIVE? is true." KEEP-ALIVE? is true."
(define (write! bv start count) (define (write! bv start count)
(let ((len-string (put-string port (number->string count 16))
(number->string count 16))) (put-string port "\r\n")
(put-string port len-string))
(put-bytevector port crlf-bv 0 2)
(put-bytevector port bv start count) (put-bytevector port bv start count)
(put-bytevector port crlf-bv 0 2) (put-string port "\r\n")
(force-output port) (force-output port)
count) count)
@ -140,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true."
(record-constructor &request-body-ended-prematurely)) (record-constructor &request-body-ended-prematurely))
(define request-body-ended-prematurely-error? (define request-body-ended-prematurely-error?
(exception-predicate &request-body-ended-prematurely)) (record-predicate &request-body-ended-prematurely))
(define (request-body-port/knots r) (define (request-body-port/knots r)
(cond (cond
@ -341,19 +331,15 @@ on the procedure being called at any particular time."
(string->utf8 (string->utf8
"internal server error"))) "internal server error")))
(define* (handle-request handler client (define (handle-request handler client
read-request-exception-handler read-request-exception-handler
write-response-exception-handler write-response-exception-handler)
buffer-size
#:key post-request-hook)
(let ((request (let ((request
(with-exception-handler (with-exception-handler
read-request-exception-handler read-request-exception-handler
(lambda () (lambda ()
(read-request client)) (read-request client))
#:unwind? #t)) #:unwind? #t)))
(read-request-time
(get-internal-real-time)))
(let ((response (let ((response
body body
(cond (cond
@ -402,9 +388,7 @@ on the procedure being called at any particular time."
(lambda () (lambda ()
(write-response response client) (write-response response client)
(let ((response-start-time (let ((body-written?
(get-internal-real-time))
(body-written?
(if (procedure? body) (if (procedure? body)
(let* ((type (response-content-type response (let* ((type (response-content-type response
'(text/plain))) '(text/plain)))
@ -415,11 +399,7 @@ on the procedure being called at any particular time."
client client
(make-chunked-output-port/knots (make-chunked-output-port/knots
client client
#:keep-alive? #t #:keep-alive? #t))))
#:buffering
(- buffer-size
(chunked-output-port-overhead-bytes
buffer-size))))))
(set-port-encoding! body-port charset) (set-port-encoding! body-port charset)
(let ((body-written? (let ((body-written?
(with-exception-handler (with-exception-handler
@ -443,11 +423,6 @@ on the procedure being called at any particular time."
(if body-written? (if body-written?
(begin (begin
(force-output client) (force-output client)
(when post-request-hook
(post-request-hook request
#:read-request-time read-request-time
#:response-start-time response-start-time
#:response-end-time (get-internal-real-time)))
(when (and (procedure? body) (when (and (procedure? body)
(response-content-length response)) (response-content-length response))
(set-port-encoding! client "ISO-8859-1")) (set-port-encoding! client "ISO-8859-1"))
@ -459,8 +434,7 @@ on the procedure being called at any particular time."
read-request-exception-handler read-request-exception-handler
write-response-exception-handler write-response-exception-handler
connection-idle-timeout connection-idle-timeout
buffer-size buffer-size)
post-request-hook)
;; Always disable Nagle's algorithm, as we handle buffering ;; Always disable Nagle's algorithm, as we handle buffering
;; ourselves; when we force-output, we really want the data to go ;; ourselves; when we force-output, we really want the data to go
;; out. ;; out.
@ -498,29 +472,11 @@ on the procedure being called at any particular time."
(else (else
(let ((keep-alive? (handle-request handler client (let ((keep-alive? (handle-request handler client
read-request-exception-handler read-request-exception-handler
write-response-exception-handler write-response-exception-handler)))
buffer-size
#:post-request-hook
post-request-hook)))
(if keep-alive? (if keep-alive?
(loop) (loop)
(close-port client))))))) (close-port client)))))))
(define (post-request-hook/safe post-request-hook)
(if post-request-hook
(lambda args
(with-exception-handler
(lambda (exn) #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(apply post-request-hook args))))
#:unwind? #t))
#f))
(define-record-type <web-server> (define-record-type <web-server>
(make-web-server socket port) (make-web-server socket port)
web-server? web-server?
@ -540,8 +496,7 @@ on the procedure being called at any particular time."
(write-response-exception-handler (write-response-exception-handler
default-write-response-exception-handler) default-write-response-exception-handler)
(connection-idle-timeout #f) (connection-idle-timeout #f)
(connection-buffer-size 1024) (connection-buffer-size 1024))
post-request-hook)
"Run the knots web server. "Run the knots web server.
HANDLER should be a procedure that takes one argument, the HTTP HANDLER should be a procedure that takes one argument, the HTTP
@ -577,9 +532,7 @@ before sending back to the client."
read-request-exception-handler read-request-exception-handler
write-response-exception-handler write-response-exception-handler
connection-idle-timeout connection-idle-timeout
connection-buffer-size connection-buffer-size))
(post-request-hook/safe
post-request-hook)))
#:parallel? #t) #:parallel? #t)
(loop)))))) (loop))))))

View file

@ -1,11 +1,10 @@
(define-module (tests) (define-module (tests)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
#:use-module (fibers) #:use-module (fibers)
#:use-module (knots)
#:export (run-fibers-for-tests #:export (run-fibers-for-tests
assert-no-heap-growth)) assert-no-heap-growth))
(define* (run-fibers-for-tests thunk #:key (drain? #t)) (define (run-fibers-for-tests thunk)
(let ((result (let ((result
(run-fibers (run-fibers
(lambda () (lambda ()
@ -13,18 +12,15 @@
(lambda (exn) (lambda (exn)
exn) exn)
(lambda () (lambda ()
(simple-format #t "running ~A\n" thunk)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(print-backtrace-and-exception/knots exn) (backtrace)
(raise-exception exn)) (raise-exception exn))
(lambda () thunk)
(start-stack #t (thunk))))
#t) #t)
#:unwind? #t)) #:unwind? #t))
#:hz 0 #:hz 0
#:parallelism 1 #:parallelism 1)))
#:drain? drain?)))
(if (exception? result) (if (exception? result)
(raise-exception result) (raise-exception result)
result))) result)))

View file

@ -61,24 +61,6 @@
identity identity
'(())))) '(()))))
(run-fibers-for-tests
(lambda ()
(with-exception-handler
(lambda (exn)
(unless (and (exception-with-message? exn)
(string=? (exception-message exn)
"foo"))
(raise-exception exn)))
(lambda ()
(fibers-map-with-progress
(lambda _
(raise-exception
(make-exception-with-message "foo")))
'((1)))
(error 'should-not-reach-here))
#:unwind? #t)))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
@ -129,16 +111,4 @@
(assert-equal a 1)))) (assert-equal a 1))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter 2)))
(fibers-for-each
(lambda _
(with-parallelism-limiter
parallelism-limiter
#f))
(iota 50))
(destroy-parallelism-limiter parallelism-limiter))))
(display "parallelism test finished successfully\n") (display "parallelism test finished successfully\n")

View file

@ -19,21 +19,7 @@
(number? (number?
(with-resource-from-pool resource-pool (with-resource-from-pool resource-pool
res res
res))) res))))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-fixed-size-resource-pool
(list 1))))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -45,9 +31,7 @@
(number? (number?
(with-resource-from-pool resource-pool (with-resource-from-pool resource-pool
res res
res))) res))))))
(destroy-resource-pool resource-pool))))
(let* ((error-constructor (let* ((error-constructor
(record-constructor &resource-pool-timeout)) (record-constructor &resource-pool-timeout))
@ -104,13 +88,10 @@
res)) res))
(iota 20)) (iota 20))
(let loop ((stats (resource-pool-stats resource-pool (let loop ((stats (resource-pool-stats resource-pool)))
#:timeout #f)))
(unless (= 0 (assq-ref stats 'resources)) (unless (= 0 (assq-ref stats 'resources))
(sleep 0.1) (sleep 0.1)
(loop (resource-pool-stats resource-pool #:timeout #f)))) (loop (resource-pool-stats resource-pool)))))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -134,9 +115,7 @@
(set! counter (+ 1 counter)) (set! counter (+ 1 counter))
(error "collision detected"))))) (error "collision detected")))))
20 20
(iota 50)) (iota 50)))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -150,7 +129,7 @@
(error "collision detected"))) (error "collision detected")))
(new-number)) (new-number))
1 1
#:default-checkout-timeout 5))) #:default-checkout-timeout 120)))
(fibers-batch-for-each (fibers-batch-for-each
(lambda _ (lambda _
(with-resource-from-pool (with-resource-from-pool
@ -161,9 +140,7 @@
(set! counter (+ 1 counter)) (set! counter (+ 1 counter))
(error "collision detected"))))) (error "collision detected")))))
20 20
(iota 50)) (iota 50)))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -187,14 +164,14 @@
(call-with-resource-from-pool (call-with-resource-from-pool
resource-pool resource-pool
(lambda (res) (lambda (res)
#f))) (error 'should-not-be-reached))))
#:unwind? #t))) #:unwind? #t)))
(while (= 0 (while (= 0
(assq-ref (assq-ref
(resource-pool-stats resource-pool #:timeout #f) (resource-pool-stats resource-pool)
'waiters)) 'waiters))
(sleep 0.1)) (sleep 0))
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -207,8 +184,6 @@
resource-pool resource-pool
(lambda (res) (lambda (res)
(error 'should-not-be-reached)))) (error 'should-not-be-reached))))
#:unwind? #t))) #:unwind? #t))))))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")