Guard against nested transactions
Not sure how to do this in PostgreSQL, so use a parameter.
This commit is contained in:
parent
982121c308
commit
2430bc4307
2 changed files with 111 additions and 93 deletions
|
|
@ -19,6 +19,7 @@
|
|||
#:use-module (system foreign)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 threads)
|
||||
#:use-module (ice-9 exceptions)
|
||||
#:use-module (squee)
|
||||
#:use-module (prometheus)
|
||||
#:use-module (guix-data-service config)
|
||||
|
|
@ -31,6 +32,7 @@
|
|||
open-postgresql-connection
|
||||
close-postgresql-connection
|
||||
|
||||
%postgresql-in-transaction?
|
||||
with-postgresql-transaction
|
||||
|
||||
check-test-database!
|
||||
|
|
@ -205,8 +207,16 @@
|
|||
(define %postgresql-connections-name
|
||||
(make-parameter #f))
|
||||
|
||||
(define %postgresql-in-transaction?
|
||||
(make-parameter #f))
|
||||
|
||||
(define* (with-postgresql-transaction conn f
|
||||
#:key always-rollback?)
|
||||
(when (%postgresql-in-transaction?)
|
||||
(raise-exception
|
||||
(make-exception-with-message
|
||||
"nested transaction detected")))
|
||||
|
||||
(exec-query conn "BEGIN;")
|
||||
|
||||
(with-exception-handler
|
||||
|
|
@ -219,7 +229,10 @@
|
|||
;; TODO Include the stack in the exception via knots
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(let ((result (f conn)))
|
||||
(let ((result
|
||||
(parameterize
|
||||
((%postgresql-in-transaction? #t))
|
||||
(f conn))))
|
||||
(exec-query conn (if always-rollback?
|
||||
"ROLLBACK;"
|
||||
"COMMIT;"))
|
||||
|
|
|
|||
|
|
@ -2562,19 +2562,21 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
|
|||
(define guix-revision-id-promise
|
||||
(fibers-delay
|
||||
(lambda ()
|
||||
(retry-on-missing-store-item
|
||||
(lambda ()
|
||||
(let ((guix-source
|
||||
channel-derivations-by-system
|
||||
(fibers-force channel-derivations-by-system-promise)))
|
||||
(load-channel-instances call-with-utility-thread
|
||||
read-derivations/fiberized
|
||||
derivation-ids-hash-table
|
||||
git-repository-id commit
|
||||
channel-derivations-by-system)))
|
||||
#:on-exception
|
||||
(lambda ()
|
||||
(fibers-promise-reset channel-derivations-by-system-promise))))))
|
||||
(parameterize
|
||||
((%postgresql-in-transaction? #f))
|
||||
(retry-on-missing-store-item
|
||||
(lambda ()
|
||||
(let ((guix-source
|
||||
channel-derivations-by-system
|
||||
(fibers-force channel-derivations-by-system-promise)))
|
||||
(load-channel-instances call-with-utility-thread
|
||||
read-derivations/fiberized
|
||||
derivation-ids-hash-table
|
||||
git-repository-id commit
|
||||
channel-derivations-by-system)))
|
||||
#:on-exception
|
||||
(lambda ()
|
||||
(fibers-promise-reset channel-derivations-by-system-promise)))))))
|
||||
|
||||
;; Prompt getting the guix-revision-id as soon as possible
|
||||
(spawn-fiber
|
||||
|
|
@ -3032,95 +3034,98 @@ SKIP LOCKED")
|
|||
(make-channel))
|
||||
|
||||
(define result
|
||||
(with-postgresql-connection
|
||||
(simple-format #f "load-new-guix-revision ~A" id)
|
||||
(lambda (conn)
|
||||
;; Fix the hash encoding of derivation_output_details. This'll only run
|
||||
;; once on any given database, but is kept here just to make sure any
|
||||
;; instances have the data updated.
|
||||
(fix-derivation-output-details-hash-encoding conn)
|
||||
(parameterize
|
||||
;; Mimic the behaviour of with-postgresql-transaction
|
||||
((%postgresql-in-transaction? #t))
|
||||
(with-postgresql-connection
|
||||
(simple-format #f "load-new-guix-revision ~A" id)
|
||||
(lambda (conn)
|
||||
;; Fix the hash encoding of derivation_output_details. This'll only run
|
||||
;; once on any given database, but is kept here just to make sure any
|
||||
;; instances have the data updated.
|
||||
(fix-derivation-output-details-hash-encoding conn)
|
||||
|
||||
(add-hook! after-gc-hook
|
||||
(lambda ()
|
||||
(simple-format (current-error-port)
|
||||
"after gc\n")))
|
||||
(add-hook! after-gc-hook
|
||||
(lambda ()
|
||||
(simple-format (current-error-port)
|
||||
"after gc\n")))
|
||||
|
||||
(exec-query conn "BEGIN")
|
||||
(exec-query conn "BEGIN")
|
||||
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(while (perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation (get-operation finished-channel)
|
||||
(const #f))
|
||||
(wrap-operation (sleep-operation 20)
|
||||
(const #t))))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(while (perform-operation
|
||||
(choice-operation
|
||||
(wrap-operation (get-operation finished-channel)
|
||||
(const #f))
|
||||
(wrap-operation (sleep-operation 20)
|
||||
(const #t))))
|
||||
|
||||
(let ((stats (gc-stats)))
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"process-job heap: ~a MiB used (~a MiB heap)~%"
|
||||
(round
|
||||
(/ (- (assoc-ref stats 'heap-size)
|
||||
(assoc-ref stats 'heap-free-size))
|
||||
(expt 2. 20)))
|
||||
(round
|
||||
(/ (assoc-ref stats 'heap-size)
|
||||
(expt 2. 20))))))))
|
||||
(let ((stats (gc-stats)))
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"process-job heap: ~a MiB used (~a MiB heap)~%"
|
||||
(round
|
||||
(/ (- (assoc-ref stats 'heap-size)
|
||||
(assoc-ref stats 'heap-free-size))
|
||||
(expt 2. 20)))
|
||||
(round
|
||||
(/ (assoc-ref stats 'heap-size)
|
||||
(expt 2. 20))))))))
|
||||
|
||||
(match (select-job-for-update conn id)
|
||||
(((id commit source git-repository-id))
|
||||
(match (select-job-for-update conn id)
|
||||
(((id commit source git-repository-id))
|
||||
|
||||
;; With a separate connection, outside of the transaction so the event
|
||||
;; gets persisted regardless.
|
||||
(with-postgresql-connection
|
||||
(simple-format #f "load-new-guix-revision ~A start-event" id)
|
||||
(lambda (start-event-conn)
|
||||
(record-job-event start-event-conn id "start")))
|
||||
;; With a separate connection, outside of the transaction so the event
|
||||
;; gets persisted regardless.
|
||||
(with-postgresql-connection
|
||||
(simple-format #f "load-new-guix-revision ~A start-event" id)
|
||||
(lambda (start-event-conn)
|
||||
(record-job-event start-event-conn id "start")))
|
||||
|
||||
(simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
|
||||
id commit source)
|
||||
(simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
|
||||
id commit source)
|
||||
|
||||
(if (eq?
|
||||
(with-time-logging (string-append "processing revision " commit)
|
||||
(with-exception-handler
|
||||
(const #f)
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format (current-error-port)
|
||||
"error: load-new-guix-revision: ~A\n"
|
||||
exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(load-new-guix-revision
|
||||
conn
|
||||
git-repository-id
|
||||
commit
|
||||
#:skip-system-tests? #t
|
||||
#:extra-inferior-environment-variables
|
||||
extra-inferior-environment-variables
|
||||
#:ignore-systems ignore-systems
|
||||
#:ignore-targets ignore-targets
|
||||
#:parallelism parallelism))))
|
||||
#:unwind? #t))
|
||||
#t)
|
||||
(begin
|
||||
(record-job-succeeded conn id)
|
||||
(record-job-event conn id "success")
|
||||
(exec-query conn "COMMIT")
|
||||
(if (eq?
|
||||
(with-time-logging (string-append "processing revision " commit)
|
||||
(with-exception-handler
|
||||
(const #f)
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(simple-format (current-error-port)
|
||||
"error: load-new-guix-revision: ~A\n"
|
||||
exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(load-new-guix-revision
|
||||
conn
|
||||
git-repository-id
|
||||
commit
|
||||
#:skip-system-tests? #t
|
||||
#:extra-inferior-environment-variables
|
||||
extra-inferior-environment-variables
|
||||
#:ignore-systems ignore-systems
|
||||
#:ignore-targets ignore-targets
|
||||
#:parallelism parallelism))))
|
||||
#:unwind? #t))
|
||||
#t)
|
||||
(begin
|
||||
(record-job-succeeded conn id)
|
||||
(record-job-event conn id "success")
|
||||
(exec-query conn "COMMIT")
|
||||
|
||||
#t)
|
||||
(begin
|
||||
(exec-query conn "ROLLBACK")
|
||||
(record-job-event conn id "failure")
|
||||
#t)
|
||||
(begin
|
||||
(exec-query conn "ROLLBACK")
|
||||
(record-job-event conn id "failure")
|
||||
|
||||
#f)))
|
||||
(()
|
||||
(exec-query conn "ROLLBACK")
|
||||
(simple-format #t "job ~A not found to be processed\n"
|
||||
id))))))
|
||||
#f)))
|
||||
(()
|
||||
(exec-query conn "ROLLBACK")
|
||||
(simple-format #t "job ~A not found to be processed\n"
|
||||
id)))))))
|
||||
|
||||
(when result
|
||||
(fibers-parallel
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue