;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots resource-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (resource-pool? make-resource-pool resource-pool-name resource-pool-channel resource-pool-configuration destroy-resource-pool &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? &resource-pool-too-many-waiters resource-pool-too-many-waiters-error-pool resource-pool-too-many-waiters-error-waiters-count resource-pool-too-many-waiters-error? &resource-pool-destroyed resource-pool-destroyed-error-pool resource-pool-destroyed-error? resource-pool-default-timeout-handler call-with-resource-from-pool with-resource-from-pool resource-pool-stats)) (define &resource-pool-abort-add-resource (make-exception-type '&recource-pool-abort-add-resource &error '())) (define make-resource-pool-abort-add-resource-error (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? (record-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) (channel resource-pool-channel) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) (set-record-type-printer! (lambda (resource-pool port) (display (simple-format #f "#" (resource-pool-name resource-pool)) port))) (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime scheduler (name "unnamed") (add-resources-parallelism 1) default-checkout-timeout default-max-waiters) (define channel (make-channel)) (define destroy-condition (make-condition)) (define pool (make-resource-pool-record name channel destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) (delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (destructor . ,destructor) (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) (define spawn-fiber-to-return-new-resource (if add-resources-parallelism (let ((thunk (fiberize (lambda () (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) (size (assq-ref (resource-pool-stats pool) 'resources))) (unless (= size max-size) (let ((new-resource (return-new-resource))) (put-message channel (list 'add-resource new-resource)))))) #:parallelism add-resources-parallelism))) (lambda () (spawn-fiber thunk))) (lambda () (spawn-fiber (lambda () (let ((new-resource (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception adding resource to pool ~A: ~A\n\n" name return-new-resource) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (start-stack #t (return-new-resource))))) #:unwind? #t))) (when new-resource (put-message channel (list 'add-resource new-resource))))))))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () (let loop () (let ((success? (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A\n" name destructor) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (start-stack #t (destructor resource)) #t))) #:unwind? #t))) (if success? (put-message channel (list 'remove resource)) (begin (sleep 5) (loop)))))))) (define (spawn-fiber-for-checkout reply-channel reply-timeout resource) (spawn-fiber (lambda () (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply-channel (cons 'success resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) (const #f)))))) (unless checkout-success? (put-message channel (list 'return-failed-checkout resource))))))) (define (destroy-loop resources) (let loop ((resources resources)) (match (get-message channel) (('add-resource resource) (when destructor (spawn-fiber-to-destroy-resource resource)) (loop resources)) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop resources)) (((and (or 'return 'return-failed-checkout 'remove) return-type) resource) (when destructor (spawn-fiber-to-destroy-resource resource)) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) (define (remove-at-index! lst i) (let ((start end (split-at! lst i))) (append start (cdr end)))) (let ((new-resources (if index (remove-at-index! resources index) (begin (simple-format (current-error-port) "resource pool error: unable to remove ~A\n" resource) resources)))) (if (null? new-resources) (begin (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop new-resources))))) (('stats reply) (let ((stats `((resources . ,(length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (perform-operation (choice-operation (wrap-operation (put-operation reply stats) (const #t)) (wrap-operation (sleep-operation 5) (const #f))))))) (loop resources)) (('check-for-idle-resources) (loop resources)) (('destroy reply) (loop resources)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop resources))))) (define (main-loop) (let loop ((resources '()) (available '()) (waiters '()) (resources-last-used '())) (match (get-message channel) (('add-resource resource) (if (= (length resources) max-size) (begin (if destructor (begin (spawn-fiber-to-destroy-resource resource) (loop (cons resource resources) available waiters (cons (get-internal-real-time) resources-last-used))) (loop resources available waiters (cons (get-internal-real-time) resources-last-used)))) (if (null? waiters) (loop (cons resource resources) (cons resource available) waiters (cons (get-internal-real-time) resources-last-used)) (let* ((current-internal-time (get-internal-real-time)) (alive-waiters dead-waiters (partition! (match-lambda ((reply . timeout) (or (not timeout) (> timeout current-internal-time)))) waiters))) (if (null? alive-waiters) (loop (cons resource resources) (cons resource available) '() (cons (get-internal-real-time) resources-last-used)) (match (last alive-waiters) ((waiter-channel . waiter-timeout) (if waiter-timeout (let ((reply-timeout (/ (- waiter-timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn ;; a new fiber to handle handing over ;; the resource, and returning it if ;; there's a timeout (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) (put-message waiter-channel (cons 'success resource))) (loop (cons resource resources) available (drop-right! alive-waiters 1) (cons (get-internal-real-time) resources-last-used))))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (length resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count (length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) (begin (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'too-many-waiters waiters-count)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop resources available waiters resources-last-used)) (loop resources available (cons (cons reply timeout-time) waiters) resources-last-used)))) (if timeout-time (let ((current-internal-time (get-internal-real-time))) ;; If this client is still waiting (if (> timeout-time current-internal-time) (let ((reply-timeout (/ (- timeout-time current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout (spawn-fiber-for-checkout reply reply-timeout (car available)) (loop resources (cdr available) waiters resources-last-used)) (loop resources available waiters resources-last-used))) (begin (put-message reply (cons 'success (car available))) (loop resources (cdr available) waiters resources-last-used))))) (((and (or 'return 'return-failed-checkout) return-type) resource) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (if (null? waiters) (loop resources (cons resource available) waiters (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used)) (let* ((current-internal-time (get-internal-real-time)) (alive-waiters dead-waiters (partition! (match-lambda ((reply . timeout) (or (not timeout) (> timeout current-internal-time)))) waiters))) (if (null? alive-waiters) (loop resources (cons resource available) '() (begin (when (eq? return-type 'return) (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time))) resources-last-used)) (match (last alive-waiters) ((waiter-channel . waiter-timeout) (if waiter-timeout (let ((reply-timeout (/ (- waiter-timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's a ;; timeout (spawn-fiber-for-checkout waiter-channel reply-timeout resource)) (put-message waiter-channel (cons 'success resource))) (loop resources available (drop-right! alive-waiters 1) (begin (list-set! resources-last-used (list-index (lambda (x) (eq? x resource)) resources) (get-internal-real-time)) resources-last-used)))))))) (('remove resource) (let ((index (list-index (lambda (x) (eq? x resource)) resources))) (define (remove-at-index! lst i) (let ((start end (split-at! lst i))) (append start (cdr end)))) (loop (if index (remove-at-index! resources index) (begin (simple-format (current-error-port) "resource pool error: unable to remove ~A\n" resource) resources)) available ; resource shouldn't be in this list waiters (remove-at-index! resources-last-used index)))) (('stats reply) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) (waiters . ,(length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (perform-operation (choice-operation (wrap-operation (put-operation reply stats) (const #t)) (wrap-operation (sleep-operation 5) (const #f))))))) (loop resources available waiters resources-last-used)) (('check-for-idle-resources) (let* ((resources-last-used-seconds (map (lambda (internal-time) (/ (- (get-internal-real-time) internal-time) internal-time-units-per-second)) resources-last-used)) (candidate-resources-to-destroy (filter-map (lambda (resource last-used-seconds) (if (and (member resource available) (> last-used-seconds idle-seconds)) resource #f)) resources resources-last-used-seconds))) (let* ((available-resources-to-destroy (lset-intersection eq? available candidate-resources-to-destroy)) (max-resources-to-destroy (max 0 (- (length resources) min-size))) (resources-to-destroy (take available-resources-to-destroy (min max-resources-to-destroy (length available-resources-to-destroy))))) (when destructor (for-each (lambda (resource) (spawn-fiber-to-destroy-resource resource)) resources-to-destroy)) (loop resources (lset-difference eq? available resources-to-destroy) waiters resources-last-used)))) (('destroy) (if (and (null? resources) (null? waiters)) (signal-condition! destroy-condition) (begin (for-each (lambda (resource) (if destructor (spawn-fiber-to-destroy-resource resource) (spawn-fiber (lambda () (put-message channel (list 'remove resource))) #:parallel? #t))) available) (let ((current-internal-time (get-internal-real-time))) (for-each (match-lambda ((reply . timeout) (when (or (not timeout) (> timeout current-internal-time)) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout (choice-operation op (wrap-operation (sleep-operation (/ (- timeout (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op)))))))) waiters)) (destroy-loop resources)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop resources available waiters resources-last-used))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () (while #t (sleep idle-seconds) (put-message channel '(check-for-idle-resources)))))) (with-exception-handler (lambda (exn) #f) (lambda () (with-exception-handler (lambda (exn) (let* ((stack (make-stack #t)) (error-string (call-with-output-string (lambda (port) (display-backtrace stack port 3) (simple-format port "exception in the ~A pool fiber, " name) (print-exception port (stack-ref stack 3) '%exception (list exn)))))) (display error-string (current-error-port))) (raise-exception exn)) (lambda () (start-stack #t (main-loop))))) #:unwind? #t)) (or scheduler (current-scheduler))) pool) (define (destroy-resource-pool pool) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ (wait (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '(pool))) (define resource-pool-timeout-error-pool (exception-accessor &resource-pool-timeout (record-accessor &resource-pool-timeout 'pool))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (record-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters &error '(pool waiters-count))) (define resource-pool-too-many-waiters-error-pool (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'pool))) (define resource-pool-too-many-waiters-error-waiters-count (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'waiters-count))) (define make-resource-pool-too-many-waiters-error (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? (record-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed &error '(pool))) (define resource-pool-destroyed-error-pool (exception-accessor &resource-pool-destroyed (record-accessor &resource-pool-destroyed 'pool))) (define make-resource-pool-destroyed-error (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? (record-predicate &resource-pool-destroyed)) (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler)) (max-waiters 'default) (channel (resource-pool-channel pool))) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define timeout-or-default (if (eq? timeout 'default) (assq-ref (resource-pool-configuration pool) 'default-checkout-timeout) timeout)) (define max-waiters-or-default (if (eq? max-waiters 'default) (assq-ref (resource-pool-configuration pool) 'default-max-waiters) max-waiters)) (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) (let ((request-success? (perform-operation (choice-operation (wrap-operation (put-operation channel (list 'checkout reply (+ start-time (* timeout-or-default internal-time-units-per-second)) max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) (if request-success? (let ((time-remaining (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop (make-channel) start-time) 'timeout) response)) 'timeout))))) (let loop ((reply (make-channel))) (put-message channel (list 'checkout reply #f max-waiters-or-default)) (get-message reply))))) (match reply ('timeout (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) (('too-many-waiters . count) (raise-exception (make-resource-pool-too-many-waiters-error pool count))) (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) (('success . resource) (call-with-values (lambda () (with-exception-handler (lambda (exn) ;; Unwind the stack before calling put-message, as ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message (resource-pool-channel pool) `(return ,resource)) (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (let ((stack (make-stack #t 0 prompt-tag 0 (and prompt-tag 1)))) (raise-exception (make-exception exn (make-knots-exception stack))))))) (lambda () (proc resource)))) #:unwind? #t)) (lambda vals (put-message (resource-pool-channel pool) `(return ,resource)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (let ((reply (make-channel)) (start-time (get-internal-real-time))) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) `(stats ,reply)) (const #t)) (wrap-operation (sleep-operation timeout) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (raise-exception (make-resource-pool-timeout-error pool))))))