Make resource pool changes and add parallelism limiter
All checks were successful
/ test (push) Successful in 11s

This was motivated by trying to allow for completely cleaning up
resource pools, which involved removing their use of fiberize which
currently has no destroy mechanism.

As part of this, there's a new parallelism limiter mechanism using
resource pools rather than fibers, and also a fixed size resource
pool.

The tests now drain? and destroy the resource pools to check cleaning
up.
This commit is contained in:
Christopher Baines 2025-06-25 18:46:46 +02:00
parent edf62414ee
commit 37280c95f1
5 changed files with 613 additions and 97 deletions

View file

@ -20,6 +20,8 @@
(define-module (knots parallelism) (define-module (knots parallelism)
#:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 control) #:use-module (ice-9 control)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
@ -27,6 +29,7 @@
#:use-module (fibers channels) #:use-module (fibers channels)
#:use-module (fibers operations) #:use-module (fibers operations)
#:use-module (knots) #:use-module (knots)
#:use-module (knots resource-pool)
#:export (fibers-batch-map #:export (fibers-batch-map
fibers-map fibers-map
@ -38,7 +41,13 @@
fibers-parallel fibers-parallel
fibers-let fibers-let
fiberize)) fiberize
make-parallelism-limiter
parallelism-limiter?
destroy-parallelism-limiter
call-with-parallelism-limiter
with-parallelism-limiter))
(define (defer-to-parallel-fiber thunk) (define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel))) (let ((reply (make-channel)))
@ -287,3 +296,30 @@
(('result . vals) (apply values vals)) (('result . vals) (apply values vals))
(('exception exn) (('exception exn)
(raise-exception exn)))))) (raise-exception exn))))))
(define-record-type <parallelism-limiter>
(make-parallelism-limiter-record resource-pool)
parallelism-limiter?
(resource-pool parallelism-limiter-resource-pool))
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
(make-parallelism-limiter-record
(make-fixed-size-resource-pool
(iota limit)
#:name name)))
(define (destroy-parallelism-limiter parallelism-limiter)
(destroy-resource-pool
(parallelism-limiter-resource-pool
parallelism-limiter)))
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
(call-with-resource-from-pool
(parallelism-limiter-resource-pool parallelism-limiter)
(lambda _
(thunk))))
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
(call-with-parallelism-limiter
parallelism-limiter
(lambda () exp ...)))

View file

@ -32,9 +32,10 @@
#:use-module (fibers conditions) #:use-module (fibers conditions)
#:use-module (knots) #:use-module (knots)
#:use-module (knots parallelism) #:use-module (knots parallelism)
#:export (resource-pool? #:export (make-fixed-size-resource-pool
make-resource-pool make-resource-pool
resource-pool?
resource-pool-name resource-pool-name
resource-pool-channel resource-pool-channel
resource-pool-configuration resource-pool-configuration
@ -91,6 +92,421 @@
(resource-pool-name resource-pool)) (resource-pool-name resource-pool))
port))) port)))
(define* (make-fixed-size-resource-pool resources
#:key
(delay-logger (const #f))
(duration-logger (const #f))
destructor
scheduler
(name "unnamed")
default-checkout-timeout
default-max-waiters)
(define channel (make-channel))
(define destroy-condition
(make-condition))
(define pool
(make-resource-pool-record
name
channel
destroy-condition
`((delay-logger . ,delay-logger)
(duration-logger . ,duration-logger)
(destructor . ,destructor)
(scheduler . ,scheduler)
(name . ,name)
(default-checkout-timeout . ,default-checkout-timeout)
(default-max-waiters . ,default-max-waiters))))
(define checkout-failure-count 0)
(define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(let loop ()
(let ((success?
(with-exception-handler
(lambda _ #f)
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A\n"
name
destructor)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(start-stack #t (destructor resource))
#t)))
#:unwind? #t)))
(if success?
(put-message channel
(list 'remove resource))
(begin
(sleep 5)
(loop))))))))
(define (spawn-fiber-for-checkout reply-channel
reply-timeout
resource)
(spawn-fiber
(lambda ()
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply-channel
(cons 'success resource))
(const #t))
(wrap-operation (sleep-operation
reply-timeout)
(const #f))))))
(unless checkout-success?
(put-message
channel
(list 'return-failed-checkout resource)))))))
(define (destroy-loop resources)
(let loop ((resources resources))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources))
(((and (or 'return
'return-failed-checkout
'remove)
return-type)
resource)
(when destructor
(spawn-fiber-to-destroy-resource resource))
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(let ((new-resources
(if index
(remove-at-index! resources index)
(begin
(simple-format
(current-error-port)
"resource pool error: unable to remove ~A\n"
resource)
resources))))
(if (null? new-resources)
(begin
(signal-condition! destroy-condition)
;; No loop
*unspecified*)
(loop new-resources)))))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . 0)
(waiters . 0)
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources))
(('destroy reply)
(loop resources))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources)))))
(define (main-loop)
(let loop ((resources resources)
(available resources)
(waiters '()))
(match (get-message channel)
(('checkout reply timeout-time max-waiters)
(if (null? available)
(let ((waiters-count
(length waiters)))
(if (and max-waiters
(>= waiters-count
max-waiters))
(begin
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'too-many-waiters
waiters-count))))
(perform-operation
(if timeout-time
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op)))))
(loop resources
available
waiters))
(loop resources
available
(cons (cons reply timeout-time)
waiters))))
(if timeout-time
(let ((current-internal-time
(get-internal-real-time)))
;; If this client is still waiting
(if (> timeout-time
current-internal-time)
(let ((reply-timeout
(/ (- timeout-time
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a new
;; fiber to handle handing over the resource,
;; and returning it if there's a timeout
(spawn-fiber-for-checkout reply
reply-timeout
(car available))
(loop resources
(cdr available)
waiters))
(loop resources
available
waiters)))
(begin
(put-message reply (cons 'success
(car available)))
(loop resources
(cdr available)
waiters)))))
(((and (or 'return
'return-failed-checkout)
return-type)
resource)
(when (eq? 'return-failed-checkout
return-type)
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if (null? waiters)
(loop resources
(cons resource available)
waiters)
(let* ((current-internal-time (get-internal-real-time))
(alive-waiters
dead-waiters
(partition!
(match-lambda
((reply . timeout)
(or (not timeout)
(> timeout current-internal-time))))
waiters)))
(if (null? alive-waiters)
(loop resources
(cons resource available)
'())
(match (last alive-waiters)
((waiter-channel . waiter-timeout)
(if waiter-timeout
(let ((reply-timeout
(/ (- waiter-timeout
current-internal-time)
internal-time-units-per-second)))
;; Don't sleep in this fiber, so spawn a
;; new fiber to handle handing over the
;; resource, and returning it if there's a
;; timeout
(spawn-fiber-for-checkout waiter-channel
reply-timeout
resource))
(put-message waiter-channel (cons 'success
resource)))
(loop resources
available
(drop-right! alive-waiters 1))))))))
(('list-resources reply)
(spawn-fiber
(lambda ()
(put-message reply (list-copy resources))))
(loop resources
available
waiters))
(('stats reply timeout-time)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation
(if timeout-time
(choice-operation
op
(sleep-operation
(/ (- timeout-time
(get-internal-real-time))
internal-time-units-per-second)))
op))))))
(loop resources
available
waiters))
(('destroy)
(if (and (null? resources)
(null? waiters))
(signal-condition!
destroy-condition)
(begin
(for-each
(lambda (resource)
(if destructor
(spawn-fiber-to-destroy-resource resource)
(spawn-fiber
(lambda ()
(put-message channel
(list 'remove resource)))
#:parallel? #t)))
available)
(let ((current-internal-time (get-internal-real-time)))
(for-each
(match-lambda
((reply . timeout)
(when (or (not timeout)
(> timeout current-internal-time))
(spawn-fiber
(lambda ()
(let ((op
(put-operation
reply
(cons 'resource-pool-destroyed
#f))))
(perform-operation
(if timeout
(choice-operation
op
(wrap-operation
(sleep-operation
(/ (- timeout
(get-internal-real-time))
internal-time-units-per-second))
(const #f)))
op))))))))
waiters))
(destroy-loop resources))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters)))))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(let* ((stack (make-stack #t))
(error-string
(call-with-output-string
(lambda (port)
(display-backtrace stack port 3)
(simple-format
port
"exception in the ~A pool fiber, " name)
(print-exception
port
(stack-ref stack 3)
'%exception
(list exn))))))
(display error-string
(current-error-port)))
(raise-exception exn))
(lambda ()
(start-stack
#t
(main-loop)))))
#:unwind? #t))
(or scheduler
(current-scheduler)))
pool)
(define* (make-resource-pool return-new-resource max-size (define* (make-resource-pool return-new-resource max-size
#:key (min-size 0) #:key (min-size 0)
(idle-seconds #f) (idle-seconds #f)
@ -126,28 +542,33 @@
(define checkout-failure-count 0) (define checkout-failure-count 0)
(define spawn-fiber-to-return-new-resource (define return-new-resource/parallelism-limiter
(if add-resources-parallelism (make-parallelism-limiter
(let ((thunk (or add-resources-parallelism
(fiberize max-size)
#:name
(string-append
name
" resource pool new resource parallelism limiter")))
(define (spawn-fiber-to-return-new-resource)
(spawn-fiber
(lambda () (lambda ()
(with-exception-handler
(lambda (exn)
;; This can happen if the resource pool is destroyed very
;; quickly
(unless (resource-pool-destroyed-error? exn)
(raise-exception exn)))
(lambda ()
(with-parallelism-limiter
return-new-resource/parallelism-limiter
(let ((max-size (let ((max-size
(assq-ref (resource-pool-configuration pool) (assq-ref (resource-pool-configuration pool)
'max-size)) 'max-size))
(size (assq-ref (resource-pool-stats pool) (size (assq-ref (resource-pool-stats pool #:timeout #f)
'resources))) 'resources)))
(unless (= size max-size) (unless (= size max-size)
(let ((new-resource
(return-new-resource)))
(put-message channel
(list 'add-resource new-resource))))))
#:parallelism add-resources-parallelism)))
(lambda ()
(spawn-fiber thunk)))
(lambda ()
(spawn-fiber
(lambda ()
(let ((new-resource
(with-exception-handler (with-exception-handler
(lambda _ #f) (lambda _ #f)
(lambda () (lambda ()
@ -161,11 +582,12 @@
(print-backtrace-and-exception/knots exn) (print-backtrace-and-exception/knots exn)
(raise-exception exn)) (raise-exception exn))
(lambda () (lambda ()
(start-stack #t (return-new-resource))))) (let ((new-resource
#:unwind? #t))) (start-stack #t (return-new-resource))))
(when new-resource
(put-message channel (put-message channel
(list 'add-resource new-resource))))))))) (list 'add-resource new-resource))))))
#:unwind? #t)))))
#:unwind? #t))))
(define (spawn-fiber-to-destroy-resource resource) (define (spawn-fiber-to-destroy-resource resource)
(spawn-fiber (spawn-fiber
@ -276,13 +698,16 @@
resources)))) resources))))
(if (null? new-resources) (if (null? new-resources)
(begin (begin
(and=> return-new-resource/parallelism-limiter
destroy-parallelism-limiter)
(signal-condition! destroy-condition) (signal-condition! destroy-condition)
;; No loop ;; No loop
*unspecified*) *unspecified*)
(loop new-resources))))) (loop new-resources)))))
(('stats reply) (('stats reply timeout-time)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . 0) (available . 0)
@ -291,13 +716,17 @@
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation (perform-operation
(if timeout-time
(choice-operation (choice-operation
(wrap-operation op
(put-operation reply stats) (sleep-operation
(const #t)) (/ (- timeout-time
(wrap-operation (sleep-operation 5) (get-internal-real-time))
(const #f))))))) internal-time-units-per-second)))
op))))))
(loop resources)) (loop resources))
@ -577,7 +1006,7 @@
waiters waiters
resources-last-used)) resources-last-used))
(('stats reply) (('stats reply timeout-time)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
(available . ,(length available)) (available . ,(length available))
@ -586,13 +1015,17 @@
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(let ((op
(put-operation reply stats)))
(perform-operation (perform-operation
(if timeout-time
(choice-operation (choice-operation
(wrap-operation op
(put-operation reply stats) (sleep-operation
(const #t)) (/ (- timeout-time
(wrap-operation (sleep-operation 5) (get-internal-real-time))
(const #f))))))) internal-time-units-per-second)))
op))))))
(loop resources (loop resources
available available
@ -949,13 +1382,17 @@ available. Return the resource once PROC has returned."
(lambda (resource) exp ...))) (lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5)) (define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel)) (if timeout
(start-time (get-internal-real-time))) (let* ((reply (make-channel))
(start-time (get-internal-real-time))
(timeout-time
(+ start-time
(* internal-time-units-per-second timeout))))
(perform-operation (perform-operation
(choice-operation (choice-operation
(wrap-operation (wrap-operation
(put-operation (resource-pool-channel pool) (put-operation (resource-pool-channel pool)
`(stats ,reply)) `(stats ,reply ,timeout-time))
(const #t)) (const #t))
(wrap-operation (sleep-operation timeout) (wrap-operation (sleep-operation timeout)
(lambda _ (lambda _
@ -976,7 +1413,11 @@ available. Return the resource once PROC has returned."
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool)))))) (make-resource-pool-timeout-error pool))))))
(raise-exception (raise-exception
(make-resource-pool-timeout-error pool)))))) (make-resource-pool-timeout-error pool)))))
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
`(stats ,reply #f))
(get-message reply))))
(define (resource-pool-list-resources pool) (define (resource-pool-list-resources pool)
(let ((reply (make-channel))) (let ((reply (make-channel)))

View file

@ -4,7 +4,7 @@
#:export (run-fibers-for-tests #:export (run-fibers-for-tests
assert-no-heap-growth)) assert-no-heap-growth))
(define (run-fibers-for-tests thunk) (define* (run-fibers-for-tests thunk #:key (drain? #t))
(let ((result (let ((result
(run-fibers (run-fibers
(lambda () (lambda ()
@ -12,6 +12,7 @@
(lambda (exn) (lambda (exn)
exn) exn)
(lambda () (lambda ()
(simple-format #t "running ~A\n" thunk)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(backtrace) (backtrace)
@ -20,7 +21,8 @@
#t) #t)
#:unwind? #t)) #:unwind? #t))
#:hz 0 #:hz 0
#:parallelism 1))) #:parallelism 1
#:drain? drain?)))
(if (exception? result) (if (exception? result)
(raise-exception result) (raise-exception result)
result))) result)))

View file

@ -111,4 +111,16 @@
(assert-equal a 1)))) (assert-equal a 1))))
(run-fibers-for-tests
(lambda ()
(let ((parallelism-limiter (make-parallelism-limiter 2)))
(fibers-for-each
(lambda _
(with-parallelism-limiter
parallelism-limiter
#f))
(iota 50))
(destroy-parallelism-limiter parallelism-limiter))))
(display "parallelism test finished successfully\n") (display "parallelism test finished successfully\n")

View file

@ -19,7 +19,21 @@
(number? (number?
(with-resource-from-pool resource-pool (with-resource-from-pool resource-pool
res res
res)))))) res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-fixed-size-resource-pool
(list 1))))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res)))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -31,7 +45,9 @@
(number? (number?
(with-resource-from-pool resource-pool (with-resource-from-pool resource-pool
res res
res)))))) res)))
(destroy-resource-pool resource-pool))))
(let* ((error-constructor (let* ((error-constructor
(record-constructor &resource-pool-timeout)) (record-constructor &resource-pool-timeout))
@ -88,10 +104,13 @@
res)) res))
(iota 20)) (iota 20))
(let loop ((stats (resource-pool-stats resource-pool))) (let loop ((stats (resource-pool-stats resource-pool
#:timeout #f)))
(unless (= 0 (assq-ref stats 'resources)) (unless (= 0 (assq-ref stats 'resources))
(sleep 0.1) (sleep 0.1)
(loop (resource-pool-stats resource-pool))))))) (loop (resource-pool-stats resource-pool #:timeout #f))))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -115,7 +134,9 @@
(set! counter (+ 1 counter)) (set! counter (+ 1 counter))
(error "collision detected"))))) (error "collision detected")))))
20 20
(iota 50))))) (iota 50))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -129,7 +150,7 @@
(error "collision detected"))) (error "collision detected")))
(new-number)) (new-number))
1 1
#:default-checkout-timeout 120))) #:default-checkout-timeout 5)))
(fibers-batch-for-each (fibers-batch-for-each
(lambda _ (lambda _
(with-resource-from-pool (with-resource-from-pool
@ -140,7 +161,9 @@
(set! counter (+ 1 counter)) (set! counter (+ 1 counter))
(error "collision detected"))))) (error "collision detected")))))
20 20
(iota 50))))) (iota 50))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
@ -164,14 +187,14 @@
(call-with-resource-from-pool (call-with-resource-from-pool
resource-pool resource-pool
(lambda (res) (lambda (res)
(error 'should-not-be-reached)))) #f)))
#:unwind? #t))) #:unwind? #t)))
(while (= 0 (while (= 0
(assq-ref (assq-ref
(resource-pool-stats resource-pool) (resource-pool-stats resource-pool #:timeout #f)
'waiters)) 'waiters))
(sleep 0)) (sleep 0.1))
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -184,6 +207,8 @@
resource-pool resource-pool
(lambda (res) (lambda (res)
(error 'should-not-be-reached)))) (error 'should-not-be-reached))))
#:unwind? #t)))))) #:unwind? #t)))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")