Mostly to no longer sleep in the main fiber. Now the main fiber just spawns other fibers when it would previously block on put-operation and these other fibers communicate back to the main resource pool fiber when necessary. This should mean that the resource pool is more responsive.
573 lines
21 KiB
Scheme
573 lines
21 KiB
Scheme
;;; Guile Knots
|
|
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
|
|
;;;
|
|
;;; This file is part of Guile Knots.
|
|
;;;
|
|
;;; The Guile Knots is free software; you can redistribute it and/or
|
|
;;; modify it under the terms of the GNU General Public License as
|
|
;;; published by the Free Software Foundation; either version 3 of the
|
|
;;; License, or (at your option) any later version.
|
|
;;;
|
|
;;; The Guile Knots is distributed in the hope that it will be useful,
|
|
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;;; General Public License for more details.
|
|
;;;
|
|
;;; You should have received a copy of the GNU General Public License
|
|
;;; along with the guix-data-service. If not, see
|
|
;;; <http://www.gnu.org/licenses/>.
|
|
|
|
(define-module (knots resource-pool)
|
|
#:use-module (srfi srfi-1)
|
|
#:use-module (srfi srfi-9)
|
|
#:use-module (srfi srfi-9 gnu)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 exceptions)
|
|
#:use-module (fibers)
|
|
#:use-module (fibers timers)
|
|
#:use-module (fibers channels)
|
|
#:use-module (fibers scheduler)
|
|
#:use-module (fibers operations)
|
|
#:use-module (knots parallelism)
|
|
#:export (resource-pool?
|
|
|
|
make-resource-pool
|
|
resource-pool-name
|
|
resource-pool-channel
|
|
resource-pool-configuration
|
|
destroy-resource-pool
|
|
|
|
resource-pool-default-timeout
|
|
resource-pool-retry-checkout-timeout
|
|
|
|
&resource-pool-timeout
|
|
resource-pool-timeout-error-pool
|
|
resource-pool-timeout-error?
|
|
|
|
resource-pool-default-timeout-handler
|
|
|
|
call-with-resource-from-pool
|
|
with-resource-from-pool
|
|
|
|
resource-pool-stats))
|
|
|
|
(define &resource-pool-abort-add-resource
|
|
(make-exception-type '&recource-pool-abort-add-resource
|
|
&error
|
|
'()))
|
|
|
|
(define make-resource-pool-abort-add-resource-error
|
|
(record-constructor &resource-pool-abort-add-resource))
|
|
|
|
(define resource-pool-abort-add-resource-error?
|
|
(record-predicate &resource-pool-abort-add-resource))
|
|
|
|
(define-record-type <resource-pool>
|
|
(make-resource-pool-record name channel configuration)
|
|
resource-pool?
|
|
(name resource-pool-name)
|
|
(channel resource-pool-channel)
|
|
(configuration resource-pool-configuration))
|
|
|
|
(set-record-type-printer!
|
|
<resource-pool>
|
|
(lambda (resource-pool port)
|
|
(display
|
|
(simple-format #f "#<resource-pool name: \"~A\">"
|
|
(resource-pool-name resource-pool))
|
|
port)))
|
|
|
|
(define* (make-resource-pool return-new-resource max-size
|
|
#:key (min-size max-size)
|
|
(idle-seconds #f)
|
|
(delay-logger (const #f))
|
|
(duration-logger (const #f))
|
|
destructor
|
|
lifetime
|
|
scheduler
|
|
(name "unnamed")
|
|
(reply-timeout 0.5)
|
|
add-resources-parallelism)
|
|
(define channel (make-channel))
|
|
|
|
(define pool
|
|
(make-resource-pool-record
|
|
name
|
|
channel
|
|
`((max-size . ,max-size)
|
|
(min-size . ,min-size)
|
|
(idle-seconds . ,idle-seconds)
|
|
(delay-logger . ,delay-logger)
|
|
(duration-logger . ,duration-logger)
|
|
(destructor . ,destructor)
|
|
(lifetime . ,lifetime)
|
|
(scheduler . ,scheduler)
|
|
(name . ,name)
|
|
(reply-timeout . ,reply-timeout))))
|
|
|
|
(define checkout-failure-count 0)
|
|
|
|
(define spawn-fiber-to-return-new-resource
|
|
(let ((thunk
|
|
(if add-resources-parallelism
|
|
(fiberize
|
|
(lambda ()
|
|
(let ((max-size
|
|
(assq-ref (resource-pool-configuration pool)
|
|
'max-size))
|
|
(size (assq-ref (resource-pool-stats pool)
|
|
'resources)))
|
|
(if (= size max-size)
|
|
(raise-exception
|
|
(make-resource-pool-abort-add-resource-error))
|
|
(return-new-resource))))
|
|
#:parallelism add-resources-parallelism
|
|
#:show-backtrace?
|
|
(lambda (key . args)
|
|
(not
|
|
(and (eq? key '%exception)
|
|
(resource-pool-abort-add-resource-error?
|
|
(car args))))))
|
|
return-new-resource)))
|
|
(lambda ()
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((new-resource
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(unless (resource-pool-abort-add-resource-error? exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception adding resource to pool ~A: ~A:\n ~A\n"
|
|
name
|
|
return-new-resource
|
|
exn))
|
|
#f)
|
|
(lambda ()
|
|
(with-throw-handler #t
|
|
thunk
|
|
(lambda (key . args)
|
|
(unless (and (eq? key '%exception)
|
|
(resource-pool-abort-add-resource-error?
|
|
(car args)))
|
|
(backtrace)))))
|
|
#:unwind? #t)))
|
|
(when new-resource
|
|
(put-message channel
|
|
(list 'add-resource new-resource)))))))))
|
|
|
|
(define (spawn-fiber-to-destroy-resource resource)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let loop ()
|
|
(let ((success?
|
|
(with-exception-handler
|
|
(lambda (exn)
|
|
(simple-format
|
|
(current-error-port)
|
|
"exception running resource pool destructor (~A): ~A:\n ~A\n"
|
|
name
|
|
destructor
|
|
exn)
|
|
#f)
|
|
(lambda ()
|
|
(with-throw-handler #t
|
|
(lambda ()
|
|
(destructor resource)
|
|
#t)
|
|
(lambda _
|
|
(backtrace))))
|
|
#:unwind? #t)))
|
|
|
|
(unless success?
|
|
(sleep 5)
|
|
|
|
(loop)))))))
|
|
|
|
(define (spawn-fiber-for-checkout reply-channel resource)
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(let ((checkout-success?
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation reply-channel resource)
|
|
(const #t))
|
|
(wrap-operation (sleep-operation
|
|
reply-timeout)
|
|
(const #f))))))
|
|
(unless checkout-success?
|
|
(put-message
|
|
channel
|
|
(list 'return-failed-checkout resource)))))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(when idle-seconds
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(while #t
|
|
(sleep idle-seconds)
|
|
(put-message channel '(check-for-idle-resources))))))
|
|
|
|
(with-throw-handler #t
|
|
(lambda ()
|
|
(let loop ((resources '())
|
|
(available '())
|
|
(waiters '())
|
|
(resources-last-used '()))
|
|
|
|
(match (get-message channel)
|
|
(('add-resource resource)
|
|
(if (= (length resources) max-size)
|
|
(begin
|
|
(spawn-fiber-to-destroy-resource resource)
|
|
|
|
(loop resources
|
|
available
|
|
waiters
|
|
resources-last-used))
|
|
|
|
(if (null? waiters)
|
|
(loop (cons resource resources)
|
|
(cons resource available)
|
|
waiters
|
|
(cons (get-internal-real-time)
|
|
resources-last-used))
|
|
|
|
(begin
|
|
(if reply-timeout
|
|
;; Don't sleep in this fiber, so spawn a new
|
|
;; fiber to handle handing over the
|
|
;; resource, and returning it if there's a
|
|
;; timeout
|
|
(spawn-fiber-for-checkout (last waiters)
|
|
resource)
|
|
(put-message (last waiters) resource))
|
|
|
|
(loop (cons resource resources)
|
|
available
|
|
(drop-right! waiters 1)
|
|
(cons (get-internal-real-time)
|
|
resources-last-used))))))
|
|
|
|
(('checkout reply)
|
|
(if (null? available)
|
|
(begin
|
|
(unless (= (length resources) max-size)
|
|
(spawn-fiber-to-return-new-resource))
|
|
|
|
(loop resources
|
|
available
|
|
(cons reply waiters)
|
|
resources-last-used))
|
|
|
|
(let ((resource (car available)))
|
|
(if reply-timeout
|
|
;; Don't sleep in this fiber, so spawn a
|
|
;; new fiber to handle handing over the
|
|
;; resource, and returning it if there's a
|
|
;; timeout
|
|
(spawn-fiber-for-checkout reply resource)
|
|
(put-message reply resource))
|
|
|
|
(loop resources
|
|
(cdr available)
|
|
waiters
|
|
resources-last-used))))
|
|
|
|
(((and (or 'return
|
|
'return-failed-checkout)
|
|
return-type)
|
|
resource)
|
|
|
|
(when (eq? 'return-failed-checkout
|
|
return-type)
|
|
(set! checkout-failure-count
|
|
(+ 1 checkout-failure-count)))
|
|
|
|
(if (null? waiters)
|
|
(loop resources
|
|
(cons resource available)
|
|
waiters
|
|
(begin
|
|
(list-set!
|
|
resources-last-used
|
|
(list-index (lambda (x)
|
|
(eq? x resource))
|
|
resources)
|
|
(get-internal-real-time))
|
|
resources-last-used))
|
|
|
|
(begin
|
|
(if reply-timeout
|
|
;; Don't sleep in this fiber, so spawn a new
|
|
;; fiber to handle handing over the
|
|
;; resource, and returning it if there's a
|
|
;; timeout
|
|
(spawn-fiber-for-checkout (last waiters)
|
|
resource)
|
|
(put-message (last waiters) resource))
|
|
|
|
(loop resources
|
|
available
|
|
(drop-right! waiters 1)
|
|
(begin
|
|
(list-set!
|
|
resources-last-used
|
|
(list-index (lambda (x)
|
|
(eq? x resource))
|
|
resources)
|
|
(get-internal-real-time))
|
|
resources-last-used)))))
|
|
|
|
(('stats reply)
|
|
(let ((stats
|
|
`((resources . ,(length resources))
|
|
(available . ,(length available))
|
|
(waiters . ,(length waiters))
|
|
(checkout-failure-count . ,checkout-failure-count))))
|
|
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation reply stats)
|
|
(const #t))
|
|
(wrap-operation (sleep-operation
|
|
reply-timeout)
|
|
(const #f)))))))
|
|
|
|
(loop resources
|
|
available
|
|
waiters
|
|
resources-last-used))
|
|
|
|
(('check-for-idle-resources)
|
|
(let* ((resources-last-used-seconds
|
|
(map
|
|
(lambda (internal-time)
|
|
(/ (- (get-internal-real-time) internal-time)
|
|
internal-time-units-per-second))
|
|
resources-last-used))
|
|
(resources-to-destroy
|
|
(filter-map
|
|
(lambda (resource last-used-seconds)
|
|
(if (and (member resource available)
|
|
(> last-used-seconds idle-seconds))
|
|
resource
|
|
#f))
|
|
resources
|
|
resources-last-used-seconds)))
|
|
|
|
(for-each
|
|
(lambda (resource)
|
|
(spawn-fiber-to-destroy-resource resource))
|
|
resources-to-destroy)
|
|
|
|
(loop (lset-difference eq? resources resources-to-destroy)
|
|
(lset-difference eq? available resources-to-destroy)
|
|
waiters
|
|
(filter-map
|
|
(lambda (resource last-used)
|
|
(if (memq resource resources-to-destroy)
|
|
#f
|
|
last-used))
|
|
resources
|
|
resources-last-used))))
|
|
|
|
(('destroy reply)
|
|
(if (= (length resources) (length available))
|
|
(begin
|
|
(for-each
|
|
(lambda (resource)
|
|
(spawn-fiber-to-destroy-resource resource))
|
|
resources)
|
|
(put-message reply 'destroy-success))
|
|
(begin
|
|
(spawn-fiber
|
|
(lambda ()
|
|
(perform-operation
|
|
(choice-operation
|
|
(put-operation reply 'resource-pool-destroy-failed)
|
|
(sleep-operation 10)))))
|
|
(loop resources
|
|
available
|
|
waiters
|
|
resources-last-used))))
|
|
|
|
(unknown
|
|
(simple-format
|
|
(current-error-port)
|
|
"unrecognised message to ~A resource pool channel: ~A\n"
|
|
name
|
|
unknown)
|
|
(loop resources
|
|
available
|
|
waiters
|
|
resources-last-used)))))
|
|
(lambda (key . args)
|
|
(simple-format (current-error-port)
|
|
"exception in the ~A pool fiber\n" name))))
|
|
(or scheduler
|
|
(current-scheduler)))
|
|
|
|
pool)
|
|
|
|
(define (destroy-resource-pool pool)
|
|
(let ((reply (make-channel)))
|
|
(put-message (resource-pool-channel pool)
|
|
(list 'destroy reply))
|
|
(let ((msg (get-message reply)))
|
|
(unless (eq? msg 'destroy-success)
|
|
(error msg)))))
|
|
|
|
(define resource-pool-default-timeout
|
|
(make-parameter #f))
|
|
|
|
(define resource-pool-retry-checkout-timeout
|
|
(make-parameter 5))
|
|
|
|
(define &resource-pool-timeout
|
|
(make-exception-type '&recource-pool-timeout
|
|
&error
|
|
'(pool)))
|
|
|
|
(define resource-pool-timeout-error-pool
|
|
(exception-accessor
|
|
&resource-pool-timeout
|
|
(record-accessor &resource-pool-timeout 'pool)))
|
|
|
|
(define make-resource-pool-timeout-error
|
|
(record-constructor &resource-pool-timeout))
|
|
|
|
(define resource-pool-timeout-error?
|
|
(record-predicate &resource-pool-timeout))
|
|
|
|
(define resource-pool-default-timeout-handler
|
|
(make-parameter #f))
|
|
|
|
(define* (call-with-resource-from-pool
|
|
pool proc #:key (timeout 'default)
|
|
(timeout-handler (resource-pool-default-timeout-handler)))
|
|
"Call PROC with a resource from POOL, blocking until a resource becomes
|
|
available. Return the resource once PROC has returned."
|
|
|
|
(define retry-timeout
|
|
(resource-pool-retry-checkout-timeout))
|
|
|
|
(define timeout-or-default
|
|
(if (eq? timeout 'default)
|
|
(resource-pool-default-timeout)
|
|
timeout))
|
|
|
|
(let ((resource
|
|
(let ((reply (make-channel)))
|
|
(let loop ((start-time (get-internal-real-time)))
|
|
(let ((request-success?
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation (resource-pool-channel pool)
|
|
`(checkout ,reply))
|
|
(const #t))
|
|
(wrap-operation (sleep-operation (or timeout-or-default
|
|
retry-timeout))
|
|
(const #f))))))
|
|
(if request-success?
|
|
(let ((time-remaining
|
|
(- (or timeout-or-default
|
|
retry-timeout)
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))))
|
|
(if (> time-remaining 0)
|
|
(let ((response
|
|
(perform-operation
|
|
(choice-operation
|
|
(get-operation reply)
|
|
(wrap-operation (sleep-operation time-remaining)
|
|
(const #f))))))
|
|
(if (or (not response)
|
|
(eq? response 'resource-pool-retry-checkout))
|
|
(if (> (- (or timeout-or-default
|
|
retry-timeout)
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))
|
|
0)
|
|
(loop start-time)
|
|
(if (eq? timeout-or-default #f)
|
|
(loop (get-internal-real-time))
|
|
#f))
|
|
response))
|
|
(if (eq? timeout-or-default #f)
|
|
(loop (get-internal-real-time))
|
|
#f)))
|
|
(if (eq? timeout-or-default #f)
|
|
(loop (get-internal-real-time))
|
|
#f)))))))
|
|
|
|
(when (or (not resource)
|
|
(eq? resource 'resource-pool-retry-checkout))
|
|
(when timeout-handler
|
|
(timeout-handler pool proc timeout))
|
|
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool)))
|
|
|
|
(with-exception-handler
|
|
(lambda (exception)
|
|
(put-message (resource-pool-channel pool)
|
|
`(return ,resource))
|
|
(raise-exception exception))
|
|
(lambda ()
|
|
(call-with-values
|
|
(lambda ()
|
|
(with-throw-handler #t
|
|
(lambda ()
|
|
(proc resource))
|
|
(lambda _
|
|
(backtrace))))
|
|
(lambda vals
|
|
(put-message (resource-pool-channel pool)
|
|
`(return ,resource))
|
|
(apply values vals))))
|
|
#:unwind? #t)))
|
|
|
|
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
|
|
(call-with-resource-from-pool
|
|
pool
|
|
(lambda (resource) exp ...)))
|
|
|
|
(define* (resource-pool-stats pool #:key (timeout 5))
|
|
(let ((reply (make-channel))
|
|
(start-time (get-internal-real-time)))
|
|
(perform-operation
|
|
(choice-operation
|
|
(wrap-operation
|
|
(put-operation (resource-pool-channel pool)
|
|
`(stats ,reply))
|
|
(const #t))
|
|
(wrap-operation (sleep-operation timeout)
|
|
(lambda _
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool))))))
|
|
|
|
(let ((time-remaining
|
|
(- timeout
|
|
(/ (- (get-internal-real-time)
|
|
start-time)
|
|
internal-time-units-per-second))))
|
|
(if (> time-remaining 0)
|
|
(perform-operation
|
|
(choice-operation
|
|
(get-operation reply)
|
|
(wrap-operation (sleep-operation time-remaining)
|
|
(lambda _
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool))))))
|
|
(raise-exception
|
|
(make-resource-pool-timeout-error pool))))))
|
|
|