guile-knots/knots/resource-pool.scm

1666 lines
62 KiB
Scheme
Raw Normal View History

2024-11-19 18:43:43 +00:00
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
2025-01-31 12:33:50 +01:00
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
2024-11-19 18:43:43 +00:00
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (make-fixed-size-resource-pool
2024-11-19 18:43:43 +00:00
make-resource-pool
resource-pool?
2025-01-08 12:23:08 +00:00
resource-pool-name
resource-pool-channel
resource-pool-configuration
2024-11-19 18:43:43 +00:00
destroy-resource-pool
&resource-pool-timeout
resource-pool-timeout-error-pool
2024-11-19 18:43:43 +00:00
resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
&resource-pool-destroy-resource
make-resource-pool-destroy-resource-exception
resource-pool-destroy-resource-exception?
resource-pool-delay-logger
resource-pool-duration-logger
2024-11-19 18:43:43 +00:00
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(exception-predicate &resource-pool-abort-add-resource))
2024-11-19 18:43:43 +00:00
(define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration)
2024-11-19 18:43:43 +00:00
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel
set-resource-pool-channel!)
(destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'resource-pool?))
'documentation
"Return @code{#t} if OBJ is a @code{<resource-pool>}.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'resource-pool-name))
'documentation
"Return the name of the resource pool.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'resource-pool-channel))
'documentation
"Return the channel used by the resource pool.")
(set-procedure-property!
(macro-transformer (module-ref (current-module) 'resource-pool-configuration))
'documentation
"Return the configuration alist of the resource pool.")
2024-11-19 18:43:43 +00:00
(define (resource-pool-delay-logger resource-pool)
(assq-ref (resource-pool-configuration resource-pool)
'delay-logger))
(define (resource-pool-duration-logger resource-pool)
(assq-ref (resource-pool-configuration resource-pool)
'duration-logger))
(set-record-type-printer!
<resource-pool>
(lambda (resource-pool port)
(display/knots
(simple-format #f "#<resource-pool name: \"~A\">"
(resource-pool-name resource-pool))
port)))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define-record-type <resource-details>
(make-resource-details value checkout-count last-used)
resource-details?
(value resource-details-value)
(checkout-count resource-details-checkout-count
set-resource-details-checkout-count!)
(last-used resource-details-last-used
set-resource-details-last-used!))
(define-inlinable (increment-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1+ (resource-details-checkout-count resource))))
(define-inlinable (decrement-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1- (resource-details-checkout-count resource))))
(define (spawn-fiber-for-checkout channel
reply-channel
reply-timeout
resource-id
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(list 'success resource-id resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource-id)))))))
(define* (make-fixed-size-resource-pool resources-list-or-vector
#:key
(delay-logger #f)
(duration-logger #f)
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
2026-03-17 21:47:47 +00:00
"Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or
vector of pre-existing resource values.
Use @code{with-resource-from-pool} or
@code{call-with-resource-from-pool} to borrow a resource and return it
automatically when done.
Optional keyword arguments:
@table @code
@item #:name
A optional string used in log messages.
Defaults to @code{\"unnamed\"}.
@item #:default-checkout-timeout
Default checkout timeout when requesting a resource from the pool,
unset by default.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a resource. When
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
raised when a resource is requested. Defaults to @code{#f} (no limit).
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a resource to become available. Defaults to @code{#f} (no
logging).
@item #:duration-logger
Called as @code{(duration-logger seconds)} after the proc passed to
@code{call-with-resource-from-pool} completes, whether it returned
normally or raised an exception. Can be overridden per-call via the
@code{#:duration-logger} keyword argument to
@code{call-with-resource-from-pool}. Defaults to @code{#f} (no
logging).
2026-03-17 21:47:47 +00:00
@item #:scheduler
The Fibers scheduler to use for the pool's internal fiber. Defaults
to the current scheduler.
@end table"
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(vector-map
(lambda (_ resource)
(make-resource-details
resource
0
#f))
(if (vector? resources-list-or-vector)
resources-list-or-vector
(list->vector resources-list-or-vector))))
(define (destroy-loop)
(define (empty?)
(vector-every (lambda (r)
(eq? r #f))
resources))
(let loop ()
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(vector-set! resources
resource-id
#f)
(if (empty?)
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop))
(('destroy)
(loop))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop)))))
(define (main-loop)
(let loop ((available (iota (vector-length resources)))
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop available
waiters))
(loop available
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
new-available
(car+cdr available)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id)))
(loop new-available
waiters))
(loop available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(vector-ref resources
resource-id)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time)))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id))))
(put-message reply
(list 'success
resource-id
(resource-details-value
(vector-ref resources
resource-id))))))
(loop available
waiters))))))
(('list-resources reply)
(spawn-fiber
(lambda ()
2025-11-25 14:26:39 +00:00
(put-message reply (vector->list resources))))
(loop available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop available
waiters))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
;; Notify all waiters that the pool has been destroyed
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(if (= (vector-length resources)
(length available))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display/knots error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size
#:key (min-size 0)
2024-11-19 18:43:43 +00:00
(idle-seconds #f)
(delay-logger #f)
(duration-logger #f)
2024-11-19 18:43:43 +00:00
destructor
lifetime
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout
default-max-waiters)
2026-03-17 21:47:47 +00:00
"Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk
called to create each new resource value. MAX-SIZE is the maximum
number of resources the pool will hold simultaneously.
Resources are created on demand when a checkout is requested and the
pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or
@code{call-with-resource-from-pool} to request a resource and return
it automatically when done.
Optional keyword arguments:
@table @code
@item #:min-size
Minimum number of resources to keep alive even when idle. Defaults to
@code{0}.
@item #:idle-seconds
Seconds a resource may remain unused before being destroyed, provided
the pool is above @code{#:min-size}. Defaults to @code{#f} (never
expire idle resources).
@item #:lifetime
Maximum number of checkouts a single resource will serve before being
destroyed and replaced by a fresh one. Defaults to @code{#f} (no
limit).
@item #:destructor
A procedure called as @code{(destructor resource)} when a resource is
removed from the pool. Defaults to @code{#f}.
@item #:add-resources-parallelism
Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the
pool needs to grow. Allowing resources to be created in parallel can
result in more resources being created than can fit inside the pool,
if this happens, the surplus resources are destroyed. Defaults to
@code{1}.
@item #:name
A string used in log messages. Defaults to @code{\"unnamed\"}.
@item #:default-checkout-timeout
Default checkout timeout when requesting a resource from the pool,
unset by default.
@item #:default-max-waiters
Maximum number of fibers that may queue waiting for a resource. When
this limit is exceeded, @code{&resource-pool-too-many-waiters} is
raised when a resource is requested. Defaults to @code{#f} (no limit).
@item #:delay-logger
Called as @code{(delay-logger seconds)} with the time spent waiting
for a resource to become available. Defaults to @code{#f} (no
logging).
@item #:duration-logger
Called as @code{(duration-logger seconds)} after the proc passed to
@code{call-with-resource-from-pool} completes, whether it returned
normally or raised an exception. Can be overridden per-call via the
@code{#:duration-logger} keyword argument to
@code{call-with-resource-from-pool}. Defaults to @code{#f} (no
logging).
2026-03-17 21:47:47 +00:00
@item #:scheduler
The Fibers scheduler to use for the pool's internal fiber. Defaults
to the current scheduler.
@end table"
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(make-hash-table))
(define-inlinable (count-resources resources)
(hash-count (const #t) resources))
(define return-new-resource/parallelism-limiter
(make-parallelism-limiter
(or add-resources-parallelism
max-size)
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(if (resource-pool-destroyed-error? exn)
#f
(raise-exception exn)))
(lambda ()
(let loop ()
(let ((success?
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (count-resources resources)))
(or (>= size max-size)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception adding resource to pool ~A: ~A\n\n"
name
return-new-resource)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(let ((new-resource
(start-stack #t (return-new-resource))))
(put-message channel
(list 'add-resource new-resource)))
#t)))
#:unwind? #t))))))
(unless success?
;; TODO Maybe this should be configurable?
(sleep 1)
;; Important to retry here and eventually create
;; a new resource, as there might be waiters
;; stuck waiting for a resource, especially if
;; the pool is empty.
(loop)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource-id resource-value)
(spawn-fiber
(lambda ()
(let loop ()
(let* ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource-value))
#t)))
#:unwind? #t)))
2025-01-31 12:33:50 +01:00
(if success?
(put-message channel
(list 'remove resource-id))
2025-01-31 12:33:50 +01:00
(begin
(sleep 5)
2025-01-31 12:33:50 +01:00
(loop))))))))
(define (destroy-loop resources next-resource-id)
(let loop ((next-resource-id next-resource-id))
(match (get-message channel)
(('add-resource resource)
(if destructor
(begin
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(hash-set! resources next-resource-id resource)
(loop (1+ next-resource-id)))
(loop next-resource-id)))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource-id)
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
(hash-remove! resources resource-id)
(if (= 0 (count-resources resources))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop next-resource-id)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id))
(('check-for-idle-resources)
(loop next-resource-id))
(('destroy)
(loop next-resource-id))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id)))))
(define (main-loop)
(let loop ((next-resource-id 0)
(available '())
(waiters (make-q)))
(match (get-message channel)
(('add-resource resource)
(if (= (count-resources resources) max-size)
(if destructor
(begin
(hash-set! resources
next-resource-id
(make-resource-details
resource
0
(get-internal-real-time)))
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(loop (1+ next-resource-id)
available
waiters))
(loop next-resource-id
available
waiters))
(let* ((current-internal-time
(get-internal-real-time))
(resource-details
(make-resource-details
resource
0
current-internal-time)))
(hash-set! resources
next-resource-id
resource-details)
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (1+ next-resource-id)
(cons next-resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
next-resource-id
resource))
(put-message reply (list 'success
next-resource-id
resource))))
(set-resource-details-checkout-count! resource-details
1)
(loop (1+ next-resource-id)
available
waiters)))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
(begin
(unless (= (count-resources resources) max-size)
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id
available
waiters))
(loop next-resource-id
available
(enq! waiters (cons reply timeout-time))))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let* ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
(car available))
(resource-details
(hash-ref resources resource-id)))
(increment-resource-checkout-count!
resource-details)
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
resource-id
(resource-details-value
resource-details))
(loop next-resource-id
(cdr available)
waiters))
(loop next-resource-id
available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(hash-ref resources
resource-id)))
(increment-resource-checkout-count! resource-details)
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-resource-id
next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time))
(resource-details
(hash-ref resources resource-id)))
(if (and lifetime
(>= (resource-details-checkout-count resource-details)
lifetime))
(begin
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(if (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details)
(set-resource-details-last-used!
resource-details
current-internal-time))
(loop next-resource-id
(cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value resource-details)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))))
(set-resource-details-last-used! resource-details
current-internal-time)
(when (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details))
(loop next-resource-id
available
waiters)))))))
(('remove resource-id)
(hash-remove! resources
resource-id)
(when (and (not (q-empty? waiters))
(< (- (count-resources resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop next-resource-id
available ; resource shouldn't be in this list
waiters))
(('destroy resource-id)
(let ((resource-details
(hash-ref resources
resource-id)))
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters)))
(('list-resources reply)
(spawn-fiber
(lambda ()
2025-11-25 14:26:39 +00:00
(put-message reply (hash-map->list
(lambda (_ value) value)
resources))))
(loop next-resource-id
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(resources-checkout-count
. ,(hash-fold
(lambda (_ resource-details result)
(cons (resource-details-checkout-count
resource-details)
result))
'()
resources))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id
available
waiters))
(('check-for-idle-resources)
(let* ((internal-real-time
(get-internal-real-time))
(candidate-resource-ids-to-destroy
(filter-map
(lambda (resource-id)
(let ((resource-details
(hash-ref resources resource-id)))
(if (> (/ (- internal-real-time
(resource-details-last-used
resource-details))
internal-time-units-per-second)
idle-seconds)
resource-id
#f)))
available))
(max-resources-to-destroy
(max 0
(- (count-resources resources)
min-size)))
(resources-to-destroy
(take candidate-resource-ids-to-destroy
(min max-resources-to-destroy
(length candidate-resource-ids-to-destroy)))))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
resources-to-destroy))
(loop next-resource-id
(lset-difference = available resources-to-destroy)
waiters)))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources
resource-id))))
available))
;; Do this in parallel to avoid deadlocks between the
;; limiter and returning new resources to this pool
(and=> return-new-resource/parallelism-limiter
(lambda (limiter)
(spawn-fiber
(lambda ()
(destroy-parallelism-limiter limiter)))))
(if (or (= 0 (count-resources resources))
(not destructor))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop resources next-resource-id))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id
available
waiters)))))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(let loop ()
(put-message channel '(check-for-idle-resources))
(when (perform-operation
(choice-operation
(wrap-operation
(sleep-operation idle-seconds)
(const #t))
(wrap-operation
(wait-operation destroy-condition)
(const #f))))
(loop))))))
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
2025-02-03 12:16:07 +01:00
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
2025-02-03 12:16:07 +01:00
(list exn))))))
(display/knots error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
2024-11-19 18:43:43 +00:00
pool)
2024-11-19 18:43:43 +00:00
(define (destroy-resource-pool pool)
2026-03-18 08:58:41 +00:00
"Destroy POOL, preventing any new checkouts. Blocks until all
checked-out resources have been returned, running the pool's
@code{#:destructor} on each. Any fibers waiting for a resource
receive @code{&resource-pool-destroyed}."
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait
(resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
2024-11-19 18:43:43 +00:00
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(pool)))
(define resource-pool-timeout-error-pool
(exception-accessor
&resource-pool-timeout
(record-accessor &resource-pool-timeout 'pool)))
(set-procedure-property! resource-pool-timeout-error-pool 'documentation
"Return the pool from a @code{&resource-pool-timeout} exception.")
2024-11-19 18:43:43 +00:00
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(exception-predicate &resource-pool-timeout))
(set-procedure-property! resource-pool-timeout-error? 'documentation
"Return @code{#t} if OBJ is a @code{&resource-pool-timeout} exception.")
2024-11-19 18:43:43 +00:00
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(set-procedure-property! resource-pool-too-many-waiters-error-pool 'documentation
"Return the pool from a @code{&resource-pool-too-many-waiters} exception.")
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(set-procedure-property! resource-pool-too-many-waiters-error-waiters-count 'documentation
"Return the waiters count from a @code{&resource-pool-too-many-waiters} exception.")
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(exception-predicate &resource-pool-too-many-waiters))
(set-procedure-property! resource-pool-too-many-waiters-error? 'documentation
"Return @code{#t} if OBJ is a @code{&resource-pool-too-many-waiters} exception.")
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(set-procedure-property! resource-pool-destroyed-error-pool 'documentation
"Return the pool from a @code{&resource-pool-destroyed} exception.")
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(exception-predicate &resource-pool-destroyed))
(set-procedure-property! resource-pool-destroyed-error? 'documentation
"Return @code{#t} if OBJ is a @code{&resource-pool-destroyed} exception.")
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
&exception
'()))
(define make-resource-pool-destroy-resource-exception
(record-constructor &resource-pool-destroy-resource))
(set-procedure-property! make-resource-pool-destroy-resource-exception 'documentation
"Construct a @code{&resource-pool-destroy-resource} exception.")
(define resource-pool-destroy-resource-exception?
(exception-predicate &resource-pool-destroy-resource))
(set-procedure-property! resource-pool-destroy-resource-exception? 'documentation
"Return @code{#t} if OBJ is a @code{&resource-pool-destroy-resource} exception.")
2024-11-19 18:43:43 +00:00
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default)
(channel (resource-pool-channel pool))
(destroy-resource-on-exception? #f)
(delay-logger (resource-pool-delay-logger pool))
(duration-logger (resource-pool-duration-logger pool)))
2024-11-19 18:43:43 +00:00
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned.
@code{#:delay-logger} is called as @code{(delay-logger seconds)} with
the time spent waiting for a resource to become available. Defaults
to the pool's @code{#:delay-logger} if not specified.
@code{#:duration-logger} is called as @code{(duration-logger seconds)}
after PROC completes, whether it returned normally or raised an
exception. Defaults to the pool's @code{#:duration-logger} if not
specified."
2024-11-19 18:43:43 +00:00
(define timeout-or-default
(if (eq? timeout 'default)
(assq-ref (resource-pool-configuration pool)
'default-checkout-timeout)
2024-11-19 18:43:43 +00:00
timeout))
(define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(define (delay-logger/safe seconds)
(with-exception-handler
;; Ignore exceptions, since this would break returning the
;; resource
(lambda (exn) #f)
(lambda ()
(delay-logger seconds))
#:unwind? #t))
(define (duration-logger/safe seconds)
(with-exception-handler
;; Ignore exceptions, since this would break returning the
;; resource
(lambda (exn) #f)
(lambda ()
(duration-logger seconds))
#:unwind? #t))
(define checkout-start-time (get-internal-real-time))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))
max-waiters-or-default))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
(if request-success?
(let ((time-remaining
(- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop (make-channel)
start-time)
'timeout)
response))
'timeout))
'timeout)))
(let ((reply (make-channel)))
(put-message channel
(list 'checkout
reply
#f
max-waiters-or-default))
(get-message reply)))))
(match reply
('timeout
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(raise-exception
(make-resource-pool-too-many-waiters-error pool
count)))
(('resource-pool-destroyed . #f)
(raise-exception
(make-resource-pool-destroyed-error pool)))
(('success resource-id resource-value)
(when delay-logger
(delay-logger/safe
(/ (- (get-internal-real-time) checkout-start-time)
internal-time-units-per-second)))
(let ((proc-start-time (get-internal-real-time)))
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
;; Unwind the stack before calling put-message, as
;; this avoids inconsistent behaviour with
;; continuation barriers
(when duration-logger
(duration-logger/safe
(/ (- (get-internal-real-time) proc-start-time)
internal-time-units-per-second)))
(put-message
channel
(list (if (or destroy-resource-on-exception?
(resource-pool-destroy-resource-exception? exn))
'destroy
'return)
resource-id))
(raise-exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(proc resource-value))))
#:unwind? #t))
(lambda vals
(when duration-logger
(duration-logger/safe
(/ (- (get-internal-real-time) proc-start-time)
internal-time-units-per-second)))
(put-message channel
`(return ,resource-id))
(apply values vals))))))))
2024-11-19 18:43:43 +00:00
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
"Evaluate EXP ... with RESOURCE bound to a resource checked out from
POOL. Syntactic sugar around @code{call-with-resource-from-pool}."
2024-11-19 18:43:43 +00:00
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
2026-03-18 08:58:41 +00:00
"Return an alist of statistics for POOL with the following keys:
@table @code
@item resources
Total number of resources currently held by the pool.
@item available
Number of resources not currently checked out.
@item waiters
Number of fibers currently queued waiting for a resource.
@item checkout-failure-count
Cumulative number of checkouts where an exception was raised inside
the proc.
@end table
Blocks waiting for the pool fiber to respond. @code{#:timeout} is
the number of seconds to wait; defaults to @code{5}. Raises
@code{&resource-pool-timeout} if the pool does not respond in time."
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(if timeout
(let* ((reply (make-channel))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
`(stats ,reply ,timeout-time))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool)))))
(let ((reply (make-channel)))
(put-message channel
`(stats ,reply #f))
(get-message reply))))
2024-11-19 18:43:43 +00:00
(define (resource-pool-list-resources pool)
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'list-resources reply))
(get-message reply)))