diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f8b2b8b..9e80f5b 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -20,6 +20,8 @@ (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -27,6 +29,7 @@ #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) + #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map @@ -38,7 +41,13 @@ fibers-parallel fibers-let - fiberize)) + fiberize + + make-parallelism-limiter + parallelism-limiter? + destroy-parallelism-limiter + call-with-parallelism-limiter + with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -287,3 +296,30 @@ (('result . vals) (apply values vals)) (('exception exn) (raise-exception exn)))))) + +(define-record-type + (make-parallelism-limiter-record resource-pool) + parallelism-limiter? + (resource-pool parallelism-limiter-resource-pool)) + +(define* (make-parallelism-limiter limit #:key (name "unnamed")) + (make-parallelism-limiter-record + (make-fixed-size-resource-pool + (iota limit) + #:name name))) + +(define (destroy-parallelism-limiter parallelism-limiter) + (destroy-resource-pool + (parallelism-limiter-resource-pool + parallelism-limiter))) + +(define* (call-with-parallelism-limiter parallelism-limiter thunk) + (call-with-resource-from-pool + (parallelism-limiter-resource-pool parallelism-limiter) + (lambda _ + (thunk)))) + +(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) + (call-with-parallelism-limiter + parallelism-limiter + (lambda () exp ...))) diff --git a/knots/resource-pool.scm b/knots/resource-pool.scm index da52051..382f2ed 100644 --- a/knots/resource-pool.scm +++ b/knots/resource-pool.scm @@ -32,9 +32,10 @@ #:use-module (fibers conditions) #:use-module (knots) #:use-module (knots parallelism) - #:export (resource-pool? - + #:export (make-fixed-size-resource-pool make-resource-pool + + resource-pool? resource-pool-name resource-pool-channel resource-pool-configuration @@ -91,6 +92,421 @@ (resource-pool-name resource-pool)) port))) +(define* (make-fixed-size-resource-pool resources + #:key + (delay-logger (const #f)) + (duration-logger (const #f)) + destructor + scheduler + (name "unnamed") + default-checkout-timeout + default-max-waiters) + (define channel (make-channel)) + (define destroy-condition + (make-condition)) + + (define pool + (make-resource-pool-record + name + channel + destroy-condition + `((delay-logger . ,delay-logger) + (duration-logger . ,duration-logger) + (destructor . ,destructor) + (scheduler . ,scheduler) + (name . ,name) + (default-checkout-timeout . ,default-checkout-timeout) + (default-max-waiters . ,default-max-waiters)))) + + (define checkout-failure-count 0) + + (define (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (let loop () + (let ((success? + (with-exception-handler + (lambda _ #f) + (lambda () + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running resource pool destructor (~A): ~A\n" + name + destructor) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (start-stack #t (destructor resource)) + #t))) + #:unwind? #t))) + + (if success? + (put-message channel + (list 'remove resource)) + (begin + (sleep 5) + + (loop)))))))) + + (define (spawn-fiber-for-checkout reply-channel + reply-timeout + resource) + (spawn-fiber + (lambda () + (let ((checkout-success? + (perform-operation + (choice-operation + (wrap-operation + (put-operation reply-channel + (cons 'success resource)) + (const #t)) + (wrap-operation (sleep-operation + reply-timeout) + (const #f)))))) + (unless checkout-success? + (put-message + channel + (list 'return-failed-checkout resource))))))) + + (define (destroy-loop resources) + (let loop ((resources resources)) + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources)) + (((and (or 'return + 'return-failed-checkout + 'remove) + return-type) + resource) + (when destructor + (spawn-fiber-to-destroy-resource resource)) + + (let ((index + (list-index (lambda (x) + (eq? x resource)) + resources))) + (define (remove-at-index! lst i) + (let ((start + end + (split-at! lst i))) + (append + start + (cdr end)))) + + (let ((new-resources + (if index + (remove-at-index! resources index) + (begin + (simple-format + (current-error-port) + "resource pool error: unable to remove ~A\n" + resource) + resources)))) + (if (null? new-resources) + (begin + (signal-condition! destroy-condition) + + ;; No loop + *unspecified*) + (loop new-resources))))) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . 0) + (waiters . 0) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources)) + + (('destroy reply) + (loop resources)) + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources))))) + + (define (main-loop) + (let loop ((resources resources) + (available resources) + (waiters '())) + + (match (get-message channel) + (('checkout reply timeout-time max-waiters) + (if (null? available) + (let ((waiters-count + (length waiters))) + (if (and max-waiters + (>= waiters-count + max-waiters)) + (begin + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'too-many-waiters + waiters-count)))) + (perform-operation + (if timeout-time + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op))))) + (loop resources + available + waiters)) + (loop resources + available + (cons (cons reply timeout-time) + waiters)))) + + (if timeout-time + (let ((current-internal-time + (get-internal-real-time))) + ;; If this client is still waiting + (if (> timeout-time + current-internal-time) + (let ((reply-timeout + (/ (- timeout-time + current-internal-time) + internal-time-units-per-second))) + + ;; Don't sleep in this fiber, so spawn a new + ;; fiber to handle handing over the resource, + ;; and returning it if there's a timeout + (spawn-fiber-for-checkout reply + reply-timeout + (car available)) + (loop resources + (cdr available) + waiters)) + (loop resources + available + waiters))) + (begin + (put-message reply (cons 'success + (car available))) + + (loop resources + (cdr available) + waiters))))) + + (((and (or 'return + 'return-failed-checkout) + return-type) + resource) + + (when (eq? 'return-failed-checkout + return-type) + (set! checkout-failure-count + (+ 1 checkout-failure-count))) + + (if (null? waiters) + (loop resources + (cons resource available) + waiters) + + (let* ((current-internal-time (get-internal-real-time)) + (alive-waiters + dead-waiters + (partition! + (match-lambda + ((reply . timeout) + (or (not timeout) + (> timeout current-internal-time)))) + waiters))) + (if (null? alive-waiters) + (loop resources + (cons resource available) + '()) + (match (last alive-waiters) + ((waiter-channel . waiter-timeout) + (if waiter-timeout + (let ((reply-timeout + (/ (- waiter-timeout + current-internal-time) + internal-time-units-per-second))) + ;; Don't sleep in this fiber, so spawn a + ;; new fiber to handle handing over the + ;; resource, and returning it if there's a + ;; timeout + (spawn-fiber-for-checkout waiter-channel + reply-timeout + resource)) + (put-message waiter-channel (cons 'success + resource))) + + (loop resources + available + (drop-right! alive-waiters 1)))))))) + + (('list-resources reply) + (spawn-fiber + (lambda () + (put-message reply (list-copy resources)))) + + (loop resources + available + waiters)) + + (('stats reply timeout-time) + (let ((stats + `((resources . ,(length resources)) + (available . ,(length available)) + (waiters . ,(length waiters)) + (checkout-failure-count . ,checkout-failure-count)))) + + (spawn-fiber + (lambda () + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) + + (loop resources + available + waiters)) + + (('destroy) + (if (and (null? resources) + (null? waiters)) + (signal-condition! + destroy-condition) + + (begin + (for-each + (lambda (resource) + (if destructor + (spawn-fiber-to-destroy-resource resource) + (spawn-fiber + (lambda () + (put-message channel + (list 'remove resource))) + #:parallel? #t))) + available) + + (let ((current-internal-time (get-internal-real-time))) + (for-each + (match-lambda + ((reply . timeout) + (when (or (not timeout) + (> timeout current-internal-time)) + (spawn-fiber + (lambda () + (let ((op + (put-operation + reply + (cons 'resource-pool-destroyed + #f)))) + (perform-operation + (if timeout + (choice-operation + op + (wrap-operation + (sleep-operation + (/ (- timeout + (get-internal-real-time)) + internal-time-units-per-second)) + (const #f))) + op)))))))) + waiters)) + + (destroy-loop resources)))) + + (unknown + (simple-format + (current-error-port) + "unrecognised message to ~A resource pool channel: ~A\n" + name + unknown) + (loop resources + available + waiters))))) + + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + #f) + (lambda () + (with-exception-handler + (lambda (exn) + (let* ((stack (make-stack #t)) + (error-string + (call-with-output-string + (lambda (port) + (display-backtrace stack port 3) + (simple-format + port + "exception in the ~A pool fiber, " name) + (print-exception + port + (stack-ref stack 3) + '%exception + (list exn)))))) + (display error-string + (current-error-port))) + (raise-exception exn)) + (lambda () + (start-stack + #t + (main-loop))))) + #:unwind? #t)) + (or scheduler + (current-scheduler))) + + pool) + (define* (make-resource-pool return-new-resource max-size #:key (min-size 0) (idle-seconds #f) @@ -126,46 +542,52 @@ (define checkout-failure-count 0) - (define spawn-fiber-to-return-new-resource - (if add-resources-parallelism - (let ((thunk - (fiberize - (lambda () - (let ((max-size - (assq-ref (resource-pool-configuration pool) - 'max-size)) - (size (assq-ref (resource-pool-stats pool) - 'resources))) - (unless (= size max-size) - (let ((new-resource - (return-new-resource))) - (put-message channel - (list 'add-resource new-resource)))))) - #:parallelism add-resources-parallelism))) - (lambda () - (spawn-fiber thunk))) - (lambda () - (spawn-fiber - (lambda () - (let ((new-resource + (define return-new-resource/parallelism-limiter + (make-parallelism-limiter + (or add-resources-parallelism + max-size) + #:name + (string-append + name + " resource pool new resource parallelism limiter"))) + + (define (spawn-fiber-to-return-new-resource) + (spawn-fiber + (lambda () + (with-exception-handler + (lambda (exn) + ;; This can happen if the resource pool is destroyed very + ;; quickly + (unless (resource-pool-destroyed-error? exn) + (raise-exception exn))) + (lambda () + (with-parallelism-limiter + return-new-resource/parallelism-limiter + (let ((max-size + (assq-ref (resource-pool-configuration pool) + 'max-size)) + (size (assq-ref (resource-pool-stats pool #:timeout #f) + 'resources))) + (unless (= size max-size) + (with-exception-handler + (lambda _ #f) + (lambda () (with-exception-handler - (lambda _ #f) + (lambda (exn) + (simple-format + (current-error-port) + "exception adding resource to pool ~A: ~A\n\n" + name + return-new-resource) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) (lambda () - (with-exception-handler - (lambda (exn) - (simple-format - (current-error-port) - "exception adding resource to pool ~A: ~A\n\n" - name - return-new-resource) - (print-backtrace-and-exception/knots exn) - (raise-exception exn)) - (lambda () - (start-stack #t (return-new-resource))))) - #:unwind? #t))) - (when new-resource - (put-message channel - (list 'add-resource new-resource))))))))) + (let ((new-resource + (start-stack #t (return-new-resource)))) + (put-message channel + (list 'add-resource new-resource)))))) + #:unwind? #t))))) + #:unwind? #t)))) (define (spawn-fiber-to-destroy-resource resource) (spawn-fiber @@ -276,13 +698,16 @@ resources)))) (if (null? new-resources) (begin + (and=> return-new-resource/parallelism-limiter + destroy-parallelism-limiter) + (signal-condition! destroy-condition) ;; No loop *unspecified*) (loop new-resources))))) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . 0) @@ -291,13 +716,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources)) @@ -577,7 +1006,7 @@ waiters resources-last-used)) - (('stats reply) + (('stats reply timeout-time) (let ((stats `((resources . ,(length resources)) (available . ,(length available)) @@ -586,13 +1015,17 @@ (spawn-fiber (lambda () - (perform-operation - (choice-operation - (wrap-operation - (put-operation reply stats) - (const #t)) - (wrap-operation (sleep-operation 5) - (const #f))))))) + (let ((op + (put-operation reply stats))) + (perform-operation + (if timeout-time + (choice-operation + op + (sleep-operation + (/ (- timeout-time + (get-internal-real-time)) + internal-time-units-per-second))) + op)))))) (loop resources available @@ -949,34 +1382,42 @@ available. Return the resource once PROC has returned." (lambda (resource) exp ...))) (define* (resource-pool-stats pool #:key (timeout 5)) - (let ((reply (make-channel)) - (start-time (get-internal-real-time))) - (perform-operation - (choice-operation - (wrap-operation - (put-operation (resource-pool-channel pool) - `(stats ,reply)) - (const #t)) - (wrap-operation (sleep-operation timeout) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (if timeout + (let* ((reply (make-channel)) + (start-time (get-internal-real-time)) + (timeout-time + (+ start-time + (* internal-time-units-per-second timeout)))) + (perform-operation + (choice-operation + (wrap-operation + (put-operation (resource-pool-channel pool) + `(stats ,reply ,timeout-time)) + (const #t)) + (wrap-operation (sleep-operation timeout) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) - (let ((time-remaining - (- timeout - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second)))) - (if (> time-remaining 0) - (perform-operation - (choice-operation - (get-operation reply) - (wrap-operation (sleep-operation time-remaining) - (lambda _ - (raise-exception - (make-resource-pool-timeout-error pool)))))) - (raise-exception - (make-resource-pool-timeout-error pool)))))) + (let ((time-remaining + (- timeout + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second)))) + (if (> time-remaining 0) + (perform-operation + (choice-operation + (get-operation reply) + (wrap-operation (sleep-operation time-remaining) + (lambda _ + (raise-exception + (make-resource-pool-timeout-error pool)))))) + (raise-exception + (make-resource-pool-timeout-error pool))))) + (let ((reply (make-channel))) + (put-message (resource-pool-channel pool) + `(stats ,reply #f)) + (get-message reply)))) (define (resource-pool-list-resources pool) (let ((reply (make-channel))) diff --git a/tests.scm b/tests.scm index 2b24c6a..a58eff0 100644 --- a/tests.scm +++ b/tests.scm @@ -4,7 +4,7 @@ #:export (run-fibers-for-tests assert-no-heap-growth)) -(define (run-fibers-for-tests thunk) +(define* (run-fibers-for-tests thunk #:key (drain? #t)) (let ((result (run-fibers (lambda () @@ -12,6 +12,7 @@ (lambda (exn) exn) (lambda () + (simple-format #t "running ~A\n" thunk) (with-exception-handler (lambda (exn) (backtrace) @@ -20,7 +21,8 @@ #t) #:unwind? #t)) #:hz 0 - #:parallelism 1))) + #:parallelism 1 + #:drain? drain?))) (if (exception? result) (raise-exception result) result))) diff --git a/tests/parallelism.scm b/tests/parallelism.scm index 9881a4d..03ec376 100644 --- a/tests/parallelism.scm +++ b/tests/parallelism.scm @@ -111,4 +111,16 @@ (assert-equal a 1)))) +(run-fibers-for-tests + (lambda () + (let ((parallelism-limiter (make-parallelism-limiter 2))) + (fibers-for-each + (lambda _ + (with-parallelism-limiter + parallelism-limiter + #f)) + (iota 50)) + + (destroy-parallelism-limiter parallelism-limiter)))) + (display "parallelism test finished successfully\n") diff --git a/tests/resource-pool.scm b/tests/resource-pool.scm index 1bc09e5..461d04b 100644 --- a/tests/resource-pool.scm +++ b/tests/resource-pool.scm @@ -19,7 +19,21 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) + +(run-fibers-for-tests + (lambda () + (let ((resource-pool (make-fixed-size-resource-pool + (list 1)))) + (assert-true + (number? + (with-resource-from-pool resource-pool + res + res))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -31,7 +45,9 @@ (number? (with-resource-from-pool resource-pool res - res)))))) + res))) + + (destroy-resource-pool resource-pool)))) (let* ((error-constructor (record-constructor &resource-pool-timeout)) @@ -88,10 +104,13 @@ res)) (iota 20)) - (let loop ((stats (resource-pool-stats resource-pool))) + (let loop ((stats (resource-pool-stats resource-pool + #:timeout #f))) (unless (= 0 (assq-ref stats 'resources)) (sleep 0.1) - (loop (resource-pool-stats resource-pool))))))) + (loop (resource-pool-stats resource-pool #:timeout #f)))) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -115,7 +134,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -129,7 +150,7 @@ (error "collision detected"))) (new-number)) 1 - #:default-checkout-timeout 120))) + #:default-checkout-timeout 5))) (fibers-batch-for-each (lambda _ (with-resource-from-pool @@ -140,7 +161,9 @@ (set! counter (+ 1 counter)) (error "collision detected"))))) 20 - (iota 50))))) + (iota 50)) + + (destroy-resource-pool resource-pool)))) (run-fibers-for-tests (lambda () @@ -164,14 +187,14 @@ (call-with-resource-from-pool resource-pool (lambda (res) - (error 'should-not-be-reached)))) + #f))) #:unwind? #t))) (while (= 0 (assq-ref - (resource-pool-stats resource-pool) + (resource-pool-stats resource-pool #:timeout #f) 'waiters)) - (sleep 0)) + (sleep 0.1)) (with-exception-handler (lambda (exn) @@ -184,6 +207,8 @@ resource-pool (lambda (res) (error 'should-not-be-reached)))) - #:unwind? #t)))))) + #:unwind? #t))) + + (destroy-resource-pool resource-pool)))) (display "resource-pool test finished successfully\n")