All checks were successful
/ test (push) Successful in 1m4s
Previously failures could lead to no resources in the pool, and waiters which will never get a resource. Retrying here fixes that issue, although maybe another approach is needed that keeps track of new resources being created, as that'll allow keeping track of this when destroying resource pools.
1446 lines
53 KiB
Scheme
1446 lines
53 KiB
Scheme
;;; Guile Knots
|
|
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
|
|
;;;
|
|
;;; This file is part of Guile Knots.
|
|
;;;
|
|
;;; The Guile Knots is free software; you can redistribute it and/or
|
|
;;; modify it under the terms of the GNU General Public License as
|
|
;;; published by the Free Software Foundation; either version 3 of the
|
|
;;; License, or (at your option) any later version.
|
|
;;;
|
|
;;; The Guile Knots is distributed in the hope that it will be useful,
|
|
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;;; General Public License for more details.
|
|
;;;
|
|
;;; You should have received a copy of the GNU General Public License
|
|
;;; along with the guix-data-service. If not, see
|
|
;;; <http://www.gnu.org/licenses/>.
|
|
|
|
(define-module (knots resource-pool)
|
|
#:use-module (srfi srfi-1)
|
|
#:use-module (srfi srfi-9)
|
|
#:use-module (srfi srfi-9 gnu)
|
|
#:use-module (srfi srfi-43)
|
|
#:use-module (srfi srfi-71)
|
|
#:use-module (ice-9 q)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 exceptions)
|
|
#:use-module (fibers)
|
|
#:use-module (fibers timers)
|
|
#:use-module (fibers channels)
|
|
#:use-module (fibers scheduler)
|
|
#:use-module (fibers operations)
|
|
#:use-module (fibers conditions)
|
|
#:use-module (knots)
|
|
#:use-module (knots parallelism)
|
|
#:export (make-fixed-size-resource-pool
|
|
make-resource-pool
|
|
|
|
resource-pool?
|
|
resource-pool-name
|
|
resource-pool-channel
|
|
resource-pool-configuration
|
|
destroy-resource-pool
|
|
|
|
&resource-pool-timeout
|
|
resource-pool-timeout-error-pool
|
|
resource-pool-timeout-error?
|
|
|
|
&resource-pool-too-many-waiters
|
|
resource-pool-too-many-waiters-error-pool
|
|
resource-pool-too-many-waiters-error-waiters-count
|
|
resource-pool-too-many-waiters-error?
|
|
|
|
&resource-pool-destroyed
|
|
resource-pool-destroyed-error-pool
|
|
resource-pool-destroyed-error?
|
|
|
|
&resource-pool-destroy-resource
|
|
make-resource-pool-destroy-resource-exception
|
|
resource-pool-destroy-resource-exception?
|
|
|
|
resource-pool-default-timeout-handler
|
|
|
|
call-with-resource-from-pool
|
|
with-resource-from-pool
|
|
|
|
resource-pool-stats))
|
|
|
|
(define &resource-pool-abort-add-resource
|
|
(make-exception-type '&recource-pool-abort-add-resource
|
|
&error
|
|
'()))
|
|
|
|
(define make-resource-pool-abort-add-resource-error
|
|
(record-constructor &resource-pool-abort-add-resource))
|
|
|
|
(define resource-pool-abort-add-resource-error?
|
|
(exception-predicate &resource-pool-abort-add-resource))
|
|
|
|
(define-record-type <resource-pool>
|
|
(make-resource-pool-record name channel destroy-condition configuration)
|
|
resource-pool?
|
|
(name resource-pool-name)
|
|
(channel resource-pool-channel
|
|
set-resource-pool-channel!)
|
|
(destroy-condition resource-pool-destroy-condition)
|
|
(configuration resource-pool-configuration))
|
|
|
|
(set-record-type-printer!
|
|
<resource-pool>
|
|
(lambda (resource-pool port)
|
|
(display/knots
|
|
(simple-format #f "#<resource-pool name: \"~A\">"
|
|
(resource-pool-name resource-pool))
|
|
port)))
|
|
|
|
(define (safe-deq q)
|
|
(if (null? (car q))
|
|
#f
|
|
(let ((it (caar q))
|
|
(next (cdar q)))
|
|
(if (null? next)
|
|
(set-cdr! q #f))
|
|
(set-car! q next)
|
|
it)))
|
|
|
|
(define-record-type <resource-details>
|
|
(make-resource-details value checkout-count last-used)
|
|
resource-details?
|
|
(value resource-details-value)
|
|
(checkout-count resource-details-checkout-count
|
|
set-resource-details-checkout-count!)
|
|
(last-used resource-details-last-used
|
|
set-resource-details-last-used!))
|
|
|
|
(define-inlinable (increment-resource-checkout-count! resource)
|
|
(set-resource-details-checkout-count!
|
|
resource
|
|
(1+ (resource-details-checkout-count resource))))
|
|
|
|
(define-inlinable (decrement-resource-checkout-count! resource)
|
|
(set-resource-details-checkout-count!
|
|
resource
|
|
(1+ (resource-details-checkout-count resource))))
|
|
|
|
(define (spawn-fiber-for-checkout channel
|
|
reply-channel
|
|
reply-timeout
|
|
resource-id
|
|
resource)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((checkout-success?
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation reply-channel
|
|
(list 'success resource-id resource))
|
|
(const #t))
|
|
(wrap-operation (sleep-operation
|
|
reply-timeout)
|
|
(const #f))))))
|
|
(unless checkout-success?
|
|
(put-message
|
|
channel
|
|
(list 'return-failed-checkout resource-id)))))))
|
|
|
|
(define* (make-fixed-size-resource-pool resources-list-or-vector
|
|
#:key
|
|
(delay-logger (const #f))
|
|
(duration-logger (const #f))
|
|
scheduler
|
|
(name "unnamed")
|
|
default-checkout-timeout
|
|
default-max-waiters)
|
|
(define channel (make-channel))
|
|
(define destroy-condition
|
|
(make-condition))
|
|
|
|
(define pool
|
|
(make-resource-pool-record
|
|
name
|
|
channel
|
|
destroy-condition
|
|
`((delay-logger . ,delay-logger)
|
|
(duration-logger . ,duration-logger)
|
|
(scheduler . ,scheduler)
|
|
(name . ,name)
|
|
(default-checkout-timeout . ,default-checkout-timeout)
|
|
(default-max-waiters . ,default-max-waiters))))
|
|
|
|
(define checkout-failure-count 0)
|
|
|
|
(define resources
|
|
(vector-map
|
|
(lambda (_ resource)
|
|
(make-resource-details
|
|
resource
|
|
0
|
|
#f))
|
|
(if (vector? resources-list-or-vector)
|
|
resources-list-or-vector
|
|
(list->vector resources-list-or-vector))))
|
|
|
|
(define (destroy-loop)
|
|
(define (empty?)
|
|
(vector-every (lambda (r)
|
|
(eq? r #f))
|
|
resources))
|
|
|
|
(let loop ()
|
|
(match (get-message channel)
|
|
(('checkout reply timeout-time max-waiters)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'resource-pool-destroyed
|
|
#f))))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op)))))
|
|
(loop))
|
|
(((and (or 'return
|
|
'return-failed-checkout)
|
|
return-type)
|
|
resource-id)
|
|
(vector-set! resources
|
|
resource-id
|
|
#f)
|
|
|
|
(if (empty?)
|
|
(begin
|
|
(set-resource-pool-channel! pool #f)
|
|
(signal-condition! destroy-condition)
|
|
|
|
;; No loop
|
|
*unspecified*)
|
|
(loop)))
|
|
|
|
(('stats reply timeout-time)
|
|
(let ((stats
|
|
`((resources . ,(vector-length resources))
|
|
(available . 0)
|
|
(waiters . 0)
|
|
(checkout-failure-count . ,checkout-failure-count))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation reply stats)))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second)))
|
|
op))))))
|
|
|
|
(loop))
|
|
|
|
(('destroy)
|
|
(loop))
|
|
(unknown
|
|
(simple-format
|
|
(current-error-port)
|
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
|
name
|
|
unknown)
|
|
(loop)))))
|
|
|
|
(define (main-loop)
|
|
(let loop ((available (iota (vector-length resources)))
|
|
(waiters (make-q)))
|
|
|
|
(match (get-message channel)
|
|
(('checkout reply timeout-time max-waiters)
|
|
(if (null? available)
|
|
(let ((waiters-count
|
|
(q-length waiters)))
|
|
(if (and max-waiters
|
|
(>= waiters-count
|
|
max-waiters))
|
|
(begin
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'too-many-waiters
|
|
waiters-count))))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op)))))
|
|
(loop available
|
|
waiters))
|
|
(loop available
|
|
(enq! waiters (cons reply timeout-time)))))
|
|
|
|
(if timeout-time
|
|
(let ((current-internal-time
|
|
(get-internal-real-time)))
|
|
;; If this client is still waiting
|
|
(if (> timeout-time
|
|
current-internal-time)
|
|
(let ((reply-timeout
|
|
(/ (- timeout-time
|
|
current-internal-time)
|
|
internal-time-units-per-second))
|
|
(resource-id
|
|
new-available
|
|
(car+cdr available)))
|
|
|
|
;; Don't sleep in this fiber, so spawn a new
|
|
;; fiber to handle handing over the resource,
|
|
;; and returning it if there's a timeout
|
|
(spawn-fiber-for-checkout
|
|
channel
|
|
reply
|
|
reply-timeout
|
|
resource-id
|
|
(resource-details-value
|
|
(vector-ref resources
|
|
resource-id)))
|
|
(loop new-available
|
|
waiters))
|
|
(loop available
|
|
waiters)))
|
|
(let* ((resource-id
|
|
next-available
|
|
(car+cdr available))
|
|
(resource-details
|
|
(vector-ref resources
|
|
resource-id)))
|
|
(put-message reply
|
|
(list 'success
|
|
resource-id
|
|
(resource-details-value
|
|
resource-details)))
|
|
|
|
(loop next-available
|
|
waiters)))))
|
|
|
|
(((and (or 'return
|
|
'return-failed-checkout)
|
|
return-type)
|
|
resource-id)
|
|
|
|
(when (eq? 'return-failed-checkout
|
|
return-type)
|
|
(set! checkout-failure-count
|
|
(+ 1 checkout-failure-count)))
|
|
|
|
(let ((current-internal-time
|
|
(get-internal-real-time)))
|
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
|
(match waiter
|
|
(#f
|
|
(loop (cons resource-id available)
|
|
waiters))
|
|
((reply . timeout)
|
|
(if (and timeout
|
|
(< timeout current-internal-time))
|
|
(waiter-loop (safe-deq waiters))
|
|
(if timeout
|
|
(let ((reply-timeout
|
|
(/ (- timeout
|
|
current-internal-time)
|
|
internal-time-units-per-second)))
|
|
;; Don't sleep in this fiber, so spawn a
|
|
;; new fiber to handle handing over the
|
|
;; resource, and returning it if there's
|
|
;; a timeout
|
|
(spawn-fiber-for-checkout
|
|
channel
|
|
reply
|
|
reply-timeout
|
|
resource-id
|
|
(resource-details-value
|
|
(vector-ref resources
|
|
resource-id))))
|
|
(put-message reply
|
|
(list 'success
|
|
resource-id
|
|
(resource-details-value
|
|
(vector-ref resources
|
|
resource-id))))))
|
|
(loop available
|
|
waiters))))))
|
|
|
|
(('list-resources reply)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(put-message reply (vector->list resources))))
|
|
|
|
(loop available
|
|
waiters))
|
|
|
|
(('stats reply timeout-time)
|
|
(let ((stats
|
|
`((resources . ,(vector-length resources))
|
|
(available . ,(length available))
|
|
(waiters . ,(q-length waiters))
|
|
(checkout-failure-count . ,checkout-failure-count))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation reply stats)))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second)))
|
|
op))))))
|
|
|
|
(loop available
|
|
waiters))
|
|
|
|
(('destroy)
|
|
(let ((current-internal-time (get-internal-real-time)))
|
|
;; Notify all waiters that the pool has been destroyed
|
|
(for-each
|
|
(match-lambda
|
|
((reply . timeout)
|
|
(when (or (not timeout)
|
|
(> timeout current-internal-time))
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'resource-pool-destroyed
|
|
#f))))
|
|
(perform-operation
|
|
(if timeout
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op))))))))
|
|
(car waiters))
|
|
|
|
(if (= (vector-length resources)
|
|
(length available))
|
|
(begin
|
|
(set-resource-pool-channel! pool #f)
|
|
(signal-condition! destroy-condition)
|
|
|
|
;; No loop
|
|
*unspecified*)
|
|
(destroy-loop))))
|
|
|
|
(unknown
|
|
(simple-format
|
|
(current-error-port)
|
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
|
name
|
|
unknown)
|
|
(loop available
|
|
waiters)))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
#f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(let* ((stack (make-stack #t))
|
|
(error-string
|
|
(call-with-output-string
|
|
(lambda (port)
|
|
(display-backtrace stack port 3)
|
|
(simple-format
|
|
port
|
|
"exception in the ~A pool fiber, " name)
|
|
(print-exception
|
|
port
|
|
(stack-ref stack 3)
|
|
'%exception
|
|
(list exn))))))
|
|
(display/knots error-string
|
|
(current-error-port)))
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(start-stack
|
|
#t
|
|
(main-loop)))))
|
|
#:unwind? #t))
|
|
(or scheduler
|
|
(current-scheduler)))
|
|
|
|
pool)
|
|
|
|
(define* (make-resource-pool return-new-resource max-size
|
|
#:key (min-size 0)
|
|
(idle-seconds #f)
|
|
(delay-logger (const #f))
|
|
(duration-logger (const #f))
|
|
destructor
|
|
lifetime
|
|
scheduler
|
|
(name "unnamed")
|
|
(add-resources-parallelism 1)
|
|
default-checkout-timeout
|
|
default-max-waiters)
|
|
(define channel (make-channel))
|
|
(define destroy-condition
|
|
(make-condition))
|
|
|
|
(define pool
|
|
(make-resource-pool-record
|
|
name
|
|
channel
|
|
destroy-condition
|
|
`((max-size . ,max-size)
|
|
(min-size . ,min-size)
|
|
(idle-seconds . ,idle-seconds)
|
|
(delay-logger . ,delay-logger)
|
|
(duration-logger . ,duration-logger)
|
|
(destructor . ,destructor)
|
|
(lifetime . ,lifetime)
|
|
(scheduler . ,scheduler)
|
|
(name . ,name)
|
|
(default-checkout-timeout . ,default-checkout-timeout)
|
|
(default-max-waiters . ,default-max-waiters))))
|
|
|
|
(define checkout-failure-count 0)
|
|
|
|
(define resources
|
|
(make-hash-table))
|
|
|
|
(define-inlinable (count-resources resources)
|
|
(hash-count (const #t) resources))
|
|
|
|
(define return-new-resource/parallelism-limiter
|
|
(make-parallelism-limiter
|
|
(or add-resources-parallelism
|
|
max-size)
|
|
#:name
|
|
(string-append
|
|
name
|
|
" resource pool new resource parallelism limiter")))
|
|
|
|
(define (spawn-fiber-to-return-new-resource)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
;; This can happen if the resource pool is destroyed very
|
|
;; quickly
|
|
(if (resource-pool-destroyed-error? exn)
|
|
#f
|
|
(raise-exception exn)))
|
|
(lambda ()
|
|
(let loop ()
|
|
(with-parallelism-limiter
|
|
return-new-resource/parallelism-limiter
|
|
(let ((max-size
|
|
(assq-ref (resource-pool-configuration pool)
|
|
'max-size))
|
|
(size (count-resources resources)))
|
|
(unless (>= size max-size)
|
|
(let ((success?
|
|
(with-exception-handler
|
|
(lambda _ #f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception adding resource to pool ~A: ~A\n\n"
|
|
name
|
|
return-new-resource)
|
|
(print-backtrace-and-exception/knots exn)
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(let ((new-resource
|
|
(start-stack #t (return-new-resource))))
|
|
(put-message channel
|
|
(list 'add-resource new-resource)))
|
|
#t)))
|
|
#:unwind? #t)))
|
|
(unless success?
|
|
;; TODO Maybe this should be configurable?
|
|
(sleep 1)
|
|
|
|
;; Important to retry here and eventually create
|
|
;; a new resource, as there might be waiters
|
|
;; stuck waiting for a resource, especially if
|
|
;; the pool is empty.
|
|
(loop))))))))
|
|
#:unwind? #t))))
|
|
|
|
(define (spawn-fiber-to-destroy-resource resource-id resource-value)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let loop ()
|
|
(let* ((success?
|
|
(with-exception-handler
|
|
(lambda _ #f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception running resource pool destructor (~A): ~A\n"
|
|
name
|
|
destructor)
|
|
(print-backtrace-and-exception/knots exn)
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(start-stack #t (destructor resource-value))
|
|
#t)))
|
|
#:unwind? #t)))
|
|
|
|
(if success?
|
|
(put-message channel
|
|
(list 'remove resource-id))
|
|
(begin
|
|
(sleep 5)
|
|
|
|
(loop))))))))
|
|
|
|
(define (destroy-loop resources next-resource-id)
|
|
(let loop ((next-resource-id next-resource-id))
|
|
(match (get-message channel)
|
|
(('add-resource resource)
|
|
(if destructor
|
|
(begin
|
|
(spawn-fiber-to-destroy-resource next-resource-id
|
|
resource)
|
|
(hash-set! resources next-resource-id resource)
|
|
|
|
(loop (1+ next-resource-id)))
|
|
(loop next-resource-id)))
|
|
|
|
(('checkout reply timeout-time max-waiters)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'resource-pool-destroyed
|
|
#f))))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op)))))
|
|
(loop next-resource-id))
|
|
(((and (or 'return
|
|
'return-failed-checkout
|
|
'remove)
|
|
return-type)
|
|
resource-id)
|
|
(when (and (not (eq? return-type 'remove))
|
|
destructor)
|
|
(spawn-fiber-to-destroy-resource
|
|
resource-id
|
|
(resource-details-value
|
|
(hash-ref resources resource-id))))
|
|
|
|
(hash-remove! resources resource-id)
|
|
|
|
(if (= 0 (count-resources resources))
|
|
(begin
|
|
(set-resource-pool-channel! pool #f)
|
|
(signal-condition! destroy-condition)
|
|
|
|
;; No loop
|
|
*unspecified*)
|
|
(loop next-resource-id)))
|
|
(('stats reply timeout-time)
|
|
(let ((stats
|
|
`((resources . ,(count-resources resources))
|
|
(available . 0)
|
|
(waiters . 0)
|
|
(checkout-failure-count . ,checkout-failure-count))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation reply stats)))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second)))
|
|
op))))))
|
|
|
|
(loop next-resource-id))
|
|
|
|
(('check-for-idle-resources)
|
|
(loop next-resource-id))
|
|
|
|
(('destroy)
|
|
(loop next-resource-id))
|
|
(unknown
|
|
(simple-format
|
|
(current-error-port)
|
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
|
name
|
|
unknown)
|
|
(loop next-resource-id)))))
|
|
|
|
(define (main-loop)
|
|
(let loop ((next-resource-id 0)
|
|
(available '())
|
|
(waiters (make-q)))
|
|
|
|
(match (get-message channel)
|
|
(('add-resource resource)
|
|
(if (= (count-resources resources) max-size)
|
|
(if destructor
|
|
(begin
|
|
(hash-set! resources
|
|
next-resource-id
|
|
(make-resource-details
|
|
resource
|
|
0
|
|
(get-internal-real-time)))
|
|
(spawn-fiber-to-destroy-resource next-resource-id
|
|
resource)
|
|
|
|
(loop (1+ next-resource-id)
|
|
available
|
|
waiters))
|
|
(loop next-resource-id
|
|
available
|
|
waiters))
|
|
|
|
(let* ((current-internal-time
|
|
(get-internal-real-time))
|
|
(resource-details
|
|
(make-resource-details
|
|
resource
|
|
0
|
|
current-internal-time)))
|
|
(hash-set! resources
|
|
next-resource-id
|
|
resource-details)
|
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
|
(match waiter
|
|
(#f
|
|
(loop (1+ next-resource-id)
|
|
(cons next-resource-id available)
|
|
waiters))
|
|
((reply . timeout)
|
|
(if (and timeout
|
|
(< timeout current-internal-time))
|
|
(waiter-loop (safe-deq waiters))
|
|
(if timeout
|
|
(let ((reply-timeout
|
|
(/ (- timeout
|
|
current-internal-time)
|
|
internal-time-units-per-second)))
|
|
;; Don't sleep in this fiber, so spawn a
|
|
;; new fiber to handle handing over the
|
|
;; resource, and returning it if there's
|
|
;; a timeout
|
|
(spawn-fiber-for-checkout channel
|
|
reply
|
|
reply-timeout
|
|
next-resource-id
|
|
resource))
|
|
(put-message reply (list 'success
|
|
next-resource-id
|
|
resource))))
|
|
(set-resource-details-checkout-count! resource-details
|
|
1)
|
|
(loop (1+ next-resource-id)
|
|
available
|
|
waiters)))))))
|
|
|
|
(('checkout reply timeout-time max-waiters)
|
|
(if (null? available)
|
|
(begin
|
|
(unless (= (count-resources resources) max-size)
|
|
(spawn-fiber-to-return-new-resource))
|
|
|
|
(let ((waiters-count
|
|
(q-length waiters)))
|
|
(if (and max-waiters
|
|
(>= waiters-count
|
|
max-waiters))
|
|
(begin
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'too-many-waiters
|
|
waiters-count))))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op)))))
|
|
(loop next-resource-id
|
|
available
|
|
waiters))
|
|
(loop next-resource-id
|
|
available
|
|
(enq! waiters (cons reply timeout-time))))))
|
|
|
|
(if timeout-time
|
|
(let ((current-internal-time
|
|
(get-internal-real-time)))
|
|
;; If this client is still waiting
|
|
(if (> timeout-time
|
|
current-internal-time)
|
|
(let* ((reply-timeout
|
|
(/ (- timeout-time
|
|
current-internal-time)
|
|
internal-time-units-per-second))
|
|
(resource-id
|
|
(car available))
|
|
(resource-details
|
|
(hash-ref resources resource-id)))
|
|
|
|
(increment-resource-checkout-count!
|
|
resource-details)
|
|
|
|
;; Don't sleep in this fiber, so spawn a new
|
|
;; fiber to handle handing over the resource,
|
|
;; and returning it if there's a timeout
|
|
(spawn-fiber-for-checkout channel
|
|
reply
|
|
reply-timeout
|
|
resource-id
|
|
(resource-details-value
|
|
resource-details))
|
|
(loop next-resource-id
|
|
(cdr available)
|
|
waiters))
|
|
(loop next-resource-id
|
|
available
|
|
waiters)))
|
|
(let* ((resource-id
|
|
next-available
|
|
(car+cdr available))
|
|
(resource-details
|
|
(hash-ref resources
|
|
resource-id)))
|
|
(increment-resource-checkout-count! resource-details)
|
|
|
|
(put-message reply
|
|
(list 'success
|
|
resource-id
|
|
(resource-details-value
|
|
resource-details)))
|
|
|
|
(loop next-resource-id
|
|
next-available
|
|
waiters)))))
|
|
|
|
(((and (or 'return
|
|
'return-failed-checkout)
|
|
return-type)
|
|
resource-id)
|
|
|
|
(when (eq? 'return-failed-checkout
|
|
return-type)
|
|
(set! checkout-failure-count
|
|
(+ 1 checkout-failure-count)))
|
|
|
|
(let ((current-internal-time
|
|
(get-internal-real-time))
|
|
(resource-details
|
|
(hash-ref resources resource-id)))
|
|
(if (and lifetime
|
|
(>= (resource-details-checkout-count resource-details)
|
|
lifetime))
|
|
(begin
|
|
(spawn-fiber-to-destroy-resource resource-id
|
|
(resource-details-value
|
|
resource-details))
|
|
(loop next-resource-id
|
|
available
|
|
waiters))
|
|
(let waiter-loop ((waiter (safe-deq waiters)))
|
|
(match waiter
|
|
(#f
|
|
(if (eq? 'return-failed-checkout
|
|
return-type)
|
|
(decrement-resource-checkout-count! resource-details)
|
|
(set-resource-details-last-used!
|
|
resource-details
|
|
current-internal-time))
|
|
|
|
(loop next-resource-id
|
|
(cons resource-id available)
|
|
waiters))
|
|
((reply . timeout)
|
|
(if (and timeout
|
|
(< timeout current-internal-time))
|
|
(waiter-loop (safe-deq waiters))
|
|
(if timeout
|
|
(let ((reply-timeout
|
|
(/ (- timeout
|
|
current-internal-time)
|
|
internal-time-units-per-second)))
|
|
;; Don't sleep in this fiber, so spawn a
|
|
;; new fiber to handle handing over the
|
|
;; resource, and returning it if there's
|
|
;; a timeout
|
|
(spawn-fiber-for-checkout
|
|
channel
|
|
reply
|
|
reply-timeout
|
|
resource-id
|
|
(resource-details-value resource-details)))
|
|
(put-message reply
|
|
(list 'success
|
|
resource-id
|
|
(resource-details-value
|
|
resource-details)))))
|
|
|
|
(set-resource-details-last-used! resource-details
|
|
current-internal-time)
|
|
(when (eq? 'return-failed-checkout
|
|
return-type)
|
|
(decrement-resource-checkout-count! resource-details))
|
|
|
|
(loop next-resource-id
|
|
available
|
|
waiters)))))))
|
|
|
|
(('remove resource-id)
|
|
(hash-remove! resources
|
|
resource-id)
|
|
|
|
(when (and (not (q-empty? waiters))
|
|
(< (- (count-resources resources) 1)
|
|
max-size))
|
|
(spawn-fiber-to-return-new-resource))
|
|
|
|
(loop next-resource-id
|
|
available ; resource shouldn't be in this list
|
|
waiters))
|
|
|
|
(('destroy resource-id)
|
|
(let ((resource-details
|
|
(hash-ref resources
|
|
resource-id)))
|
|
(spawn-fiber-to-destroy-resource resource-id
|
|
(resource-details-value
|
|
resource-details))
|
|
|
|
(loop next-resource-id
|
|
available
|
|
waiters)))
|
|
|
|
(('list-resources reply)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(put-message reply (hash-map->list
|
|
(lambda (_ value) value)
|
|
resources))))
|
|
|
|
(loop next-resource-id
|
|
available
|
|
waiters))
|
|
|
|
(('stats reply timeout-time)
|
|
(let ((stats
|
|
`((resources . ,(count-resources resources))
|
|
(available . ,(length available))
|
|
(waiters . ,(q-length waiters))
|
|
(resources-checkout-count
|
|
. ,(hash-fold
|
|
(lambda (_ resource-details result)
|
|
(cons (resource-details-checkout-count
|
|
resource-details)
|
|
result))
|
|
'()
|
|
resources))
|
|
(checkout-failure-count . ,checkout-failure-count))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation reply stats)))
|
|
(perform-operation
|
|
(if timeout-time
|
|
(choice-operation
|
|
op
|
|
(sleep-operation
|
|
(/ (- timeout-time
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second)))
|
|
op))))))
|
|
|
|
(loop next-resource-id
|
|
available
|
|
waiters))
|
|
|
|
(('check-for-idle-resources)
|
|
(let* ((internal-real-time
|
|
(get-internal-real-time))
|
|
(candidate-resource-ids-to-destroy
|
|
(filter-map
|
|
(lambda (resource-id)
|
|
(let ((resource-details
|
|
(hash-ref resources resource-id)))
|
|
(if (> (/ (- internal-real-time
|
|
(resource-details-last-used
|
|
resource-details))
|
|
internal-time-units-per-second)
|
|
idle-seconds)
|
|
resource-id
|
|
#f)))
|
|
available))
|
|
(max-resources-to-destroy
|
|
(max 0
|
|
(- (count-resources resources)
|
|
min-size)))
|
|
(resources-to-destroy
|
|
(take candidate-resource-ids-to-destroy
|
|
(min max-resources-to-destroy
|
|
(length candidate-resource-ids-to-destroy)))))
|
|
(when destructor
|
|
(for-each
|
|
(lambda (resource-id)
|
|
(spawn-fiber-to-destroy-resource
|
|
resource-id
|
|
(resource-details-value
|
|
(hash-ref resources resource-id))))
|
|
resources-to-destroy))
|
|
|
|
(loop next-resource-id
|
|
(lset-difference = available resources-to-destroy)
|
|
waiters)))
|
|
|
|
(('destroy)
|
|
(let ((current-internal-time (get-internal-real-time)))
|
|
(for-each
|
|
(match-lambda
|
|
((reply . timeout)
|
|
(when (or (not timeout)
|
|
(> timeout current-internal-time))
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((op
|
|
(put-operation
|
|
reply
|
|
(cons 'resource-pool-destroyed
|
|
#f))))
|
|
(perform-operation
|
|
(if timeout
|
|
(choice-operation
|
|
op
|
|
(wrap-operation
|
|
(sleep-operation
|
|
(/ (- timeout
|
|
(get-internal-real-time))
|
|
internal-time-units-per-second))
|
|
(const #f)))
|
|
op))))))))
|
|
(car waiters))
|
|
|
|
(when destructor
|
|
(for-each
|
|
(lambda (resource-id)
|
|
(spawn-fiber-to-destroy-resource
|
|
resource-id
|
|
(resource-details-value
|
|
(hash-ref resources
|
|
resource-id))))
|
|
available))
|
|
|
|
;; Do this in parallel to avoid deadlocks between the
|
|
;; limiter and returning new resources to this pool
|
|
(and=> return-new-resource/parallelism-limiter
|
|
(lambda (limiter)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(destroy-parallelism-limiter limiter)))))
|
|
|
|
(if (or (= 0 (count-resources resources))
|
|
(not destructor))
|
|
(begin
|
|
(set-resource-pool-channel! pool #f)
|
|
(signal-condition! destroy-condition)
|
|
|
|
;; No loop
|
|
*unspecified*)
|
|
(destroy-loop resources next-resource-id))))
|
|
|
|
(unknown
|
|
(simple-format
|
|
(current-error-port)
|
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
|
name
|
|
unknown)
|
|
(loop next-resource-id
|
|
available
|
|
waiters)))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(when idle-seconds
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let loop ()
|
|
(put-message channel '(check-for-idle-resources))
|
|
(when (perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(sleep-operation idle-seconds)
|
|
(const #t))
|
|
(wrap-operation
|
|
(wait-operation destroy-condition)
|
|
(const #f))))
|
|
(loop))))))
|
|
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
#f)
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(let* ((stack (make-stack #t))
|
|
(error-string
|
|
(call-with-output-string
|
|
(lambda (port)
|
|
(display-backtrace stack port 3)
|
|
(simple-format
|
|
port
|
|
"exception in the ~A pool fiber, " name)
|
|
(print-exception
|
|
port
|
|
(stack-ref stack 3)
|
|
'%exception
|
|
(list exn))))))
|
|
(display/knots error-string
|
|
(current-error-port)))
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(start-stack
|
|
#t
|
|
(main-loop)))))
|
|
#:unwind? #t))
|
|
(or scheduler
|
|
(current-scheduler)))
|
|
|
|
pool)
|
|
|
|
(define (destroy-resource-pool pool)
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation (resource-pool-channel pool)
|
|
(list 'destroy))
|
|
(lambda _
|
|
(wait
|
|
(resource-pool-destroy-condition pool))))
|
|
(wait-operation
|
|
(resource-pool-destroy-condition pool))))
|
|
#t)
|
|
|
|
(define &resource-pool-timeout
|
|
(make-exception-type '&recource-pool-timeout
|
|
&error
|
|
'(pool)))
|
|
|
|
(define resource-pool-timeout-error-pool
|
|
(exception-accessor
|
|
&resource-pool-timeout
|
|
(record-accessor &resource-pool-timeout 'pool)))
|
|
|
|
(define make-resource-pool-timeout-error
|
|
(record-constructor &resource-pool-timeout))
|
|
|
|
(define resource-pool-timeout-error?
|
|
(exception-predicate &resource-pool-timeout))
|
|
|
|
(define &resource-pool-too-many-waiters
|
|
(make-exception-type '&recource-pool-too-many-waiters
|
|
&error
|
|
'(pool waiters-count)))
|
|
|
|
(define resource-pool-too-many-waiters-error-pool
|
|
(exception-accessor
|
|
&resource-pool-too-many-waiters
|
|
(record-accessor &resource-pool-too-many-waiters 'pool)))
|
|
|
|
(define resource-pool-too-many-waiters-error-waiters-count
|
|
(exception-accessor
|
|
&resource-pool-too-many-waiters
|
|
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
|
|
|
|
(define make-resource-pool-too-many-waiters-error
|
|
(record-constructor &resource-pool-too-many-waiters))
|
|
|
|
(define resource-pool-too-many-waiters-error?
|
|
(exception-predicate &resource-pool-too-many-waiters))
|
|
|
|
(define &resource-pool-destroyed
|
|
(make-exception-type '&recource-pool-destroyed
|
|
&error
|
|
'(pool)))
|
|
|
|
(define resource-pool-destroyed-error-pool
|
|
(exception-accessor
|
|
&resource-pool-destroyed
|
|
(record-accessor &resource-pool-destroyed 'pool)))
|
|
|
|
(define make-resource-pool-destroyed-error
|
|
(record-constructor &resource-pool-destroyed))
|
|
|
|
(define resource-pool-destroyed-error?
|
|
(exception-predicate &resource-pool-destroyed))
|
|
|
|
(define &resource-pool-destroy-resource
|
|
(make-exception-type '&recource-pool-destroy-resource
|
|
&exception
|
|
'()))
|
|
|
|
(define make-resource-pool-destroy-resource-exception
|
|
(record-constructor &resource-pool-destroy-resource))
|
|
|
|
(define resource-pool-destroy-resource-exception?
|
|
(exception-predicate &resource-pool-destroy-resource))
|
|
|
|
(define resource-pool-default-timeout-handler
|
|
(make-parameter #f))
|
|
|
|
(define* (call-with-resource-from-pool
|
|
pool proc #:key (timeout 'default)
|
|
(timeout-handler (resource-pool-default-timeout-handler))
|
|
(max-waiters 'default)
|
|
(channel (resource-pool-channel pool))
|
|
(destroy-resource-on-exception? #f))
|
|
"Call PROC with a resource from POOL, blocking until a resource becomes
|
|
available. Return the resource once PROC has returned."
|
|
|
|
(define timeout-or-default
|
|
(if (eq? timeout 'default)
|
|
(assq-ref (resource-pool-configuration pool)
|
|
'default-checkout-timeout)
|
|
timeout))
|
|
|
|
(define max-waiters-or-default
|
|
(if (eq? max-waiters 'default)
|
|
(assq-ref (resource-pool-configuration pool)
|
|
'default-max-waiters)
|
|
max-waiters))
|
|
|
|
(unless channel
|
|
(raise-exception
|
|
(make-resource-pool-destroyed-error pool)))
|
|
|
|
(let ((reply
|
|
(if timeout-or-default
|
|
(let loop ((reply (make-channel))
|
|
(start-time (get-internal-real-time)))
|
|
(let ((request-success?
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation channel
|
|
(list 'checkout
|
|
reply
|
|
(+ start-time
|
|
(* timeout-or-default
|
|
internal-time-units-per-second))
|
|
max-waiters-or-default))
|
|
(const #t))
|
|
(wrap-operation (sleep-operation timeout-or-default)
|
|
(const #f))))))
|
|
(if request-success?
|
|
(let ((time-remaining
|
|
(- timeout-or-default
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))))
|
|
(if (> time-remaining 0)
|
|
(let ((response
|
|
(perform-operation
|
|
(choice-operation
|
|
(get-operation reply)
|
|
(wrap-operation (sleep-operation time-remaining)
|
|
(const #f))))))
|
|
(if (or (not response)
|
|
(eq? response 'resource-pool-retry-checkout))
|
|
(if (> (- timeout-or-default
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))
|
|
0)
|
|
(loop (make-channel)
|
|
start-time)
|
|
'timeout)
|
|
response))
|
|
'timeout))
|
|
'timeout)))
|
|
(let ((reply (make-channel)))
|
|
(put-message channel
|
|
(list 'checkout
|
|
reply
|
|
#f
|
|
max-waiters-or-default))
|
|
(get-message reply)))))
|
|
|
|
(match reply
|
|
('timeout
|
|
(when timeout-handler
|
|
(timeout-handler pool proc timeout))
|
|
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool)))
|
|
(('too-many-waiters . count)
|
|
|
|
(raise-exception
|
|
(make-resource-pool-too-many-waiters-error pool
|
|
count)))
|
|
(('resource-pool-destroyed . #f)
|
|
(raise-exception
|
|
(make-resource-pool-destroyed-error pool)))
|
|
(('success resource-id resource-value)
|
|
(call-with-values
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
;; Unwind the stack before calling put-message, as
|
|
;; this avoids inconsistent behaviour with
|
|
;; continuation barriers
|
|
(put-message
|
|
channel
|
|
(list (if (or destroy-resource-on-exception?
|
|
(resource-pool-destroy-resource-exception? exn))
|
|
'destroy
|
|
'return)
|
|
resource-id))
|
|
(raise-exception exn))
|
|
(lambda ()
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(let ((stack
|
|
(match (fluid-ref %stacks)
|
|
((stack-tag . prompt-tag)
|
|
(make-stack #t
|
|
0 prompt-tag
|
|
0 (and prompt-tag 1)))
|
|
(_
|
|
(make-stack #t)))))
|
|
(raise-exception
|
|
(make-exception
|
|
exn
|
|
(make-knots-exception stack)))))
|
|
(lambda ()
|
|
(proc resource-value))))
|
|
#:unwind? #t))
|
|
(lambda vals
|
|
(put-message channel
|
|
`(return ,resource-id))
|
|
(apply values vals)))))))
|
|
|
|
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
|
|
(call-with-resource-from-pool
|
|
pool
|
|
(lambda (resource) exp ...)))
|
|
|
|
(define* (resource-pool-stats pool #:key (timeout 5))
|
|
(define channel
|
|
(resource-pool-channel pool))
|
|
|
|
(unless channel
|
|
(raise-exception
|
|
(make-resource-pool-destroyed-error pool)))
|
|
|
|
(if timeout
|
|
(let* ((reply (make-channel))
|
|
(start-time (get-internal-real-time))
|
|
(timeout-time
|
|
(+ start-time
|
|
(* internal-time-units-per-second timeout))))
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation channel
|
|
`(stats ,reply ,timeout-time))
|
|
(const #t))
|
|
(wrap-operation (sleep-operation timeout)
|
|
(lambda _
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool))))))
|
|
|
|
(let ((time-remaining
|
|
(- timeout
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))))
|
|
(if (> time-remaining 0)
|
|
(perform-operation
|
|
(choice-operation
|
|
(get-operation reply)
|
|
(wrap-operation (sleep-operation time-remaining)
|
|
(lambda _
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool))))))
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool)))))
|
|
(let ((reply (make-channel)))
|
|
(put-message channel
|
|
`(stats ,reply #f))
|
|
(get-message reply))))
|
|
|
|
(define (resource-pool-list-resources pool)
|
|
(define channel
|
|
(resource-pool-channel pool))
|
|
|
|
(unless channel
|
|
(raise-exception
|
|
(make-resource-pool-destroyed-error pool)))
|
|
|
|
(let ((reply (make-channel)))
|
|
(put-message (resource-pool-channel pool)
|
|
(list 'list-resources reply))
|
|
(get-message reply)))
|