guile-knots/knots/resource-pool.scm

1435 lines
52 KiB
Scheme
Raw Permalink Normal View History

2024-11-19 18:43:43 +00:00
;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
2025-01-31 12:33:50 +01:00
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
2024-11-19 18:43:43 +00:00
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (fibers conditions)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (make-fixed-size-resource-pool
2024-11-19 18:43:43 +00:00
make-resource-pool
resource-pool?
2025-01-08 12:23:08 +00:00
resource-pool-name
resource-pool-channel
resource-pool-configuration
2024-11-19 18:43:43 +00:00
destroy-resource-pool
&resource-pool-timeout
resource-pool-timeout-error-pool
2024-11-19 18:43:43 +00:00
resource-pool-timeout-error?
&resource-pool-too-many-waiters
resource-pool-too-many-waiters-error-pool
resource-pool-too-many-waiters-error-waiters-count
resource-pool-too-many-waiters-error?
&resource-pool-destroyed
resource-pool-destroyed-error-pool
resource-pool-destroyed-error?
&resource-pool-destroy-resource
make-resource-pool-destroy-resource-exception
resource-pool-destroy-resource-exception?
2024-11-19 18:43:43 +00:00
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(exception-predicate &resource-pool-abort-add-resource))
2024-11-19 18:43:43 +00:00
(define-record-type <resource-pool>
(make-resource-pool-record name channel destroy-condition configuration)
2024-11-19 18:43:43 +00:00
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel
set-resource-pool-channel!)
(destroy-condition resource-pool-destroy-condition)
(configuration resource-pool-configuration))
2024-11-19 18:43:43 +00:00
(set-record-type-printer!
<resource-pool>
(lambda (resource-pool port)
(display
(simple-format #f "#<resource-pool name: \"~A\">"
(resource-pool-name resource-pool))
port)))
(define (safe-deq q)
(if (null? (car q))
#f
(let ((it (caar q))
(next (cdar q)))
(if (null? next)
(set-cdr! q #f))
(set-car! q next)
it)))
(define-record-type <resource-details>
(make-resource-details value checkout-count last-used)
resource-details?
(value resource-details-value)
(checkout-count resource-details-checkout-count
set-resource-details-checkout-count!)
(last-used resource-details-last-used
set-resource-details-last-used!))
(define-inlinable (increment-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1+ (resource-details-checkout-count resource))))
(define-inlinable (decrement-resource-checkout-count! resource)
(set-resource-details-checkout-count!
resource
(1+ (resource-details-checkout-count resource))))
(define (spawn-fiber-for-checkout channel
reply-channel
reply-timeout
resource-id
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(list 'success resource-id resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource-id)))))))
(define* (make-fixed-size-resource-pool resources-list-or-vector
#:key
(delay-logger (const #f))
(duration-logger (const #f))
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(vector-map
(lambda (_ resource)
(make-resource-details
resource
0
#f))
(if (vector? resources-list-or-vector)
resources-list-or-vector
(list->vector resources-list-or-vector))))
(define (destroy-loop)
(define (empty?)
(vector-every (lambda (r)
(eq? r #f))
resources))
(let loop ()
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(vector-set! resources
resource-id
#f)
(if (empty?)
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop))
(('destroy)
(loop))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop)))))
(define (main-loop)
(let loop ((available (iota (vector-length resources)))
(waiters (make-q)))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop available
waiters))
(loop available
(enq! waiters (cons reply timeout-time)))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
new-available
(car+cdr available)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id)))
(loop new-available
waiters))
(loop available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(vector-ref resources
resource-id)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time)))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value
(vector-ref resources
resource-id))))
(put-message reply
(list 'success
resource-id
(resource-details-value
(vector-ref resources
resource-id))))))
(loop available
waiters))))))
(('list-resources reply)
(spawn-fiber
(lambda ()
2025-11-25 14:26:39 +00:00
(put-message reply (vector->list resources))))
(loop available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(vector-length resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop available
waiters))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
;; Notify all waiters that the pool has been destroyed
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(if (= (vector-length resources)
(length available))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size
#:key (min-size 0)
2024-11-19 18:43:43 +00:00
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define resources
(make-hash-table))
(define-inlinable (count-resources resources)
(hash-count (const #t) resources))
(define return-new-resource/parallelism-limiter
(make-parallelism-limiter
(or add-resources-parallelism
max-size)
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(if (resource-pool-destroyed-error? exn)
#f
(raise-exception exn)))
(lambda ()
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (count-resources resources)))
(unless (>= size max-size)
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception adding resource to pool ~A: ~A\n\n"
name
return-new-resource)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(let ((new-resource
(start-stack #t (return-new-resource))))
(put-message channel
(list 'add-resource new-resource))))))
#:unwind? #t)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource-id resource-value)
(spawn-fiber
(lambda ()
(let loop ()
(let* ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource-value))
#t)))
#:unwind? #t)))
2025-01-31 12:33:50 +01:00
(if success?
(put-message channel
(list 'remove resource-id))
2025-01-31 12:33:50 +01:00
(begin
(sleep 5)
2025-01-31 12:33:50 +01:00
(loop))))))))
(define (destroy-loop resources next-resource-id)
(let loop ((next-resource-id next-resource-id))
(match (get-message channel)
(('add-resource resource)
(if destructor
(begin
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(hash-set! resources next-resource-id resource)
(loop (1+ next-resource-id)))
(loop next-resource-id)))
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource-id)
(when (and (not (eq? return-type 'remove))
destructor)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
(hash-remove! resources resource-id)
(if (= 0 (count-resources resources))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop next-resource-id)))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id))
(('check-for-idle-resources)
(loop next-resource-id))
(('destroy)
(loop next-resource-id))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id)))))
(define (main-loop)
(let loop ((next-resource-id 0)
(available '())
(waiters (make-q)))
(match (get-message channel)
(('add-resource resource)
(if (= (count-resources resources) max-size)
(if destructor
(begin
(hash-set! resources
next-resource-id
(make-resource-details
resource
0
(get-internal-real-time)))
(spawn-fiber-to-destroy-resource next-resource-id
resource)
(loop (1+ next-resource-id)
available
waiters))
(loop next-resource-id
available
waiters))
(let* ((current-internal-time
(get-internal-real-time))
(resource-details
(make-resource-details
resource
0
current-internal-time)))
(hash-set! resources
next-resource-id
resource-details)
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(loop (1+ next-resource-id)
(cons next-resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
next-resource-id
resource))
(put-message reply (list 'success
next-resource-id
resource))))
(set-resource-details-checkout-count! resource-details
1)
(loop (1+ next-resource-id)
available
waiters)))))))
(('checkout reply timeout-time max-waiters)
(if (null? available)
(begin
(unless (= (count-resources resources) max-size)
(spawn-fiber-to-return-new-resource))
(let ((waiters-count
(q-length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop next-resource-id
available
waiters))
(loop next-resource-id
available
(enq! waiters (cons reply timeout-time))))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let* ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second))
(resource-id
(car available))
(resource-details
(hash-ref resources resource-id)))
(increment-resource-checkout-count!
resource-details)
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout channel
reply
reply-timeout
resource-id
(resource-details-value
resource-details))
(loop next-resource-id
(cdr available)
waiters))
(loop next-resource-id
available
waiters)))
(let* ((resource-id
next-available
(car+cdr available))
(resource-details
(hash-ref resources
resource-id)))
(increment-resource-checkout-count! resource-details)
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))
(loop next-resource-id
next-available
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource-id)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(let ((current-internal-time
(get-internal-real-time))
(resource-details
(hash-ref resources resource-id)))
(if (and lifetime
(>= (resource-details-checkout-count resource-details)
lifetime))
(begin
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters))
(let waiter-loop ((waiter (safe-deq waiters)))
(match waiter
(#f
(if (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details)
(set-resource-details-last-used!
resource-details
current-internal-time))
(loop next-resource-id
(cons resource-id available)
waiters))
((reply . timeout)
(if (and timeout
(< timeout current-internal-time))
(waiter-loop (safe-deq waiters))
(if timeout
(let ((reply-timeout
(/ (- timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's
;; a timeout
(spawn-fiber-for-checkout
channel
reply
reply-timeout
resource-id
(resource-details-value resource-details)))
(put-message reply
(list 'success
resource-id
(resource-details-value
resource-details)))))
(set-resource-details-last-used! resource-details
current-internal-time)
(when (eq? 'return-failed-checkout
return-type)
(decrement-resource-checkout-count! resource-details))
(loop next-resource-id
available
waiters)))))))
(('remove resource-id)
(hash-remove! resources
resource-id)
(when (and (not (q-empty? waiters))
(< (- (count-resources resources) 1)
max-size))
(spawn-fiber-to-return-new-resource))
(loop next-resource-id
available ; resource shouldn't be in this list
waiters))
(('destroy resource-id)
(let ((resource-details
(hash-ref resources
resource-id)))
(spawn-fiber-to-destroy-resource resource-id
(resource-details-value
resource-details))
(loop next-resource-id
available
waiters)))
(('list-resources reply)
(spawn-fiber
(lambda ()
2025-11-25 14:26:39 +00:00
(put-message reply (hash-map->list
(lambda (_ value) value)
resources))))
(loop next-resource-id
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(count-resources resources))
(available . ,(length available))
(waiters . ,(q-length waiters))
(resources-checkout-count
. ,(hash-fold
(lambda (_ resource-details result)
(cons (resource-details-checkout-count
resource-details)
result))
'()
resources))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop next-resource-id
available
waiters))
(('check-for-idle-resources)
(let* ((internal-real-time
(get-internal-real-time))
(candidate-resource-ids-to-destroy
(filter-map
(lambda (resource-id)
(let ((resource-details
(hash-ref resources resource-id)))
(if (> (/ (- internal-real-time
(resource-details-last-used
resource-details))
internal-time-units-per-second)
idle-seconds)
resource-id
#f)))
available))
(max-resources-to-destroy
(max 0
(- (count-resources resources)
min-size)))
(resources-to-destroy
(take candidate-resource-ids-to-destroy
(min max-resources-to-destroy
(length candidate-resource-ids-to-destroy)))))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources resource-id))))
resources-to-destroy))
(loop next-resource-id
(lset-difference = available resources-to-destroy)
waiters)))
(('destroy)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
(car waiters))
(when destructor
(for-each
(lambda (resource-id)
(spawn-fiber-to-destroy-resource
resource-id
(resource-details-value
(hash-ref resources
resource-id))))
available))
;; Do this in parallel to avoid deadlocks between the
;; limiter and returning new resources to this pool
(and=> return-new-resource/parallelism-limiter
(lambda (limiter)
(spawn-fiber
(lambda ()
(destroy-parallelism-limiter limiter)))))
(if (or (= 0 (count-resources resources))
(not destructor))
(begin
(set-resource-pool-channel! pool #f)
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(destroy-loop resources next-resource-id))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop next-resource-id
available
waiters)))))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(let loop ()
(put-message channel '(check-for-idle-resources))
(when (perform-operation
(choice-operation
(wrap-operation
(sleep-operation idle-seconds)
(const #t))
(wrap-operation
(wait-operation destroy-condition)
(const #f))))
(loop))))))
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
2025-02-03 12:16:07 +01:00
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
2025-02-03 12:16:07 +01:00
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
2024-11-19 18:43:43 +00:00
pool)
2024-11-19 18:43:43 +00:00
(define (destroy-resource-pool pool)
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'destroy))
(lambda _
(wait
(resource-pool-destroy-condition pool))))
(wait-operation
(resource-pool-destroy-condition pool))))
#t)
2024-11-19 18:43:43 +00:00
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(pool)))
(define resource-pool-timeout-error-pool
(exception-accessor
&resource-pool-timeout
(record-accessor &resource-pool-timeout 'pool)))
2024-11-19 18:43:43 +00:00
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(exception-predicate &resource-pool-timeout))
2024-11-19 18:43:43 +00:00
(define &resource-pool-too-many-waiters
(make-exception-type '&recource-pool-too-many-waiters
&error
'(pool waiters-count)))
(define resource-pool-too-many-waiters-error-pool
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'pool)))
(define resource-pool-too-many-waiters-error-waiters-count
(exception-accessor
&resource-pool-too-many-waiters
(record-accessor &resource-pool-too-many-waiters 'waiters-count)))
(define make-resource-pool-too-many-waiters-error
(record-constructor &resource-pool-too-many-waiters))
(define resource-pool-too-many-waiters-error?
(exception-predicate &resource-pool-too-many-waiters))
(define &resource-pool-destroyed
(make-exception-type '&recource-pool-destroyed
&error
'(pool)))
(define resource-pool-destroyed-error-pool
(exception-accessor
&resource-pool-destroyed
(record-accessor &resource-pool-destroyed 'pool)))
(define make-resource-pool-destroyed-error
(record-constructor &resource-pool-destroyed))
(define resource-pool-destroyed-error?
(exception-predicate &resource-pool-destroyed))
(define &resource-pool-destroy-resource
(make-exception-type '&recource-pool-destroy-resource
&exception
'()))
(define make-resource-pool-destroy-resource-exception
(record-constructor &resource-pool-destroy-resource))
(define resource-pool-destroy-resource-exception?
(exception-predicate &resource-pool-destroy-resource))
2024-11-19 18:43:43 +00:00
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler))
(max-waiters 'default)
(channel (resource-pool-channel pool))
(destroy-resource-on-exception? #f))
2024-11-19 18:43:43 +00:00
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define timeout-or-default
(if (eq? timeout 'default)
(assq-ref (resource-pool-configuration pool)
'default-checkout-timeout)
2024-11-19 18:43:43 +00:00
timeout))
(define max-waiters-or-default
(if (eq? max-waiters 'default)
(assq-ref (resource-pool-configuration pool)
'default-max-waiters)
max-waiters))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))
max-waiters-or-default))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
(if request-success?
(let ((time-remaining
(- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop (make-channel)
start-time)
'timeout)
response))
'timeout))
'timeout)))
(let ((reply (make-channel)))
(put-message channel
(list 'checkout
reply
#f
max-waiters-or-default))
(get-message reply)))))
(match reply
('timeout
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(('too-many-waiters . count)
(raise-exception
(make-resource-pool-too-many-waiters-error pool
count)))
(('resource-pool-destroyed . #f)
(raise-exception
(make-resource-pool-destroyed-error pool)))
(('success resource-id resource-value)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
;; Unwind the stack before calling put-message, as
;; this avoids inconsistent behaviour with
;; continuation barriers
(put-message
channel
(list (if (or destroy-resource-on-exception?
(resource-pool-destroy-resource-exception? exn))
'destroy
'return)
resource-id))
(raise-exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(let ((stack
(match (fluid-ref %stacks)
((stack-tag . prompt-tag)
(make-stack #t
0 prompt-tag
0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(proc resource-value))))
#:unwind? #t))
(lambda vals
(put-message channel
`(return ,resource-id))
(apply values vals)))))))
2024-11-19 18:43:43 +00:00
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(if timeout
(let* ((reply (make-channel))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation
(choice-operation
(wrap-operation
(put-operation channel
`(stats ,reply ,timeout-time))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool)))))
(let ((reply (make-channel)))
(put-message channel
`(stats ,reply #f))
(get-message reply))))
2024-11-19 18:43:43 +00:00
(define (resource-pool-list-resources pool)
(define channel
(resource-pool-channel pool))
(unless channel
(raise-exception
(make-resource-pool-destroyed-error pool)))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'list-resources reply))
(get-message reply)))