diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index ac536cc..094eadb 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -200,6 +200,246 @@ channel (list 'return-failed-checkout resource))))))) + (define (main-loop) + (let loop ((resources '()) + (available '()) + (waiters '()) + (resources-last-used '())) + + (match (get-message channel) + (('add-resource resource) + (if (= (length resources) max-size) + (begin + (if destructor + (begin + (spawn-fiber-to-destroy-resource resource) + + (loop (cons resource resources) + available + waiters + (cons (get-internal-real-time) + resources-last-used))) + (loop resources + available + waiters + (cons (get-internal-real-time) + resources-last-used)))) + + (if (null? waiters) + (loop (cons resource resources) + (cons resource available) + waiters + (cons (get-internal-real-time) + resources-last-used)) + + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (last waiters) + resource) + (put-message (last waiters) resource)) + + (loop (cons resource resources) + available + (drop-right! waiters 1) + (cons (get-internal-real-time) + resources-last-used)))))) + + (('checkout reply) + (if (null? available) + (begin + (unless (= (length resources) max-size) + (spawn-fiber-to-return-new-resource)) + + (loop resources + available + (cons reply waiters) + resources-last-used)) + + (let ((resource (car available))) + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout reply resource) + (put-message reply resource)) + + (loop resources + (cdr available) + waiters + resources-last-used)))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (null? waiters) + (loop resources + (cons resource available) + waiters + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used)) + + (begin + (if reply-timeout + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout (last waiters) + resource) + (put-message (last waiters) resource)) + + (loop resources + available + (drop-right! waiters 1) + (begin + (list-set! + resources-last-used + (list-index (lambda (x) + (eq? x resource)) + resources) + (get-internal-real-time)) + resources-last-used))))) + + (('remove resource) + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + + (loop (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)) + available ; resource shouldn't be in this list + waiters + (remove-at-index! + resources-last-used + index)))) + + (('stats reply) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply stats) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f))))))) + + (loop resources + available + waiters + resources-last-used)) + + (('check-for-idle-resources) + (let* ((resources-last-used-seconds + (map + (lambda (internal-time) + (/ (- (get-internal-real-time) internal-time) + internal-time-units-per-second)) + resources-last-used)) + (resources-to-destroy + (filter-map + (lambda (resource last-used-seconds) + (if (and (member resource available) + (> last-used-seconds idle-seconds)) + resource + #f)) + resources + resources-last-used-seconds))) + + (when destructor + (for-each + (lambda (resource) + (spawn-fiber-to-destroy-resource resource)) + resources-to-destroy)) + + (loop (lset-difference eq? resources resources-to-destroy) + (lset-difference eq? available resources-to-destroy) + waiters + (filter-map + (lambda (resource last-used) + (if (memq resource resources-to-destroy) + #f + last-used)) + resources + resources-last-used)))) + + (('destroy reply) + (if (null? resources) + (put-message reply 'destroy-success) + + (begin + (for-each + (lambda (resource) + (if destructor + (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (put-message channel + (list 'remove resource))) + #:parallel? #t))) + available) + + (spawn-fiber + (lambda () + (sleep 0.1) + (put-message channel + (list 'destroy reply)))) + + (loop resources + '() + waiters + resources-last-used)))) + + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters + resources-last-used))))) + (spawn-fiber (lambda () (when idle-seconds @@ -209,249 +449,33 @@ (sleep idle-seconds) (put-message channel '(check-for-idle-resources)))))) - (with-throw-handler #t + (with-exception-handler + (lambda (exn) + #f) (lambda () - (let loop ((resources '()) - (available '()) - (waiters '()) - (resources-last-used '())) - - (match (get-message channel) - (('add-resource resource) - (if (= (length resources) max-size) - (begin - (if destructor - (begin - (spawn-fiber-to-destroy-resource resource) - - (loop (cons resource resources) - available - waiters - (cons (get-internal-real-time) - resources-last-used))) - (loop resources - available - waiters - (cons (get-internal-real-time) - resources-last-used)))) - - (if (null? waiters) - (loop (cons resource resources) - (cons resource available) - waiters - (cons (get-internal-real-time) - resources-last-used)) - - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (last waiters) - resource) - (put-message (last waiters) resource)) - - (loop (cons resource resources) - available - (drop-right! waiters 1) - (cons (get-internal-real-time) - resources-last-used)))))) - - (('checkout reply) - (if (null? available) - (begin - (unless (= (length resources) max-size) - (spawn-fiber-to-return-new-resource)) - - (loop resources - available - (cons reply waiters) - resources-last-used)) - - (let ((resource (car available))) - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a - ;; new fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout reply resource) - (put-message reply resource)) - - (loop resources - (cdr available) - waiters - resources-last-used)))) - - (((and (or 'return - 'return-failed-checkout) - return-type) - resource) - - (when (eq? 'return-failed-checkout - return-type) - (set! checkout-failure-count - (+ 1 checkout-failure-count))) - - (if (null? waiters) - (loop resources - (cons resource available) - waiters - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used)) - - (begin - (if reply-timeout - ;; Don't sleep in this fiber, so spawn a new - ;; fiber to handle handing over the - ;; resource, and returning it if there's a - ;; timeout - (spawn-fiber-for-checkout (last waiters) - resource) - (put-message (last waiters) resource)) - - (loop resources - available - (drop-right! waiters 1) - (begin - (list-set! - resources-last-used - (list-index (lambda (x) - (eq? x resource)) - resources) - (get-internal-real-time)) - resources-last-used))))) - - (('remove resource) - (let ((index - (list-index (lambda (x) - (eq? x resource)) - resources))) - (define (remove-at-index! lst i) - (let ((start - end - (split-at! lst i))) - (append - start - (cdr end)))) - - (loop (if index - (remove-at-index! resources index) - (begin - (simple-format - (current-error-port) - "resource pool error: unable to remove ~A\n" - resource) - resources)) - available ; resource shouldn't be in this list - waiters - (remove-at-index! - resources-last-used - index)))) - - (('stats reply) - (let ((stats - `((resources . ,(length resources)) - (available . ,(length available)) - (waiters . ,(length waiters)) - (checkout-failure-count . ,checkout-failure-count)))) - - (spawn-fiber - (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation - reply-timeout) - (const #f))))))) - - (loop resources - available - waiters - resources-last-used)) - - (('check-for-idle-resources) - (let* ((resources-last-used-seconds - (map - (lambda (internal-time) - (/ (- (get-internal-real-time) internal-time) - internal-time-units-per-second)) - resources-last-used)) - (resources-to-destroy - (filter-map - (lambda (resource last-used-seconds) - (if (and (member resource available) - (> last-used-seconds idle-seconds)) - resource - #f)) - resources - resources-last-used-seconds))) - - (when destructor - (for-each - (lambda (resource) - (spawn-fiber-to-destroy-resource resource)) - resources-to-destroy)) - - (loop (lset-difference eq? resources resources-to-destroy) - (lset-difference eq? available resources-to-destroy) - waiters - (filter-map - (lambda (resource last-used) - (if (memq resource resources-to-destroy) - #f - last-used)) - resources - resources-last-used)))) - - (('destroy reply) - (if (null? resources) - (put-message reply 'destroy-success) - - (begin - (for-each - (lambda (resource) - (if destructor - (spawn-fiber-to-destroy-resource resource) - (spawn-fiber - (lambda () - (put-message channel - (list 'remove resource))) - #:parallel? #t))) - available) - - (spawn-fiber - (lambda () - (sleep 0.1) - (put-message channel - (list 'destroy reply)))) - - (loop resources - '() - waiters - resources-last-used)))) - - (unknown - (simple-format - (current-error-port) - "unrecognised message to ~A resource pool channel: ~A\n" - name - unknown) - (loop resources - available - waiters - resources-last-used))))) - (lambda (key . args) - (simple-format (current-error-port) - "exception in the ~A pool fiber\n" name)))) + (with-exception-handler + (lambda (exn) + (let* ((stack (make-stack #t)) + (error-string + (call-with-output-string + (lambda (port) + (simple-format + port + "exception in the ~A pool fiber, " name) + (print-exception + port + (stack-ref stack 3) + '%exception + (list exn)) + (display-backtrace stack port 3))))) + (display error-string + (current-error-port))) + (raise-exception exn)) + (lambda () + (start-stack + #t + (main-loop))))) + #:unwind? #t)) (or scheduler (current-scheduler)))