guile-knots/knots/resource-pool.scm
Christopher Baines 5260c38b5e
All checks were successful
/ test (push) Successful in 5s
Address issue with failures when creating resource pool resources
Previously failures could lead to no resources in the pool, and
waiters which will never get a resource. Retrying here fixes that
issue, although maybe another approach is needed that keeps track of
new resources being created, as that'll allow keeping track of this
when destroying resource pools.
2026-01-12 16:26:24 +00:00

1446 lines
53 KiB
Scheme

;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (make-fixed-size-resource-pool
make-resource-pool
resource-pool?
resource-pool-name
resource-pool-channel
resource-pool-configuration
destroy-resource-pool
&resource-pool-timeout
resource-pool-timeout-error-pool
resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
&resource-pool-destroy-resource
make-resource-pool-destroy-resource-exception
resource-pool-destroy-resource-exception?
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(exception-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel
set-resource-pool-channel!)
(destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
(set-record-type-printer!
<resource-pool>
(lambda (resource-pool port)
(display/knots
(simple-format #f "#<resource-pool name: \"~A\">"
(resource-pool-name resource-pool))
port)))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define-record-type <resource-details>
(make-resource-details value checkout-count last-used)
resource-details?
(value resource-details-value)
(checkout-count resource-details-checkout-count
set-resource-details-checkout-count!)
(last-used resource-details-last-used
set-resource-details-last-used!))
(define-inlinable (increment-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1+ (resource-details-checkout-count resource))))
(define-inlinable (decrement-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1+ (resource-details-checkout-count resource))))
(define (spawn-fiber-for-checkout channel
reply-channel
reply-timeout
resource-id
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(list 'success resource-id resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource-id)))))))
(define* (make-fixed-size-resource-pool resources-list-or-vector
#:key
(delay-logger (const #f))
(duration-logger (const #f))
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(vector-map
(lambda (_ resource)
(make-resource-details
resource
0
#f))
(if (vector? resources-list-or-vector)
resources-list-or-vector
(list->vector resources-list-or-vector))))
(define (destroy-loop)
(define (empty?)
(vector-every (lambda (r)
(eq? r #f))
resources))
(let loop ()
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(vector-set! resources
resource-id
#f)
(if (empty?)
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop))
(('destroy)
(loop))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop)))))
(define (main-loop)
(let loop ((available (iota (vector-length resources)))
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop available
waiters))
(loop available
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
new-available
(car+cdr available)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id)))
(loop new-available
waiters))
(loop available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(vector-ref resources
resource-id)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time)))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id))))
(put-message reply
(list 'success
resource-id
(resource-details-value
(vector-ref resources
resource-id))))))
(loop available
waiters))))))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (vector->list resources))))
(loop available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop available
waiters))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
;; Notify all waiters that the pool has been destroyed
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(if (= (vector-length resources)
(length available))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display/knots error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size
#:key (min-size 0)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(make-hash-table))
(define-inlinable (count-resources resources)
(hash-count (const #t) resources))
(define return-new-resource/parallelism-limiter
(make-parallelism-limiter
(or add-resources-parallelism
max-size)
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(if (resource-pool-destroyed-error? exn)
#f
(raise-exception exn)))
(lambda ()
(let loop ()
(let ((success?
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (count-resources resources)))
(or (>= size max-size)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception adding resource to pool ~A: ~A\n\n"
name
return-new-resource)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(let ((new-resource
(start-stack #t (return-new-resource))))
(put-message channel
(list 'add-resource new-resource)))
#t)))
#:unwind? #t))))))
(unless success?
;; TODO Maybe this should be configurable?
(sleep 1)
;; Important to retry here and eventually create
;; a new resource, as there might be waiters
;; stuck waiting for a resource, especially if
;; the pool is empty.
(loop)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource-id resource-value)
(spawn-fiber
(lambda ()
(let loop ()
(let* ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource-value))
#t)))
#:unwind? #t)))
(if success?
(put-message channel
(list 'remove resource-id))
(begin
(sleep 5)
(loop))))))))
(define (destroy-loop resources next-resource-id)
(let loop ((next-resource-id next-resource-id))
(match (get-message channel)
(('add-resource resource)
(if destructor
(begin
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(hash-set! resources next-resource-id resource)
(loop (1+ next-resource-id)))
(loop next-resource-id)))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource-id)
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
(hash-remove! resources resource-id)
(if (= 0 (count-resources resources))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop next-resource-id)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id))
(('check-for-idle-resources)
(loop next-resource-id))
(('destroy)
(loop next-resource-id))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id)))))
(define (main-loop)
(let loop ((next-resource-id 0)
(available '())
(waiters (make-q)))
(match (get-message channel)
(('add-resource resource)
(if (= (count-resources resources) max-size)
(if destructor
(begin
(hash-set! resources
next-resource-id
(make-resource-details
resource
0
(get-internal-real-time)))
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(loop (1+ next-resource-id)
available
waiters))
(loop next-resource-id
available
waiters))
(let* ((current-internal-time
(get-internal-real-time))
(resource-details
(make-resource-details
resource
0
current-internal-time)))
(hash-set! resources
next-resource-id
resource-details)
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (1+ next-resource-id)
(cons next-resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
next-resource-id
resource))
(put-message reply (list 'success
next-resource-id
resource))))
(set-resource-details-checkout-count! resource-details
1)
(loop (1+ next-resource-id)
available
waiters)))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
(begin
(unless (= (count-resources resources) max-size)
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id
available
waiters))
(loop next-resource-id
available
(enq! waiters (cons reply timeout-time))))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let* ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
(car available))
(resource-details
(hash-ref resources resource-id)))
(increment-resource-checkout-count!
resource-details)
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
resource-id
(resource-details-value
resource-details))
(loop next-resource-id
(cdr available)
waiters))
(loop next-resource-id
available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(hash-ref resources
resource-id)))
(increment-resource-checkout-count! resource-details)
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-resource-id
next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time))
(resource-details
(hash-ref resources resource-id)))
(if (and lifetime
(>= (resource-details-checkout-count resource-details)
lifetime))
(begin
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(if (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details)
(set-resource-details-last-used!
resource-details
current-internal-time))
(loop next-resource-id
(cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value resource-details)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))))
(set-resource-details-last-used! resource-details
current-internal-time)
(when (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details))
(loop next-resource-id
available
waiters)))))))
(('remove resource-id)
(hash-remove! resources
resource-id)
(when (and (not (q-empty? waiters))
(< (- (count-resources resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop next-resource-id
available ; resource shouldn't be in this list
waiters))
(('destroy resource-id)
(let ((resource-details
(hash-ref resources
resource-id)))
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters)))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (hash-map->list
(lambda (_ value) value)
resources))))
(loop next-resource-id
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(resources-checkout-count
. ,(hash-fold
(lambda (_ resource-details result)
(cons (resource-details-checkout-count
resource-details)
result))
'()
resources))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id
available
waiters))
(('check-for-idle-resources)
(let* ((internal-real-time
(get-internal-real-time))
(candidate-resource-ids-to-destroy
(filter-map
(lambda (resource-id)
(let ((resource-details
(hash-ref resources resource-id)))
(if (> (/ (- internal-real-time
(resource-details-last-used
resource-details))
internal-time-units-per-second)
idle-seconds)
resource-id
#f)))
available))
(max-resources-to-destroy
(max 0
(- (count-resources resources)
min-size)))
(resources-to-destroy
(take candidate-resource-ids-to-destroy
(min max-resources-to-destroy
(length candidate-resource-ids-to-destroy)))))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
resources-to-destroy))
(loop next-resource-id
(lset-difference = available resources-to-destroy)
waiters)))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources
resource-id))))
available))
;; Do this in parallel to avoid deadlocks between the
;; limiter and returning new resources to this pool
(and=> return-new-resource/parallelism-limiter
(lambda (limiter)
(spawn-fiber
(lambda ()
(destroy-parallelism-limiter limiter)))))
(if (or (= 0 (count-resources resources))
(not destructor))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop resources next-resource-id))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id
available
waiters)))))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(let loop ()
(put-message channel '(check-for-idle-resources))
(when (perform-operation
(choice-operation
(wrap-operation
(sleep-operation idle-seconds)
(const #t))
(wrap-operation
(wait-operation destroy-condition)
(const #f))))
(loop))))))
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display/knots error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define (destroy-resource-pool pool)
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait
(resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(pool)))
(define resource-pool-timeout-error-pool
(exception-accessor
&resource-pool-timeout
(record-accessor &resource-pool-timeout 'pool)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(exception-predicate &resource-pool-timeout))
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(exception-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(exception-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
&exception
'()))
(define make-resource-pool-destroy-resource-exception
(record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception?
(exception-predicate &resource-pool-destroy-resource))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default)
(channel (resource-pool-channel pool))
(destroy-resource-on-exception? #f))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define timeout-or-default
(if (eq? timeout 'default)
(assq-ref (resource-pool-configuration pool)
'default-checkout-timeout)
timeout))
(define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))
max-waiters-or-default))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
(if request-success?
(let ((time-remaining
(- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop (make-channel)
start-time)
'timeout)
response))
'timeout))
'timeout)))
(let ((reply (make-channel)))
(put-message channel
(list 'checkout
reply
#f
max-waiters-or-default))
(get-message reply)))))
(match reply
('timeout
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(raise-exception
(make-resource-pool-too-many-waiters-error pool
count)))
(('resource-pool-destroyed . #f)
(raise-exception
(make-resource-pool-destroyed-error pool)))
(('success resource-id resource-value)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
;; Unwind the stack before calling put-message, as
;; this avoids inconsistent behaviour with
;; continuation barriers
(put-message
channel
(list (if (or destroy-resource-on-exception?
(resource-pool-destroy-resource-exception? exn))
'destroy
'return)
resource-id))
(raise-exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(proc resource-value))))
#:unwind? #t))
(lambda vals
(put-message channel
`(return ,resource-id))
(apply values vals)))))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(if timeout
(let* ((reply (make-channel))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
`(stats ,reply ,timeout-time))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool)))))
(let ((reply (make-channel)))
(put-message channel
`(stats ,reply #f))
(get-message reply))))
(define (resource-pool-list-resources pool)
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'list-resources reply))
(get-message reply)))