Improve exception reporting in the resource pool
This commit is contained in:
parent
823cd95628
commit
dc98ef9dcc
1 changed files with 266 additions and 242 deletions
|
@ -200,6 +200,246 @@
|
|||
channel
|
||||
(list 'return-failed-checkout resource)))))))
|
||||
|
||||
(define (main-loop)
|
||||
(let loop ((resources '())
|
||||
(available '())
|
||||
(waiters '())
|
||||
(resources-last-used '()))
|
||||
|
||||
(match (get-message channel)
|
||||
(('add-resource resource)
|
||||
(if (= (length resources) max-size)
|
||||
(begin
|
||||
(if destructor
|
||||
(begin
|
||||
(spawn-fiber-to-destroy-resource resource)
|
||||
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used)))
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))))
|
||||
|
||||
(if (null? waiters)
|
||||
(loop (cons resource resources)
|
||||
(cons resource available)
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))
|
||||
|
||||
(begin
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a new
|
||||
;; fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout (last waiters)
|
||||
resource)
|
||||
(put-message (last waiters) resource))
|
||||
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
(drop-right! waiters 1)
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))))))
|
||||
|
||||
(('checkout reply)
|
||||
(if (null? available)
|
||||
(begin
|
||||
(unless (= (length resources) max-size)
|
||||
(spawn-fiber-to-return-new-resource))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
(cons reply waiters)
|
||||
resources-last-used))
|
||||
|
||||
(let ((resource (car available)))
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout reply resource)
|
||||
(put-message reply resource))
|
||||
|
||||
(loop resources
|
||||
(cdr available)
|
||||
waiters
|
||||
resources-last-used))))
|
||||
|
||||
(((and (or 'return
|
||||
'return-failed-checkout)
|
||||
return-type)
|
||||
resource)
|
||||
|
||||
(when (eq? 'return-failed-checkout
|
||||
return-type)
|
||||
(set! checkout-failure-count
|
||||
(+ 1 checkout-failure-count)))
|
||||
|
||||
(if (null? waiters)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
(get-internal-real-time))
|
||||
resources-last-used))
|
||||
|
||||
(begin
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a new
|
||||
;; fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout (last waiters)
|
||||
resource)
|
||||
(put-message (last waiters) resource))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
(drop-right! waiters 1)
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
(get-internal-real-time))
|
||||
resources-last-used)))))
|
||||
|
||||
(('remove resource)
|
||||
(let ((index
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)))
|
||||
(define (remove-at-index! lst i)
|
||||
(let ((start
|
||||
end
|
||||
(split-at! lst i)))
|
||||
(append
|
||||
start
|
||||
(cdr end))))
|
||||
|
||||
(loop (if index
|
||||
(remove-at-index! resources index)
|
||||
(begin
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"resource pool error: unable to remove ~A\n"
|
||||
resource)
|
||||
resources))
|
||||
available ; resource shouldn't be in this list
|
||||
waiters
|
||||
(remove-at-index!
|
||||
resources-last-used
|
||||
index))))
|
||||
|
||||
(('stats reply)
|
||||
(let ((stats
|
||||
`((resources . ,(length resources))
|
||||
(available . ,(length available))
|
||||
(waiters . ,(length waiters))
|
||||
(checkout-failure-count . ,checkout-failure-count))))
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply stats)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation
|
||||
reply-timeout)
|
||||
(const #f)))))))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-last-used))
|
||||
|
||||
(('check-for-idle-resources)
|
||||
(let* ((resources-last-used-seconds
|
||||
(map
|
||||
(lambda (internal-time)
|
||||
(/ (- (get-internal-real-time) internal-time)
|
||||
internal-time-units-per-second))
|
||||
resources-last-used))
|
||||
(resources-to-destroy
|
||||
(filter-map
|
||||
(lambda (resource last-used-seconds)
|
||||
(if (and (member resource available)
|
||||
(> last-used-seconds idle-seconds))
|
||||
resource
|
||||
#f))
|
||||
resources
|
||||
resources-last-used-seconds)))
|
||||
|
||||
(when destructor
|
||||
(for-each
|
||||
(lambda (resource)
|
||||
(spawn-fiber-to-destroy-resource resource))
|
||||
resources-to-destroy))
|
||||
|
||||
(loop (lset-difference eq? resources resources-to-destroy)
|
||||
(lset-difference eq? available resources-to-destroy)
|
||||
waiters
|
||||
(filter-map
|
||||
(lambda (resource last-used)
|
||||
(if (memq resource resources-to-destroy)
|
||||
#f
|
||||
last-used))
|
||||
resources
|
||||
resources-last-used))))
|
||||
|
||||
(('destroy reply)
|
||||
(if (null? resources)
|
||||
(put-message reply 'destroy-success)
|
||||
|
||||
(begin
|
||||
(for-each
|
||||
(lambda (resource)
|
||||
(if destructor
|
||||
(spawn-fiber-to-destroy-resource resource)
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(put-message channel
|
||||
(list 'remove resource)))
|
||||
#:parallel? #t)))
|
||||
available)
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(sleep 0.1)
|
||||
(put-message channel
|
||||
(list 'destroy reply))))
|
||||
|
||||
(loop resources
|
||||
'()
|
||||
waiters
|
||||
resources-last-used))))
|
||||
|
||||
(unknown
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||
name
|
||||
unknown)
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-last-used)))))
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(when idle-seconds
|
||||
|
@ -209,249 +449,33 @@
|
|||
(sleep idle-seconds)
|
||||
(put-message channel '(check-for-idle-resources))))))
|
||||
|
||||
(with-throw-handler #t
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
#f)
|
||||
(lambda ()
|
||||
(let loop ((resources '())
|
||||
(available '())
|
||||
(waiters '())
|
||||
(resources-last-used '()))
|
||||
|
||||
(match (get-message channel)
|
||||
(('add-resource resource)
|
||||
(if (= (length resources) max-size)
|
||||
(begin
|
||||
(if destructor
|
||||
(begin
|
||||
(spawn-fiber-to-destroy-resource resource)
|
||||
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used)))
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))))
|
||||
|
||||
(if (null? waiters)
|
||||
(loop (cons resource resources)
|
||||
(cons resource available)
|
||||
waiters
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))
|
||||
|
||||
(begin
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a new
|
||||
;; fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout (last waiters)
|
||||
resource)
|
||||
(put-message (last waiters) resource))
|
||||
|
||||
(loop (cons resource resources)
|
||||
available
|
||||
(drop-right! waiters 1)
|
||||
(cons (get-internal-real-time)
|
||||
resources-last-used))))))
|
||||
|
||||
(('checkout reply)
|
||||
(if (null? available)
|
||||
(begin
|
||||
(unless (= (length resources) max-size)
|
||||
(spawn-fiber-to-return-new-resource))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
(cons reply waiters)
|
||||
resources-last-used))
|
||||
|
||||
(let ((resource (car available)))
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a
|
||||
;; new fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout reply resource)
|
||||
(put-message reply resource))
|
||||
|
||||
(loop resources
|
||||
(cdr available)
|
||||
waiters
|
||||
resources-last-used))))
|
||||
|
||||
(((and (or 'return
|
||||
'return-failed-checkout)
|
||||
return-type)
|
||||
resource)
|
||||
|
||||
(when (eq? 'return-failed-checkout
|
||||
return-type)
|
||||
(set! checkout-failure-count
|
||||
(+ 1 checkout-failure-count)))
|
||||
|
||||
(if (null? waiters)
|
||||
(loop resources
|
||||
(cons resource available)
|
||||
waiters
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
(get-internal-real-time))
|
||||
resources-last-used))
|
||||
|
||||
(begin
|
||||
(if reply-timeout
|
||||
;; Don't sleep in this fiber, so spawn a new
|
||||
;; fiber to handle handing over the
|
||||
;; resource, and returning it if there's a
|
||||
;; timeout
|
||||
(spawn-fiber-for-checkout (last waiters)
|
||||
resource)
|
||||
(put-message (last waiters) resource))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
(drop-right! waiters 1)
|
||||
(begin
|
||||
(list-set!
|
||||
resources-last-used
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)
|
||||
(get-internal-real-time))
|
||||
resources-last-used)))))
|
||||
|
||||
(('remove resource)
|
||||
(let ((index
|
||||
(list-index (lambda (x)
|
||||
(eq? x resource))
|
||||
resources)))
|
||||
(define (remove-at-index! lst i)
|
||||
(let ((start
|
||||
end
|
||||
(split-at! lst i)))
|
||||
(append
|
||||
start
|
||||
(cdr end))))
|
||||
|
||||
(loop (if index
|
||||
(remove-at-index! resources index)
|
||||
(begin
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"resource pool error: unable to remove ~A\n"
|
||||
resource)
|
||||
resources))
|
||||
available ; resource shouldn't be in this list
|
||||
waiters
|
||||
(remove-at-index!
|
||||
resources-last-used
|
||||
index))))
|
||||
|
||||
(('stats reply)
|
||||
(let ((stats
|
||||
`((resources . ,(length resources))
|
||||
(available . ,(length available))
|
||||
(waiters . ,(length waiters))
|
||||
(checkout-failure-count . ,checkout-failure-count))))
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation
|
||||
(put-operation reply stats)
|
||||
(const #t))
|
||||
(wrap-operation (sleep-operation
|
||||
reply-timeout)
|
||||
(const #f)))))))
|
||||
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-last-used))
|
||||
|
||||
(('check-for-idle-resources)
|
||||
(let* ((resources-last-used-seconds
|
||||
(map
|
||||
(lambda (internal-time)
|
||||
(/ (- (get-internal-real-time) internal-time)
|
||||
internal-time-units-per-second))
|
||||
resources-last-used))
|
||||
(resources-to-destroy
|
||||
(filter-map
|
||||
(lambda (resource last-used-seconds)
|
||||
(if (and (member resource available)
|
||||
(> last-used-seconds idle-seconds))
|
||||
resource
|
||||
#f))
|
||||
resources
|
||||
resources-last-used-seconds)))
|
||||
|
||||
(when destructor
|
||||
(for-each
|
||||
(lambda (resource)
|
||||
(spawn-fiber-to-destroy-resource resource))
|
||||
resources-to-destroy))
|
||||
|
||||
(loop (lset-difference eq? resources resources-to-destroy)
|
||||
(lset-difference eq? available resources-to-destroy)
|
||||
waiters
|
||||
(filter-map
|
||||
(lambda (resource last-used)
|
||||
(if (memq resource resources-to-destroy)
|
||||
#f
|
||||
last-used))
|
||||
resources
|
||||
resources-last-used))))
|
||||
|
||||
(('destroy reply)
|
||||
(if (null? resources)
|
||||
(put-message reply 'destroy-success)
|
||||
|
||||
(begin
|
||||
(for-each
|
||||
(lambda (resource)
|
||||
(if destructor
|
||||
(spawn-fiber-to-destroy-resource resource)
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(put-message channel
|
||||
(list 'remove resource)))
|
||||
#:parallel? #t)))
|
||||
available)
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(sleep 0.1)
|
||||
(put-message channel
|
||||
(list 'destroy reply))))
|
||||
|
||||
(loop resources
|
||||
'()
|
||||
waiters
|
||||
resources-last-used))))
|
||||
|
||||
(unknown
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"unrecognised message to ~A resource pool channel: ~A\n"
|
||||
name
|
||||
unknown)
|
||||
(loop resources
|
||||
available
|
||||
waiters
|
||||
resources-last-used)))))
|
||||
(lambda (key . args)
|
||||
(simple-format (current-error-port)
|
||||
"exception in the ~A pool fiber\n" name))))
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let* ((stack (make-stack #t))
|
||||
(error-string
|
||||
(call-with-output-string
|
||||
(lambda (port)
|
||||
(simple-format
|
||||
port
|
||||
"exception in the ~A pool fiber, " name)
|
||||
(print-exception
|
||||
port
|
||||
(stack-ref stack 3)
|
||||
'%exception
|
||||
(list exn))
|
||||
(display-backtrace stack port 3)))))
|
||||
(display error-string
|
||||
(current-error-port)))
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(start-stack
|
||||
#t
|
||||
(main-loop)))))
|
||||
#:unwind? #t))
|
||||
(or scheduler
|
||||
(current-scheduler)))
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue