guile-knots/knots/resource-pool.scm

698 lines
26 KiB
Scheme

;;; Guile Knots
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This file is part of Guile Knots.
;;;
;;; The Guile Knots is free software; you can redistribute it and/or
;;; modify it under the terms of the GNU General Public License as
;;; published by the Free Software Foundation; either version 3 of the
;;; License, or (at your option) any later version.
;;;
;;; The Guile Knots is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with the guix-data-service. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (knots resource-pool)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ice-9 exceptions)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers scheduler)
#:use-module (fibers operations)
#:use-module (knots)
#:use-module (knots parallelism)
#:export (resource-pool?
make-resource-pool
resource-pool-name
resource-pool-channel
resource-pool-configuration
destroy-resource-pool
&resource-pool-timeout
resource-pool-timeout-error-pool
resource-pool-timeout-error?
resource-pool-default-timeout-handler
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats))
(define &resource-pool-abort-add-resource
(make-exception-type '&recource-pool-abort-add-resource
&error
'()))
(define make-resource-pool-abort-add-resource-error
(record-constructor &resource-pool-abort-add-resource))
(define resource-pool-abort-add-resource-error?
(record-predicate &resource-pool-abort-add-resource))
(define-record-type <resource-pool>
(make-resource-pool-record name channel configuration)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel)
(configuration resource-pool-configuration))
(set-record-type-printer!
<resource-pool>
(lambda (resource-pool port)
(display
(simple-format #f "#<resource-pool name: \"~A\">"
(resource-pool-name resource-pool))
port)))
(define* (make-resource-pool return-new-resource max-size
#:key (min-size 0)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed")
(add-resources-parallelism 1)
default-checkout-timeout)
(define channel (make-channel))
(define pool
(make-resource-pool-record
name
channel
`((max-size . ,max-size)
(min-size . ,min-size)
(idle-seconds . ,idle-seconds)
(delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(lifetime . ,lifetime)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout))))
(define checkout-failure-count 0)
(define spawn-fiber-to-return-new-resource
(if add-resources-parallelism
(let ((thunk
(fiberize
(lambda ()
(let ((max-size
(assq-ref (resource-pool-configuration pool)
'max-size))
(size (assq-ref (resource-pool-stats pool)
'resources)))
(unless (= size max-size)
(let ((new-resource
(return-new-resource)))
(put-message channel
(list 'add-resource new-resource))))))
#:parallelism add-resources-parallelism)))
(lambda ()
(spawn-fiber thunk)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception adding resource to pool ~A: ~A\n\n"
name
return-new-resource)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (return-new-resource)))))
#:unwind? #t)))
(when new-resource
(put-message channel
(list 'add-resource new-resource)))))))))
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource))
#t)))
#:unwind? #t)))
(if success?
(put-message channel
(list 'remove resource))
(begin
(sleep 5)
(loop))))))))
(define (spawn-fiber-for-checkout reply-channel
reply-timeout
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel resource)
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(define (main-loop)
(let loop ((resources '())
(available '())
(waiters '())
(resources-last-used '()))
(match (get-message channel)
(('add-resource resource)
(if (= (length resources) max-size)
(begin
(if destructor
(begin
(spawn-fiber-to-destroy-resource resource)
(loop (cons resource resources)
available
waiters
(cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
waiters
(cons (get-internal-real-time)
resources-last-used))))
(if (null? waiters)
(loop (cons resource resources)
(cons resource available)
waiters
(cons (get-internal-real-time)
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop (cons resource resources)
(cons resource available)
'()
(cons (get-internal-real-time)
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn
;; a new fiber to handle handing over
;; the resource, and returning it if
;; there's a timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel resource))
(loop (cons resource resources)
available
(drop-right! alive-waiters 1)
(cons (get-internal-real-time)
resources-last-used)))))))))
(('checkout reply timeout-time)
(if (null? available)
(begin
(unless (= (length resources) max-size)
(spawn-fiber-to-return-new-resource))
(loop resources
available
(cons (cons reply timeout-time)
waiters)
resources-last-used))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout reply
reply-timeout
(car available))
(loop resources
(cdr available)
waiters
resources-last-used))
(loop resources
available
waiters
resources-last-used)))
(begin
(put-message reply (car available))
(loop resources
(cdr available)
waiters
resources-last-used)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(loop resources
(cons resource available)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources
(cons resource available)
'()
(begin
(when (eq? return-type 'return)
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time)))
resources-last-used))
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel resource))
(loop resources
available
(drop-right! alive-waiters 1)
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))))))))
(('remove resource)
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(loop (if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))
available ; resource shouldn't be in this list
waiters
(remove-at-index!
resources-last-used
index))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 5)
(const #f)))))))
(loop resources
available
waiters
resources-last-used))
(('check-for-idle-resources)
(let* ((resources-last-used-seconds
(map
(lambda (internal-time)
(/ (- (get-internal-real-time) internal-time)
internal-time-units-per-second))
resources-last-used))
(candidate-resources-to-destroy
(filter-map
(lambda (resource last-used-seconds)
(if (and (member resource available)
(> last-used-seconds idle-seconds))
resource
#f))
resources
resources-last-used-seconds)))
(let* ((available-resources-to-destroy
(lset-intersection eq?
available
candidate-resources-to-destroy))
(max-resources-to-destroy
(max 0
(- (length resources)
min-size)))
(resources-to-destroy
(take available-resources-to-destroy
(min max-resources-to-destroy
(length available-resources-to-destroy)))))
(when destructor
(for-each
(lambda (resource)
(spawn-fiber-to-destroy-resource resource))
resources-to-destroy))
(loop resources
(lset-difference eq? available resources-to-destroy)
waiters
resources-last-used))))
(('destroy reply)
(if (null? resources)
(put-message reply 'destroy-success)
(begin
(for-each
(lambda (resource)
(if destructor
(spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(put-message channel
(list 'remove resource)))
#:parallel? #t)))
available)
(spawn-fiber
(lambda ()
(sleep 0.1)
(put-message channel
(list 'destroy reply))))
(loop resources
'()
waiters
resources-last-used))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters
resources-last-used)))))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(while #t
(sleep idle-seconds)
(put-message channel '(check-for-idle-resources))))))
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(pool)))
(define resource-pool-timeout-error-pool
(exception-accessor
&resource-pool-timeout
(record-accessor &resource-pool-timeout 'pool)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define resource-pool-default-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool
pool proc #:key (timeout 'default)
(timeout-handler (resource-pool-default-timeout-handler)))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define timeout-or-default
(if (eq? timeout 'default)
(assq-ref (resource-pool-configuration pool)
'default-checkout-timeout)
timeout))
(let ((resource
(if timeout-or-default
(let loop ((reply (make-channel))
(start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
(list 'checkout
reply
(+ start-time
(* timeout-or-default
internal-time-units-per-second))))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))))
(if request-success?
(let ((time-remaining
(- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop (make-channel)
start-time)
#f)
response))
#f)))))
(let loop ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'checkout
reply
#f))
(get-message reply)))))
(when (not resource)
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error pool)))
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exn))
(lambda ()
(proc resource))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals)))))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
(start-time (get-internal-real-time)))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(stats ,reply))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error pool))))))
(raise-exception
(make-resource-pool-timeout-error pool))))))