Compare commits
19 commits
actions-te
...
trunk
Author | SHA1 | Date | |
---|---|---|---|
52092e7a99 | |||
4468a3ef6d | |||
d18b5b8d5d | |||
f4b48a1499 | |||
ec2f2489a2 | |||
ff93dc1442 | |||
ce1b710bcf | |||
7709ffe1d3 | |||
deae518b52 | |||
0fa6737a39 | |||
4140ef0bd6 | |||
6f6d57b189 | |||
d8f64399cd | |||
163d775496 | |||
8f3e0a9a1d | |||
09ca6cfb6b | |||
ab5411da42 | |||
edf62414ee | |||
2e25c3b074 |
9 changed files with 917 additions and 313 deletions
|
@ -1,7 +1,7 @@
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- actions-test
|
- trunk
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
runs-on: host
|
runs-on: host
|
||||||
|
@ -10,13 +10,17 @@ jobs:
|
||||||
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
|
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
|
||||||
- run: |
|
- run: |
|
||||||
cd knots-trunk
|
cd knots-trunk
|
||||||
guix shell -D -f guix-dev.scm -- documenta api knots
|
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/"
|
||||||
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
|
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
|
||||||
|
|
||||||
- run: |
|
- run: |
|
||||||
cd knots-pages
|
cd knots-pages
|
||||||
git add .
|
git add .
|
||||||
|
if [[ -z "$(git status -s)" ]]; then
|
||||||
|
echo "Nothing to push"
|
||||||
|
else
|
||||||
git config user.email ""
|
git config user.email ""
|
||||||
git config user.name "Automatic website updater"
|
git config user.name "Automatic website updater"
|
||||||
git commit -m "Automatic website update"
|
git commit -m "Automatic website update"
|
||||||
git push
|
git push
|
||||||
|
fi
|
|
@ -20,6 +20,9 @@
|
||||||
(define-module (knots parallelism)
|
(define-module (knots parallelism)
|
||||||
#:use-module (srfi srfi-1)
|
#:use-module (srfi srfi-1)
|
||||||
#:use-module (srfi srfi-71)
|
#:use-module (srfi srfi-71)
|
||||||
|
#:use-module (srfi srfi-9)
|
||||||
|
#:use-module (srfi srfi-9 gnu)
|
||||||
|
#:use-module (srfi srfi-43)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
#:use-module (ice-9 control)
|
#:use-module (ice-9 control)
|
||||||
#:use-module (ice-9 exceptions)
|
#:use-module (ice-9 exceptions)
|
||||||
|
@ -27,6 +30,7 @@
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
#:use-module (fibers operations)
|
#:use-module (fibers operations)
|
||||||
#:use-module (knots)
|
#:use-module (knots)
|
||||||
|
#:use-module (knots resource-pool)
|
||||||
#:export (fibers-batch-map
|
#:export (fibers-batch-map
|
||||||
fibers-map
|
fibers-map
|
||||||
|
|
||||||
|
@ -38,7 +42,13 @@
|
||||||
fibers-parallel
|
fibers-parallel
|
||||||
fibers-let
|
fibers-let
|
||||||
|
|
||||||
fiberize))
|
fiberize
|
||||||
|
|
||||||
|
make-parallelism-limiter
|
||||||
|
parallelism-limiter?
|
||||||
|
destroy-parallelism-limiter
|
||||||
|
call-with-parallelism-limiter
|
||||||
|
with-parallelism-limiter))
|
||||||
|
|
||||||
(define (defer-to-parallel-fiber thunk)
|
(define (defer-to-parallel-fiber thunk)
|
||||||
(let ((reply (make-channel)))
|
(let ((reply (make-channel)))
|
||||||
|
@ -48,7 +58,7 @@
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(put-message
|
(put-message
|
||||||
reply
|
reply
|
||||||
(list 'exception exn)))
|
(cons 'exception exn)))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -69,7 +79,7 @@
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-stack #t (thunk)))
|
(start-stack #t (thunk)))
|
||||||
(lambda vals
|
(lambda vals
|
||||||
(put-message reply vals))))))
|
(put-message reply (cons 'result vals)))))))
|
||||||
#:unwind? #t))
|
#:unwind? #t))
|
||||||
#:parallel? #t)
|
#:parallel? #t)
|
||||||
reply))
|
reply))
|
||||||
|
@ -79,13 +89,16 @@
|
||||||
reply-channels)))
|
reply-channels)))
|
||||||
(map
|
(map
|
||||||
(match-lambda
|
(match-lambda
|
||||||
(('exception exn)
|
(('exception . exn)
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
(result
|
(('result . vals)
|
||||||
(apply values result)))
|
(apply values vals)))
|
||||||
responses)))
|
responses)))
|
||||||
|
|
||||||
(define (fibers-batch-map proc parallelism-limit . lists)
|
(define (fibers-batch-map proc parallelism-limit . lists)
|
||||||
|
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
|
||||||
|
the invocations of PROC raise an exception, this will be raised once
|
||||||
|
all of the calls to PROC have finished."
|
||||||
(define vecs (map (lambda (list-or-vec)
|
(define vecs (map (lambda (list-or-vec)
|
||||||
(if (vector? list-or-vec)
|
(if (vector? list-or-vec)
|
||||||
list-or-vec
|
list-or-vec
|
||||||
|
@ -105,9 +118,18 @@
|
||||||
(channel-indexes '()))
|
(channel-indexes '()))
|
||||||
(if (and (eq? #f next-to-process-index)
|
(if (and (eq? #f next-to-process-index)
|
||||||
(null? channel-indexes))
|
(null? channel-indexes))
|
||||||
|
(let ((processed-result-vec
|
||||||
|
(vector-map
|
||||||
|
(lambda (_ result-or-exn)
|
||||||
|
(match result-or-exn
|
||||||
|
(('exception . exn)
|
||||||
|
(raise-exception exn))
|
||||||
|
(('result . vals)
|
||||||
|
(car vals))))
|
||||||
|
result-vec)))
|
||||||
(if (vector? (first lists))
|
(if (vector? (first lists))
|
||||||
result-vec
|
processed-result-vec
|
||||||
(vector->list result-vec))
|
(vector->list processed-result-vec)))
|
||||||
|
|
||||||
(if (or (= (length channel-indexes)
|
(if (or (= (length channel-indexes)
|
||||||
(min parallelism-limit vecs-length))
|
(min parallelism-limit vecs-length))
|
||||||
|
@ -123,18 +145,13 @@
|
||||||
(get-operation
|
(get-operation
|
||||||
(vector-ref result-vec index))
|
(vector-ref result-vec index))
|
||||||
(lambda (result)
|
(lambda (result)
|
||||||
(match result
|
|
||||||
(('exception exn)
|
|
||||||
(raise-exception exn))
|
|
||||||
(_
|
|
||||||
(vector-set! result-vec
|
(vector-set! result-vec
|
||||||
index
|
index
|
||||||
(first result))
|
result)
|
||||||
|
|
||||||
(values next-to-process-index
|
(values next-to-process-index
|
||||||
(lset-difference =
|
(lset-difference =
|
||||||
channel-indexes
|
channel-indexes
|
||||||
(list index))))))))
|
(list index))))))
|
||||||
channel-indexes)))))
|
channel-indexes)))))
|
||||||
(loop new-index
|
(loop new-index
|
||||||
new-channel-indexes))
|
new-channel-indexes))
|
||||||
|
@ -157,9 +174,14 @@
|
||||||
channel-indexes)))))))
|
channel-indexes)))))))
|
||||||
|
|
||||||
(define (fibers-map proc . lists)
|
(define (fibers-map proc . lists)
|
||||||
|
"Map PROC over LISTS in parallel, running up to 20 fibers in
|
||||||
|
PARALLEL. If any of the invocations of PROC raise an exception, this
|
||||||
|
will be raised once all of the calls to PROC have finished."
|
||||||
(apply fibers-batch-map proc 20 lists))
|
(apply fibers-batch-map proc 20 lists))
|
||||||
|
|
||||||
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
||||||
|
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
|
||||||
|
parallel."
|
||||||
(apply fibers-batch-map
|
(apply fibers-batch-map
|
||||||
(lambda args
|
(lambda args
|
||||||
(apply proc args)
|
(apply proc args)
|
||||||
|
@ -170,10 +192,13 @@
|
||||||
*unspecified*)
|
*unspecified*)
|
||||||
|
|
||||||
(define (fibers-for-each proc . lists)
|
(define (fibers-for-each proc . lists)
|
||||||
|
"Call PROC on LISTS, running up to 20 fibers in parallel."
|
||||||
(apply fibers-batch-for-each proc 20 lists))
|
(apply fibers-batch-for-each proc 20 lists))
|
||||||
|
|
||||||
(define-syntax fibers-parallel
|
(define-syntax fibers-parallel
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
|
"Run each expression in parallel. If any expression raises an
|
||||||
|
exception, this will be raised after all exceptions have finished."
|
||||||
(syntax-case x ()
|
(syntax-case x ()
|
||||||
((_ e0 ...)
|
((_ e0 ...)
|
||||||
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
||||||
|
@ -184,12 +209,16 @@
|
||||||
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
||||||
|
|
||||||
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
||||||
|
"Let, but run each binding in a fiber in parallel."
|
||||||
(call-with-values
|
(call-with-values
|
||||||
(lambda () (fibers-parallel e ...))
|
(lambda () (fibers-parallel e ...))
|
||||||
(lambda (v ...)
|
(lambda (v ...)
|
||||||
b0 b1 ...)))
|
b0 b1 ...)))
|
||||||
|
|
||||||
(define* (fibers-map-with-progress proc lists #:key report)
|
(define* (fibers-map-with-progress proc lists #:key report)
|
||||||
|
"Map PROC over LISTS, calling #:REPORT if specified after each
|
||||||
|
invocation of PROC finishes. REPORT is passed the results for each
|
||||||
|
element of LISTS, or #f if no result has been received yet."
|
||||||
(let loop ((channels-to-results
|
(let loop ((channels-to-results
|
||||||
(apply map
|
(apply map
|
||||||
(lambda args
|
(lambda args
|
||||||
|
@ -210,8 +239,8 @@
|
||||||
(match-lambda
|
(match-lambda
|
||||||
((#f . ('exception . exn))
|
((#f . ('exception . exn))
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
((#f . ('result . val))
|
((#f . ('result . vals))
|
||||||
val))
|
(car vals)))
|
||||||
channels-to-results)
|
channels-to-results)
|
||||||
(loop
|
(loop
|
||||||
(perform-operation
|
(perform-operation
|
||||||
|
@ -228,12 +257,7 @@
|
||||||
(map (match-lambda
|
(map (match-lambda
|
||||||
((c . r)
|
((c . r)
|
||||||
(if (eq? channel c)
|
(if (eq? channel c)
|
||||||
(cons #f
|
(cons #f result)
|
||||||
(match result
|
|
||||||
(('exception . exn)
|
|
||||||
result)
|
|
||||||
(_
|
|
||||||
(cons 'result result))))
|
|
||||||
(cons c r))))
|
(cons c r))))
|
||||||
channels-to-results)))
|
channels-to-results)))
|
||||||
#f))))
|
#f))))
|
||||||
|
@ -254,7 +278,7 @@
|
||||||
reply-channel
|
reply-channel
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(list 'exception exn))
|
(cons 'exception exn))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -285,5 +309,32 @@
|
||||||
(put-message input-channel (cons reply-channel args))
|
(put-message input-channel (cons reply-channel args))
|
||||||
(match (get-message reply-channel)
|
(match (get-message reply-channel)
|
||||||
(('result . vals) (apply values vals))
|
(('result . vals) (apply values vals))
|
||||||
(('exception exn)
|
(('exception . exn)
|
||||||
(raise-exception exn))))))
|
(raise-exception exn))))))
|
||||||
|
|
||||||
|
(define-record-type <parallelism-limiter>
|
||||||
|
(make-parallelism-limiter-record resource-pool)
|
||||||
|
parallelism-limiter?
|
||||||
|
(resource-pool parallelism-limiter-resource-pool))
|
||||||
|
|
||||||
|
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||||
|
(make-parallelism-limiter-record
|
||||||
|
(make-fixed-size-resource-pool
|
||||||
|
(iota limit)
|
||||||
|
#:name name)))
|
||||||
|
|
||||||
|
(define (destroy-parallelism-limiter parallelism-limiter)
|
||||||
|
(destroy-resource-pool
|
||||||
|
(parallelism-limiter-resource-pool
|
||||||
|
parallelism-limiter)))
|
||||||
|
|
||||||
|
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||||
|
(call-with-resource-from-pool
|
||||||
|
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||||
|
(lambda _
|
||||||
|
(thunk))))
|
||||||
|
|
||||||
|
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
|
||||||
|
(call-with-parallelism-limiter
|
||||||
|
parallelism-limiter
|
||||||
|
(lambda () exp ...)))
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#:use-module (srfi srfi-9)
|
#:use-module (srfi srfi-9)
|
||||||
#:use-module (srfi srfi-9 gnu)
|
#:use-module (srfi srfi-9 gnu)
|
||||||
#:use-module (srfi srfi-71)
|
#:use-module (srfi srfi-71)
|
||||||
|
#:use-module (ice-9 q)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
#:use-module (ice-9 exceptions)
|
#:use-module (ice-9 exceptions)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
|
@ -32,9 +33,10 @@
|
||||||
#:use-module (fibers conditions)
|
#:use-module (fibers conditions)
|
||||||
#:use-module (knots)
|
#:use-module (knots)
|
||||||
#:use-module (knots parallelism)
|
#:use-module (knots parallelism)
|
||||||
#:export (resource-pool?
|
#:export (make-fixed-size-resource-pool
|
||||||
|
|
||||||
make-resource-pool
|
make-resource-pool
|
||||||
|
|
||||||
|
resource-pool?
|
||||||
resource-pool-name
|
resource-pool-name
|
||||||
resource-pool-channel
|
resource-pool-channel
|
||||||
resource-pool-configuration
|
resource-pool-configuration
|
||||||
|
@ -73,7 +75,7 @@
|
||||||
(record-constructor &resource-pool-abort-add-resource))
|
(record-constructor &resource-pool-abort-add-resource))
|
||||||
|
|
||||||
(define resource-pool-abort-add-resource-error?
|
(define resource-pool-abort-add-resource-error?
|
||||||
(record-predicate &resource-pool-abort-add-resource))
|
(exception-predicate &resource-pool-abort-add-resource))
|
||||||
|
|
||||||
(define-record-type <resource-pool>
|
(define-record-type <resource-pool>
|
||||||
(make-resource-pool-record name channel destroy-condition configuration)
|
(make-resource-pool-record name channel destroy-condition configuration)
|
||||||
|
@ -91,6 +93,429 @@
|
||||||
(resource-pool-name resource-pool))
|
(resource-pool-name resource-pool))
|
||||||
port)))
|
port)))
|
||||||
|
|
||||||
|
(define (remove-at-index! lst i)
|
||||||
|
(let ((start
|
||||||
|
end
|
||||||
|
(split-at! lst i)))
|
||||||
|
(append
|
||||||
|
start
|
||||||
|
(cdr end))))
|
||||||
|
|
||||||
|
(define* (make-fixed-size-resource-pool resources
|
||||||
|
#:key
|
||||||
|
(delay-logger (const #f))
|
||||||
|
(duration-logger (const #f))
|
||||||
|
destructor
|
||||||
|
scheduler
|
||||||
|
(name "unnamed")
|
||||||
|
default-checkout-timeout
|
||||||
|
default-max-waiters)
|
||||||
|
(define channel (make-channel))
|
||||||
|
(define destroy-condition
|
||||||
|
(make-condition))
|
||||||
|
|
||||||
|
(define pool
|
||||||
|
(make-resource-pool-record
|
||||||
|
name
|
||||||
|
channel
|
||||||
|
destroy-condition
|
||||||
|
`((delay-logger . ,delay-logger)
|
||||||
|
(duration-logger . ,duration-logger)
|
||||||
|
(destructor . ,destructor)
|
||||||
|
(scheduler . ,scheduler)
|
||||||
|
(name . ,name)
|
||||||
|
(default-checkout-timeout . ,default-checkout-timeout)
|
||||||
|
(default-max-waiters . ,default-max-waiters))))
|
||||||
|
|
||||||
|
(define checkout-failure-count 0)
|
||||||
|
|
||||||
|
(define (spawn-fiber-to-destroy-resource resource)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(let ((success?
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda _ #f)
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"exception running resource pool destructor (~A): ~A\n"
|
||||||
|
name
|
||||||
|
destructor)
|
||||||
|
(print-backtrace-and-exception/knots exn)
|
||||||
|
(raise-exception exn))
|
||||||
|
(lambda ()
|
||||||
|
(start-stack #t (destructor resource))
|
||||||
|
#t)))
|
||||||
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
(if success?
|
||||||
|
(put-message channel
|
||||||
|
(list 'remove resource))
|
||||||
|
(begin
|
||||||
|
(sleep 5)
|
||||||
|
|
||||||
|
(loop))))))))
|
||||||
|
|
||||||
|
(define (spawn-fiber-for-checkout reply-channel
|
||||||
|
reply-timeout
|
||||||
|
resource)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((checkout-success?
|
||||||
|
(perform-operation
|
||||||
|
(choice-operation
|
||||||
|
(wrap-operation
|
||||||
|
(put-operation reply-channel
|
||||||
|
(cons 'success resource))
|
||||||
|
(const #t))
|
||||||
|
(wrap-operation (sleep-operation
|
||||||
|
reply-timeout)
|
||||||
|
(const #f))))))
|
||||||
|
(unless checkout-success?
|
||||||
|
(put-message
|
||||||
|
channel
|
||||||
|
(list 'return-failed-checkout resource)))))))
|
||||||
|
|
||||||
|
(define (destroy-loop resources)
|
||||||
|
(let loop ((resources resources))
|
||||||
|
(match (get-message channel)
|
||||||
|
(('checkout reply timeout-time max-waiters)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'resource-pool-destroyed
|
||||||
|
#f))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op)))))
|
||||||
|
(loop resources))
|
||||||
|
(((and (or 'return
|
||||||
|
'return-failed-checkout
|
||||||
|
'remove)
|
||||||
|
return-type)
|
||||||
|
resource)
|
||||||
|
(when (and (not (eq? return-type 'remove))
|
||||||
|
destructor)
|
||||||
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
|
||||||
|
(let ((index
|
||||||
|
(list-index (lambda (x)
|
||||||
|
(eq? x resource))
|
||||||
|
resources)))
|
||||||
|
(let ((new-resources
|
||||||
|
(if index
|
||||||
|
(remove-at-index! resources index)
|
||||||
|
(begin
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"resource pool error: unable to remove ~A\n"
|
||||||
|
resource)
|
||||||
|
resources))))
|
||||||
|
(if (null? new-resources)
|
||||||
|
(begin
|
||||||
|
(signal-condition! destroy-condition)
|
||||||
|
|
||||||
|
;; No loop
|
||||||
|
*unspecified*)
|
||||||
|
(loop new-resources)))))
|
||||||
|
|
||||||
|
(('stats reply timeout-time)
|
||||||
|
(let ((stats
|
||||||
|
`((resources . ,(length resources))
|
||||||
|
(available . 0)
|
||||||
|
(waiters . 0)
|
||||||
|
(checkout-failure-count . ,checkout-failure-count))))
|
||||||
|
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation reply stats)))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second)))
|
||||||
|
op))))))
|
||||||
|
|
||||||
|
(loop resources))
|
||||||
|
|
||||||
|
(('destroy reply)
|
||||||
|
(loop resources))
|
||||||
|
(unknown
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||||
|
name
|
||||||
|
unknown)
|
||||||
|
(loop resources)))))
|
||||||
|
|
||||||
|
(define (main-loop)
|
||||||
|
(let loop ((resources resources)
|
||||||
|
(available resources)
|
||||||
|
(waiters (make-q)))
|
||||||
|
|
||||||
|
(match (get-message channel)
|
||||||
|
(('checkout reply timeout-time max-waiters)
|
||||||
|
(if (null? available)
|
||||||
|
(let ((waiters-count
|
||||||
|
(q-length waiters)))
|
||||||
|
(if (and max-waiters
|
||||||
|
(>= waiters-count
|
||||||
|
max-waiters))
|
||||||
|
(begin
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'too-many-waiters
|
||||||
|
waiters-count))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op)))))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
(enq! waiters (cons reply timeout-time)))))
|
||||||
|
|
||||||
|
(if timeout-time
|
||||||
|
(let ((current-internal-time
|
||||||
|
(get-internal-real-time)))
|
||||||
|
;; If this client is still waiting
|
||||||
|
(if (> timeout-time
|
||||||
|
current-internal-time)
|
||||||
|
(let ((reply-timeout
|
||||||
|
(/ (- timeout-time
|
||||||
|
current-internal-time)
|
||||||
|
internal-time-units-per-second)))
|
||||||
|
|
||||||
|
;; Don't sleep in this fiber, so spawn a new
|
||||||
|
;; fiber to handle handing over the resource,
|
||||||
|
;; and returning it if there's a timeout
|
||||||
|
(spawn-fiber-for-checkout reply
|
||||||
|
reply-timeout
|
||||||
|
(car available))
|
||||||
|
(loop resources
|
||||||
|
(cdr available)
|
||||||
|
waiters))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters)))
|
||||||
|
(begin
|
||||||
|
(put-message reply (cons 'success
|
||||||
|
(car available)))
|
||||||
|
|
||||||
|
(loop resources
|
||||||
|
(cdr available)
|
||||||
|
waiters)))))
|
||||||
|
|
||||||
|
(((and (or 'return
|
||||||
|
'return-failed-checkout)
|
||||||
|
return-type)
|
||||||
|
resource)
|
||||||
|
|
||||||
|
(when (eq? 'return-failed-checkout
|
||||||
|
return-type)
|
||||||
|
(set! checkout-failure-count
|
||||||
|
(+ 1 checkout-failure-count)))
|
||||||
|
|
||||||
|
(if (q-empty? waiters)
|
||||||
|
(loop resources
|
||||||
|
(cons resource available)
|
||||||
|
waiters)
|
||||||
|
|
||||||
|
(let ((current-internal-time
|
||||||
|
(get-internal-real-time)))
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(if (eq? (exception-kind exn) 'q-empty)
|
||||||
|
(loop resources
|
||||||
|
(cons resource available)
|
||||||
|
waiters)
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(let waiter-loop ((waiter (deq! waiters)))
|
||||||
|
(match waiter
|
||||||
|
((reply . timeout)
|
||||||
|
(if (and timeout
|
||||||
|
(< timeout current-internal-time))
|
||||||
|
(waiter-loop (deq! waiters))
|
||||||
|
(if timeout
|
||||||
|
(let ((reply-timeout
|
||||||
|
(/ (- timeout
|
||||||
|
current-internal-time)
|
||||||
|
internal-time-units-per-second)))
|
||||||
|
;; Don't sleep in this fiber, so spawn a
|
||||||
|
;; new fiber to handle handing over the
|
||||||
|
;; resource, and returning it if there's
|
||||||
|
;; a timeout
|
||||||
|
(spawn-fiber-for-checkout reply
|
||||||
|
reply-timeout
|
||||||
|
resource))
|
||||||
|
(put-message reply (cons 'success
|
||||||
|
resource))))))))
|
||||||
|
#:unwind? #t)
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters))))
|
||||||
|
|
||||||
|
(('list-resources reply)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(put-message reply (list-copy resources))))
|
||||||
|
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters))
|
||||||
|
|
||||||
|
(('stats reply timeout-time)
|
||||||
|
(let ((stats
|
||||||
|
`((resources . ,(length resources))
|
||||||
|
(available . ,(length available))
|
||||||
|
(waiters . ,(q-length waiters))
|
||||||
|
(checkout-failure-count . ,checkout-failure-count))))
|
||||||
|
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation reply stats)))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second)))
|
||||||
|
op))))))
|
||||||
|
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters))
|
||||||
|
|
||||||
|
(('destroy)
|
||||||
|
(if (and (null? resources)
|
||||||
|
(q-empty? waiters))
|
||||||
|
(signal-condition!
|
||||||
|
destroy-condition)
|
||||||
|
|
||||||
|
(let ((current-internal-time (get-internal-real-time)))
|
||||||
|
(for-each
|
||||||
|
(match-lambda
|
||||||
|
((reply . timeout)
|
||||||
|
(when (or (not timeout)
|
||||||
|
(> timeout current-internal-time))
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'resource-pool-destroyed
|
||||||
|
#f))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op))))))))
|
||||||
|
(car waiters))
|
||||||
|
|
||||||
|
(if destructor
|
||||||
|
(begin
|
||||||
|
(for-each
|
||||||
|
(lambda (resource)
|
||||||
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
available)
|
||||||
|
(destroy-loop resources))
|
||||||
|
(let dl ((resources resources)
|
||||||
|
(available available))
|
||||||
|
(if (null? available)
|
||||||
|
(if (null? resources)
|
||||||
|
(signal-condition!
|
||||||
|
destroy-condition)
|
||||||
|
(destroy-loop resources))
|
||||||
|
(let ((index
|
||||||
|
(list-index (lambda (x)
|
||||||
|
(eq? x (car available)))
|
||||||
|
resources)))
|
||||||
|
(dl (remove-at-index! resources index)
|
||||||
|
(cdr available)))))))))
|
||||||
|
|
||||||
|
(unknown
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||||
|
name
|
||||||
|
unknown)
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters)))))
|
||||||
|
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
#f)
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(let* ((stack (make-stack #t))
|
||||||
|
(error-string
|
||||||
|
(call-with-output-string
|
||||||
|
(lambda (port)
|
||||||
|
(display-backtrace stack port 3)
|
||||||
|
(simple-format
|
||||||
|
port
|
||||||
|
"exception in the ~A pool fiber, " name)
|
||||||
|
(print-exception
|
||||||
|
port
|
||||||
|
(stack-ref stack 3)
|
||||||
|
'%exception
|
||||||
|
(list exn))))))
|
||||||
|
(display error-string
|
||||||
|
(current-error-port)))
|
||||||
|
(raise-exception exn))
|
||||||
|
(lambda ()
|
||||||
|
(start-stack
|
||||||
|
#t
|
||||||
|
(main-loop)))))
|
||||||
|
#:unwind? #t))
|
||||||
|
(or scheduler
|
||||||
|
(current-scheduler)))
|
||||||
|
|
||||||
|
pool)
|
||||||
|
|
||||||
(define* (make-resource-pool return-new-resource max-size
|
(define* (make-resource-pool return-new-resource max-size
|
||||||
#:key (min-size 0)
|
#:key (min-size 0)
|
||||||
(idle-seconds #f)
|
(idle-seconds #f)
|
||||||
|
@ -126,28 +551,33 @@
|
||||||
|
|
||||||
(define checkout-failure-count 0)
|
(define checkout-failure-count 0)
|
||||||
|
|
||||||
(define spawn-fiber-to-return-new-resource
|
(define return-new-resource/parallelism-limiter
|
||||||
(if add-resources-parallelism
|
(make-parallelism-limiter
|
||||||
(let ((thunk
|
(or add-resources-parallelism
|
||||||
(fiberize
|
max-size)
|
||||||
|
#:name
|
||||||
|
(string-append
|
||||||
|
name
|
||||||
|
" resource pool new resource parallelism limiter")))
|
||||||
|
|
||||||
|
(define (spawn-fiber-to-return-new-resource)
|
||||||
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
;; This can happen if the resource pool is destroyed very
|
||||||
|
;; quickly
|
||||||
|
(unless (resource-pool-destroyed-error? exn)
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(with-parallelism-limiter
|
||||||
|
return-new-resource/parallelism-limiter
|
||||||
(let ((max-size
|
(let ((max-size
|
||||||
(assq-ref (resource-pool-configuration pool)
|
(assq-ref (resource-pool-configuration pool)
|
||||||
'max-size))
|
'max-size))
|
||||||
(size (assq-ref (resource-pool-stats pool)
|
(size (assq-ref (resource-pool-stats pool #:timeout #f)
|
||||||
'resources)))
|
'resources)))
|
||||||
(unless (= size max-size)
|
(unless (= size max-size)
|
||||||
(let ((new-resource
|
|
||||||
(return-new-resource)))
|
|
||||||
(put-message channel
|
|
||||||
(list 'add-resource new-resource))))))
|
|
||||||
#:parallelism add-resources-parallelism)))
|
|
||||||
(lambda ()
|
|
||||||
(spawn-fiber thunk)))
|
|
||||||
(lambda ()
|
|
||||||
(spawn-fiber
|
|
||||||
(lambda ()
|
|
||||||
(let ((new-resource
|
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda _ #f)
|
(lambda _ #f)
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -161,11 +591,12 @@
|
||||||
(print-backtrace-and-exception/knots exn)
|
(print-backtrace-and-exception/knots exn)
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-stack #t (return-new-resource)))))
|
(let ((new-resource
|
||||||
#:unwind? #t)))
|
(start-stack #t (return-new-resource))))
|
||||||
(when new-resource
|
|
||||||
(put-message channel
|
(put-message channel
|
||||||
(list 'add-resource new-resource)))))))))
|
(list 'add-resource new-resource))))))
|
||||||
|
#:unwind? #t)))))
|
||||||
|
#:unwind? #t))))
|
||||||
|
|
||||||
(define (spawn-fiber-to-destroy-resource resource)
|
(define (spawn-fiber-to-destroy-resource resource)
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
|
@ -250,21 +681,14 @@
|
||||||
'remove)
|
'remove)
|
||||||
return-type)
|
return-type)
|
||||||
resource)
|
resource)
|
||||||
(when destructor
|
(when (and (not (eq? return-type 'remove))
|
||||||
|
destructor)
|
||||||
(spawn-fiber-to-destroy-resource resource))
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
|
||||||
(let ((index
|
(let ((index
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)))
|
resources)))
|
||||||
(define (remove-at-index! lst i)
|
|
||||||
(let ((start
|
|
||||||
end
|
|
||||||
(split-at! lst i)))
|
|
||||||
(append
|
|
||||||
start
|
|
||||||
(cdr end))))
|
|
||||||
|
|
||||||
(let ((new-resources
|
(let ((new-resources
|
||||||
(if index
|
(if index
|
||||||
(remove-at-index! resources index)
|
(remove-at-index! resources index)
|
||||||
|
@ -276,13 +700,16 @@
|
||||||
resources))))
|
resources))))
|
||||||
(if (null? new-resources)
|
(if (null? new-resources)
|
||||||
(begin
|
(begin
|
||||||
|
(and=> return-new-resource/parallelism-limiter
|
||||||
|
destroy-parallelism-limiter)
|
||||||
|
|
||||||
(signal-condition! destroy-condition)
|
(signal-condition! destroy-condition)
|
||||||
|
|
||||||
;; No loop
|
;; No loop
|
||||||
*unspecified*)
|
*unspecified*)
|
||||||
(loop new-resources)))))
|
(loop new-resources)))))
|
||||||
|
|
||||||
(('stats reply)
|
(('stats reply timeout-time)
|
||||||
(let ((stats
|
(let ((stats
|
||||||
`((resources . ,(length resources))
|
`((resources . ,(length resources))
|
||||||
(available . 0)
|
(available . 0)
|
||||||
|
@ -291,13 +718,17 @@
|
||||||
|
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation reply stats)))
|
||||||
(perform-operation
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
op
|
||||||
(put-operation reply stats)
|
(sleep-operation
|
||||||
(const #t))
|
(/ (- timeout-time
|
||||||
(wrap-operation (sleep-operation 5)
|
(get-internal-real-time))
|
||||||
(const #f)))))))
|
internal-time-units-per-second)))
|
||||||
|
op))))))
|
||||||
|
|
||||||
(loop resources))
|
(loop resources))
|
||||||
|
|
||||||
|
@ -317,7 +748,7 @@
|
||||||
(define (main-loop)
|
(define (main-loop)
|
||||||
(let loop ((resources '())
|
(let loop ((resources '())
|
||||||
(available '())
|
(available '())
|
||||||
(waiters '())
|
(waiters (make-q))
|
||||||
(resources-last-used '()))
|
(resources-last-used '()))
|
||||||
|
|
||||||
(match (get-message channel)
|
(match (get-message channel)
|
||||||
|
@ -339,50 +770,51 @@
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used))))
|
resources-last-used))))
|
||||||
|
|
||||||
(if (null? waiters)
|
(if (q-empty? waiters)
|
||||||
(loop (cons resource resources)
|
(loop (cons resource resources)
|
||||||
(cons resource available)
|
(cons resource available)
|
||||||
waiters
|
waiters
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
|
|
||||||
(let* ((current-internal-time (get-internal-real-time))
|
(let ((current-internal-time
|
||||||
(alive-waiters
|
(get-internal-real-time)))
|
||||||
dead-waiters
|
(with-exception-handler
|
||||||
(partition!
|
(lambda (exn)
|
||||||
(match-lambda
|
(if (eq? (exception-kind exn) 'q-empty)
|
||||||
((reply . timeout)
|
|
||||||
(or (not timeout)
|
|
||||||
(> timeout current-internal-time))))
|
|
||||||
waiters)))
|
|
||||||
(if (null? alive-waiters)
|
|
||||||
(loop (cons resource resources)
|
(loop (cons resource resources)
|
||||||
(cons resource available)
|
(cons resource available)
|
||||||
'()
|
waiters
|
||||||
(cons (get-internal-real-time)
|
(cons current-internal-time
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
(match (last alive-waiters)
|
(raise-exception exn)))
|
||||||
((waiter-channel . waiter-timeout)
|
(lambda ()
|
||||||
(if waiter-timeout
|
(let waiter-loop ((waiter (deq! waiters)))
|
||||||
|
(match waiter
|
||||||
|
((reply . timeout)
|
||||||
|
(if (and timeout
|
||||||
|
(< timeout current-internal-time))
|
||||||
|
(waiter-loop (deq! waiters))
|
||||||
|
(if timeout
|
||||||
(let ((reply-timeout
|
(let ((reply-timeout
|
||||||
(/ (- waiter-timeout
|
(/ (- timeout
|
||||||
current-internal-time)
|
current-internal-time)
|
||||||
internal-time-units-per-second)))
|
internal-time-units-per-second)))
|
||||||
;; Don't sleep in this fiber, so spawn
|
;; Don't sleep in this fiber, so spawn a
|
||||||
;; a new fiber to handle handing over
|
;; new fiber to handle handing over the
|
||||||
;; the resource, and returning it if
|
;; resource, and returning it if there's
|
||||||
;; there's a timeout
|
;; a timeout
|
||||||
(spawn-fiber-for-checkout waiter-channel
|
(spawn-fiber-for-checkout reply
|
||||||
reply-timeout
|
reply-timeout
|
||||||
resource))
|
resource))
|
||||||
(put-message waiter-channel (cons 'success
|
(put-message reply (cons 'success
|
||||||
resource)))
|
resource))))))))
|
||||||
|
#:unwind? #t)
|
||||||
(loop (cons resource resources)
|
(loop (cons resource resources)
|
||||||
available
|
available
|
||||||
(drop-right! alive-waiters 1)
|
waiters
|
||||||
(cons (get-internal-real-time)
|
(cons current-internal-time
|
||||||
resources-last-used)))))))))
|
resources-last-used))))))
|
||||||
|
|
||||||
(('checkout reply timeout-time max-waiters)
|
(('checkout reply timeout-time max-waiters)
|
||||||
(if (null? available)
|
(if (null? available)
|
||||||
|
@ -391,7 +823,7 @@
|
||||||
(spawn-fiber-to-return-new-resource))
|
(spawn-fiber-to-return-new-resource))
|
||||||
|
|
||||||
(let ((waiters-count
|
(let ((waiters-count
|
||||||
(length waiters)))
|
(q-length waiters)))
|
||||||
(if (and max-waiters
|
(if (and max-waiters
|
||||||
(>= waiters-count
|
(>= waiters-count
|
||||||
max-waiters))
|
max-waiters))
|
||||||
|
@ -420,8 +852,7 @@
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
(loop resources
|
(loop resources
|
||||||
available
|
available
|
||||||
(cons (cons reply timeout-time)
|
(enq! waiters (cons reply timeout-time))
|
||||||
waiters)
|
|
||||||
resources-last-used))))
|
resources-last-used))))
|
||||||
|
|
||||||
(if timeout-time
|
(if timeout-time
|
||||||
|
@ -468,7 +899,7 @@
|
||||||
(set! checkout-failure-count
|
(set! checkout-failure-count
|
||||||
(+ 1 checkout-failure-count)))
|
(+ 1 checkout-failure-count)))
|
||||||
|
|
||||||
(if (null? waiters)
|
(if (q-empty? waiters)
|
||||||
(loop resources
|
(loop resources
|
||||||
(cons resource available)
|
(cons resource available)
|
||||||
waiters
|
waiters
|
||||||
|
@ -481,19 +912,14 @@
|
||||||
(get-internal-real-time))
|
(get-internal-real-time))
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
|
|
||||||
(let* ((current-internal-time (get-internal-real-time))
|
(let ((current-internal-time
|
||||||
(alive-waiters
|
(get-internal-real-time)))
|
||||||
dead-waiters
|
(with-exception-handler
|
||||||
(partition!
|
(lambda (exn)
|
||||||
(match-lambda
|
(if (eq? (exception-kind exn) 'q-empty)
|
||||||
((reply . timeout)
|
|
||||||
(or (not timeout)
|
|
||||||
(> timeout current-internal-time))))
|
|
||||||
waiters)))
|
|
||||||
(if (null? alive-waiters)
|
|
||||||
(loop resources
|
(loop resources
|
||||||
(cons resource available)
|
(cons resource available)
|
||||||
'()
|
waiters
|
||||||
(begin
|
(begin
|
||||||
(when (eq? return-type 'return)
|
(when (eq? return-type 'return)
|
||||||
(list-set!
|
(list-set!
|
||||||
|
@ -501,50 +927,48 @@
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)
|
resources)
|
||||||
(get-internal-real-time)))
|
current-internal-time))
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
(match (last alive-waiters)
|
(raise-exception exn)))
|
||||||
((waiter-channel . waiter-timeout)
|
(lambda ()
|
||||||
(if waiter-timeout
|
(let waiter-loop ((waiter (deq! waiters)))
|
||||||
|
(match waiter
|
||||||
|
((reply . timeout)
|
||||||
|
(if (and timeout
|
||||||
|
(< timeout current-internal-time))
|
||||||
|
(waiter-loop (deq! waiters))
|
||||||
|
(if timeout
|
||||||
(let ((reply-timeout
|
(let ((reply-timeout
|
||||||
(/ (- waiter-timeout
|
(/ (- timeout
|
||||||
current-internal-time)
|
current-internal-time)
|
||||||
internal-time-units-per-second)))
|
internal-time-units-per-second)))
|
||||||
;; Don't sleep in this fiber, so spawn a
|
;; Don't sleep in this fiber, so spawn a
|
||||||
;; new fiber to handle handing over the
|
;; new fiber to handle handing over the
|
||||||
;; resource, and returning it if there's a
|
;; resource, and returning it if there's
|
||||||
;; timeout
|
;; a timeout
|
||||||
(spawn-fiber-for-checkout waiter-channel
|
(spawn-fiber-for-checkout reply
|
||||||
reply-timeout
|
reply-timeout
|
||||||
resource))
|
resource))
|
||||||
(put-message waiter-channel (cons 'success
|
(put-message reply (cons 'success
|
||||||
resource)))
|
resource))))))))
|
||||||
|
#:unwind? #t)
|
||||||
(loop resources
|
(loop resources
|
||||||
available
|
available
|
||||||
(drop-right! alive-waiters 1)
|
waiters
|
||||||
(begin
|
(begin
|
||||||
(list-set!
|
(list-set!
|
||||||
resources-last-used
|
resources-last-used
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)
|
resources)
|
||||||
(get-internal-real-time))
|
current-internal-time)
|
||||||
resources-last-used))))))))
|
resources-last-used)))))
|
||||||
|
|
||||||
(('remove resource)
|
(('remove resource)
|
||||||
(let ((index
|
(let ((index
|
||||||
(list-index (lambda (x)
|
(list-index (lambda (x)
|
||||||
(eq? x resource))
|
(eq? x resource))
|
||||||
resources)))
|
resources)))
|
||||||
(define (remove-at-index! lst i)
|
|
||||||
(let ((start
|
|
||||||
end
|
|
||||||
(split-at! lst i)))
|
|
||||||
(append
|
|
||||||
start
|
|
||||||
(cdr end))))
|
|
||||||
|
|
||||||
(loop (if index
|
(loop (if index
|
||||||
(remove-at-index! resources index)
|
(remove-at-index! resources index)
|
||||||
(begin
|
(begin
|
||||||
|
@ -577,22 +1001,26 @@
|
||||||
waiters
|
waiters
|
||||||
resources-last-used))
|
resources-last-used))
|
||||||
|
|
||||||
(('stats reply)
|
(('stats reply timeout-time)
|
||||||
(let ((stats
|
(let ((stats
|
||||||
`((resources . ,(length resources))
|
`((resources . ,(length resources))
|
||||||
(available . ,(length available))
|
(available . ,(length available))
|
||||||
(waiters . ,(length waiters))
|
(waiters . ,(q-length waiters))
|
||||||
(checkout-failure-count . ,checkout-failure-count))))
|
(checkout-failure-count . ,checkout-failure-count))))
|
||||||
|
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation reply stats)))
|
||||||
(perform-operation
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
op
|
||||||
(put-operation reply stats)
|
(sleep-operation
|
||||||
(const #t))
|
(/ (- timeout-time
|
||||||
(wrap-operation (sleep-operation 5)
|
(get-internal-real-time))
|
||||||
(const #f)))))))
|
internal-time-units-per-second)))
|
||||||
|
op))))))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
available
|
available
|
||||||
|
@ -641,22 +1069,10 @@
|
||||||
|
|
||||||
(('destroy)
|
(('destroy)
|
||||||
(if (and (null? resources)
|
(if (and (null? resources)
|
||||||
(null? waiters))
|
(q-empty? waiters))
|
||||||
(signal-condition!
|
(signal-condition!
|
||||||
destroy-condition)
|
destroy-condition)
|
||||||
|
|
||||||
(begin
|
|
||||||
(for-each
|
|
||||||
(lambda (resource)
|
|
||||||
(if destructor
|
|
||||||
(spawn-fiber-to-destroy-resource resource)
|
|
||||||
(spawn-fiber
|
|
||||||
(lambda ()
|
|
||||||
(put-message channel
|
|
||||||
(list 'remove resource)))
|
|
||||||
#:parallel? #t)))
|
|
||||||
available)
|
|
||||||
|
|
||||||
(let ((current-internal-time (get-internal-real-time)))
|
(let ((current-internal-time (get-internal-real-time)))
|
||||||
(for-each
|
(for-each
|
||||||
(match-lambda
|
(match-lambda
|
||||||
|
@ -681,10 +1097,28 @@
|
||||||
internal-time-units-per-second))
|
internal-time-units-per-second))
|
||||||
(const #f)))
|
(const #f)))
|
||||||
op))))))))
|
op))))))))
|
||||||
waiters))
|
(car waiters))
|
||||||
|
|
||||||
(destroy-loop resources))))
|
|
||||||
|
|
||||||
|
(if destructor
|
||||||
|
(begin
|
||||||
|
(for-each
|
||||||
|
(lambda (resource)
|
||||||
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
available)
|
||||||
|
(destroy-loop resources))
|
||||||
|
(let dl ((resources resources)
|
||||||
|
(available available))
|
||||||
|
(if (null? available)
|
||||||
|
(if (null? resources)
|
||||||
|
(signal-condition!
|
||||||
|
destroy-condition)
|
||||||
|
(destroy-loop resources))
|
||||||
|
(let ((index
|
||||||
|
(list-index (lambda (x)
|
||||||
|
(eq? x (car available)))
|
||||||
|
resources)))
|
||||||
|
(dl (remove-at-index! resources index)
|
||||||
|
(cdr available)))))))))
|
||||||
(unknown
|
(unknown
|
||||||
(simple-format
|
(simple-format
|
||||||
(current-error-port)
|
(current-error-port)
|
||||||
|
@ -744,7 +1178,8 @@
|
||||||
(put-operation (resource-pool-channel pool)
|
(put-operation (resource-pool-channel pool)
|
||||||
(list 'destroy))
|
(list 'destroy))
|
||||||
(lambda _
|
(lambda _
|
||||||
(wait (resource-pool-destroy-condition pool))))
|
(wait
|
||||||
|
(resource-pool-destroy-condition pool))))
|
||||||
(wait-operation
|
(wait-operation
|
||||||
(resource-pool-destroy-condition pool))))
|
(resource-pool-destroy-condition pool))))
|
||||||
#t)
|
#t)
|
||||||
|
@ -763,7 +1198,7 @@
|
||||||
(record-constructor &resource-pool-timeout))
|
(record-constructor &resource-pool-timeout))
|
||||||
|
|
||||||
(define resource-pool-timeout-error?
|
(define resource-pool-timeout-error?
|
||||||
(record-predicate &resource-pool-timeout))
|
(exception-predicate &resource-pool-timeout))
|
||||||
|
|
||||||
(define &resource-pool-too-many-waiters
|
(define &resource-pool-too-many-waiters
|
||||||
(make-exception-type '&recource-pool-too-many-waiters
|
(make-exception-type '&recource-pool-too-many-waiters
|
||||||
|
@ -784,7 +1219,7 @@
|
||||||
(record-constructor &resource-pool-too-many-waiters))
|
(record-constructor &resource-pool-too-many-waiters))
|
||||||
|
|
||||||
(define resource-pool-too-many-waiters-error?
|
(define resource-pool-too-many-waiters-error?
|
||||||
(record-predicate &resource-pool-too-many-waiters))
|
(exception-predicate &resource-pool-too-many-waiters))
|
||||||
|
|
||||||
(define &resource-pool-destroyed
|
(define &resource-pool-destroyed
|
||||||
(make-exception-type '&recource-pool-destroyed
|
(make-exception-type '&recource-pool-destroyed
|
||||||
|
@ -800,7 +1235,7 @@
|
||||||
(record-constructor &resource-pool-destroyed))
|
(record-constructor &resource-pool-destroyed))
|
||||||
|
|
||||||
(define resource-pool-destroyed-error?
|
(define resource-pool-destroyed-error?
|
||||||
(record-predicate &resource-pool-destroyed))
|
(exception-predicate &resource-pool-destroyed))
|
||||||
|
|
||||||
(define &resource-pool-destroy-resource
|
(define &resource-pool-destroy-resource
|
||||||
(make-exception-type '&recource-pool-destroy-resource
|
(make-exception-type '&recource-pool-destroy-resource
|
||||||
|
@ -811,7 +1246,7 @@
|
||||||
(record-constructor &resource-pool-destroy-resource))
|
(record-constructor &resource-pool-destroy-resource))
|
||||||
|
|
||||||
(define resource-pool-destroy-resource-exception?
|
(define resource-pool-destroy-resource-exception?
|
||||||
(record-predicate &resource-pool-destroy-resource))
|
(exception-predicate &resource-pool-destroy-resource))
|
||||||
|
|
||||||
(define resource-pool-default-timeout-handler
|
(define resource-pool-default-timeout-handler
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
@ -879,8 +1314,9 @@ available. Return the resource once PROC has returned."
|
||||||
start-time)
|
start-time)
|
||||||
'timeout)
|
'timeout)
|
||||||
response))
|
response))
|
||||||
'timeout)))))
|
'timeout))
|
||||||
(let loop ((reply (make-channel)))
|
'timeout)))
|
||||||
|
(let ((reply (make-channel)))
|
||||||
(put-message channel
|
(put-message channel
|
||||||
(list 'checkout
|
(list 'checkout
|
||||||
reply
|
reply
|
||||||
|
@ -918,8 +1354,7 @@ available. Return the resource once PROC has returned."
|
||||||
'destroy
|
'destroy
|
||||||
'return)
|
'return)
|
||||||
resource))
|
resource))
|
||||||
(unless (resource-pool-destroy-resource-exception? exn)
|
(raise-exception exn))
|
||||||
(raise-exception exn)))
|
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -949,13 +1384,17 @@ available. Return the resource once PROC has returned."
|
||||||
(lambda (resource) exp ...)))
|
(lambda (resource) exp ...)))
|
||||||
|
|
||||||
(define* (resource-pool-stats pool #:key (timeout 5))
|
(define* (resource-pool-stats pool #:key (timeout 5))
|
||||||
(let ((reply (make-channel))
|
(if timeout
|
||||||
(start-time (get-internal-real-time)))
|
(let* ((reply (make-channel))
|
||||||
|
(start-time (get-internal-real-time))
|
||||||
|
(timeout-time
|
||||||
|
(+ start-time
|
||||||
|
(* internal-time-units-per-second timeout))))
|
||||||
(perform-operation
|
(perform-operation
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
(put-operation (resource-pool-channel pool)
|
(put-operation (resource-pool-channel pool)
|
||||||
`(stats ,reply))
|
`(stats ,reply ,timeout-time))
|
||||||
(const #t))
|
(const #t))
|
||||||
(wrap-operation (sleep-operation timeout)
|
(wrap-operation (sleep-operation timeout)
|
||||||
(lambda _
|
(lambda _
|
||||||
|
@ -976,7 +1415,11 @@ available. Return the resource once PROC has returned."
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-resource-pool-timeout-error pool))))))
|
(make-resource-pool-timeout-error pool))))))
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-resource-pool-timeout-error pool))))))
|
(make-resource-pool-timeout-error pool)))))
|
||||||
|
(let ((reply (make-channel)))
|
||||||
|
(put-message (resource-pool-channel pool)
|
||||||
|
`(stats ,reply #f))
|
||||||
|
(get-message reply))))
|
||||||
|
|
||||||
(define (resource-pool-list-resources pool)
|
(define (resource-pool-list-resources pool)
|
||||||
(let ((reply (make-channel)))
|
(let ((reply (make-channel)))
|
||||||
|
|
|
@ -198,7 +198,7 @@ from there, or #f if that would be an empty string."
|
||||||
(record-accessor &thread-pool-timeout-error 'pool)))
|
(record-accessor &thread-pool-timeout-error 'pool)))
|
||||||
|
|
||||||
(define thread-pool-timeout-error?
|
(define thread-pool-timeout-error?
|
||||||
(record-predicate &thread-pool-timeout-error))
|
(exception-predicate &thread-pool-timeout-error))
|
||||||
|
|
||||||
(define* (make-fixed-size-thread-pool size
|
(define* (make-fixed-size-thread-pool size
|
||||||
#:key
|
#:key
|
||||||
|
|
|
@ -85,7 +85,7 @@
|
||||||
(record-constructor &port-timeout-error))
|
(record-constructor &port-timeout-error))
|
||||||
|
|
||||||
(define port-timeout-error?
|
(define port-timeout-error?
|
||||||
(record-predicate &port-timeout-error))
|
(exception-predicate &port-timeout-error))
|
||||||
|
|
||||||
(define &port-read-timeout-error
|
(define &port-read-timeout-error
|
||||||
(make-exception-type '&port-read-timeout-error
|
(make-exception-type '&port-read-timeout-error
|
||||||
|
@ -96,7 +96,7 @@
|
||||||
(record-constructor &port-read-timeout-error))
|
(record-constructor &port-read-timeout-error))
|
||||||
|
|
||||||
(define port-read-timeout-error?
|
(define port-read-timeout-error?
|
||||||
(record-predicate &port-read-timeout-error))
|
(exception-predicate &port-read-timeout-error))
|
||||||
|
|
||||||
(define &port-write-timeout-error
|
(define &port-write-timeout-error
|
||||||
(make-exception-type '&port-write-timeout-error
|
(make-exception-type '&port-write-timeout-error
|
||||||
|
@ -107,7 +107,7 @@
|
||||||
(record-constructor &port-write-timeout-error))
|
(record-constructor &port-write-timeout-error))
|
||||||
|
|
||||||
(define port-write-timeout-error?
|
(define port-write-timeout-error?
|
||||||
(record-predicate &port-write-timeout-error))
|
(exception-predicate &port-write-timeout-error))
|
||||||
|
|
||||||
(define (readable? port)
|
(define (readable? port)
|
||||||
"Test if PORT is writable."
|
"Test if PORT is writable."
|
||||||
|
|
|
@ -63,6 +63,14 @@
|
||||||
(bind sock family addr port)
|
(bind sock family addr port)
|
||||||
sock))
|
sock))
|
||||||
|
|
||||||
|
(define crlf-bv
|
||||||
|
(string->utf8 "\r\n"))
|
||||||
|
|
||||||
|
(define (chunked-output-port-overhead-bytes write-size)
|
||||||
|
(+ (string-length (number->string write-size 16))
|
||||||
|
(bytevector-length crlf-bv)
|
||||||
|
(bytevector-length crlf-bv)))
|
||||||
|
|
||||||
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
|
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
|
||||||
(buffering 1200))
|
(buffering 1200))
|
||||||
"Returns a new port which translates non-encoded data into a HTTP
|
"Returns a new port which translates non-encoded data into a HTTP
|
||||||
|
@ -74,10 +82,12 @@ when done, as it will output the remaining data, and encode the final
|
||||||
zero chunk. When the port is closed it will also close PORT, unless
|
zero chunk. When the port is closed it will also close PORT, unless
|
||||||
KEEP-ALIVE? is true."
|
KEEP-ALIVE? is true."
|
||||||
(define (write! bv start count)
|
(define (write! bv start count)
|
||||||
(put-string port (number->string count 16))
|
(let ((len-string
|
||||||
(put-string port "\r\n")
|
(number->string count 16)))
|
||||||
|
(put-string port len-string))
|
||||||
|
(put-bytevector port crlf-bv 0 2)
|
||||||
(put-bytevector port bv start count)
|
(put-bytevector port bv start count)
|
||||||
(put-string port "\r\n")
|
(put-bytevector port crlf-bv 0 2)
|
||||||
(force-output port)
|
(force-output port)
|
||||||
count)
|
count)
|
||||||
|
|
||||||
|
@ -130,7 +140,7 @@ closes PORT, unless KEEP-ALIVE? is true."
|
||||||
(record-constructor &request-body-ended-prematurely))
|
(record-constructor &request-body-ended-prematurely))
|
||||||
|
|
||||||
(define request-body-ended-prematurely-error?
|
(define request-body-ended-prematurely-error?
|
||||||
(record-predicate &request-body-ended-prematurely))
|
(exception-predicate &request-body-ended-prematurely))
|
||||||
|
|
||||||
(define (request-body-port/knots r)
|
(define (request-body-port/knots r)
|
||||||
(cond
|
(cond
|
||||||
|
@ -331,15 +341,19 @@ on the procedure being called at any particular time."
|
||||||
(string->utf8
|
(string->utf8
|
||||||
"internal server error")))
|
"internal server error")))
|
||||||
|
|
||||||
(define (handle-request handler client
|
(define* (handle-request handler client
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler)
|
write-response-exception-handler
|
||||||
|
buffer-size
|
||||||
|
#:key post-request-hook)
|
||||||
(let ((request
|
(let ((request
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(read-request client))
|
(read-request client))
|
||||||
#:unwind? #t)))
|
#:unwind? #t))
|
||||||
|
(read-request-time
|
||||||
|
(get-internal-real-time)))
|
||||||
(let ((response
|
(let ((response
|
||||||
body
|
body
|
||||||
(cond
|
(cond
|
||||||
|
@ -388,7 +402,9 @@ on the procedure being called at any particular time."
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(write-response response client)
|
(write-response response client)
|
||||||
|
|
||||||
(let ((body-written?
|
(let ((response-start-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
(body-written?
|
||||||
(if (procedure? body)
|
(if (procedure? body)
|
||||||
(let* ((type (response-content-type response
|
(let* ((type (response-content-type response
|
||||||
'(text/plain)))
|
'(text/plain)))
|
||||||
|
@ -399,7 +415,11 @@ on the procedure being called at any particular time."
|
||||||
client
|
client
|
||||||
(make-chunked-output-port/knots
|
(make-chunked-output-port/knots
|
||||||
client
|
client
|
||||||
#:keep-alive? #t))))
|
#:keep-alive? #t
|
||||||
|
#:buffering
|
||||||
|
(- buffer-size
|
||||||
|
(chunked-output-port-overhead-bytes
|
||||||
|
buffer-size))))))
|
||||||
(set-port-encoding! body-port charset)
|
(set-port-encoding! body-port charset)
|
||||||
(let ((body-written?
|
(let ((body-written?
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
|
@ -423,6 +443,11 @@ on the procedure being called at any particular time."
|
||||||
(if body-written?
|
(if body-written?
|
||||||
(begin
|
(begin
|
||||||
(force-output client)
|
(force-output client)
|
||||||
|
(when post-request-hook
|
||||||
|
(post-request-hook request
|
||||||
|
#:read-request-time read-request-time
|
||||||
|
#:response-start-time response-start-time
|
||||||
|
#:response-end-time (get-internal-real-time)))
|
||||||
(when (and (procedure? body)
|
(when (and (procedure? body)
|
||||||
(response-content-length response))
|
(response-content-length response))
|
||||||
(set-port-encoding! client "ISO-8859-1"))
|
(set-port-encoding! client "ISO-8859-1"))
|
||||||
|
@ -434,7 +459,8 @@ on the procedure being called at any particular time."
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler
|
||||||
connection-idle-timeout
|
connection-idle-timeout
|
||||||
buffer-size)
|
buffer-size
|
||||||
|
post-request-hook)
|
||||||
;; Always disable Nagle's algorithm, as we handle buffering
|
;; Always disable Nagle's algorithm, as we handle buffering
|
||||||
;; ourselves; when we force-output, we really want the data to go
|
;; ourselves; when we force-output, we really want the data to go
|
||||||
;; out.
|
;; out.
|
||||||
|
@ -472,11 +498,29 @@ on the procedure being called at any particular time."
|
||||||
(else
|
(else
|
||||||
(let ((keep-alive? (handle-request handler client
|
(let ((keep-alive? (handle-request handler client
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler)))
|
write-response-exception-handler
|
||||||
|
buffer-size
|
||||||
|
#:post-request-hook
|
||||||
|
post-request-hook)))
|
||||||
(if keep-alive?
|
(if keep-alive?
|
||||||
(loop)
|
(loop)
|
||||||
(close-port client)))))))
|
(close-port client)))))))
|
||||||
|
|
||||||
|
(define (post-request-hook/safe post-request-hook)
|
||||||
|
(if post-request-hook
|
||||||
|
(lambda args
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn) #f)
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(print-backtrace-and-exception/knots exn)
|
||||||
|
(raise-exception exn))
|
||||||
|
(lambda ()
|
||||||
|
(apply post-request-hook args))))
|
||||||
|
#:unwind? #t))
|
||||||
|
#f))
|
||||||
|
|
||||||
(define-record-type <web-server>
|
(define-record-type <web-server>
|
||||||
(make-web-server socket port)
|
(make-web-server socket port)
|
||||||
web-server?
|
web-server?
|
||||||
|
@ -496,7 +540,8 @@ on the procedure being called at any particular time."
|
||||||
(write-response-exception-handler
|
(write-response-exception-handler
|
||||||
default-write-response-exception-handler)
|
default-write-response-exception-handler)
|
||||||
(connection-idle-timeout #f)
|
(connection-idle-timeout #f)
|
||||||
(connection-buffer-size 1024))
|
(connection-buffer-size 1024)
|
||||||
|
post-request-hook)
|
||||||
"Run the knots web server.
|
"Run the knots web server.
|
||||||
|
|
||||||
HANDLER should be a procedure that takes one argument, the HTTP
|
HANDLER should be a procedure that takes one argument, the HTTP
|
||||||
|
@ -532,7 +577,9 @@ before sending back to the client."
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler
|
||||||
connection-idle-timeout
|
connection-idle-timeout
|
||||||
connection-buffer-size))
|
connection-buffer-size
|
||||||
|
(post-request-hook/safe
|
||||||
|
post-request-hook)))
|
||||||
#:parallel? #t)
|
#:parallel? #t)
|
||||||
(loop))))))
|
(loop))))))
|
||||||
|
|
||||||
|
|
12
tests.scm
12
tests.scm
|
@ -1,10 +1,11 @@
|
||||||
(define-module (tests)
|
(define-module (tests)
|
||||||
#:use-module (ice-9 exceptions)
|
#:use-module (ice-9 exceptions)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
|
#:use-module (knots)
|
||||||
#:export (run-fibers-for-tests
|
#:export (run-fibers-for-tests
|
||||||
assert-no-heap-growth))
|
assert-no-heap-growth))
|
||||||
|
|
||||||
(define (run-fibers-for-tests thunk)
|
(define* (run-fibers-for-tests thunk #:key (drain? #t))
|
||||||
(let ((result
|
(let ((result
|
||||||
(run-fibers
|
(run-fibers
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -12,15 +13,18 @@
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
exn)
|
exn)
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
(simple-format #t "running ~A\n" thunk)
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(backtrace)
|
(print-backtrace-and-exception/knots exn)
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
thunk)
|
(lambda ()
|
||||||
|
(start-stack #t (thunk))))
|
||||||
#t)
|
#t)
|
||||||
#:unwind? #t))
|
#:unwind? #t))
|
||||||
#:hz 0
|
#:hz 0
|
||||||
#:parallelism 1)))
|
#:parallelism 1
|
||||||
|
#:drain? drain?)))
|
||||||
(if (exception? result)
|
(if (exception? result)
|
||||||
(raise-exception result)
|
(raise-exception result)
|
||||||
result)))
|
result)))
|
||||||
|
|
|
@ -61,6 +61,24 @@
|
||||||
identity
|
identity
|
||||||
'(()))))
|
'(()))))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(unless (and (exception-with-message? exn)
|
||||||
|
(string=? (exception-message exn)
|
||||||
|
"foo"))
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(fibers-map-with-progress
|
||||||
|
(lambda _
|
||||||
|
(raise-exception
|
||||||
|
(make-exception-with-message "foo")))
|
||||||
|
'((1)))
|
||||||
|
|
||||||
|
(error 'should-not-reach-here))
|
||||||
|
#:unwind? #t)))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
|
@ -111,4 +129,16 @@
|
||||||
|
|
||||||
(assert-equal a 1))))
|
(assert-equal a 1))))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(let ((parallelism-limiter (make-parallelism-limiter 2)))
|
||||||
|
(fibers-for-each
|
||||||
|
(lambda _
|
||||||
|
(with-parallelism-limiter
|
||||||
|
parallelism-limiter
|
||||||
|
#f))
|
||||||
|
(iota 50))
|
||||||
|
|
||||||
|
(destroy-parallelism-limiter parallelism-limiter))))
|
||||||
|
|
||||||
(display "parallelism test finished successfully\n")
|
(display "parallelism test finished successfully\n")
|
||||||
|
|
|
@ -19,7 +19,21 @@
|
||||||
(number?
|
(number?
|
||||||
(with-resource-from-pool resource-pool
|
(with-resource-from-pool resource-pool
|
||||||
res
|
res
|
||||||
res))))))
|
res)))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(let ((resource-pool (make-fixed-size-resource-pool
|
||||||
|
(list 1))))
|
||||||
|
(assert-true
|
||||||
|
(number?
|
||||||
|
(with-resource-from-pool resource-pool
|
||||||
|
res
|
||||||
|
res)))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -31,7 +45,9 @@
|
||||||
(number?
|
(number?
|
||||||
(with-resource-from-pool resource-pool
|
(with-resource-from-pool resource-pool
|
||||||
res
|
res
|
||||||
res))))))
|
res)))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(let* ((error-constructor
|
(let* ((error-constructor
|
||||||
(record-constructor &resource-pool-timeout))
|
(record-constructor &resource-pool-timeout))
|
||||||
|
@ -88,10 +104,13 @@
|
||||||
res))
|
res))
|
||||||
(iota 20))
|
(iota 20))
|
||||||
|
|
||||||
(let loop ((stats (resource-pool-stats resource-pool)))
|
(let loop ((stats (resource-pool-stats resource-pool
|
||||||
|
#:timeout #f)))
|
||||||
(unless (= 0 (assq-ref stats 'resources))
|
(unless (= 0 (assq-ref stats 'resources))
|
||||||
(sleep 0.1)
|
(sleep 0.1)
|
||||||
(loop (resource-pool-stats resource-pool)))))))
|
(loop (resource-pool-stats resource-pool #:timeout #f))))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -115,7 +134,9 @@
|
||||||
(set! counter (+ 1 counter))
|
(set! counter (+ 1 counter))
|
||||||
(error "collision detected")))))
|
(error "collision detected")))))
|
||||||
20
|
20
|
||||||
(iota 50)))))
|
(iota 50))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -129,7 +150,7 @@
|
||||||
(error "collision detected")))
|
(error "collision detected")))
|
||||||
(new-number))
|
(new-number))
|
||||||
1
|
1
|
||||||
#:default-checkout-timeout 120)))
|
#:default-checkout-timeout 5)))
|
||||||
(fibers-batch-for-each
|
(fibers-batch-for-each
|
||||||
(lambda _
|
(lambda _
|
||||||
(with-resource-from-pool
|
(with-resource-from-pool
|
||||||
|
@ -140,7 +161,9 @@
|
||||||
(set! counter (+ 1 counter))
|
(set! counter (+ 1 counter))
|
||||||
(error "collision detected")))))
|
(error "collision detected")))))
|
||||||
20
|
20
|
||||||
(iota 50)))))
|
(iota 50))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -164,14 +187,14 @@
|
||||||
(call-with-resource-from-pool
|
(call-with-resource-from-pool
|
||||||
resource-pool
|
resource-pool
|
||||||
(lambda (res)
|
(lambda (res)
|
||||||
(error 'should-not-be-reached))))
|
#f)))
|
||||||
#:unwind? #t)))
|
#:unwind? #t)))
|
||||||
|
|
||||||
(while (= 0
|
(while (= 0
|
||||||
(assq-ref
|
(assq-ref
|
||||||
(resource-pool-stats resource-pool)
|
(resource-pool-stats resource-pool #:timeout #f)
|
||||||
'waiters))
|
'waiters))
|
||||||
(sleep 0))
|
(sleep 0.1))
|
||||||
|
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -184,6 +207,8 @@
|
||||||
resource-pool
|
resource-pool
|
||||||
(lambda (res)
|
(lambda (res)
|
||||||
(error 'should-not-be-reached))))
|
(error 'should-not-be-reached))))
|
||||||
#:unwind? #t))))))
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
(destroy-resource-pool resource-pool))))
|
||||||
|
|
||||||
(display "resource-pool test finished successfully\n")
|
(display "resource-pool test finished successfully\n")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue