Compare commits

..

No commits in common. "7ba77010ae98e675340a7ea22b400f0dcc20ef65" and "47ff45d96331cf885e1e3d3ad8748295d182f882" have entirely different histories.

8 changed files with 197 additions and 492 deletions

View file

@ -67,14 +67,11 @@
(define* (print-backtrace-and-exception/knots
exn
#:key (port (current-error-port)))
(let* ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t))))
(let* ((stack (match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))))
(error-string
(call-with-output-string
(lambda (port)

View file

@ -22,7 +22,6 @@
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
@ -44,33 +43,27 @@
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(put-message
reply
(list 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))
#:unwind? #t))
(call-with-escape-continuation
(lambda (return)
(with-exception-handler
(lambda (exn)
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(put-message reply
(list 'exception
(make-exception
exn
(make-knots-exception stack)))))))
(return))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))))
#:parallel? #t)
reply))
@ -252,31 +245,25 @@
(get-message process-channel))))
(put-message
reply-channel
(with-exception-handler
(lambda (exn)
(list 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (apply proc args)))
(lambda vals
(cons 'result vals))))))
#:unwind? #t)))))
(call-with-escape-continuation
(lambda (return)
(with-exception-handler
(lambda (exn)
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(return (list 'exception
(make-exception
exn
(make-knots-exception stack))))))))
(lambda ()
(call-with-values
(lambda ()
(start-stack #t (apply proc args)))
(lambda vals
(cons 'result vals)))))))))))
#:parallel? #t))
(iota parallelism))

View file

@ -29,7 +29,6 @@
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (resource-pool?
@ -44,19 +43,6 @@
resource-pool-timeout-error-pool
resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
&resource-pool-destroy-resource
make-resource-pool-destroy-resource-exception
resource-pool-destroy-resource-exception?
resource-pool-default-timeout-handler
call-with-resource-from-pool
@ -76,12 +62,11 @@
(record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration)
(make-resource-pool-record name channel configuration)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel)
(destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
(name resource-pool-name)
(channel resource-pool-channel)
(configuration resource-pool-configuration))
(set-record-type-printer!
<resource-pool>
@ -101,17 +86,13 @@
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout
default-max-waiters)
default-checkout-timeout)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
@ -121,8 +102,7 @@
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(default-checkout-timeout . ,default-checkout-timeout))))
(define checkout-failure-count 0)
@ -206,8 +186,7 @@
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(cons 'success resource))
(put-operation reply-channel resource)
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
@ -217,103 +196,6 @@
channel
(list 'return-failed-checkout resource)))))))
(define (destroy-loop resources)
(let loop ((resources resources))
(match (get-message channel)
(('add-resource resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(loop resources))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
(loop resources))
(('check-for-idle-resources)
(loop resources))
(('destroy reply)
(loop resources))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources)))))
(define (main-loop)
(let loop ((resources '())
(available '())
@ -375,8 +257,7 @@
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(put-message waiter-channel resource))
(loop (cons resource resources)
available
@ -384,45 +265,17 @@
(cons (get-internal-real-time)
resources-last-used)))))))))
(('checkout reply timeout-time max-waiters)
(('checkout reply timeout-time)
(if (null? available)
(begin
(unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters
resources-last-used))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))
(if timeout-time
(let ((current-internal-time
@ -450,8 +303,7 @@
waiters
resources-last-used)))
(begin
(put-message reply (cons 'success
(car available)))
(put-message reply (car available))
(loop resources
(cdr available)
@ -517,8 +369,7 @@
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(put-message waiter-channel resource))
(loop resources
available
@ -559,24 +410,6 @@
resources-last-used
index))))
(('destroy resource)
(spawn-fiber-to-destroy-resource resource)
(loop resources
available
waiters
resources-last-used))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (list-copy resources))))
(loop resources
available
waiters
resources-last-used))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
@ -639,11 +472,9 @@
waiters
resources-last-used))))
(('destroy)
(if (and (null? resources)
(null? waiters))
(signal-condition!
destroy-condition)
(('destroy reply)
(if (null? resources)
(put-message reply 'destroy-success)
(begin
(for-each
@ -657,33 +488,16 @@
#:parallel? #t)))
available)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
waiters))
(spawn-fiber
(lambda ()
(sleep 0.1)
(put-message channel
(list 'destroy reply))))
(destroy-loop resources))))
(loop resources
'()
waiters
resources-last-used))))
(unknown
(simple-format
@ -738,16 +552,12 @@
pool)
(define (destroy-resource-pool pool)
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait (resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
@ -765,63 +575,12 @@
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(record-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(record-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
&exception
'()))
(define make-resource-pool-destroy-resource-exception
(record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception?
(record-predicate &resource-pool-destroy-resource))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default)
(channel (resource-pool-channel pool))
(destroy-resource-on-exception? #f))
(timeout-handler (resource-pool-default-timeout-handler)))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
@ -831,13 +590,7 @@ available. Return the resource once PROC has returned."
'default-checkout-timeout)
timeout))
(define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(let ((reply
(let ((resource
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
@ -845,13 +598,12 @@ available. Return the resource once PROC has returned."
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
(put-operation (resource-pool-channel pool)
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))
max-waiters-or-default))
internal-time-units-per-second))))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
@ -877,71 +629,37 @@ available. Return the resource once PROC has returned."
0)
(loop (make-channel)
start-time)
'timeout)
#f)
response))
'timeout)))))
#f)))))
(let loop ((reply (make-channel)))
(put-message channel
(put-message (resource-pool-channel pool)
(list 'checkout
reply
#f
max-waiters-or-default))
#f))
(get-message reply)))))
(match reply
('timeout
(when timeout-handler
(timeout-handler pool proc timeout))
(when (not resource)
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(raise-exception
(make-resource-pool-timeout-error pool)))
(raise-exception
(make-resource-pool-too-many-waiters-error pool
count)))
(('resource-pool-destroyed . #f)
(raise-exception
(make-resource-pool-destroyed-error pool)))
(('success . resource)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
;; Unwind the stack before calling put-message, as
;; this avoids inconsistent behaviour with
;; continuation barriers
(put-message
(resource-pool-channel pool)
(list (if (or destroy-resource-on-exception?
(resource-pool-destroy-resource-exception? exn))
'destroy
'return)
resource))
(unless (resource-pool-destroy-resource-exception? exn)
(raise-exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(proc resource))))
#:unwind? #t))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))))
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exn))
(lambda ()
(proc resource))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
@ -978,8 +696,3 @@ available. Return the resource once PROC has returned."
(raise-exception
(make-resource-pool-timeout-error pool))))))
(define (resource-pool-list-resources pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'list-resources reply))
(get-message reply)))

View file

@ -251,18 +251,15 @@ arguments of the thread pool procedure."
proc)
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(let ((stack (make-stack #t
0 prompt-tag
0 (and prompt-tag 1))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))))
(lambda ()
(call-with-values
(lambda ()

View file

@ -48,6 +48,7 @@
request-body-port/knots
read-request-body/knots
default-exception-handler
default-write-response-exception-handler
web-server?
@ -309,7 +310,7 @@ on the procedure being called at any particular time."
;; Close the client port
#f)
(define (exception-handler exn request)
(define (default-exception-handler exn request)
(let* ((error-string
(call-with-output-string
(lambda (port)
@ -331,7 +332,8 @@ on the procedure being called at any particular time."
(define (handle-request handler client
read-request-exception-handler
write-response-exception-handler)
write-response-exception-handler
exception-handler)
(let ((request
(with-exception-handler
read-request-exception-handler
@ -354,14 +356,58 @@ on the procedure being called at any particular time."
(lambda (return)
(with-exception-handler
(lambda (exn)
(call-with-values
(lambda ()
(exception-handler exn request))
(lambda (response body)
(with-exception-handler
(lambda (exn)
(call-with-values
(lambda ()
(default-exception-handler
(make-exception
exn
(make-exception-with-message
"exception in exception handler")
(make-exception-with-irritants
exception-handler))
request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response
request
response
body))
return)))))
(lambda ()
(call-with-values
(lambda ()
(sanitize-response request response body))
return))))
(exception-handler exn request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response request response body))
return))
(other
(call-with-values
(lambda ()
(default-exception-handler
(make-exception-with-irritants
(list (make-exception-with-message
(simple-format
#f
"wrong number of values returned from exception handler, expecting 2, got ~A"
(length other)))
exception-handler))
request))
(match-lambda*
((response body)
(call-with-values
(lambda ()
(sanitize-response
request
response
body))
return))))))))))
(lambda ()
(start-stack
#t
@ -429,6 +475,7 @@ on the procedure being called at any particular time."
#:unwind? #t))))
(define* (client-loop client handler
exception-handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout
@ -470,7 +517,8 @@ on the procedure being called at any particular time."
(else
(let ((keep-alive? (handle-request handler client
read-request-exception-handler
write-response-exception-handler)))
write-response-exception-handler
exception-handler)))
(if keep-alive?
(loop)
(close-port client)))))))
@ -489,6 +537,8 @@ on the procedure being called at any particular time."
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port))
(exception-handler
default-exception-handler)
(read-request-exception-handler
default-read-request-exception-handler)
(write-response-exception-handler
@ -527,6 +577,7 @@ before sending back to the client."
((client . sockaddr)
(spawn-fiber (lambda ()
(client-loop client handler
exception-handler
read-request-exception-handler
write-response-exception-handler
connection-idle-timeout

View file

@ -93,22 +93,4 @@
1))
#:unwind? #t)))
(run-fibers-for-tests
(lambda ()
(let ((a 0))
(call-with-values
(lambda ()
(fibers-parallel
(begin
(sleep 1)
1)
(begin
(set! a 1)
2)))
(lambda (a b)
(assert-equal a 1)
(assert-equal b 2)))
(assert-equal a 1))))
(display "parallelism test finished successfully\n")

View file

@ -142,48 +142,4 @@
20
(iota 50)))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
(lambda () #f)
1
#:default-max-waiters 1)))
(call-with-resource-from-pool
resource-pool
(lambda (res)
;; 1st waiter
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(if (resource-pool-destroyed-error? exn)
#t
(raise-exception exn)))
(lambda ()
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t)))
(while (= 0
(assq-ref
(resource-pool-stats resource-pool)
'waiters))
(sleep 0))
(with-exception-handler
(lambda (exn)
(if (resource-pool-too-many-waiters-error? exn)
#t
(raise-exception exn)))
(lambda ()
;; 2nd waiter
(call-with-resource-from-pool
resource-pool
(lambda (res)
(error 'should-not-be-reached))))
#:unwind? #t))))))
(display "resource-pool test finished successfully\n")

View file

@ -52,6 +52,28 @@
uri
#:port (non-blocking-open-socket-for-uri uri)))))))
(run-fibers-for-tests
(lambda ()
(let* ((web-server
(run-knots-web-server
(lambda (request)
"Hello, World!")
#:port 0
#:exception-handler
(lambda (exn request)
"Error"))) ;; Bind to any port
(port
(web-server-port web-server))
(uri
(build-uri 'http #:host "127.0.0.1" #:port port)))
(assert-equal
500
(response-code
(http-get
uri
#:port (non-blocking-open-socket-for-uri uri)))))))
(run-fibers-for-tests
(lambda ()
(let* ((web-server