;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots resource-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (knots parallelism) #:export (resource-pool? make-resource-pool resource-pool-name resource-pool-channel resource-pool-configuration destroy-resource-pool resource-pool-default-timeout resource-pool-retry-checkout-timeout &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? resource-pool-default-timeout-handler call-with-resource-from-pool with-resource-from-pool resource-pool-stats)) (define &resource-pool-abort-add-resource (make-exception-type '&recource-pool-abort-add-resource &error '())) (define make-resource-pool-abort-add-resource-error (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? (record-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel configuration) resource-pool? (name resource-pool-name) (channel resource-pool-channel) (configuration resource-pool-configuration)) (set-record-type-printer! (lambda (resource-pool port) (display (simple-format #f "#" (resource-pool-name resource-pool)) port))) (define* (make-resource-pool return-new-resource max-size #:key (min-size max-size) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime scheduler (name "unnamed") (reply-timeout 0.5) (add-resources-parallelism 1)) (define channel (make-channel)) (define pool (make-resource-pool-record name channel `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) (delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (destructor . ,destructor) (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) (reply-timeout . ,reply-timeout)))) (define checkout-failure-count 0) (define spawn-fiber-to-return-new-resource (let ((thunk (if add-resources-parallelism (fiberize (lambda () (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) (size (assq-ref (resource-pool-stats pool) 'resources))) (if (= size max-size) (raise-exception (make-resource-pool-abort-add-resource-error)) (return-new-resource)))) #:parallelism add-resources-parallelism #:show-backtrace? (lambda (key . args) (not (and (eq? key '%exception) (resource-pool-abort-add-resource-error? (car args)))))) return-new-resource))) (lambda () (spawn-fiber (lambda () (let ((new-resource (with-exception-handler (lambda (exn) (unless (resource-pool-abort-add-resource-error? exn) (simple-format (current-error-port) "exception adding resource to pool ~A: ~A:\n ~A\n" name return-new-resource exn)) #f) (lambda () (with-throw-handler #t thunk (lambda (key . args) (unless (and (eq? key '%exception) (resource-pool-abort-add-resource-error? (car args))) (backtrace))))) #:unwind? #t))) (when new-resource (put-message channel (list 'add-resource new-resource))))))))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () (let loop () (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A:\n ~A\n" name destructor exn) #f) (lambda () (with-throw-handler #t (lambda () (destructor resource) #t) (lambda _ (backtrace)))) #:unwind? #t))) (unless success? (sleep 5) (loop))))))) (define (spawn-fiber-for-checkout reply-channel resource) (spawn-fiber (lambda () (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply-channel resource) (const #t)) (wrap-operation (sleep-operation reply-timeout) (const #f)))))) (unless checkout-success? (put-message channel (list 'return-failed-checkout resource))))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () (while #t (sleep idle-seconds) (put-message channel '(check-for-idle-resources)))))) (with-throw-handler #t (lambda () (let loop ((resources '()) (available '()) (waiters '()) (resources-last-used '())) (match (get-message channel) (('add-resource resource) (if (= (length resources) max-size) (begin (spawn-fiber-to-destroy-resource resource) (loop resources available waiters resources-last-used)) (if (null? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) (begin (if reply-timeout ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the ;; resource, and returning it if there's a ;; timeout (spawn-fiber-for-checkout (last waiters) resource) (put-message (last waiters) resource)) (loop (cons resource resources) available (drop-right! waiters 1) (cons (get-internal-real-time) resources-last-used)))))) (('checkout reply) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) (loop resources available (cons reply waiters) resources-last-used)) (let ((resource (car available))) (if reply-timeout ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's a ;; timeout (spawn-fiber-for-checkout reply resource) (put-message reply resource)) (loop resources (cdr available) waiters resources-last-used)))) (((and (or 'return 'return-failed-checkout) return-type) resource) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (if (null? waiters) (loop resources (cons resource available) waiters (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used)) (begin (if reply-timeout ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the ;; resource, and returning it if there's a ;; timeout (spawn-fiber-for-checkout (last waiters) resource) (put-message (last waiters) resource)) (loop resources available (drop-right! waiters 1) (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used))))) (('stats reply) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (perform-operation (choice-operation (wrap-operation (put-operation reply stats) (const #t)) (wrap-operation (sleep-operation reply-timeout) (const #f))))))) (loop resources available waiters resources-last-used)) (('check-for-idle-resources) (let* ((resources-last-used-seconds (map (lambda (internal-time) (/ (- (get-internal-real-time) internal-time) internal-time-units-per-second)) resources-last-used)) (resources-to-destroy (filter-map (lambda (resource last-used-seconds) (if (and (member resource available) (> last-used-seconds idle-seconds)) resource #f)) resources resources-last-used-seconds))) (for-each (lambda (resource) (spawn-fiber-to-destroy-resource resource)) resources-to-destroy) (loop (lset-difference eq? resources resources-to-destroy) (lset-difference eq? available resources-to-destroy) waiters (filter-map (lambda (resource last-used) (if (memq resource resources-to-destroy) #f last-used)) resources resources-last-used)))) (('destroy reply) (if (= (length resources) (length available)) (begin (for-each (lambda (resource) (spawn-fiber-to-destroy-resource resource)) resources) (put-message reply 'destroy-success)) (begin (spawn-fiber (lambda () (perform-operation (choice-operation (put-operation reply 'resource-pool-destroy-failed) (sleep-operation 10))))) (loop resources available waiters resources-last-used)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop resources available waiters resources-last-used))))) (lambda (key . args) (simple-format (current-error-port) "exception in the ~A pool fiber\n" name)))) (or scheduler (current-scheduler))) pool) (define (destroy-resource-pool pool) (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'destroy reply)) (let ((msg (get-message reply))) (unless (eq? msg 'destroy-success) (error msg))))) (define resource-pool-default-timeout (make-parameter #f)) (define resource-pool-retry-checkout-timeout (make-parameter 5)) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '(pool))) (define resource-pool-timeout-error-pool (exception-accessor &resource-pool-timeout (record-accessor &resource-pool-timeout 'pool))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define retry-timeout (resource-pool-retry-checkout-timeout)) (define timeout-or-default (if (eq? timeout 'default) (resource-pool-default-timeout) timeout)) (let ((resource (let ((reply (make-channel))) (let loop ((start-time (get-internal-real-time))) (let ((request-success? (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) `(checkout ,reply)) (const #t)) (wrap-operation (sleep-operation (or timeout-or-default retry-timeout)) (const #f)))))) (if request-success? (let ((time-remaining (- (or timeout-or-default retry-timeout) (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- (or timeout-or-default retry-timeout) (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop start-time) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f)) response)) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f))) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f))))))) (when (or (not resource) (eq? resource 'resource-pool-retry-checkout)) (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) (with-exception-handler (lambda (exception) (put-message (resource-pool-channel pool) `(return ,resource)) (raise-exception exception)) (lambda () (call-with-values (lambda () (with-throw-handler #t (lambda () (proc resource)) (lambda _ (backtrace)))) (lambda vals (put-message (resource-pool-channel pool) `(return ,resource)) (apply values vals)))) #:unwind? #t))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (let ((reply (make-channel)) (start-time (get-internal-real-time))) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) `(stats ,reply)) (const #t)) (wrap-operation (sleep-operation timeout) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (raise-exception (make-resource-pool-timeout-error pool))))))