;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots resource-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:export (resource-pool? make-resource-pool destroy-resource-pool resource-pool-default-timeout resource-pool-retry-checkout-timeout &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? resource-pool-default-timeout-handler call-with-resource-from-pool with-resource-from-pool resource-pool-stats)) (define-record-type (make-resource-pool-record name channel) resource-pool? (name resource-pool-name) (channel resource-pool-channel)) (define* (make-resource-pool initializer max-size #:key (min-size max-size) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime scheduler (name "unnamed") ;; Add options for customizing timeouts ) (define (initializer/safe) (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running ~A resource pool initializer: ~A:\n ~A\n" name initializer exn) #f) (lambda () (with-throw-handler #t initializer (lambda args (backtrace)))) #:unwind? #t)) (define (destructor/safe args) (let ((success? (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A:\n ~A\n" name destructor exn) #f) (lambda () (with-throw-handler #t (lambda () (destructor args) #t) (lambda _ (backtrace)))) #:unwind? #t))) (or success? #t (begin (sleep 5) (destructor/safe args))))) (let ((channel (make-channel)) (checkout-failure-count 0)) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () (while #t (sleep idle-seconds) (put-message channel '(check-for-idle-resources)))))) (while #t (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception in the ~A pool fiber: ~A\n" name exn)) (lambda () (let loop ((resources '()) (available '()) (waiters '()) (resources-last-used '())) (match (get-message channel) (('checkout reply) (if (null? available) (if (= (length resources) max-size) (loop resources available (cons reply waiters) resources-last-used) (let ((new-resource (initializer/safe))) (if new-resource (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply new-resource) (const #t)) (wrap-operation (sleep-operation 1) (const #f)))))) (unless checkout-success? (set! checkout-failure-count (+ 1 checkout-failure-count))) (loop (cons new-resource resources) (if checkout-success? available (cons new-resource available)) waiters (cons (get-internal-real-time) resources-last-used))) (loop resources available (cons reply waiters) resources-last-used)))) (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply (car available)) (const #t)) (wrap-operation (sleep-operation 1) (const #f)))))) (unless checkout-success? (set! checkout-failure-count (+ 1 checkout-failure-count))) (if checkout-success? (loop resources (cdr available) waiters resources-last-used) (loop resources available waiters resources-last-used))))) (('return resource) (if (null? waiters) (loop resources (cons resource available) waiters (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used)) (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation (last waiters) resource) (const #t)) (wrap-operation (sleep-operation 1) (const #f)))))) (unless checkout-success? (set! checkout-failure-count (+ 1 checkout-failure-count))) (if checkout-success? (loop resources available (drop-right! waiters 1) (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used)) (begin (for-each (lambda (waiter) (spawn-fiber (lambda () (perform-operation (choice-operation (put-operation waiter 'resource-pool-retry-checkout) (sleep-operation 10)))))) waiters) (loop resources (cons resource available) '() (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used))))))) (('stats reply) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (perform-operation (choice-operation (wrap-operation (put-operation reply stats) (const #t)) (wrap-operation (sleep-operation 1) (const #f))))))) (loop resources available waiters resources-last-used)) (('check-for-idle-resources) (let* ((resources-last-used-seconds (map (lambda (internal-time) (/ (- (get-internal-real-time) internal-time) internal-time-units-per-second)) resources-last-used)) (resources-to-destroy (filter-map (lambda (resource last-used-seconds) (if (and (member resource available) (> last-used-seconds idle-seconds)) resource #f)) resources resources-last-used-seconds))) (for-each (lambda (resource) (destructor/safe resource)) resources-to-destroy) (loop (lset-difference eq? resources resources-to-destroy) (lset-difference eq? available resources-to-destroy) waiters (filter-map (lambda (resource last-used) (if (memq resource resources-to-destroy) #f last-used)) resources resources-last-used)))) (('destroy reply) (if (= (length resources) (length available)) (begin (for-each (lambda (resource) (destructor/safe resource)) resources) (put-message reply 'destroy-success)) (begin (spawn-fiber (lambda () (perform-operation (choice-operation (put-operation reply 'resource-pool-destroy-failed) (sleep-operation 10))))) (loop resources available waiters resources-last-used)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop resources available waiters resources-last-used))))) #:unwind? #t))) (or scheduler (current-scheduler))) (make-resource-pool-record name channel))) (define (destroy-resource-pool pool) (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'destroy reply)) (let ((msg (get-message reply))) (unless (eq? msg 'destroy-success) (error msg))))) (define resource-pool-default-timeout (make-parameter #f)) (define resource-pool-retry-checkout-timeout (make-parameter 5)) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '(pool))) (define resource-pool-timeout-error-pool (exception-accessor &resource-pool-timeout (record-accessor &resource-pool-timeout 'pool))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define retry-timeout (resource-pool-retry-checkout-timeout)) (define timeout-or-default (if (eq? timeout 'default) (resource-pool-default-timeout) timeout)) (let ((resource (let ((reply (make-channel))) (let loop ((start-time (get-internal-real-time))) (let ((request-success? (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) `(checkout ,reply)) (const #t)) (wrap-operation (sleep-operation (or timeout-or-default retry-timeout)) (const #f)))))) (if request-success? (let ((time-remaining (- (or timeout-or-default retry-timeout) (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- (or timeout-or-default retry-timeout) (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop start-time) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f)) response)) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f))) (if (eq? timeout-or-default #f) (loop (get-internal-real-time)) #f))))))) (when (or (not resource) (eq? resource 'resource-pool-retry-checkout)) (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) (with-exception-handler (lambda (exception) (put-message (resource-pool-channel pool) `(return ,resource)) (raise-exception exception)) (lambda () (call-with-values (lambda () (with-throw-handler #t (lambda () (proc resource)) (lambda _ (backtrace)))) (lambda vals (put-message (resource-pool-channel pool) `(return ,resource)) (apply values vals)))) #:unwind? #t))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (let ((reply (make-channel)) (start-time (get-internal-real-time))) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) `(stats ,reply)) (const #t)) (wrap-operation (sleep-operation timeout) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (raise-exception (make-resource-pool-timeout-error pool))))))