Compare commits
10 commits
47ff45d963
...
7ba77010ae
Author | SHA1 | Date | |
---|---|---|---|
7ba77010ae | |||
8c63ed7b4e | |||
838ee6f1e3 | |||
1dca6d755e | |||
68cfbe0380 | |||
4f0eafef0a | |||
8c0f04be4f | |||
e1858dfff5 | |||
da69fd19f3 | |||
a73fd1ca50 |
8 changed files with 492 additions and 197 deletions
13
knots.scm
13
knots.scm
|
@ -67,11 +67,14 @@
|
||||||
(define* (print-backtrace-and-exception/knots
|
(define* (print-backtrace-and-exception/knots
|
||||||
exn
|
exn
|
||||||
#:key (port (current-error-port)))
|
#:key (port (current-error-port)))
|
||||||
(let* ((stack (match (fluid-ref %stacks)
|
(let* ((stack
|
||||||
((stack-tag . prompt-tag)
|
(match (fluid-ref %stacks)
|
||||||
(make-stack #t
|
((stack-tag . prompt-tag)
|
||||||
0 prompt-tag
|
(make-stack #t
|
||||||
0 (and prompt-tag 1)))))
|
0 prompt-tag
|
||||||
|
0 (and prompt-tag 1)))
|
||||||
|
(_
|
||||||
|
(make-stack #t))))
|
||||||
(error-string
|
(error-string
|
||||||
(call-with-output-string
|
(call-with-output-string
|
||||||
(lambda (port)
|
(lambda (port)
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#:use-module (srfi srfi-71)
|
#:use-module (srfi srfi-71)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
#:use-module (ice-9 control)
|
#:use-module (ice-9 control)
|
||||||
|
#:use-module (ice-9 exceptions)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
#:use-module (fibers operations)
|
#:use-module (fibers operations)
|
||||||
|
@ -43,27 +44,33 @@
|
||||||
(let ((reply (make-channel)))
|
(let ((reply (make-channel)))
|
||||||
(spawn-fiber
|
(spawn-fiber
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(call-with-escape-continuation
|
(with-exception-handler
|
||||||
(lambda (return)
|
(lambda (exn)
|
||||||
(with-exception-handler
|
(put-message
|
||||||
(lambda (exn)
|
reply
|
||||||
(match (fluid-ref %stacks)
|
(list 'exception exn)))
|
||||||
((stack-tag . prompt-tag)
|
(lambda ()
|
||||||
(let ((stack (make-stack #t
|
(with-exception-handler
|
||||||
0 prompt-tag
|
(lambda (exn)
|
||||||
0 (and prompt-tag 1))))
|
(let ((stack
|
||||||
(put-message reply
|
(match (fluid-ref %stacks)
|
||||||
(list 'exception
|
((stack-tag . prompt-tag)
|
||||||
(make-exception
|
(make-stack #t
|
||||||
exn
|
0 prompt-tag
|
||||||
(make-knots-exception stack)))))))
|
0 (and prompt-tag 1)))
|
||||||
(return))
|
(_
|
||||||
(lambda ()
|
(make-stack #t)))))
|
||||||
(call-with-values
|
(raise-exception
|
||||||
(lambda ()
|
(make-exception
|
||||||
(start-stack #t (thunk)))
|
exn
|
||||||
(lambda vals
|
(make-knots-exception stack)))))
|
||||||
(put-message reply vals))))))))
|
(lambda ()
|
||||||
|
(call-with-values
|
||||||
|
(lambda ()
|
||||||
|
(start-stack #t (thunk)))
|
||||||
|
(lambda vals
|
||||||
|
(put-message reply vals))))))
|
||||||
|
#:unwind? #t))
|
||||||
#:parallel? #t)
|
#:parallel? #t)
|
||||||
reply))
|
reply))
|
||||||
|
|
||||||
|
@ -245,25 +252,31 @@
|
||||||
(get-message process-channel))))
|
(get-message process-channel))))
|
||||||
(put-message
|
(put-message
|
||||||
reply-channel
|
reply-channel
|
||||||
(call-with-escape-continuation
|
(with-exception-handler
|
||||||
(lambda (return)
|
(lambda (exn)
|
||||||
(with-exception-handler
|
(list 'exception exn))
|
||||||
(lambda (exn)
|
(lambda ()
|
||||||
(match (fluid-ref %stacks)
|
(with-exception-handler
|
||||||
((stack-tag . prompt-tag)
|
(lambda (exn)
|
||||||
(let ((stack (make-stack #t
|
(let ((stack
|
||||||
0 prompt-tag
|
(match (fluid-ref %stacks)
|
||||||
0 (and prompt-tag 1))))
|
((stack-tag . prompt-tag)
|
||||||
(return (list 'exception
|
(make-stack #t
|
||||||
(make-exception
|
0 prompt-tag
|
||||||
exn
|
0 (and prompt-tag 1)))
|
||||||
(make-knots-exception stack))))))))
|
(_
|
||||||
(lambda ()
|
(make-stack #t)))))
|
||||||
(call-with-values
|
(raise-exception
|
||||||
(lambda ()
|
(make-exception
|
||||||
(start-stack #t (apply proc args)))
|
exn
|
||||||
(lambda vals
|
(make-knots-exception stack)))))
|
||||||
(cons 'result vals)))))))))))
|
(lambda ()
|
||||||
|
(call-with-values
|
||||||
|
(lambda ()
|
||||||
|
(start-stack #t (apply proc args)))
|
||||||
|
(lambda vals
|
||||||
|
(cons 'result vals))))))
|
||||||
|
#:unwind? #t)))))
|
||||||
#:parallel? #t))
|
#:parallel? #t))
|
||||||
(iota parallelism))
|
(iota parallelism))
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
#:use-module (fibers scheduler)
|
#:use-module (fibers scheduler)
|
||||||
#:use-module (fibers operations)
|
#:use-module (fibers operations)
|
||||||
|
#:use-module (fibers conditions)
|
||||||
#:use-module (knots)
|
#:use-module (knots)
|
||||||
#:use-module (knots parallelism)
|
#:use-module (knots parallelism)
|
||||||
#:export (resource-pool?
|
#:export (resource-pool?
|
||||||
|
@ -43,6 +44,19 @@
|
||||||
resource-pool-timeout-error-pool
|
resource-pool-timeout-error-pool
|
||||||
resource-pool-timeout-error?
|
resource-pool-timeout-error?
|
||||||
|
|
||||||
|
&resource-pool-too-many-waiters
|
||||||
|
resource-pool-too-many-waiters-error-pool
|
||||||
|
resource-pool-too-many-waiters-error-waiters-count
|
||||||
|
resource-pool-too-many-waiters-error?
|
||||||
|
|
||||||
|
&resource-pool-destroyed
|
||||||
|
resource-pool-destroyed-error-pool
|
||||||
|
resource-pool-destroyed-error?
|
||||||
|
|
||||||
|
&resource-pool-destroy-resource
|
||||||
|
make-resource-pool-destroy-resource-exception
|
||||||
|
resource-pool-destroy-resource-exception?
|
||||||
|
|
||||||
resource-pool-default-timeout-handler
|
resource-pool-default-timeout-handler
|
||||||
|
|
||||||
call-with-resource-from-pool
|
call-with-resource-from-pool
|
||||||
|
@ -62,11 +76,12 @@
|
||||||
(record-predicate &resource-pool-abort-add-resource))
|
(record-predicate &resource-pool-abort-add-resource))
|
||||||
|
|
||||||
(define-record-type <resource-pool>
|
(define-record-type <resource-pool>
|
||||||
(make-resource-pool-record name channel configuration)
|
(make-resource-pool-record name channel destroy-condition configuration)
|
||||||
resource-pool?
|
resource-pool?
|
||||||
(name resource-pool-name)
|
(name resource-pool-name)
|
||||||
(channel resource-pool-channel)
|
(channel resource-pool-channel)
|
||||||
(configuration resource-pool-configuration))
|
(destroy-condition resource-pool-destroy-condition)
|
||||||
|
(configuration resource-pool-configuration))
|
||||||
|
|
||||||
(set-record-type-printer!
|
(set-record-type-printer!
|
||||||
<resource-pool>
|
<resource-pool>
|
||||||
|
@ -86,13 +101,17 @@
|
||||||
scheduler
|
scheduler
|
||||||
(name "unnamed")
|
(name "unnamed")
|
||||||
(add-resources-parallelism 1)
|
(add-resources-parallelism 1)
|
||||||
default-checkout-timeout)
|
default-checkout-timeout
|
||||||
|
default-max-waiters)
|
||||||
(define channel (make-channel))
|
(define channel (make-channel))
|
||||||
|
(define destroy-condition
|
||||||
|
(make-condition))
|
||||||
|
|
||||||
(define pool
|
(define pool
|
||||||
(make-resource-pool-record
|
(make-resource-pool-record
|
||||||
name
|
name
|
||||||
channel
|
channel
|
||||||
|
destroy-condition
|
||||||
`((max-size . ,max-size)
|
`((max-size . ,max-size)
|
||||||
(min-size . ,min-size)
|
(min-size . ,min-size)
|
||||||
(idle-seconds . ,idle-seconds)
|
(idle-seconds . ,idle-seconds)
|
||||||
|
@ -102,7 +121,8 @@
|
||||||
(lifetime . ,lifetime)
|
(lifetime . ,lifetime)
|
||||||
(scheduler . ,scheduler)
|
(scheduler . ,scheduler)
|
||||||
(name . ,name)
|
(name . ,name)
|
||||||
(default-checkout-timeout . ,default-checkout-timeout))))
|
(default-checkout-timeout . ,default-checkout-timeout)
|
||||||
|
(default-max-waiters . ,default-max-waiters))))
|
||||||
|
|
||||||
(define checkout-failure-count 0)
|
(define checkout-failure-count 0)
|
||||||
|
|
||||||
|
@ -186,7 +206,8 @@
|
||||||
(perform-operation
|
(perform-operation
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
(put-operation reply-channel resource)
|
(put-operation reply-channel
|
||||||
|
(cons 'success resource))
|
||||||
(const #t))
|
(const #t))
|
||||||
(wrap-operation (sleep-operation
|
(wrap-operation (sleep-operation
|
||||||
reply-timeout)
|
reply-timeout)
|
||||||
|
@ -196,6 +217,103 @@
|
||||||
channel
|
channel
|
||||||
(list 'return-failed-checkout resource)))))))
|
(list 'return-failed-checkout resource)))))))
|
||||||
|
|
||||||
|
(define (destroy-loop resources)
|
||||||
|
(let loop ((resources resources))
|
||||||
|
(match (get-message channel)
|
||||||
|
(('add-resource resource)
|
||||||
|
(when destructor
|
||||||
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
|
||||||
|
(loop resources))
|
||||||
|
(('checkout reply timeout-time max-waiters)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'resource-pool-destroyed
|
||||||
|
#f))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op)))))
|
||||||
|
(loop resources))
|
||||||
|
(((and (or 'return
|
||||||
|
'return-failed-checkout
|
||||||
|
'remove)
|
||||||
|
return-type)
|
||||||
|
resource)
|
||||||
|
(when destructor
|
||||||
|
(spawn-fiber-to-destroy-resource resource))
|
||||||
|
|
||||||
|
(let ((index
|
||||||
|
(list-index (lambda (x)
|
||||||
|
(eq? x resource))
|
||||||
|
resources)))
|
||||||
|
(define (remove-at-index! lst i)
|
||||||
|
(let ((start
|
||||||
|
end
|
||||||
|
(split-at! lst i)))
|
||||||
|
(append
|
||||||
|
start
|
||||||
|
(cdr end))))
|
||||||
|
|
||||||
|
(let ((new-resources
|
||||||
|
(if index
|
||||||
|
(remove-at-index! resources index)
|
||||||
|
(begin
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"resource pool error: unable to remove ~A\n"
|
||||||
|
resource)
|
||||||
|
resources))))
|
||||||
|
(if (null? new-resources)
|
||||||
|
(begin
|
||||||
|
(signal-condition! destroy-condition)
|
||||||
|
|
||||||
|
;; No loop
|
||||||
|
*unspecified*)
|
||||||
|
(loop new-resources)))))
|
||||||
|
|
||||||
|
(('stats reply)
|
||||||
|
(let ((stats
|
||||||
|
`((resources . ,(length resources))
|
||||||
|
(available . 0)
|
||||||
|
(waiters . 0)
|
||||||
|
(checkout-failure-count . ,checkout-failure-count))))
|
||||||
|
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(perform-operation
|
||||||
|
(choice-operation
|
||||||
|
(wrap-operation
|
||||||
|
(put-operation reply stats)
|
||||||
|
(const #t))
|
||||||
|
(wrap-operation (sleep-operation 5)
|
||||||
|
(const #f)))))))
|
||||||
|
|
||||||
|
(loop resources))
|
||||||
|
|
||||||
|
(('check-for-idle-resources)
|
||||||
|
(loop resources))
|
||||||
|
|
||||||
|
(('destroy reply)
|
||||||
|
(loop resources))
|
||||||
|
(unknown
|
||||||
|
(simple-format
|
||||||
|
(current-error-port)
|
||||||
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||||
|
name
|
||||||
|
unknown)
|
||||||
|
(loop resources)))))
|
||||||
|
|
||||||
(define (main-loop)
|
(define (main-loop)
|
||||||
(let loop ((resources '())
|
(let loop ((resources '())
|
||||||
(available '())
|
(available '())
|
||||||
|
@ -257,7 +375,8 @@
|
||||||
(spawn-fiber-for-checkout waiter-channel
|
(spawn-fiber-for-checkout waiter-channel
|
||||||
reply-timeout
|
reply-timeout
|
||||||
resource))
|
resource))
|
||||||
(put-message waiter-channel resource))
|
(put-message waiter-channel (cons 'success
|
||||||
|
resource)))
|
||||||
|
|
||||||
(loop (cons resource resources)
|
(loop (cons resource resources)
|
||||||
available
|
available
|
||||||
|
@ -265,17 +384,45 @@
|
||||||
(cons (get-internal-real-time)
|
(cons (get-internal-real-time)
|
||||||
resources-last-used)))))))))
|
resources-last-used)))))))))
|
||||||
|
|
||||||
(('checkout reply timeout-time)
|
(('checkout reply timeout-time max-waiters)
|
||||||
(if (null? available)
|
(if (null? available)
|
||||||
(begin
|
(begin
|
||||||
(unless (= (length resources) max-size)
|
(unless (= (length resources) max-size)
|
||||||
(spawn-fiber-to-return-new-resource))
|
(spawn-fiber-to-return-new-resource))
|
||||||
|
|
||||||
(loop resources
|
(let ((waiters-count
|
||||||
available
|
(length waiters)))
|
||||||
(cons (cons reply timeout-time)
|
(if (and max-waiters
|
||||||
waiters)
|
(>= waiters-count
|
||||||
resources-last-used))
|
max-waiters))
|
||||||
|
(begin
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'too-many-waiters
|
||||||
|
waiters-count))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout-time
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout-time
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op)))))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters
|
||||||
|
resources-last-used))
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
(cons (cons reply timeout-time)
|
||||||
|
waiters)
|
||||||
|
resources-last-used))))
|
||||||
|
|
||||||
(if timeout-time
|
(if timeout-time
|
||||||
(let ((current-internal-time
|
(let ((current-internal-time
|
||||||
|
@ -303,7 +450,8 @@
|
||||||
waiters
|
waiters
|
||||||
resources-last-used)))
|
resources-last-used)))
|
||||||
(begin
|
(begin
|
||||||
(put-message reply (car available))
|
(put-message reply (cons 'success
|
||||||
|
(car available)))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
(cdr available)
|
(cdr available)
|
||||||
|
@ -369,7 +517,8 @@
|
||||||
(spawn-fiber-for-checkout waiter-channel
|
(spawn-fiber-for-checkout waiter-channel
|
||||||
reply-timeout
|
reply-timeout
|
||||||
resource))
|
resource))
|
||||||
(put-message waiter-channel resource))
|
(put-message waiter-channel (cons 'success
|
||||||
|
resource)))
|
||||||
|
|
||||||
(loop resources
|
(loop resources
|
||||||
available
|
available
|
||||||
|
@ -410,6 +559,24 @@
|
||||||
resources-last-used
|
resources-last-used
|
||||||
index))))
|
index))))
|
||||||
|
|
||||||
|
(('destroy resource)
|
||||||
|
(spawn-fiber-to-destroy-resource resource)
|
||||||
|
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters
|
||||||
|
resources-last-used))
|
||||||
|
|
||||||
|
(('list-resources reply)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(put-message reply (list-copy resources))))
|
||||||
|
|
||||||
|
(loop resources
|
||||||
|
available
|
||||||
|
waiters
|
||||||
|
resources-last-used))
|
||||||
|
|
||||||
(('stats reply)
|
(('stats reply)
|
||||||
(let ((stats
|
(let ((stats
|
||||||
`((resources . ,(length resources))
|
`((resources . ,(length resources))
|
||||||
|
@ -472,9 +639,11 @@
|
||||||
waiters
|
waiters
|
||||||
resources-last-used))))
|
resources-last-used))))
|
||||||
|
|
||||||
(('destroy reply)
|
(('destroy)
|
||||||
(if (null? resources)
|
(if (and (null? resources)
|
||||||
(put-message reply 'destroy-success)
|
(null? waiters))
|
||||||
|
(signal-condition!
|
||||||
|
destroy-condition)
|
||||||
|
|
||||||
(begin
|
(begin
|
||||||
(for-each
|
(for-each
|
||||||
|
@ -488,16 +657,33 @@
|
||||||
#:parallel? #t)))
|
#:parallel? #t)))
|
||||||
available)
|
available)
|
||||||
|
|
||||||
(spawn-fiber
|
(let ((current-internal-time (get-internal-real-time)))
|
||||||
(lambda ()
|
(for-each
|
||||||
(sleep 0.1)
|
(match-lambda
|
||||||
(put-message channel
|
((reply . timeout)
|
||||||
(list 'destroy reply))))
|
(when (or (not timeout)
|
||||||
|
(> timeout current-internal-time))
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(let ((op
|
||||||
|
(put-operation
|
||||||
|
reply
|
||||||
|
(cons 'resource-pool-destroyed
|
||||||
|
#f))))
|
||||||
|
(perform-operation
|
||||||
|
(if timeout
|
||||||
|
(choice-operation
|
||||||
|
op
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation
|
||||||
|
(/ (- timeout
|
||||||
|
(get-internal-real-time))
|
||||||
|
internal-time-units-per-second))
|
||||||
|
(const #f)))
|
||||||
|
op))))))))
|
||||||
|
waiters))
|
||||||
|
|
||||||
(loop resources
|
(destroy-loop resources))))
|
||||||
'()
|
|
||||||
waiters
|
|
||||||
resources-last-used))))
|
|
||||||
|
|
||||||
(unknown
|
(unknown
|
||||||
(simple-format
|
(simple-format
|
||||||
|
@ -552,12 +738,16 @@
|
||||||
pool)
|
pool)
|
||||||
|
|
||||||
(define (destroy-resource-pool pool)
|
(define (destroy-resource-pool pool)
|
||||||
(let ((reply (make-channel)))
|
(perform-operation
|
||||||
(put-message (resource-pool-channel pool)
|
(choice-operation
|
||||||
(list 'destroy reply))
|
(wrap-operation
|
||||||
(let ((msg (get-message reply)))
|
(put-operation (resource-pool-channel pool)
|
||||||
(unless (eq? msg 'destroy-success)
|
(list 'destroy))
|
||||||
(error msg)))))
|
(lambda _
|
||||||
|
(wait (resource-pool-destroy-condition pool))))
|
||||||
|
(wait-operation
|
||||||
|
(resource-pool-destroy-condition pool))))
|
||||||
|
#t)
|
||||||
|
|
||||||
(define &resource-pool-timeout
|
(define &resource-pool-timeout
|
||||||
(make-exception-type '&recource-pool-timeout
|
(make-exception-type '&recource-pool-timeout
|
||||||
|
@ -575,12 +765,63 @@
|
||||||
(define resource-pool-timeout-error?
|
(define resource-pool-timeout-error?
|
||||||
(record-predicate &resource-pool-timeout))
|
(record-predicate &resource-pool-timeout))
|
||||||
|
|
||||||
|
(define &resource-pool-too-many-waiters
|
||||||
|
(make-exception-type '&recource-pool-too-many-waiters
|
||||||
|
&error
|
||||||
|
'(pool waiters-count)))
|
||||||
|
|
||||||
|
(define resource-pool-too-many-waiters-error-pool
|
||||||
|
(exception-accessor
|
||||||
|
&resource-pool-too-many-waiters
|
||||||
|
(record-accessor &resource-pool-too-many-waiters 'pool)))
|
||||||
|
|
||||||
|
(define resource-pool-too-many-waiters-error-waiters-count
|
||||||
|
(exception-accessor
|
||||||
|
&resource-pool-too-many-waiters
|
||||||
|
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
|
||||||
|
|
||||||
|
(define make-resource-pool-too-many-waiters-error
|
||||||
|
(record-constructor &resource-pool-too-many-waiters))
|
||||||
|
|
||||||
|
(define resource-pool-too-many-waiters-error?
|
||||||
|
(record-predicate &resource-pool-too-many-waiters))
|
||||||
|
|
||||||
|
(define &resource-pool-destroyed
|
||||||
|
(make-exception-type '&recource-pool-destroyed
|
||||||
|
&error
|
||||||
|
'(pool)))
|
||||||
|
|
||||||
|
(define resource-pool-destroyed-error-pool
|
||||||
|
(exception-accessor
|
||||||
|
&resource-pool-destroyed
|
||||||
|
(record-accessor &resource-pool-destroyed 'pool)))
|
||||||
|
|
||||||
|
(define make-resource-pool-destroyed-error
|
||||||
|
(record-constructor &resource-pool-destroyed))
|
||||||
|
|
||||||
|
(define resource-pool-destroyed-error?
|
||||||
|
(record-predicate &resource-pool-destroyed))
|
||||||
|
|
||||||
|
(define &resource-pool-destroy-resource
|
||||||
|
(make-exception-type '&recource-pool-destroy-resource
|
||||||
|
&exception
|
||||||
|
'()))
|
||||||
|
|
||||||
|
(define make-resource-pool-destroy-resource-exception
|
||||||
|
(record-constructor &resource-pool-destroy-resource))
|
||||||
|
|
||||||
|
(define resource-pool-destroy-resource-exception?
|
||||||
|
(record-predicate &resource-pool-destroy-resource))
|
||||||
|
|
||||||
(define resource-pool-default-timeout-handler
|
(define resource-pool-default-timeout-handler
|
||||||
(make-parameter #f))
|
(make-parameter #f))
|
||||||
|
|
||||||
(define* (call-with-resource-from-pool
|
(define* (call-with-resource-from-pool
|
||||||
pool proc #:key (timeout 'default)
|
pool proc #:key (timeout 'default)
|
||||||
(timeout-handler (resource-pool-default-timeout-handler)))
|
(timeout-handler (resource-pool-default-timeout-handler))
|
||||||
|
(max-waiters 'default)
|
||||||
|
(channel (resource-pool-channel pool))
|
||||||
|
(destroy-resource-on-exception? #f))
|
||||||
"Call PROC with a resource from POOL, blocking until a resource becomes
|
"Call PROC with a resource from POOL, blocking until a resource becomes
|
||||||
available. Return the resource once PROC has returned."
|
available. Return the resource once PROC has returned."
|
||||||
|
|
||||||
|
@ -590,7 +831,13 @@ available. Return the resource once PROC has returned."
|
||||||
'default-checkout-timeout)
|
'default-checkout-timeout)
|
||||||
timeout))
|
timeout))
|
||||||
|
|
||||||
(let ((resource
|
(define max-waiters-or-default
|
||||||
|
(if (eq? max-waiters 'default)
|
||||||
|
(assq-ref (resource-pool-configuration pool)
|
||||||
|
'default-max-waiters)
|
||||||
|
max-waiters))
|
||||||
|
|
||||||
|
(let ((reply
|
||||||
(if timeout-or-default
|
(if timeout-or-default
|
||||||
(let loop ((reply (make-channel))
|
(let loop ((reply (make-channel))
|
||||||
(start-time (get-internal-real-time)))
|
(start-time (get-internal-real-time)))
|
||||||
|
@ -598,12 +845,13 @@ available. Return the resource once PROC has returned."
|
||||||
(perform-operation
|
(perform-operation
|
||||||
(choice-operation
|
(choice-operation
|
||||||
(wrap-operation
|
(wrap-operation
|
||||||
(put-operation (resource-pool-channel pool)
|
(put-operation channel
|
||||||
(list 'checkout
|
(list 'checkout
|
||||||
reply
|
reply
|
||||||
(+ start-time
|
(+ start-time
|
||||||
(* timeout-or-default
|
(* timeout-or-default
|
||||||
internal-time-units-per-second))))
|
internal-time-units-per-second))
|
||||||
|
max-waiters-or-default))
|
||||||
(const #t))
|
(const #t))
|
||||||
(wrap-operation (sleep-operation timeout-or-default)
|
(wrap-operation (sleep-operation timeout-or-default)
|
||||||
(const #f))))))
|
(const #f))))))
|
||||||
|
@ -629,37 +877,71 @@ available. Return the resource once PROC has returned."
|
||||||
0)
|
0)
|
||||||
(loop (make-channel)
|
(loop (make-channel)
|
||||||
start-time)
|
start-time)
|
||||||
#f)
|
'timeout)
|
||||||
response))
|
response))
|
||||||
#f)))))
|
'timeout)))))
|
||||||
(let loop ((reply (make-channel)))
|
(let loop ((reply (make-channel)))
|
||||||
(put-message (resource-pool-channel pool)
|
(put-message channel
|
||||||
(list 'checkout
|
(list 'checkout
|
||||||
reply
|
reply
|
||||||
#f))
|
#f
|
||||||
|
max-waiters-or-default))
|
||||||
(get-message reply)))))
|
(get-message reply)))))
|
||||||
|
|
||||||
(when (not resource)
|
(match reply
|
||||||
(when timeout-handler
|
('timeout
|
||||||
(timeout-handler pool proc timeout))
|
(when timeout-handler
|
||||||
|
(timeout-handler pool proc timeout))
|
||||||
|
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-resource-pool-timeout-error pool)))
|
(make-resource-pool-timeout-error pool)))
|
||||||
|
(('too-many-waiters . count)
|
||||||
|
|
||||||
(call-with-values
|
(raise-exception
|
||||||
(lambda ()
|
(make-resource-pool-too-many-waiters-error pool
|
||||||
(with-exception-handler
|
count)))
|
||||||
(lambda (exn)
|
(('resource-pool-destroyed . #f)
|
||||||
(print-backtrace-and-exception/knots exn)
|
(raise-exception
|
||||||
(put-message (resource-pool-channel pool)
|
(make-resource-pool-destroyed-error pool)))
|
||||||
`(return ,resource))
|
(('success . resource)
|
||||||
(raise-exception exn))
|
(call-with-values
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(proc resource))))
|
(with-exception-handler
|
||||||
(lambda vals
|
(lambda (exn)
|
||||||
(put-message (resource-pool-channel pool)
|
;; Unwind the stack before calling put-message, as
|
||||||
`(return ,resource))
|
;; this avoids inconsistent behaviour with
|
||||||
(apply values vals)))))
|
;; continuation barriers
|
||||||
|
(put-message
|
||||||
|
(resource-pool-channel pool)
|
||||||
|
(list (if (or destroy-resource-on-exception?
|
||||||
|
(resource-pool-destroy-resource-exception? exn))
|
||||||
|
'destroy
|
||||||
|
'return)
|
||||||
|
resource))
|
||||||
|
(unless (resource-pool-destroy-resource-exception? exn)
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(let ((stack
|
||||||
|
(match (fluid-ref %stacks)
|
||||||
|
((stack-tag . prompt-tag)
|
||||||
|
(make-stack #t
|
||||||
|
0 prompt-tag
|
||||||
|
0 (and prompt-tag 1)))
|
||||||
|
(_
|
||||||
|
(make-stack #t)))))
|
||||||
|
(raise-exception
|
||||||
|
(make-exception
|
||||||
|
exn
|
||||||
|
(make-knots-exception stack)))))
|
||||||
|
(lambda ()
|
||||||
|
(proc resource))))
|
||||||
|
#:unwind? #t))
|
||||||
|
(lambda vals
|
||||||
|
(put-message (resource-pool-channel pool)
|
||||||
|
`(return ,resource))
|
||||||
|
(apply values vals)))))))
|
||||||
|
|
||||||
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
|
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
|
||||||
(call-with-resource-from-pool
|
(call-with-resource-from-pool
|
||||||
|
@ -696,3 +978,8 @@ available. Return the resource once PROC has returned."
|
||||||
(raise-exception
|
(raise-exception
|
||||||
(make-resource-pool-timeout-error pool))))))
|
(make-resource-pool-timeout-error pool))))))
|
||||||
|
|
||||||
|
(define (resource-pool-list-resources pool)
|
||||||
|
(let ((reply (make-channel)))
|
||||||
|
(put-message (resource-pool-channel pool)
|
||||||
|
(list 'list-resources reply))
|
||||||
|
(get-message reply)))
|
||||||
|
|
|
@ -251,15 +251,18 @@ arguments of the thread pool procedure."
|
||||||
proc)
|
proc)
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(match (fluid-ref %stacks)
|
(let ((stack
|
||||||
((stack-tag . prompt-tag)
|
(match (fluid-ref %stacks)
|
||||||
(let ((stack (make-stack #t
|
((stack-tag . prompt-tag)
|
||||||
0 prompt-tag
|
(make-stack #t
|
||||||
0 (and prompt-tag 1))))
|
0 prompt-tag
|
||||||
(raise-exception
|
0 (and prompt-tag 1)))
|
||||||
(make-exception
|
(_
|
||||||
exn
|
(make-stack #t)))))
|
||||||
(make-knots-exception stack)))))))
|
(raise-exception
|
||||||
|
(make-exception
|
||||||
|
exn
|
||||||
|
(make-knots-exception stack)))))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(call-with-values
|
(call-with-values
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
|
|
@ -48,7 +48,6 @@
|
||||||
request-body-port/knots
|
request-body-port/knots
|
||||||
read-request-body/knots
|
read-request-body/knots
|
||||||
|
|
||||||
default-exception-handler
|
|
||||||
default-write-response-exception-handler
|
default-write-response-exception-handler
|
||||||
|
|
||||||
web-server?
|
web-server?
|
||||||
|
@ -310,7 +309,7 @@ on the procedure being called at any particular time."
|
||||||
;; Close the client port
|
;; Close the client port
|
||||||
#f)
|
#f)
|
||||||
|
|
||||||
(define (default-exception-handler exn request)
|
(define (exception-handler exn request)
|
||||||
(let* ((error-string
|
(let* ((error-string
|
||||||
(call-with-output-string
|
(call-with-output-string
|
||||||
(lambda (port)
|
(lambda (port)
|
||||||
|
@ -332,8 +331,7 @@ on the procedure being called at any particular time."
|
||||||
|
|
||||||
(define (handle-request handler client
|
(define (handle-request handler client
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler)
|
||||||
exception-handler)
|
|
||||||
(let ((request
|
(let ((request
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
|
@ -356,58 +354,14 @@ on the procedure being called at any particular time."
|
||||||
(lambda (return)
|
(lambda (return)
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(with-exception-handler
|
(call-with-values
|
||||||
(lambda (exn)
|
(lambda ()
|
||||||
(call-with-values
|
(exception-handler exn request))
|
||||||
(lambda ()
|
(lambda (response body)
|
||||||
(default-exception-handler
|
|
||||||
(make-exception
|
|
||||||
exn
|
|
||||||
(make-exception-with-message
|
|
||||||
"exception in exception handler")
|
|
||||||
(make-exception-with-irritants
|
|
||||||
exception-handler))
|
|
||||||
request))
|
|
||||||
(match-lambda*
|
|
||||||
((response body)
|
|
||||||
(call-with-values
|
|
||||||
(lambda ()
|
|
||||||
(sanitize-response
|
|
||||||
request
|
|
||||||
response
|
|
||||||
body))
|
|
||||||
return)))))
|
|
||||||
(lambda ()
|
|
||||||
(call-with-values
|
(call-with-values
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(exception-handler exn request))
|
(sanitize-response request response body))
|
||||||
(match-lambda*
|
return))))
|
||||||
((response body)
|
|
||||||
(call-with-values
|
|
||||||
(lambda ()
|
|
||||||
(sanitize-response request response body))
|
|
||||||
return))
|
|
||||||
(other
|
|
||||||
(call-with-values
|
|
||||||
(lambda ()
|
|
||||||
(default-exception-handler
|
|
||||||
(make-exception-with-irritants
|
|
||||||
(list (make-exception-with-message
|
|
||||||
(simple-format
|
|
||||||
#f
|
|
||||||
"wrong number of values returned from exception handler, expecting 2, got ~A"
|
|
||||||
(length other)))
|
|
||||||
exception-handler))
|
|
||||||
request))
|
|
||||||
(match-lambda*
|
|
||||||
((response body)
|
|
||||||
(call-with-values
|
|
||||||
(lambda ()
|
|
||||||
(sanitize-response
|
|
||||||
request
|
|
||||||
response
|
|
||||||
body))
|
|
||||||
return))))))))))
|
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-stack
|
(start-stack
|
||||||
#t
|
#t
|
||||||
|
@ -475,7 +429,6 @@ on the procedure being called at any particular time."
|
||||||
#:unwind? #t))))
|
#:unwind? #t))))
|
||||||
|
|
||||||
(define* (client-loop client handler
|
(define* (client-loop client handler
|
||||||
exception-handler
|
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler
|
||||||
connection-idle-timeout
|
connection-idle-timeout
|
||||||
|
@ -517,8 +470,7 @@ on the procedure being called at any particular time."
|
||||||
(else
|
(else
|
||||||
(let ((keep-alive? (handle-request handler client
|
(let ((keep-alive? (handle-request handler client
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler)))
|
||||||
exception-handler)))
|
|
||||||
(if keep-alive?
|
(if keep-alive?
|
||||||
(loop)
|
(loop)
|
||||||
(close-port client)))))))
|
(close-port client)))))))
|
||||||
|
@ -537,8 +489,6 @@ on the procedure being called at any particular time."
|
||||||
INADDR_LOOPBACK))
|
INADDR_LOOPBACK))
|
||||||
(port 8080)
|
(port 8080)
|
||||||
(socket (make-default-socket family addr port))
|
(socket (make-default-socket family addr port))
|
||||||
(exception-handler
|
|
||||||
default-exception-handler)
|
|
||||||
(read-request-exception-handler
|
(read-request-exception-handler
|
||||||
default-read-request-exception-handler)
|
default-read-request-exception-handler)
|
||||||
(write-response-exception-handler
|
(write-response-exception-handler
|
||||||
|
@ -577,7 +527,6 @@ before sending back to the client."
|
||||||
((client . sockaddr)
|
((client . sockaddr)
|
||||||
(spawn-fiber (lambda ()
|
(spawn-fiber (lambda ()
|
||||||
(client-loop client handler
|
(client-loop client handler
|
||||||
exception-handler
|
|
||||||
read-request-exception-handler
|
read-request-exception-handler
|
||||||
write-response-exception-handler
|
write-response-exception-handler
|
||||||
connection-idle-timeout
|
connection-idle-timeout
|
||||||
|
|
|
@ -93,4 +93,22 @@
|
||||||
1))
|
1))
|
||||||
#:unwind? #t)))
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(let ((a 0))
|
||||||
|
(call-with-values
|
||||||
|
(lambda ()
|
||||||
|
(fibers-parallel
|
||||||
|
(begin
|
||||||
|
(sleep 1)
|
||||||
|
1)
|
||||||
|
(begin
|
||||||
|
(set! a 1)
|
||||||
|
2)))
|
||||||
|
(lambda (a b)
|
||||||
|
(assert-equal a 1)
|
||||||
|
(assert-equal b 2)))
|
||||||
|
|
||||||
|
(assert-equal a 1))))
|
||||||
|
|
||||||
(display "parallelism test finished successfully\n")
|
(display "parallelism test finished successfully\n")
|
||||||
|
|
|
@ -142,4 +142,48 @@
|
||||||
20
|
20
|
||||||
(iota 50)))))
|
(iota 50)))))
|
||||||
|
|
||||||
|
(run-fibers-for-tests
|
||||||
|
(lambda ()
|
||||||
|
(let ((resource-pool (make-resource-pool
|
||||||
|
(lambda () #f)
|
||||||
|
1
|
||||||
|
#:default-max-waiters 1)))
|
||||||
|
(call-with-resource-from-pool
|
||||||
|
resource-pool
|
||||||
|
(lambda (res)
|
||||||
|
|
||||||
|
;; 1st waiter
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(if (resource-pool-destroyed-error? exn)
|
||||||
|
#t
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
(call-with-resource-from-pool
|
||||||
|
resource-pool
|
||||||
|
(lambda (res)
|
||||||
|
(error 'should-not-be-reached))))
|
||||||
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
(while (= 0
|
||||||
|
(assq-ref
|
||||||
|
(resource-pool-stats resource-pool)
|
||||||
|
'waiters))
|
||||||
|
(sleep 0))
|
||||||
|
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(if (resource-pool-too-many-waiters-error? exn)
|
||||||
|
#t
|
||||||
|
(raise-exception exn)))
|
||||||
|
(lambda ()
|
||||||
|
;; 2nd waiter
|
||||||
|
(call-with-resource-from-pool
|
||||||
|
resource-pool
|
||||||
|
(lambda (res)
|
||||||
|
(error 'should-not-be-reached))))
|
||||||
|
#:unwind? #t))))))
|
||||||
|
|
||||||
(display "resource-pool test finished successfully\n")
|
(display "resource-pool test finished successfully\n")
|
||||||
|
|
|
@ -52,28 +52,6 @@
|
||||||
uri
|
uri
|
||||||
#:port (non-blocking-open-socket-for-uri uri)))))))
|
#:port (non-blocking-open-socket-for-uri uri)))))))
|
||||||
|
|
||||||
(run-fibers-for-tests
|
|
||||||
(lambda ()
|
|
||||||
(let* ((web-server
|
|
||||||
(run-knots-web-server
|
|
||||||
(lambda (request)
|
|
||||||
"Hello, World!")
|
|
||||||
#:port 0
|
|
||||||
#:exception-handler
|
|
||||||
(lambda (exn request)
|
|
||||||
"Error"))) ;; Bind to any port
|
|
||||||
(port
|
|
||||||
(web-server-port web-server))
|
|
||||||
(uri
|
|
||||||
(build-uri 'http #:host "127.0.0.1" #:port port)))
|
|
||||||
|
|
||||||
(assert-equal
|
|
||||||
500
|
|
||||||
(response-code
|
|
||||||
(http-get
|
|
||||||
uri
|
|
||||||
#:port (non-blocking-open-socket-for-uri uri)))))))
|
|
||||||
|
|
||||||
(run-fibers-for-tests
|
(run-fibers-for-tests
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(let* ((web-server
|
(let* ((web-server
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue