Close postgresql connections when the thread pool thread is idle
I think the idle connections associated with idle threads are still taking up memory, so especially now that you can configure an arbitrary number of threads (and thus connections), I think it's good to close them regularly.
This commit is contained in:
parent
aaec813cba
commit
d06230fcf4
3 changed files with 52 additions and 2 deletions
|
|
@ -27,6 +27,7 @@
|
||||||
|
|
||||||
with-postgresql-connection-per-thread
|
with-postgresql-connection-per-thread
|
||||||
with-thread-postgresql-connection
|
with-thread-postgresql-connection
|
||||||
|
close-thread-postgresql-connection
|
||||||
|
|
||||||
with-postgresql-transaction
|
with-postgresql-transaction
|
||||||
|
|
||||||
|
|
@ -146,6 +147,15 @@
|
||||||
|
|
||||||
(f conn)))))
|
(f conn)))))
|
||||||
|
|
||||||
|
(define (close-thread-postgresql-connection)
|
||||||
|
(let ((conn (fluid-ref %thread-postgresql-connection)))
|
||||||
|
(when conn
|
||||||
|
(pg-conn-finish conn)
|
||||||
|
(hash-remove! (%postgresql-connections-hash-table)
|
||||||
|
(current-thread))
|
||||||
|
(fluid-set! %thread-postgresql-connection
|
||||||
|
conn))))
|
||||||
|
|
||||||
(define* (with-postgresql-transaction conn f
|
(define* (with-postgresql-transaction conn f
|
||||||
#:key always-rollback?)
|
#:key always-rollback?)
|
||||||
(exec-query conn "BEGIN;")
|
(exec-query conn "BEGIN;")
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@
|
||||||
#:use-module (ice-9 threads)
|
#:use-module (ice-9 threads)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
|
#:use-module (fibers operations)
|
||||||
|
#:use-module (fibers timers)
|
||||||
#:use-module (fibers conditions)
|
#:use-module (fibers conditions)
|
||||||
#:use-module (prometheus)
|
#:use-module (prometheus)
|
||||||
#:export (call-with-time-logging
|
#:export (call-with-time-logging
|
||||||
|
|
@ -30,6 +32,8 @@
|
||||||
prevent-inlining-for-tests
|
prevent-inlining-for-tests
|
||||||
|
|
||||||
%thread-pool-threads
|
%thread-pool-threads
|
||||||
|
%thread-pool-idle-seconds
|
||||||
|
%thread-pool-idle-thunk
|
||||||
parallel-via-thread-pool-channel
|
parallel-via-thread-pool-channel
|
||||||
par-map&
|
par-map&
|
||||||
letpar&
|
letpar&
|
||||||
|
|
@ -62,6 +66,12 @@
|
||||||
(define %thread-pool-threads
|
(define %thread-pool-threads
|
||||||
(make-parameter 8))
|
(make-parameter 8))
|
||||||
|
|
||||||
|
(define %thread-pool-idle-seconds
|
||||||
|
(make-parameter #f))
|
||||||
|
|
||||||
|
(define %thread-pool-idle-thunk
|
||||||
|
(make-parameter #f))
|
||||||
|
|
||||||
(define* (make-thread-pool-channel threads)
|
(define* (make-thread-pool-channel threads)
|
||||||
(define (delay-logger seconds-delayed)
|
(define (delay-logger seconds-delayed)
|
||||||
(when (> seconds-delayed 1)
|
(when (> seconds-delayed 1)
|
||||||
|
|
@ -70,13 +80,37 @@
|
||||||
"warning: thread pool delayed by ~1,2f seconds~%"
|
"warning: thread pool delayed by ~1,2f seconds~%"
|
||||||
seconds-delayed)))
|
seconds-delayed)))
|
||||||
|
|
||||||
|
(define idle-thunk
|
||||||
|
(%thread-pool-idle-thunk))
|
||||||
|
|
||||||
|
(define idle-seconds
|
||||||
|
(%thread-pool-idle-seconds))
|
||||||
|
|
||||||
(let ((channel (make-channel)))
|
(let ((channel (make-channel)))
|
||||||
(for-each
|
(for-each
|
||||||
(lambda _
|
(lambda _
|
||||||
(call-with-new-thread
|
(call-with-new-thread
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(let loop ()
|
(let loop ()
|
||||||
(match (get-message channel)
|
(match (if idle-seconds
|
||||||
|
(perform-operation
|
||||||
|
(choice-operation
|
||||||
|
(get-operation channel)
|
||||||
|
(wrap-operation (sleep-operation idle-seconds)
|
||||||
|
(const 'timeout))))
|
||||||
|
(get-message channel))
|
||||||
|
('timeout
|
||||||
|
(when idle-thunk
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(simple-format (current-error-port)
|
||||||
|
"worker thread idle thunk exception: ~A\n"
|
||||||
|
exn))
|
||||||
|
idle-thunk
|
||||||
|
#:unwind? #t))
|
||||||
|
|
||||||
|
(loop))
|
||||||
|
|
||||||
(((? channel? reply) sent-time (? procedure? proc))
|
(((? channel? reply) sent-time (? procedure? proc))
|
||||||
(let ((time-delay
|
(let ((time-delay
|
||||||
(- (get-internal-real-time)
|
(- (get-internal-real-time)
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,13 @@
|
||||||
(assoc-ref opts 'show-error-details))
|
(assoc-ref opts 'show-error-details))
|
||||||
|
|
||||||
(%thread-pool-threads
|
(%thread-pool-threads
|
||||||
(assoc-ref opts 'thread-pool-threads)))
|
(assoc-ref opts 'thread-pool-threads))
|
||||||
|
(%thread-pool-idle-seconds
|
||||||
|
120)
|
||||||
|
(%thread-pool-idle-thunk
|
||||||
|
(lambda ()
|
||||||
|
(close-thread-postgresql-connection))))
|
||||||
|
|
||||||
|
|
||||||
(let* ((startup-completed
|
(let* ((startup-completed
|
||||||
(make-atomic-box
|
(make-atomic-box
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue