;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots resource-pool) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-43) #:use-module (srfi srfi-71) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers timers) #:use-module (fibers channels) #:use-module (fibers scheduler) #:use-module (fibers operations) #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) #:export (make-fixed-size-resource-pool make-resource-pool resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration destroy-resource-pool &resource-pool-timeout resource-pool-timeout-error-pool resource-pool-timeout-error? &resource-pool-too-many-waiters resource-pool-too-many-waiters-error-pool resource-pool-too-many-waiters-error-waiters-count resource-pool-too-many-waiters-error? &resource-pool-destroyed resource-pool-destroyed-error-pool resource-pool-destroyed-error? &resource-pool-destroy-resource make-resource-pool-destroy-resource-exception resource-pool-destroy-resource-exception? resource-pool-default-timeout-handler call-with-resource-from-pool with-resource-from-pool resource-pool-stats)) (define &resource-pool-abort-add-resource (make-exception-type '&recource-pool-abort-add-resource &error '())) (define make-resource-pool-abort-add-resource-error (record-constructor &resource-pool-abort-add-resource)) (define resource-pool-abort-add-resource-error? (exception-predicate &resource-pool-abort-add-resource)) (define-record-type (make-resource-pool-record name channel destroy-condition configuration) resource-pool? (name resource-pool-name) (channel resource-pool-channel set-resource-pool-channel!) (destroy-condition resource-pool-destroy-condition) (configuration resource-pool-configuration)) (set-record-type-printer! (lambda (resource-pool port) (display/knots (simple-format #f "#" (resource-pool-name resource-pool)) port))) (define (safe-deq q) (if (null? (car q)) #f (let ((it (caar q)) (next (cdar q))) (if (null? next) (set-cdr! q #f)) (set-car! q next) it))) (define-record-type (make-resource-details value checkout-count last-used) resource-details? (value resource-details-value) (checkout-count resource-details-checkout-count set-resource-details-checkout-count!) (last-used resource-details-last-used set-resource-details-last-used!)) (define-inlinable (increment-resource-checkout-count! resource) (set-resource-details-checkout-count! resource (1+ (resource-details-checkout-count resource)))) (define-inlinable (decrement-resource-checkout-count! resource) (set-resource-details-checkout-count! resource (1+ (resource-details-checkout-count resource)))) (define (spawn-fiber-for-checkout channel reply-channel reply-timeout resource-id resource) (spawn-fiber (lambda () (let ((checkout-success? (perform-operation (choice-operation (wrap-operation (put-operation reply-channel (list 'success resource-id resource)) (const #t)) (wrap-operation (sleep-operation reply-timeout) (const #f)))))) (unless checkout-success? (put-message channel (list 'return-failed-checkout resource-id))))))) (define* (make-fixed-size-resource-pool resources-list-or-vector #:key (delay-logger (const #f)) (duration-logger (const #f)) scheduler (name "unnamed") default-checkout-timeout default-max-waiters) "Create a resource pool from RESOURCES-LIST-OR-VECTOR, a list or vector of pre-existing resource values. Use @code{with-resource-from-pool} or @code{call-with-resource-from-pool} to borrow a resource and return it automatically when done. Optional keyword arguments: @table @code @item #:name A optional string used in log messages. Defaults to @code{\"unnamed\"}. @item #:default-checkout-timeout Default checkout timeout when requesting a resource from the pool, unset by default. @item #:default-max-waiters Maximum number of fibers that may queue waiting for a resource. When this limit is exceeded, @code{&resource-pool-too-many-waiters} is raised when a resource is requested. Defaults to @code{#f} (no limit). @item #:scheduler The Fibers scheduler to use for the pool's internal fiber. Defaults to the current scheduler. @end table" (define channel (make-channel)) (define destroy-condition (make-condition)) (define pool (make-resource-pool-record name channel destroy-condition `((delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) (define resources (vector-map (lambda (_ resource) (make-resource-details resource 0 #f)) (if (vector? resources-list-or-vector) resources-list-or-vector (list->vector resources-list-or-vector)))) (define (destroy-loop) (define (empty?) (vector-every (lambda (r) (eq? r #f)) resources)) (let loop () (match (get-message channel) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop)) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (vector-set! resources resource-id #f) (if (empty?) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop))) (('stats reply timeout-time) (let ((stats `((resources . ,(vector-length resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop)) (('destroy) (loop)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop))))) (define (main-loop) (let loop ((available (iota (vector-length resources))) (waiters (make-q))) (match (get-message channel) (('checkout reply timeout-time max-waiters) (if (null? available) (let ((waiters-count (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) (begin (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'too-many-waiters waiters-count)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop available waiters)) (loop available (enq! waiters (cons reply timeout-time))))) (if timeout-time (let ((current-internal-time (get-internal-real-time))) ;; If this client is still waiting (if (> timeout-time current-internal-time) (let ((reply-timeout (/ (- timeout-time current-internal-time) internal-time-units-per-second)) (resource-id new-available (car+cdr available))) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value (vector-ref resources resource-id))) (loop new-available waiters)) (loop available waiters))) (let* ((resource-id next-available (car+cdr available)) (resource-details (vector-ref resources resource-id))) (put-message reply (list 'success resource-id (resource-details-value resource-details))) (loop next-available waiters))))) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (let ((current-internal-time (get-internal-real-time))) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (loop (cons resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value (vector-ref resources resource-id)))) (put-message reply (list 'success resource-id (resource-details-value (vector-ref resources resource-id)))))) (loop available waiters)))))) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (vector->list resources)))) (loop available waiters)) (('stats reply timeout-time) (let ((stats `((resources . ,(vector-length resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop available waiters)) (('destroy) (let ((current-internal-time (get-internal-real-time))) ;; Notify all waiters that the pool has been destroyed (for-each (match-lambda ((reply . timeout) (when (or (not timeout) (> timeout current-internal-time)) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout (choice-operation op (wrap-operation (sleep-operation (/ (- timeout (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op)))))))) (car waiters)) (if (= (vector-length resources) (length available)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (destroy-loop)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop available waiters))))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) #f) (lambda () (with-exception-handler (lambda (exn) (let* ((stack (make-stack #t)) (error-string (call-with-output-string (lambda (port) (display-backtrace stack port 3) (simple-format port "exception in the ~A pool fiber, " name) (print-exception port (stack-ref stack 3) '%exception (list exn)))))) (display/knots error-string (current-error-port))) (raise-exception exn)) (lambda () (start-stack #t (main-loop))))) #:unwind? #t)) (or scheduler (current-scheduler))) pool) (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) (delay-logger (const #f)) (duration-logger (const #f)) destructor lifetime scheduler (name "unnamed") (add-resources-parallelism 1) default-checkout-timeout default-max-waiters) "Create a dynamic resource pool. RETURN-NEW-RESOURCE is a thunk called to create each new resource value. MAX-SIZE is the maximum number of resources the pool will hold simultaneously. Resources are created on demand when a checkout is requested and the pool is not yet at MAX-SIZE. Use @code{with-resource-from-pool} or @code{call-with-resource-from-pool} to request a resource and return it automatically when done. Optional keyword arguments: @table @code @item #:min-size Minimum number of resources to keep alive even when idle. Defaults to @code{0}. @item #:idle-seconds Seconds a resource may remain unused before being destroyed, provided the pool is above @code{#:min-size}. Defaults to @code{#f} (never expire idle resources). @item #:lifetime Maximum number of checkouts a single resource will serve before being destroyed and replaced by a fresh one. Defaults to @code{#f} (no limit). @item #:destructor A procedure called as @code{(destructor resource)} when a resource is removed from the pool. Defaults to @code{#f}. @item #:add-resources-parallelism Maximum number of concurrent calls to RETURN-NEW-RESOURCE when the pool needs to grow. Allowing resources to be created in parallel can result in more resources being created than can fit inside the pool, if this happens, the surplus resources are destroyed. Defaults to @code{1}. @item #:name A string used in log messages. Defaults to @code{\"unnamed\"}. @item #:default-checkout-timeout Default checkout timeout when requesting a resource from the pool, unset by default. @item #:default-max-waiters Maximum number of fibers that may queue waiting for a resource. When this limit is exceeded, @code{&resource-pool-too-many-waiters} is raised when a resource is requested. Defaults to @code{#f} (no limit). @item #:scheduler The Fibers scheduler to use for the pool's internal fiber. Defaults to the current scheduler. @end table" (define channel (make-channel)) (define destroy-condition (make-condition)) (define pool (make-resource-pool-record name channel destroy-condition `((max-size . ,max-size) (min-size . ,min-size) (idle-seconds . ,idle-seconds) (delay-logger . ,delay-logger) (duration-logger . ,duration-logger) (destructor . ,destructor) (lifetime . ,lifetime) (scheduler . ,scheduler) (name . ,name) (default-checkout-timeout . ,default-checkout-timeout) (default-max-waiters . ,default-max-waiters)))) (define checkout-failure-count 0) (define resources (make-hash-table)) (define-inlinable (count-resources resources) (hash-count (const #t) resources)) (define return-new-resource/parallelism-limiter (make-parallelism-limiter (or add-resources-parallelism max-size) #:name (string-append name " resource pool new resource parallelism limiter"))) (define (spawn-fiber-to-return-new-resource) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) ;; This can happen if the resource pool is destroyed very ;; quickly (if (resource-pool-destroyed-error? exn) #f (raise-exception exn))) (lambda () (let loop () (let ((success? (with-parallelism-limiter return-new-resource/parallelism-limiter (let ((max-size (assq-ref (resource-pool-configuration pool) 'max-size)) (size (count-resources resources))) (or (>= size max-size) (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception adding resource to pool ~A: ~A\n\n" name return-new-resource) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (let ((new-resource (start-stack #t (return-new-resource)))) (put-message channel (list 'add-resource new-resource))) #t))) #:unwind? #t)))))) (unless success? ;; TODO Maybe this should be configurable? (sleep 1) ;; Important to retry here and eventually create ;; a new resource, as there might be waiters ;; stuck waiting for a resource, especially if ;; the pool is empty. (loop))))) #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource-id resource-value) (spawn-fiber (lambda () (let loop () (let* ((success? (with-exception-handler (lambda _ #f) (lambda () (with-exception-handler (lambda (exn) (simple-format (current-error-port) "exception running resource pool destructor (~A): ~A\n" name destructor) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (start-stack #t (destructor resource-value)) #t))) #:unwind? #t))) (if success? (put-message channel (list 'remove resource-id)) (begin (sleep 5) (loop)))))))) (define (destroy-loop resources next-resource-id) (let loop ((next-resource-id next-resource-id)) (match (get-message channel) (('add-resource resource) (if destructor (begin (spawn-fiber-to-destroy-resource next-resource-id resource) (hash-set! resources next-resource-id resource) (loop (1+ next-resource-id))) (loop next-resource-id))) (('checkout reply timeout-time max-waiters) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop next-resource-id)) (((and (or 'return 'return-failed-checkout 'remove) return-type) resource-id) (when (and (not (eq? return-type 'remove)) destructor) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) (hash-remove! resources resource-id) (if (= 0 (count-resources resources)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop next-resource-id))) (('stats reply timeout-time) (let ((stats `((resources . ,(count-resources resources)) (available . 0) (waiters . 0) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop next-resource-id)) (('check-for-idle-resources) (loop next-resource-id)) (('destroy) (loop next-resource-id)) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop next-resource-id))))) (define (main-loop) (let loop ((next-resource-id 0) (available '()) (waiters (make-q))) (match (get-message channel) (('add-resource resource) (if (= (count-resources resources) max-size) (if destructor (begin (hash-set! resources next-resource-id (make-resource-details resource 0 (get-internal-real-time))) (spawn-fiber-to-destroy-resource next-resource-id resource) (loop (1+ next-resource-id) available waiters)) (loop next-resource-id available waiters)) (let* ((current-internal-time (get-internal-real-time)) (resource-details (make-resource-details resource 0 current-internal-time))) (hash-set! resources next-resource-id resource-details) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (loop (1+ next-resource-id) (cons next-resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout next-resource-id resource)) (put-message reply (list 'success next-resource-id resource)))) (set-resource-details-checkout-count! resource-details 1) (loop (1+ next-resource-id) available waiters))))))) (('checkout reply timeout-time max-waiters) (if (null? available) (begin (unless (= (count-resources resources) max-size) (spawn-fiber-to-return-new-resource)) (let ((waiters-count (q-length waiters))) (if (and max-waiters (>= waiters-count max-waiters)) (begin (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'too-many-waiters waiters-count)))) (perform-operation (if timeout-time (choice-operation op (wrap-operation (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op))))) (loop next-resource-id available waiters)) (loop next-resource-id available (enq! waiters (cons reply timeout-time)))))) (if timeout-time (let ((current-internal-time (get-internal-real-time))) ;; If this client is still waiting (if (> timeout-time current-internal-time) (let* ((reply-timeout (/ (- timeout-time current-internal-time) internal-time-units-per-second)) (resource-id (car available)) (resource-details (hash-ref resources resource-id))) (increment-resource-checkout-count! resource-details) ;; Don't sleep in this fiber, so spawn a new ;; fiber to handle handing over the resource, ;; and returning it if there's a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value resource-details)) (loop next-resource-id (cdr available) waiters)) (loop next-resource-id available waiters))) (let* ((resource-id next-available (car+cdr available)) (resource-details (hash-ref resources resource-id))) (increment-resource-checkout-count! resource-details) (put-message reply (list 'success resource-id (resource-details-value resource-details))) (loop next-resource-id next-available waiters))))) (((and (or 'return 'return-failed-checkout) return-type) resource-id) (when (eq? 'return-failed-checkout return-type) (set! checkout-failure-count (+ 1 checkout-failure-count))) (let ((current-internal-time (get-internal-real-time)) (resource-details (hash-ref resources resource-id))) (if (and lifetime (>= (resource-details-checkout-count resource-details) lifetime)) (begin (spawn-fiber-to-destroy-resource resource-id (resource-details-value resource-details)) (loop next-resource-id available waiters)) (let waiter-loop ((waiter (safe-deq waiters))) (match waiter (#f (if (eq? 'return-failed-checkout return-type) (decrement-resource-checkout-count! resource-details) (set-resource-details-last-used! resource-details current-internal-time)) (loop next-resource-id (cons resource-id available) waiters)) ((reply . timeout) (if (and timeout (< timeout current-internal-time)) (waiter-loop (safe-deq waiters)) (if timeout (let ((reply-timeout (/ (- timeout current-internal-time) internal-time-units-per-second))) ;; Don't sleep in this fiber, so spawn a ;; new fiber to handle handing over the ;; resource, and returning it if there's ;; a timeout (spawn-fiber-for-checkout channel reply reply-timeout resource-id (resource-details-value resource-details))) (put-message reply (list 'success resource-id (resource-details-value resource-details))))) (set-resource-details-last-used! resource-details current-internal-time) (when (eq? 'return-failed-checkout return-type) (decrement-resource-checkout-count! resource-details)) (loop next-resource-id available waiters))))))) (('remove resource-id) (hash-remove! resources resource-id) (when (and (not (q-empty? waiters)) (< (- (count-resources resources) 1) max-size)) (spawn-fiber-to-return-new-resource)) (loop next-resource-id available ; resource shouldn't be in this list waiters)) (('destroy resource-id) (let ((resource-details (hash-ref resources resource-id))) (spawn-fiber-to-destroy-resource resource-id (resource-details-value resource-details)) (loop next-resource-id available waiters))) (('list-resources reply) (spawn-fiber (lambda () (put-message reply (hash-map->list (lambda (_ value) value) resources)))) (loop next-resource-id available waiters)) (('stats reply timeout-time) (let ((stats `((resources . ,(count-resources resources)) (available . ,(length available)) (waiters . ,(q-length waiters)) (resources-checkout-count . ,(hash-fold (lambda (_ resource-details result) (cons (resource-details-checkout-count resource-details) result)) '() resources)) (checkout-failure-count . ,checkout-failure-count)))) (spawn-fiber (lambda () (let ((op (put-operation reply stats))) (perform-operation (if timeout-time (choice-operation op (sleep-operation (/ (- timeout-time (get-internal-real-time)) internal-time-units-per-second))) op)))))) (loop next-resource-id available waiters)) (('check-for-idle-resources) (let* ((internal-real-time (get-internal-real-time)) (candidate-resource-ids-to-destroy (filter-map (lambda (resource-id) (let ((resource-details (hash-ref resources resource-id))) (if (> (/ (- internal-real-time (resource-details-last-used resource-details)) internal-time-units-per-second) idle-seconds) resource-id #f))) available)) (max-resources-to-destroy (max 0 (- (count-resources resources) min-size))) (resources-to-destroy (take candidate-resource-ids-to-destroy (min max-resources-to-destroy (length candidate-resource-ids-to-destroy))))) (when destructor (for-each (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) resources-to-destroy)) (loop next-resource-id (lset-difference = available resources-to-destroy) waiters))) (('destroy) (let ((current-internal-time (get-internal-real-time))) (for-each (match-lambda ((reply . timeout) (when (or (not timeout) (> timeout current-internal-time)) (spawn-fiber (lambda () (let ((op (put-operation reply (cons 'resource-pool-destroyed #f)))) (perform-operation (if timeout (choice-operation op (wrap-operation (sleep-operation (/ (- timeout (get-internal-real-time)) internal-time-units-per-second)) (const #f))) op)))))))) (car waiters)) (when destructor (for-each (lambda (resource-id) (spawn-fiber-to-destroy-resource resource-id (resource-details-value (hash-ref resources resource-id)))) available)) ;; Do this in parallel to avoid deadlocks between the ;; limiter and returning new resources to this pool (and=> return-new-resource/parallelism-limiter (lambda (limiter) (spawn-fiber (lambda () (destroy-parallelism-limiter limiter))))) (if (or (= 0 (count-resources resources)) (not destructor)) (begin (set-resource-pool-channel! pool #f) (signal-condition! destroy-condition) ;; No loop *unspecified*) (destroy-loop resources next-resource-id)))) (unknown (simple-format (current-error-port) "unrecognised message to ~A resource pool channel: ~A\n" name unknown) (loop next-resource-id available waiters))))) (spawn-fiber (lambda () (when idle-seconds (spawn-fiber (lambda () (let loop () (put-message channel '(check-for-idle-resources)) (when (perform-operation (choice-operation (wrap-operation (sleep-operation idle-seconds) (const #t)) (wrap-operation (wait-operation destroy-condition) (const #f)))) (loop)))))) (with-exception-handler (lambda (exn) #f) (lambda () (with-exception-handler (lambda (exn) (let* ((stack (make-stack #t)) (error-string (call-with-output-string (lambda (port) (display-backtrace stack port 3) (simple-format port "exception in the ~A pool fiber, " name) (print-exception port (stack-ref stack 3) '%exception (list exn)))))) (display/knots error-string (current-error-port))) (raise-exception exn)) (lambda () (start-stack #t (main-loop))))) #:unwind? #t)) (or scheduler (current-scheduler))) pool) (define (destroy-resource-pool pool) (perform-operation (choice-operation (wrap-operation (put-operation (resource-pool-channel pool) (list 'destroy)) (lambda _ (wait (resource-pool-destroy-condition pool)))) (wait-operation (resource-pool-destroy-condition pool)))) #t) (define &resource-pool-timeout (make-exception-type '&recource-pool-timeout &error '(pool))) (define resource-pool-timeout-error-pool (exception-accessor &resource-pool-timeout (record-accessor &resource-pool-timeout 'pool))) (define make-resource-pool-timeout-error (record-constructor &resource-pool-timeout)) (define resource-pool-timeout-error? (exception-predicate &resource-pool-timeout)) (define &resource-pool-too-many-waiters (make-exception-type '&recource-pool-too-many-waiters &error '(pool waiters-count))) (define resource-pool-too-many-waiters-error-pool (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'pool))) (define resource-pool-too-many-waiters-error-waiters-count (exception-accessor &resource-pool-too-many-waiters (record-accessor &resource-pool-too-many-waiters 'waiters-count))) (define make-resource-pool-too-many-waiters-error (record-constructor &resource-pool-too-many-waiters)) (define resource-pool-too-many-waiters-error? (exception-predicate &resource-pool-too-many-waiters)) (define &resource-pool-destroyed (make-exception-type '&recource-pool-destroyed &error '(pool))) (define resource-pool-destroyed-error-pool (exception-accessor &resource-pool-destroyed (record-accessor &resource-pool-destroyed 'pool))) (define make-resource-pool-destroyed-error (record-constructor &resource-pool-destroyed)) (define resource-pool-destroyed-error? (exception-predicate &resource-pool-destroyed)) (define &resource-pool-destroy-resource (make-exception-type '&recource-pool-destroy-resource &exception '())) (define make-resource-pool-destroy-resource-exception (record-constructor &resource-pool-destroy-resource)) (define resource-pool-destroy-resource-exception? (exception-predicate &resource-pool-destroy-resource)) (define resource-pool-default-timeout-handler (make-parameter #f)) (define* (call-with-resource-from-pool pool proc #:key (timeout 'default) (timeout-handler (resource-pool-default-timeout-handler)) (max-waiters 'default) (channel (resource-pool-channel pool)) (destroy-resource-on-exception? #f)) "Call PROC with a resource from POOL, blocking until a resource becomes available. Return the resource once PROC has returned." (define timeout-or-default (if (eq? timeout 'default) (assq-ref (resource-pool-configuration pool) 'default-checkout-timeout) timeout)) (define max-waiters-or-default (if (eq? max-waiters 'default) (assq-ref (resource-pool-configuration pool) 'default-max-waiters) max-waiters)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (let ((reply (if timeout-or-default (let loop ((reply (make-channel)) (start-time (get-internal-real-time))) (let ((request-success? (perform-operation (choice-operation (wrap-operation (put-operation channel (list 'checkout reply (+ start-time (* timeout-or-default internal-time-units-per-second)) max-waiters-or-default)) (const #t)) (wrap-operation (sleep-operation timeout-or-default) (const #f)))))) (if request-success? (let ((time-remaining (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (let ((response (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (const #f)))))) (if (or (not response) (eq? response 'resource-pool-retry-checkout)) (if (> (- timeout-or-default (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)) 0) (loop (make-channel) start-time) 'timeout) response)) 'timeout)) 'timeout))) (let ((reply (make-channel))) (put-message channel (list 'checkout reply #f max-waiters-or-default)) (get-message reply))))) (match reply ('timeout (when timeout-handler (timeout-handler pool proc timeout)) (raise-exception (make-resource-pool-timeout-error pool))) (('too-many-waiters . count) (raise-exception (make-resource-pool-too-many-waiters-error pool count))) (('resource-pool-destroyed . #f) (raise-exception (make-resource-pool-destroyed-error pool))) (('success resource-id resource-value) (call-with-values (lambda () (with-exception-handler (lambda (exn) ;; Unwind the stack before calling put-message, as ;; this avoids inconsistent behaviour with ;; continuation barriers (put-message channel (list (if (or destroy-resource-on-exception? (resource-pool-destroy-resource-exception? exn)) 'destroy 'return) resource-id)) (raise-exception exn)) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (proc resource-value)))) #:unwind? #t)) (lambda vals (put-message channel `(return ,resource-id)) (apply values vals))))))) (define-syntax-rule (with-resource-from-pool pool resource exp ...) (call-with-resource-from-pool pool (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) (define channel (resource-pool-channel pool)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (if timeout (let* ((reply (make-channel)) (start-time (get-internal-real-time)) (timeout-time (+ start-time (* internal-time-units-per-second timeout)))) (perform-operation (choice-operation (wrap-operation (put-operation channel `(stats ,reply ,timeout-time)) (const #t)) (wrap-operation (sleep-operation timeout) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (let ((time-remaining (- timeout (/ (- (get-internal-real-time) start-time) internal-time-units-per-second)))) (if (> time-remaining 0) (perform-operation (choice-operation (get-operation reply) (wrap-operation (sleep-operation time-remaining) (lambda _ (raise-exception (make-resource-pool-timeout-error pool)))))) (raise-exception (make-resource-pool-timeout-error pool))))) (let ((reply (make-channel))) (put-message channel `(stats ,reply #f)) (get-message reply)))) (define (resource-pool-list-resources pool) (define channel (resource-pool-channel pool)) (unless channel (raise-exception (make-resource-pool-destroyed-error pool))) (let ((reply (make-channel))) (put-message (resource-pool-channel pool) (list 'list-resources reply)) (get-message reply)))