;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots resource-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (make-fixed-size-resource-pool make-resource-pool resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration destroy-resource-pool &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? &resource-pool-too-many-waiters resource-pool-too-many-waiters-error-pool resource-pool-too-many-waiters-error-waiters-count resource-pool-too-many-waiters-error? &resource-pool-destroyed resource-pool-destroyed-error-pool resource-pool-destroyed-error? &resource-pool-destroy-resource make-resource-pool-destroy-resource-exception resource-pool-destroy-resource-exception? resource-pool-default-timeout-handler call-with-resource-from-pool with-resource-from-pool resource-pool-stats)) (define &resource-pool-abort-add-resource (make-exception-type '&recource-pool-abort-add-resource &error '())) (define make-resource-pool-abort-add-resource-error (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? (exception-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) (channel resource-pool-channel set-resource-pool-channel!) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) (set-record-type-printer! (lambda (resource-pool port) (display (simple-format #f "#" (resource-pool-name resource-pool)) port))) (define (safe-deq q) (if (null? (car q)) #f (let ((it (caar q)) (next (cdar q))) (if (null? next) (set-cdr! q #f)) (set-car! q next) it))) (define-record-type (make-resource-details value checkout-count last-used) resource-details? (value resource-details-value) (checkout-count resource-details-checkout-count set-resource-details-checkout-count!) (last-used resource-details-last-used set-resource-details-last-used!)) (define-inlinable (increment-resource-checkout-count! resource) (set-resource-details-checkout-count! resource (1+ (resource-details-checkout-count resource)))) (define-inlinable (decrement-resource-checkout-count! resource) (set-resource-details-checkout-count! resource (1+ (resource-details-checkout-count resource)))) (define (spawn-fiber-for-checkout channel reply-channel reply-timeout resource-id resource) (spawn-fiber (lambda () (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply-channel (list 'success resource-id resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) (const #f)))))) (unless checkout-success? (put-message channel (list 'return-failed-checkout resource-id))))))) (define* (make-fixed-size-resource-pool resources-list-or-vector #:key (delay-logger (const #f)) (duration-logger (const #f)) scheduler (name "unnamed") default-checkout-timeout default-max-waiters) (define channel (make-channel)) (define destroy-condition (make-condition)) (define pool (make-resource-pool-record name channel destroy-condition `((delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) (define resources (vector-map (lambda (_ resource) (make-resource-details resource 0 #f)) (if (vector? resources-list-or-vector) resources-list-or-vector (list->vector resources-list-or-vector)))) (define (destroy-loop) (define (empty?) (vector-every (lambda (r) (eq? r #f)) resources)) (let loop () (match (get-message channel) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop)) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (vector-set! resources resource-id #f) (if (empty?) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop))) (('stats reply timeout-time) (let ((stats `((resources . ,(vector-length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop)) (('destroy) (loop)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop))))) (define (main-loop) (let loop ((available (iota (vector-length resources))) (waiters (make-q))) (match (get-message channel) (('checkout reply timeout-time max-waiters) (if (null? available) (let ((waiters-count (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) (begin (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'too-many-waiters waiters-count)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop available waiters)) (loop available (enq! waiters (cons reply timeout-time))))) (if timeout-time (let ((current-internal-time (get-internal-real-time))) ;; If this client is still waiting (if (> timeout-time current-internal-time) (let ((reply-timeout (/ (- timeout-time current-internal-time) internal-time-units-per-second)) (resource-id new-available (car+cdr available))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value (vector-ref resources resource-id))) (loop new-available waiters)) (loop available waiters))) (let* ((resource-id next-available (car+cdr available)) (resource-details (vector-ref resources resource-id))) (put-message reply (list 'success resource-id (resource-details-value resource-details))) (loop next-available waiters))))) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (let ((current-internal-time (get-internal-real-time))) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (loop (cons resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value (vector-ref resources resource-id)))) (put-message reply (list 'success resource-id (resource-details-value (vector-ref resources resource-id)))))) (loop available waiters)))))) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (list-copy resources)))) (loop available waiters)) (('stats reply timeout-time) (let ((stats `((resources . ,(vector-length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop available waiters)) (('destroy) (let ((current-internal-time (get-internal-real-time))) ;; Notify all waiters that the pool has been destroyed (for-each (match-lambda ((reply . timeout) (when (or (not timeout) (> timeout current-internal-time)) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout (choice-operation op (wrap-operation (sleep-operation (/ (- timeout (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op)))))))) (car waiters)) (if (= (vector-length resources) (length available)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (destroy-loop)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop available waiters))))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) #f) (lambda () (with-exception-handler (lambda (exn) (let* ((stack (make-stack #t)) (error-string (call-with-output-string (lambda (port) (display-backtrace stack port 3) (simple-format port "exception in the ~A pool fiber, " name) (print-exception port (stack-ref stack 3) '%exception (list exn)))))) (display error-string (current-error-port))) (raise-exception exn)) (lambda () (start-stack #t (main-loop))))) #:unwind? #t)) (or scheduler (current-scheduler))) pool) (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime scheduler (name "unnamed") (add-resources-parallelism 1) default-checkout-timeout default-max-waiters) (define channel (make-channel)) (define destroy-condition (make-condition)) (define pool (make-resource-pool-record name channel destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) (delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (destructor . ,destructor) (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) (define resources (make-hash-table)) (define-inlinable (count-resources resources) (hash-count (const #t) resources)) (define return-new-resource/parallelism-limiter (make-parallelism-limiter (or add-resources-parallelism max-size) #:name (string-append name " resource pool new resource parallelism limiter"))) (define (spawn-fiber-to-return-new-resource) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) ;; This can happen if the resource pool is destroyed very ;; quickly (if (resource-pool-destroyed-error? exn) #f (raise-exception exn))) (lambda () (with-parallelism-limiter return-new-resource/parallelism-limiter (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) (size (count-resources resources))) (unless (>= size max-size) (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception adding resource to pool ~A: ~A\n\n" name return-new-resource) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (let ((new-resource (start-stack #t (return-new-resource)))) (put-message channel (list 'add-resource new-resource)))))) #:unwind? #t))))) #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource-id resource-value) (spawn-fiber (lambda () (let loop () (let* ((success? (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A\n" name destructor) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (start-stack #t (destructor resource-value)) #t))) #:unwind? #t))) (if success? (put-message channel (list 'remove resource-id)) (begin (sleep 5) (loop)))))))) (define (destroy-loop resources next-resource-id) (let loop ((next-resource-id next-resource-id)) (match (get-message channel) (('add-resource resource) (if destructor (begin (spawn-fiber-to-destroy-resource next-resource-id resource) (hash-set! resources next-resource-id resource) (loop (1+ next-resource-id))) (loop next-resource-id))) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop next-resource-id)) (((and (or 'return 'return-failed-checkout 'remove) return-type) resource-id) (when (and (not (eq? return-type 'remove)) destructor) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) (hash-remove! resources resource-id) (if (= 0 (count-resources resources)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop next-resource-id))) (('stats reply timeout-time) (let ((stats `((resources . ,(count-resources resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop next-resource-id)) (('check-for-idle-resources) (loop next-resource-id)) (('destroy) (loop next-resource-id)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop next-resource-id))))) (define (main-loop) (let loop ((next-resource-id 0) (available '()) (waiters (make-q))) (match (get-message channel) (('add-resource resource) (if (= (count-resources resources) max-size) (if destructor (begin (hash-set! resources next-resource-id (make-resource-details resource 0 (get-internal-real-time))) (spawn-fiber-to-destroy-resource next-resource-id resource) (loop (1+ next-resource-id) available waiters)) (loop next-resource-id available waiters)) (let* ((current-internal-time (get-internal-real-time)) (resource-details (make-resource-details resource 0 current-internal-time))) (hash-set! resources next-resource-id resource-details) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (loop (1+ next-resource-id) (cons next-resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout next-resource-id resource)) (put-message reply (list 'success next-resource-id resource)))) (set-resource-details-checkout-count! resource-details 1) (loop (1+ next-resource-id) available waiters))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (count-resources resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) (begin (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'too-many-waiters waiters-count)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop next-resource-id available waiters)) (loop next-resource-id available (enq! waiters (cons reply timeout-time)))))) (if timeout-time (let ((current-internal-time (get-internal-real-time))) ;; If this client is still waiting (if (> timeout-time current-internal-time) (let* ((reply-timeout (/ (- timeout-time current-internal-time) internal-time-units-per-second)) (resource-id (car available)) (resource-details (hash-ref resources resource-id))) (increment-resource-checkout-count! resource-details) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value resource-details)) (loop next-resource-id (cdr available) waiters)) (loop next-resource-id available waiters))) (let* ((resource-id next-available (car+cdr available)) (resource-details (hash-ref resources resource-id))) (increment-resource-checkout-count! resource-details) (put-message reply (list 'success resource-id (resource-details-value resource-details))) (loop next-resource-id next-available waiters))))) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (let ((current-internal-time (get-internal-real-time)) (resource-details (hash-ref resources resource-id))) (if (and lifetime (>= (resource-details-checkout-count resource-details) lifetime)) (begin (spawn-fiber-to-destroy-resource resource-id (resource-details-value resource-details)) (loop next-resource-id available waiters)) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (if (eq? 'return-failed-checkout return-type) (decrement-resource-checkout-count! resource-details) (set-resource-details-last-used! resource-details current-internal-time)) (loop next-resource-id (cons resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value resource-details))) (put-message reply (list 'success resource-id (resource-details-value resource-details))))) (set-resource-details-last-used! resource-details current-internal-time) (when (eq? 'return-failed-checkout return-type) (decrement-resource-checkout-count! resource-details)) (loop next-resource-id available waiters))))))) (('remove resource-id) (hash-remove! resources resource-id) (when (and (not (q-empty? waiters)) (< (- (count-resources resources) 1) max-size)) (spawn-fiber-to-return-new-resource)) (loop next-resource-id available ; resource shouldn't be in this list waiters)) (('destroy resource-id) (let ((resource-details (hash-ref resources resource-id))) (spawn-fiber-to-destroy-resource resource-id (resource-details-value resource-details)) (loop next-resource-id available waiters))) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (list-copy resources)))) (loop next-resource-id available waiters)) (('stats reply timeout-time) (let ((stats `((resources . ,(count-resources resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (resources-checkout-count . ,(hash-fold (lambda (_ resource-details result) (cons (resource-details-checkout-count resource-details) result)) '() resources)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop next-resource-id available waiters)) (('check-for-idle-resources) (let* ((internal-real-time (get-internal-real-time)) (candidate-resource-ids-to-destroy (filter-map (lambda (resource-id) (let ((resource-details (hash-ref resources resource-id))) (if (> (/ (- internal-real-time (resource-details-last-used resource-details)) internal-time-units-per-second) idle-seconds) resource-id #f))) available)) (max-resources-to-destroy (max 0 (- (count-resources resources) min-size))) (resources-to-destroy (take candidate-resource-ids-to-destroy (min max-resources-to-destroy (length candidate-resource-ids-to-destroy))))) (when destructor (for-each (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) resources-to-destroy)) (loop next-resource-id (lset-difference = available resources-to-destroy) waiters))) (('destroy) (let ((current-internal-time (get-internal-real-time))) (for-each (match-lambda ((reply . timeout) (when (or (not timeout) (> timeout current-internal-time)) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout (choice-operation op (wrap-operation (sleep-operation (/ (- timeout (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op)))))))) (car waiters)) (when destructor (for-each (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) available)) ;; Do this in parallel to avoid deadlocks between the ;; limiter and returning new resources to this pool (and=> return-new-resource/parallelism-limiter (lambda (limiter) (spawn-fiber (lambda () (destroy-parallelism-limiter limiter))))) (if (or (= 0 (count-resources resources)) (not destructor)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (destroy-loop resources next-resource-id)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop next-resource-id available waiters))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () (let loop () (put-message channel '(check-for-idle-resources)) (when (choice-operation (wrap-operation (sleep-operation idle-seconds) (const #t)) (wrap-operation (wait-operation destroy-condition) (const #f))) (loop)))))) (with-exception-handler (lambda (exn) #f) (lambda () (with-exception-handler (lambda (exn) (let* ((stack (make-stack #t)) (error-string (call-with-output-string (lambda (port) (display-backtrace stack port 3) (simple-format port "exception in the ~A pool fiber, " name) (print-exception port (stack-ref stack 3) '%exception (list exn)))))) (display error-string (current-error-port))) (raise-exception exn)) (lambda () (start-stack #t (main-loop))))) #:unwind? #t)) (or scheduler (current-scheduler))) pool) (define (destroy-resource-pool pool) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ (wait (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '(pool))) (define resource-pool-timeout-error-pool (exception-accessor &resource-pool-timeout (record-accessor &resource-pool-timeout 'pool))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (exception-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters &error '(pool waiters-count))) (define resource-pool-too-many-waiters-error-pool (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'pool))) (define resource-pool-too-many-waiters-error-waiters-count (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'waiters-count))) (define make-resource-pool-too-many-waiters-error (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? (exception-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed &error '(pool))) (define resource-pool-destroyed-error-pool (exception-accessor &resource-pool-destroyed (record-accessor &resource-pool-destroyed 'pool))) (define make-resource-pool-destroyed-error (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? (exception-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource &exception '())) (define make-resource-pool-destroy-resource-exception (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? (exception-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler)) (max-waiters 'default) (channel (resource-pool-channel pool)) (destroy-resource-on-exception? #f)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define timeout-or-default (if (eq? timeout 'default) (assq-ref (resource-pool-configuration pool) 'default-checkout-timeout) timeout)) (define max-waiters-or-default (if (eq? max-waiters 'default) (assq-ref (resource-pool-configuration pool) 'default-max-waiters) max-waiters)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) (let ((request-success? (perform-operation (choice-operation (wrap-operation (put-operation channel (list 'checkout reply (+ start-time (* timeout-or-default internal-time-units-per-second)) max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) (if request-success? (let ((time-remaining (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop (make-channel) start-time) 'timeout) response)) 'timeout)) 'timeout))) (let ((reply (make-channel))) (put-message channel (list 'checkout reply #f max-waiters-or-default)) (get-message reply))))) (match reply ('timeout (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) (('too-many-waiters . count) (raise-exception (make-resource-pool-too-many-waiters-error pool count))) (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) (('success resource-id resource-value) (call-with-values (lambda () (with-exception-handler (lambda (exn) ;; Unwind the stack before calling put-message, as ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message channel (list (if (or destroy-resource-on-exception? (resource-pool-destroy-resource-exception? exn)) 'destroy 'return) resource-id)) (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (proc resource-value)))) #:unwind? #t)) (lambda vals (put-message channel `(return ,resource-id)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (define channel (resource-pool-channel pool)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (if timeout (let* ((reply (make-channel)) (start-time (get-internal-real-time)) (timeout-time (+ start-time (* internal-time-units-per-second timeout)))) (perform-operation (choice-operation (wrap-operation (put-operation channel `(stats ,reply ,timeout-time)) (const #t)) (wrap-operation (sleep-operation timeout) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (raise-exception (make-resource-pool-timeout-error pool))))) (let ((reply (make-channel))) (put-message channel `(stats ,reply #f)) (get-message reply)))) (define (resource-pool-list-resources pool) (define channel (resource-pool-channel pool)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'list-resources reply)) (get-message reply)))