Improve resource pool destruction

This commit is contained in:
Christopher Baines 2025-01-31 12:33:50 +01:00
parent eebb42e7a7
commit 61451907a9
2 changed files with 116 additions and 40 deletions

View file

@ -21,6 +21,7 @@
#:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
#:use-module (fibers) #:use-module (fibers)
@ -179,10 +180,13 @@
(backtrace)))) (backtrace))))
#:unwind? #t))) #:unwind? #t)))
(unless success? (if success?
(sleep 5) (put-message channel
(list 'remove resource))
(begin
(sleep 5)
(loop))))))) (loop))))))))
(define (spawn-fiber-for-checkout reply-channel resource) (define (spawn-fiber-for-checkout reply-channel resource)
(spawn-fiber (spawn-fiber
@ -221,12 +225,20 @@
(('add-resource resource) (('add-resource resource)
(if (= (length resources) max-size) (if (= (length resources) max-size)
(begin (begin
(spawn-fiber-to-destroy-resource resource) (if destructor
(begin
(spawn-fiber-to-destroy-resource resource)
(loop resources (loop (cons resource resources)
available available
waiters waiters
resources-last-used)) (cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
waiters
(cons (get-internal-real-time)
resources-last-used))))
(if (null? waiters) (if (null? waiters)
(loop (cons resource resources) (loop (cons resource resources)
@ -321,6 +333,26 @@
(get-internal-real-time)) (get-internal-real-time))
resources-last-used))))) resources-last-used)))))
(('remove resource)
(let ((index
(list-index (lambda (x)
(eq? x resource))
resources)))
(define (remove-at-index! lst i)
(let ((start
end
(split-at! lst i)))
(append
start
(cdr end))))
(loop (remove-at-index! resources index)
available ; resource shouldn't be in this list
waiters
(remove-at-index!
resources-last-used
index))))
(('stats reply) (('stats reply)
(let ((stats (let ((stats
`((resources . ,(length resources)) `((resources . ,(length resources))
@ -361,10 +393,11 @@
resources resources
resources-last-used-seconds))) resources-last-used-seconds)))
(for-each (when destructor
(lambda (resource) (for-each
(spawn-fiber-to-destroy-resource resource)) (lambda (resource)
resources-to-destroy) (spawn-fiber-to-destroy-resource resource))
resources-to-destroy))
(loop (lset-difference eq? resources resources-to-destroy) (loop (lset-difference eq? resources resources-to-destroy)
(lset-difference eq? available resources-to-destroy) (lset-difference eq? available resources-to-destroy)
@ -378,22 +411,29 @@
resources-last-used)))) resources-last-used))))
(('destroy reply) (('destroy reply)
(if (= (length resources) (length available)) (if (null? resources)
(put-message reply 'destroy-success)
(begin (begin
(for-each (for-each
(lambda (resource) (lambda (resource)
(spawn-fiber-to-destroy-resource resource)) (if destructor
resources) (spawn-fiber-to-destroy-resource resource)
(put-message reply 'destroy-success)) (spawn-fiber
(begin (lambda ()
(put-message channel
(list 'remove resource)))
#:parallel? #t)))
available)
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(perform-operation (sleep 0.1)
(choice-operation (put-message channel
(put-operation reply 'resource-pool-destroy-failed) (list 'destroy reply))))
(sleep-operation 10)))))
(loop resources (loop resources
available '()
waiters waiters
resources-last-used)))) resources-last-used))))

View file

@ -1,32 +1,37 @@
(use-modules (tests) (use-modules (tests)
(fibers) (fibers)
(unit-test) (unit-test)
(knots parallelism)
(knots resource-pool)) (knots resource-pool))
(run-fibers-for-tests (define new-number
(lambda () (let ((val 0))
(let ((resource-pool (make-resource-pool (lambda ()
(lambda () (set! val (1+ val))
2) val)))
1)))
(assert-equal
(with-resource-from-pool resource-pool
res
res)
2))))
(run-fibers-for-tests (run-fibers-for-tests
(lambda () (lambda ()
(let ((resource-pool (make-resource-pool (let ((resource-pool (make-resource-pool
(lambda () new-number
2) 1)))
(assert-true
(number?
(with-resource-from-pool resource-pool
res
res))))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
new-number
1 1
#:add-resources-parallelism 1))) #:add-resources-parallelism 1)))
(assert-equal (assert-true
(with-resource-from-pool resource-pool (number?
res (with-resource-from-pool resource-pool
res) res
2)))) res))))))
(let* ((error-constructor (let* ((error-constructor
(record-constructor &resource-pool-timeout)) (record-constructor &resource-pool-timeout))
@ -36,4 +41,35 @@
(resource-pool-timeout-error-pool err) (resource-pool-timeout-error-pool err)
'foo)) 'foo))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
new-number
2)))
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(run-fibers-for-tests
(lambda ()
(let ((resource-pool (make-resource-pool
new-number
2
#:destructor
(lambda (res)
#t))))
(fibers-for-each
(lambda _
(with-resource-from-pool resource-pool
res
res))
(iota 20))
(destroy-resource-pool resource-pool))))
(display "resource-pool test finished successfully\n") (display "resource-pool test finished successfully\n")