guile-knots/knots/resource-pool.scm
Christopher Baines dcb56ee2c5 Tweak the resource pool
Mostly to no longer sleep in the main fiber. Now the main fiber just
spawns other fibers when it would previously block on put-operation
and these other fibers communicate back to the main resource pool
fiber when necessary.

This should mean that the resource pool is more responsive.
2025-01-09 09:34:11 +00:00

573 lines
21 KiB
Scheme

;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (knots parallelism)
#:export (resource-pool?
make-resource-pool
resource-pool-name
resource-pool-channel
resource-pool-configuration
destroy-resource-pool
resource-pool-default-timeout
resource-pool-retry-checkout-timeout
&resource-pool-timeout
resource-pool-timeout-error-pool
resource-pool-timeout-error?
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel configuration)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel)
(configuration resource-pool-configuration))
(set-record-type-printer!
<resource-pool>
(lambda (resource-pool port)
(display
(simple-format #f "#<resource-pool name: \"~A\">"
(resource-pool-name resource-pool))
port)))
(define* (make-resource-pool return-new-resource max-size
#:key (min-size max-size)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed")
(reply-timeout 0.5)
add-resources-parallelism)
(define channel (make-channel))
(define pool
(make-resource-pool-record
name
channel
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(reply-timeout . ,reply-timeout))))
(define checkout-failure-count 0)
(define spawn-fiber-to-return-new-resource
(let ((thunk
(if add-resources-parallelism
(fiberize
(lambda ()
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (assq-ref (resource-pool-stats pool)
'resources)))
(if (= size max-size)
(raise-exception
(make-resource-pool-abort-add-resource-error))
(return-new-resource))))
#:parallelism add-resources-parallelism
#:show-backtrace?
(lambda (key . args)
(not
(and (eq? key '%exception)
(resource-pool-abort-add-resource-error?
(car args))))))
return-new-resource)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler
(lambda (exn)
(unless (resource-pool-abort-add-resource-error? exn)
(simple-format
(current-error-port)
"exception adding resource to pool ~A: ~A:\n ~A\n"
name
return-new-resource
exn))
#f)
(lambda ()
(with-throw-handler #t
thunk
(lambda (key . args)
(unless (and (eq? key '%exception)
(resource-pool-abort-add-resource-error?
(car args)))
(backtrace)))))
#:unwind? #t)))
(when new-resource
(put-message channel
(list 'add-resource new-resource)))))))))
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(destructor resource)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(unless success?
(sleep 5)
(loop)))))))
(define (spawn-fiber-for-checkout reply-channel resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel resource)
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(while #t
(sleep idle-seconds)
(put-message channel '(check-for-idle-resources))))))
(with-throw-handler #t
(lambda ()
(let loop ((resources '())
(available '())
(waiters '())
(resources-last-used '()))
(match (get-message channel)
(('add-resource resource)
(if (= (length resources) max-size)
(begin
(spawn-fiber-to-destroy-resource resource)
(loop resources
available
waiters
resources-last-used))
(if (null? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(begin
(if reply-timeout
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout (last waiters)
resource)
(put-message (last waiters) resource))
(loop (cons resource resources)
available
(drop-right! waiters 1)
(cons (get-internal-real-time)
resources-last-used))))))
(('checkout reply)
(if (null? available)
(begin
(unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource))
(loop resources
available
(cons reply waiters)
resources-last-used))
(let ((resource (car available)))
(if reply-timeout
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout reply resource)
(put-message reply resource))
(loop resources
(cdr available)
waiters
resources-last-used))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(loop resources
(cons resource available)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(begin
(if reply-timeout
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout (last waiters)
resource)
(put-message (last waiters) resource))
(loop resources
available
(drop-right! waiters 1)
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used)))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f)))))))
(loop resources
available
waiters
resources-last-used))
(('check-for-idle-resources)
(let* ((resources-last-used-seconds
(map
(lambda (internal-time)
(/ (- (get-internal-real-time) internal-time)
internal-time-units-per-second))
resources-last-used))
(resources-to-destroy
(filter-map
(lambda (resource last-used-seconds)
(if (and (member resource available)
(> last-used-seconds idle-seconds))
resource
#f))
resources
resources-last-used-seconds)))
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
resources-to-destroy)
(loop (lset-difference eq? resources resources-to-destroy)
(lset-difference eq? available resources-to-destroy)
waiters
(filter-map
(lambda (resource last-used)
(if (memq resource resources-to-destroy)
#f
last-used))
resources
resources-last-used))))
(('destroy reply)
(if (= (length resources) (length available))
(begin
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
resources)
(put-message reply 'destroy-success))
(begin
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation reply 'resource-pool-destroy-failed)
(sleep-operation 10)))))
(loop resources
available
waiters
resources-last-used))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters
resources-last-used)))))
(lambda (key . args)
(simple-format (current-error-port)
"exception in the ~A pool fiber\n" name))))
(or scheduler
(current-scheduler)))
pool)
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define resource-pool-default-timeout
(make-parameter #f))
(define resource-pool-retry-checkout-timeout
(make-parameter 5))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(pool)))
(define resource-pool-timeout-error-pool
(exception-accessor
&resource-pool-timeout
(record-accessor &resource-pool-timeout 'pool)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler)))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define retry-timeout
(resource-pool-retry-checkout-timeout))
(define timeout-or-default
(if (eq? timeout 'default)
(resource-pool-default-timeout)
timeout))
(let ((resource
(let ((reply (make-channel)))
(let loop ((start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(checkout ,reply))
(const #t))
(wrap-operation (sleep-operation (or timeout-or-default
retry-timeout))
(const #f))))))
(if request-success?
(let ((time-remaining
(- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop start-time)
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f))
response))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))))))
(when (or (not resource)
(eq? resource 'resource-pool-retry-checkout))
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(with-exception-handler
(lambda (exception)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values
(lambda ()
(with-throw-handler #t
(lambda ()
(proc resource))
(lambda _
(backtrace))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
(start-time (get-internal-real-time)))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(stats ,reply))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool))))))