Use drain? #t for fibers when loading revisions

To check that there's no left over fibers.
This commit is contained in:
Christopher Baines 2025-06-26 00:19:13 +02:00
parent f7f4e70d28
commit 0dd14c0a67
5 changed files with 178 additions and 129 deletions

View file

@ -22,6 +22,7 @@
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
#:use-module (squee) #:use-module (squee)
#:use-module (prometheus) #:use-module (prometheus)
#:use-module (knots)
#:use-module (guix-data-service config) #:use-module (guix-data-service config)
#:export (get-database-config #:export (get-database-config
%database-metrics-registry %database-metrics-registry
@ -226,17 +227,31 @@
(lambda () (lambda ()
(exec-query conn "ROLLBACK;")) (exec-query conn "ROLLBACK;"))
#:unwind? #t) #:unwind? #t)
;; TODO Include the stack in the exception via knots
(raise-exception exn)) (raise-exception exn))
(lambda () (lambda ()
(let ((result (with-exception-handler
(parameterize (lambda (exn)
((%postgresql-in-transaction? #t)) (let ((stack
(f conn)))) (match (fluid-ref %stacks)
(exec-query conn (if always-rollback? ((stack-tag . prompt-tag)
"ROLLBACK;" (make-stack #t
"COMMIT;")) 0 prompt-tag
result)) 0 (and prompt-tag 1)))
(_
(make-stack #t)))))
(raise-exception
(make-exception
exn
(make-knots-exception stack)))))
(lambda ()
(let ((result
(parameterize
((%postgresql-in-transaction? #t))
(f conn))))
(exec-query conn (if always-rollback?
"ROLLBACK;"
"COMMIT;"))
result))))
#:unwind? #t)) #:unwind? #t))
(define (check-test-database! conn) (define (check-test-database! conn)

View file

@ -177,9 +177,13 @@
inf))) inf)))
string<?)) string<?))
(define (all-inferior-system-tests inf store guix-source guix-commit) (define* (all-inferior-system-tests inf store guix-source guix-commit
#:key (ignore-systems '()))
(define inf-systems (define inf-systems
(inferior-guix-systems inf)) (lset-difference
string=?
(inferior-guix-systems inf)
ignore-systems))
(define extract (define extract
`(lambda (store) `(lambda (store)
@ -1304,9 +1308,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(lambda (conn) (lambda (conn)
(let ((drv (read-derivation-from-file file-name)) (let ((drv (read-derivation-from-file file-name))
(postgresql-connection-pool (postgresql-connection-pool
(make-resource-pool (make-fixed-size-resource-pool
(const conn) (list conn)
1
#:name "postgres")) #:name "postgres"))
(call-with-utility-thread (call-with-utility-thread
(lambda (thunk) (lambda (thunk)
@ -1346,10 +1349,13 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
1000 1000
input-derivations))))) input-derivations)))))
(fix-derivation-inputs conn drv)))))))))) (fix-derivation-inputs conn drv))))
(destroy-resource-pool postgresql-connection-pool)))))))
#:unwind? #t)) #:unwind? #t))
#:hz 0 #:hz 0
#:parallelism 1)) #:parallelism 1
#:drain? #t))
(define (fix-derivation-source-file-nar id) (define (fix-derivation-source-file-nar id)
(run-fibers (run-fibers
@ -1358,9 +1364,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
"fix" "fix"
(lambda (conn) (lambda (conn)
(let ((postgresql-connection-pool (let ((postgresql-connection-pool
(make-resource-pool (make-fixed-size-resource-pool
(const conn) (list conn)
1
#:name "postgres"))) #:name "postgres")))
(match (exec-query (match (exec-query
conn conn
@ -1371,13 +1376,16 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(compute-and-update-derivation-source-file-nar (compute-and-update-derivation-source-file-nar
postgresql-connection-pool postgresql-connection-pool
id id
store-path))))))) store-path)))
(destroy-resource-pool postgresql-connection-pool)))))
#:hz 0 #:hz 0
#:parallelism 1)) #:parallelism 1
#:drain? #t))
(define* (derivation-file-names->derivation-ids postgresql-connection-pool (define* (derivation-file-names->derivation-ids postgresql-connection-pool
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
derivation-file-names derivation-file-names
#:key (log-tag "unspecified")) #:key (log-tag "unspecified"))
@ -1414,7 +1422,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(chunk-for-each! (chunk-for-each!
(lambda (missing-derivation-file-names-chunk) (lambda (missing-derivation-file-names-chunk)
(let ((missing-derivations-chunk (let ((missing-derivations-chunk
(read-derivations/fiberized (read-derivations/serialised
missing-derivation-file-names-chunk))) missing-derivation-file-names-chunk)))
(simple-format (simple-format
#t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n" #t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n"
@ -1752,6 +1760,8 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
#:unwind? #t))))) #:unwind? #t)))))
systems))) systems)))
(destroy-resource-pool inferior-and-store-pool)
(cons (cons
(channel-instance-checkout channel-instance) (channel-instance-checkout channel-instance)
result))) result)))
@ -1972,7 +1982,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
guix-source store-item guix-source store-item
guix-derivation guix-derivation
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
#:key skip-system-tests? #:key skip-system-tests?
extra-inferior-environment-variables extra-inferior-environment-variables
@ -2268,43 +2278,50 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
*unspecified*) *unspecified*)
(define get-derivations/parallelism-limiter
(make-parallelism-limiter parallelism))
(define (get-derivations system target) (define (get-derivations system target)
(let ((derivations-vector (make-vector packages-count))) ;; Limit concurrency here to keep focused on specific systems until
(with-time-logging ;; they've been fully processed
(simple-format #f "getting derivations for ~A" (cons system target)) (with-parallelism-limiter
(let loop ((start-index 0)) get-derivations/parallelism-limiter
(let* ((last-chunk? (let ((derivations-vector (make-vector packages-count)))
(>= (+ start-index chunk-size) packages-count)) (with-time-logging
(count (simple-format #f "getting derivations for ~A"
(if last-chunk? (cons system target))
(- packages-count start-index) (let loop ((start-index 0))
chunk-size)) (let* ((last-chunk?
(chunk (>= (+ start-index chunk-size) packages-count))
(call-with-inferior (count
(lambda (inferior inferior-store) (if last-chunk?
(ensure-gds-inferior-packages-defined! inferior) (- packages-count start-index)
chunk-size))
(chunk
(call-with-inferior
(lambda (inferior inferior-store)
(ensure-gds-inferior-packages-defined! inferior)
(let ((result (let ((result
(inferior-package-derivations (inferior-package-derivations
inferior-store inferior-store
inferior inferior
system system
target target
start-index start-index
count))) count)))
(when last-chunk? (when last-chunk?
(inferior-cleanup inferior)) (inferior-cleanup inferior))
result))))) result)))))
(vector-copy! derivations-vector (vector-copy! derivations-vector
start-index start-index
chunk) chunk)
(unless last-chunk? (unless last-chunk?
(loop (+ start-index chunk-size)))))) (loop (+ start-index chunk-size))))))
derivations-vector)) derivations-vector)))
(define (process-system-and-target system target get-derivations) (define (process-system-and-target system target)
(with-time-logging (with-time-logging
(simple-format #f "processing derivations for ~A" (cons system target)) (simple-format #f "processing derivations for ~A" (cons system target))
(let* ((derivations-vector (get-derivations system target)) (let* ((derivations-vector (get-derivations system target))
@ -2315,7 +2332,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(derivation-file-names->derivation-ids (derivation-file-names->derivation-ids
postgresql-connection-pool postgresql-connection-pool
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
derivations-vector derivations-vector
#:log-tag (simple-format #f "~A:~A" system target)))) #:log-tag (simple-format #f "~A:~A" system target))))
@ -2345,46 +2362,43 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
'finished) 'finished)
(let ((get-derivations/fiberized (with-time-logging "extract-and-store-package-derivations"
(fiberize get-derivations (fibers-map-with-progress
;; Limit concurrency here to keep focused on specific (match-lambda
;; systems until they've been fully processed ((system . target)
#:parallelism parallelism))) (retry-on-missing-store-item
(with-time-logging "extract-and-store-package-derivations" (lambda ()
(fibers-map-with-progress (process-system-and-target system target)))))
(match-lambda (list
((system . target) (let ((all-system-target-pairs
(retry-on-missing-store-item (call-with-inferior
(lambda () (lambda (inferior inferior-store)
(process-system-and-target system target (inferior-fetch-system-target-pairs inferior)))))
get-derivations/fiberized))))) (filter
(list (match-lambda
(let ((all-system-target-pairs ((system . target)
(call-with-inferior (if (or (member system ignore-systems)
(lambda (inferior inferior-store) (member target ignore-targets))
(inferior-fetch-system-target-pairs inferior))))) (begin
(filter (simple-format
(match-lambda (current-error-port)
((system . target) "ignoring ~A ~A for package derivations\n"
(if (or (member system ignore-systems) system
(member target ignore-targets)) target)
(begin #f)
(simple-format #t)))
(current-error-port) all-system-target-pairs)))
"ignoring ~A ~A for package derivations\n" #:report
system (lambda (data)
target) (for-each
#f) (match-lambda
#t))) ((result (system . target))
all-system-target-pairs))) (simple-format #t "~A ~A: ~A\n"
#:report system target result)))
(lambda (data) data)))
(for-each (destroy-parallelism-limiter
(match-lambda get-derivations/parallelism-limiter)
((result (system . target)) #t))
(simple-format #t "~A ~A: ~A\n"
system target result)))
data))))))
(define (extract-and-store-system-tests) (define (extract-and-store-system-tests)
(if skip-system-tests? (if skip-system-tests?
@ -2400,7 +2414,8 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
inferior inferior
inferior-store inferior-store
guix-source guix-source
commit)))))) commit
#:ignore-systems ignore-systems))))))
(when data-with-derivation-file-names (when data-with-derivation-file-names
(let ((data-with-derivation-ids (let ((data-with-derivation-ids
(map (match-lambda (map (match-lambda
@ -2414,7 +2429,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(derivation-file-names->derivation-ids (derivation-file-names->derivation-ids
postgresql-connection-pool postgresql-connection-pool
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
(list->vector (list->vector
(map cdr derivation-file-names-by-system)) (map cdr derivation-file-names-by-system))
@ -2439,12 +2454,15 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(with-time-logging "extract-and-store-lint-checkers-and-warnings" (with-time-logging "extract-and-store-lint-checkers-and-warnings"
(extract-and-store-lint-checkers-and-warnings)))) (extract-and-store-lint-checkers-and-warnings))))
(destroy-resource-pool inf-and-store-pool)
(destroy-resource-pool postgresql-connection-pool)
#t) #t)
(prevent-inlining-for-tests extract-information-from) (prevent-inlining-for-tests extract-information-from)
(define (load-channel-instances call-with-utility-thread (define (load-channel-instances call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
git-repository-id commit git-repository-id commit
channel-derivations-by-system) channel-derivations-by-system)
@ -2473,9 +2491,8 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(insert-guix-revision channel-instances-conn (insert-guix-revision channel-instances-conn
git-repository-id commit))) git-repository-id commit)))
(postgresql-connection-pool (postgresql-connection-pool
(make-resource-pool (make-fixed-size-resource-pool
(const channel-instances-conn) (list channel-instances-conn)
1
#:name "postgres"))) #:name "postgres")))
(unless existing-guix-revision-id (unless existing-guix-revision-id
@ -2493,7 +2510,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(derivation-file-names->derivation-ids (derivation-file-names->derivation-ids
postgresql-connection-pool postgresql-connection-pool
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
(list->vector (map cdr derivations-by-system))))) (list->vector (map cdr derivations-by-system)))))
@ -2537,11 +2554,13 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
(make-missing-store-item-error (make-missing-store-item-error
filename)))) filename))))
filenames)))) filenames))))
(define read-derivations/fiberized
(fiberize read-derivations (define read-derivations/parallelism-limiter
;; Don't do this in parallel as there's caching involved with (make-parallelism-limiter 1))
;; read-derivation-from-file (define (read-derivations/serialised . args)
#:parallelism 1)) (with-parallelism-limiter
read-derivations/parallelism-limiter
(apply read-derivations args)))
(define derivation-ids-hash-table (define derivation-ids-hash-table
(make-hash-table)) (make-hash-table))
@ -2581,7 +2600,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
channel-derivations-by-system channel-derivations-by-system
(fibers-force channel-derivations-by-system-promise))) (fibers-force channel-derivations-by-system-promise)))
(load-channel-instances call-with-utility-thread (load-channel-instances call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
git-repository-id commit git-repository-id commit
channel-derivations-by-system))) channel-derivations-by-system)))
@ -2608,7 +2627,7 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
commit guix-source store-item commit guix-source store-item
guix-derivation guix-derivation
call-with-utility-thread call-with-utility-thread
read-derivations/fiberized read-derivations/serialised
derivation-ids-hash-table derivation-ids-hash-table
#:skip-system-tests? #:skip-system-tests?
skip-system-tests? skip-system-tests?
@ -2619,6 +2638,9 @@ SELECT store_path FROM derivation_source_files WHERE id = $1"
#:parallelism parallelism) #:parallelism parallelism)
(let ((guix-revision-id (let ((guix-revision-id
(fibers-force guix-revision-id-promise))) (fibers-force guix-revision-id-promise)))
(destroy-parallelism-limiter
read-derivations/parallelism-limiter)
(and (and
(if (defined? 'channel-news-for-commit (if (defined? 'channel-news-for-commit
(resolve-module '(guix channels))) (resolve-module '(guix channels)))
@ -3133,7 +3155,9 @@ SKIP LOCKED")
#:parallelism parallelism)) #:parallelism parallelism))
(record-job-succeeded conn id) (record-job-succeeded conn id)
(record-job-event conn id "success")) (record-job-event conn id "success")
#t)
(() (()
(raise-exception (raise-exception
job-not-found-exception)))))))) job-not-found-exception))))))))

View file

@ -42,7 +42,7 @@
(srfi srfi-1)) (srfi srfi-1))
(define guile-knots (define guile-knots
(let ((commit "016f37f108ca19da3664516baa97e907aa972b90") (let ((commit "ab5411da423043f2b8a0e27c7507f8d9c34686a2")
(revision "1")) (revision "1"))
(package (package
(name "guile-knots") (name "guile-knots")
@ -54,7 +54,7 @@
(commit commit))) (commit commit)))
(sha256 (sha256
(base32 (base32
"12j3l9p4acf47cjpfzm41ddxyxs1v6vlfa2vrymdd4gdday62xfn")) "0v39yd9cfcwc23cmb4h89kvp9m96xdg47nbj2k80a43fbalfd9aq"))
(file-name (string-append name "-" version "-checkout")))) (file-name (string-append name "-" version "-checkout"))))
(build-system gnu-build-system) (build-system gnu-build-system)
(native-inputs (native-inputs

View file

@ -93,18 +93,27 @@
(with-fluids ((%file-port-name-canonicalization 'none)) (with-fluids ((%file-port-name-canonicalization 'none))
(run-fibers (run-fibers
(lambda () (lambda ()
(process-load-new-guix-revision-job (with-exception-handler
job (lambda (exn)
#:skip-system-tests? (assq-ref opts 'skip-system-tests) ;; Exit if exceptions get this far, as not all fibers are
#:extra-inferior-environment-variables ;; guaranteed to finish
(filter-map (primitive-exit 1))
(match-lambda (lambda ()
(('inferior-environment-variable key val) (process-load-new-guix-revision-job
(cons key val)) job
(_ #f)) #:skip-system-tests? (assq-ref opts 'skip-system-tests)
opts) #:extra-inferior-environment-variables
#:ignore-systems (assq-ref opts 'ignore-systems) (filter-map
#:ignore-targets (assq-ref opts 'ignore-targets) (match-lambda
#:parallelism (assq-ref opts 'parallelism))) (('inferior-environment-variable key val)
(cons key val))
(_ #f))
opts)
#:ignore-systems (assq-ref opts 'ignore-systems)
#:ignore-targets (assq-ref opts 'ignore-targets)
#:parallelism (assq-ref opts 'parallelism)))
#:unwind? #t))
#:hz 0 #:hz 0
#:parallelism 1))))) #:parallelism 1
;; Drain to make sure there are no bugs with the use of fibers
#:drain? #t)))))

View file

@ -106,7 +106,8 @@
(process-load-new-guix-revision-job (process-load-new-guix-revision-job
id #:parallelism 1)) id #:parallelism 1))
#:hz 0 #:hz 0
#:parallelism 1)))))))))))))) #:parallelism 1
#:drain? #t))))))))))))))
(exec-query conn "TRUNCATE guix_revisions CASCADE") (exec-query conn "TRUNCATE guix_revisions CASCADE")
(exec-query conn "TRUNCATE load_new_guix_revision_jobs CASCADE") (exec-query conn "TRUNCATE load_new_guix_revision_jobs CASCADE")