Add some utilities to use PostgreSQL/Squee through a channel

To allow for some concurrency.
This commit is contained in:
Christopher Baines 2020-10-01 19:13:30 +01:00
parent 3330f034a4
commit 614f9888a5

View file

@ -18,9 +18,19 @@
(define-module (guix-data-service database) (define-module (guix-data-service database)
#:use-module (system foreign) #:use-module (system foreign)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (squee) #:use-module (squee)
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers conditions)
#:use-module (guix-data-service config) #:use-module (guix-data-service config)
#:export (with-postgresql-connection #:export (with-postgresql-connection
make-postgresql-connection-channel
close-postgresql-connection-channel
exec-query/through-channel
with-postgresql-transaction/through-channel
with-postgresql-transaction with-postgresql-transaction
check-test-database! check-test-database!
@ -61,6 +71,98 @@
(lambda (key . args) (lambda (key . args)
(pg-conn-finish conn))))) (pg-conn-finish conn)))))
(define* (make-postgresql-connection-channel name
#:key
(statement-timeout #f)
(threads 4))
(parameterize (((@@ (fibers internal) current-fiber) #f))
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(with-postgresql-connection
name
(lambda (conn)
(let loop ()
(match (get-message channel)
(((? channel? reply) f (? boolean? allways-rollback?))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"postgresql connection thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
(lambda ()
(with-postgresql-transaction
conn
(lambda (conn)
(f conn))))
(lambda vals vals)))))
#:unwind? #t))
(loop))
(((? channel? reply) . (? list? args))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"postgresql connection thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
(lambda ()
(apply exec-query
conn
args))
(lambda vals vals)))))
#:unwind? #t))
(loop))
(_ #f))))
#:statement-timeout statement-timeout))))
(iota threads))
channel)))
(define (close-postgresql-connection-channel channel)
(put-message channel #f))
(define (exec-query/through-channel channel . args)
(let ((reply (make-channel)))
(put-message channel (cons reply args))
(match (get-message reply)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))))
(define* (with-postgresql-transaction/through-channel channel
f
#:key always-rollback?)
(let ((reply (make-channel)))
(put-message channel (list reply f always-rollback?))
(match (get-message reply)
(('worker-thread-error . exn)
(raise-exception exn))
(result
(apply values result)))))
(define* (with-postgresql-transaction conn f (define* (with-postgresql-transaction conn f
#:key always-rollback?) #:key always-rollback?)
(exec-query conn "BEGIN;") (exec-query conn "BEGIN;")