Increase parallelism when loading revisions

This commit is contained in:
Christopher Baines 2024-10-17 17:10:25 +02:00
parent f1071cbd4d
commit b6551842d1

View file

@ -107,7 +107,7 @@
missing-store-item-error? missing-store-item-error?
(item missing-store-item-error-item)) (item missing-store-item-error-item))
(define (retry-on-missing-store-item thunk) (define* (retry-on-missing-store-item thunk #:key on-exception)
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(if (missing-store-item-error? exn) (if (missing-store-item-error? exn)
@ -116,6 +116,7 @@
"missing store item ~A, retrying ~A\n" "missing store item ~A, retrying ~A\n"
(missing-store-item-error-item exn) (missing-store-item-error-item exn)
thunk) thunk)
(when on-exception (on-exception))
(retry-on-missing-store-item thunk)) (retry-on-missing-store-item thunk))
(raise-exception exn))) (raise-exception exn)))
thunk thunk
@ -1691,7 +1692,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
inf)))) inf))))
(define* (extract-information-from db-conn guix-revision-id commit (define* (extract-information-from db-conn guix-revision-id-promise
commit
guix-source store-item guix-source store-item
guix-derivation guix-derivation
utility-thread-channel utility-thread-channel
@ -1885,9 +1887,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(let ((package-ids (fibers-force package-ids-promise))) (let ((package-ids (fibers-force package-ids-promise)))
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(insert-guix-revision-lint-checkers conn (insert-guix-revision-lint-checkers
guix-revision-id conn
lint-checker-ids) (fibers-force guix-revision-id-promise)
lint-checker-ids)
(let ((lint-warning-ids (let ((lint-warning-ids
(insert-lint-warnings (insert-lint-warnings
@ -1897,9 +1900,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
lint-warnings-data))) lint-warnings-data)))
(chunk-for-each! (chunk-for-each!
(lambda (lint-warning-ids-chunk) (lambda (lint-warning-ids-chunk)
(insert-guix-revision-lint-warnings conn (insert-guix-revision-lint-warnings
guix-revision-id conn
lint-warning-ids-chunk)) (fibers-force guix-revision-id-promise)
lint-warning-ids-chunk))
5000 5000
lint-warning-ids))))))) lint-warning-ids)))))))
@ -1913,62 +1917,66 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(define chunk-size 1000) (define chunk-size 1000)
(define (process-system-and-target system target) (define (get-derivations system target)
(let ((derivations-vector (make-vector packages-count)))
(with-time-logging
(simple-format #f "getting derivations for ~A" (cons system target))
(let loop ((start-index 0))
(let* ((count
(if (>= (+ start-index chunk-size) packages-count)
(- packages-count start-index)
chunk-size))
(chunk
(call-with-inferior
(lambda (inferior inferior-store)
(ensure-gds-inferior-packages-defined! inferior)
(inferior-package-derivations
inferior-store
inferior
system
target
start-index
count)))))
(vector-copy! derivations-vector
start-index
chunk)
(unless (>= (+ start-index chunk-size) packages-count)
(loop (+ start-index chunk-size))))))
derivations-vector))
(define (process-system-and-target system target get-derivations)
(with-time-logging (with-time-logging
(simple-format #f "processing derivations for ~A" (cons system target)) (simple-format #f "processing derivations for ~A" (cons system target))
(let ((derivations-vector (make-vector packages-count))) (let* ((derivations-vector (get-derivations system target))
(with-time-logging (derivation-ids
(simple-format #f "getting derivations for ~A" (cons system target)) (with-time-logging
(let loop ((start-index 0)) (simple-format #f "derivation-file-names->derivation-ids (~A ~A)"
(let* ((count system target)
(if (>= (+ start-index chunk-size) packages-count) (derivation-file-names->derivation-ids/fiberized
(- packages-count start-index) derivations-vector)))
chunk-size)) (guix-revision-id
(chunk (fibers-force guix-revision-id-promise))
(call-with-inferior (package-ids (fibers-force package-ids-promise))
(lambda (inferior inferior-store) (package-derivation-ids
(ensure-gds-inferior-packages-defined! inferior) (with-resource-from-pool postgresql-connection-pool conn
(inferior-package-derivations
inferior-store
inferior
system
target
start-index
count)))))
(vector-copy! derivations-vector
start-index
chunk)
(unless (>= (+ start-index chunk-size) packages-count)
(loop (+ start-index chunk-size))))))
(let* ((derivation-ids
(with-time-logging (with-time-logging
(simple-format #f "derivation-file-names->derivation-ids (~A ~A)" (simple-format #f "insert-package-derivations (~A ~A)"
system target) system target)
(derivation-file-names->derivation-ids/fiberized (insert-package-derivations conn
derivations-vector)))) system
(or target "")
(let* ((package-ids (fibers-force package-ids-promise)) package-ids
(package-derivation-ids derivation-ids)))))
(with-resource-from-pool postgresql-connection-pool conn (chunk-for-each!
(with-time-logging (lambda (package-derivation-ids-chunk)
(simple-format #f "insert-package-derivations (~A ~A)" (with-resource-from-pool postgresql-connection-pool conn
system target) (insert-guix-revision-package-derivations
(insert-package-derivations conn conn
system guix-revision-id
(or target "") package-derivation-ids-chunk)))
package-ids 2000
derivation-ids))))) package-derivation-ids)))
(chunk-for-each!
(lambda (package-derivation-ids-chunk)
(with-resource-from-pool postgresql-connection-pool conn
(insert-guix-revision-package-derivations
conn
guix-revision-id
package-derivation-ids-chunk)))
2000
package-derivation-ids)))))
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(with-time-logging (with-time-logging
@ -1977,23 +1985,24 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
system target) system target)
(insert-guix-revision-package-derivation-distribution-counts (insert-guix-revision-package-derivation-distribution-counts
conn conn
guix-revision-id (fibers-force guix-revision-id-promise)
(number->string (number->string
(system->system-id conn system)) (system->system-id conn system))
(or target ""))))) (or target "")))))
(let ((process-system-and-target/fiberized (let ((get-derivations/fiberized
(fiberize process-system-and-target (fiberize get-derivations
#:parallelism parallelism))) #:parallelism parallelism)))
(par-map& (par-map&
(match-lambda (match-lambda
((system . target) ((system . target)
(retry-on-missing-store-item (retry-on-missing-store-item
(lambda () (lambda ()
(process-system-and-target/fiberized system target))))) (process-system-and-target system target
(call-with-inferior get-derivations/fiberized)))))
(lambda (inferior inferior-store) (call-with-inferior
(inferior-fetch-system-target-pairs inferior)))))) (lambda (inferior inferior-store)
(inferior-fetch-system-target-pairs inferior))))))
(define (extract-and-store-system-tests) (define (extract-and-store-system-tests)
(if skip-system-tests? (if skip-system-tests?
@ -2027,7 +2036,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(insert-system-tests-for-guix-revision (insert-system-tests-for-guix-revision
conn conn
guix-revision-id (fibers-force guix-revision-id-promise)
data-with-derivation-ids))))))) data-with-derivation-ids)))))))
(with-time-logging (with-time-logging
@ -2124,34 +2133,48 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(channel-for-commit (channel-for-commit
(channel (name 'guix) (channel (name 'guix)
(url git-repository-url) (url git-repository-url)
(commit commit))) (commit commit))))
(guix-source
channel-derivations-by-system (define channel-derivations-by-system-promise
guix-revision-id (fibers-delay
(retry-on-missing-store-item (lambda ()
(lambda () (channel->source-and-derivations-by-system
(let ((guix-source conn
channel-derivations-by-system channel-for-commit
(channel->source-and-derivations-by-system fetch-with-authentication?
conn #:parallelism parallelism))))
channel-for-commit
fetch-with-authentication? (define guix-revision-id-promise
#:parallelism parallelism))) (fibers-delay
(let ((guix-revision-id (lambda ()
(load-channel-instances utility-thread-channel (retry-on-missing-store-item
git-repository-id commit (lambda ()
channel-derivations-by-system))) (let ((guix-source
(values guix-source channel-derivations-by-system
channel-derivations-by-system (fibers-force channel-derivations-by-system-promise)))
guix-revision-id))))))) (load-channel-instances utility-thread-channel
(let ((store-item git-repository-id commit
guix-derivation channel-derivations-by-system)))
(channel-derivations-by-system->guix-store-item #:on-exception
channel-derivations-by-system))) (lambda ()
(fibers-promise-reset channel-derivations-by-system-promise))))))
;; Prompt getting the guix-revision-id as soon as possible
(spawn-fiber
(lambda ()
(fibers-force guix-revision-id-promise)))
(let* ((guix-source
channel-derivations-by-system
(fibers-force channel-derivations-by-system-promise))
(store-item
guix-derivation
(channel-derivations-by-system->guix-store-item
channel-derivations-by-system)))
(if store-item (if store-item
(and (and
(extract-information-from conn (extract-information-from conn
guix-revision-id guix-revision-id-promise
commit guix-source store-item commit guix-source store-item
guix-derivation guix-derivation
utility-thread-channel utility-thread-channel
@ -2166,21 +2189,22 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(with-time-logging "inserting channel news entries" (with-time-logging "inserting channel news entries"
(insert-channel-news-entries-for-guix-revision (insert-channel-news-entries-for-guix-revision
conn conn
guix-revision-id (fibers-force guix-revision-id-promise)
(channel-news-for-commit channel-for-commit commit))) (channel-news-for-commit channel-for-commit commit)))
(begin (begin
(simple-format (simple-format
#t "debug: importing channel news not supported\n") #t "debug: importing channel news not supported\n")
#t)) #t))
(update-package-derivations-table conn (update-package-derivations-table
git-repository-id conn
guix-revision-id git-repository-id
commit) (fibers-force guix-revision-id-promise)
commit)
(with-time-logging "updating builds.derivation_output_details_set_id" (with-time-logging "updating builds.derivation_output_details_set_id"
(update-builds-derivation-output-details-set-id (update-builds-derivation-output-details-set-id
conn conn
(string->number guix-revision-id)))) (string->number (fibers-force guix-revision-id-promise)))))
(begin (begin
(simple-format #t "Failed to generate store item for ~A\n" (simple-format #t "Failed to generate store item for ~A\n"
commit) commit)
@ -2572,109 +2596,123 @@ SKIP LOCKED")
(define* (process-load-new-guix-revision-job id #:key skip-system-tests? (define* (process-load-new-guix-revision-job id #:key skip-system-tests?
extra-inferior-environment-variables extra-inferior-environment-variables
parallelism) parallelism)
(with-postgresql-connection (define result
(simple-format #f "load-new-guix-revision ~A" id) (with-postgresql-connection
(lambda (conn) (simple-format #f "load-new-guix-revision ~A" id)
;; Fix the hash encoding of derivation_output_details. This'll only run (lambda (conn)
;; once on any given database, but is kept here just to make sure any ;; Fix the hash encoding of derivation_output_details. This'll only run
;; instances have the data updated. ;; once on any given database, but is kept here just to make sure any
(fix-derivation-output-details-hash-encoding conn) ;; instances have the data updated.
(fix-derivation-output-details-hash-encoding conn)
(exec-query conn "BEGIN") (exec-query conn "BEGIN")
(spawn-fiber (spawn-fiber
(lambda () (lambda ()
(while #t (while #t
(sleep 30) (sleep 30)
(let ((stats (gc-stats))) (let ((stats (gc-stats)))
(simple-format (simple-format
(current-error-port) (current-error-port)
"process-job heap: ~a MiB used (~a MiB heap)~%" "process-job heap: ~a MiB used (~a MiB heap)~%"
(round (round
(/ (- (assoc-ref stats 'heap-size) (/ (- (assoc-ref stats 'heap-size)
(assoc-ref stats 'heap-free-size)) (assoc-ref stats 'heap-free-size))
(expt 2. 20))) (expt 2. 20)))
(round (round
(/ (assoc-ref stats 'heap-size) (/ (assoc-ref stats 'heap-size)
(expt 2. 20)))))))) (expt 2. 20))))))))
(match (select-job-for-update conn id) (match (select-job-for-update conn id)
(((id commit source git-repository-id)) (((id commit source git-repository-id))
;; With a separate connection, outside of the transaction so the event ;; With a separate connection, outside of the transaction so the event
;; gets persisted regardless. ;; gets persisted regardless.
(with-postgresql-connection (with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A start-event" id) (simple-format #f "load-new-guix-revision ~A start-event" id)
(lambda (start-event-conn) (lambda (start-event-conn)
(record-job-event start-event-conn id "start"))) (record-job-event start-event-conn id "start")))
(simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n" (simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
id commit source) id commit source)
(if (eq? (if (eq?
(with-time-logging (string-append "processing revision " commit) (with-time-logging (string-append "processing revision " commit)
(with-exception-handler (with-exception-handler
(const #f) (const #f)
(lambda () (lambda ()
(with-throw-handler #t (with-throw-handler #t
(lambda () (lambda ()
(load-new-guix-revision (load-new-guix-revision
conn conn
git-repository-id git-repository-id
commit commit
#:skip-system-tests? #t #:skip-system-tests? #t
#:extra-inferior-environment-variables #:extra-inferior-environment-variables
extra-inferior-environment-variables extra-inferior-environment-variables
#:parallelism parallelism)) #:parallelism parallelism))
(lambda (key . args) (lambda (key . args)
(simple-format (current-error-port) (simple-format (current-error-port)
"error: load-new-guix-revision: ~A ~A\n" "error: load-new-guix-revision: ~A ~A\n"
key args) key args)
(backtrace)))) (backtrace))))
#:unwind? #t)) #:unwind? #t))
#t) #t)
(begin (begin
(record-job-succeeded conn id) (record-job-succeeded conn id)
(record-job-event conn id "success") (record-job-event conn id "success")
(exec-query conn "COMMIT") (exec-query conn "COMMIT")
(with-time-logging #t)
"vacuuming package derivations by guix revision range table" (begin
(vacuum-package-derivations-table conn)) (exec-query conn "ROLLBACK")
(record-job-event conn id "failure")
(with-time-logging #f)))
"vacuum-derivation-inputs-table" (()
(vacuum-derivation-inputs-table conn)) (exec-query conn "ROLLBACK")
(simple-format #t "job ~A not found to be processed\n"
id))))))
(match (exec-query (when result
conn (parallel-via-fibers
"SELECT reltuples::bigint FROM pg_class WHERE relname = 'derivation_inputs'") (with-postgresql-connection
(((rows)) (simple-format #f "post load-new-guix-revision ~A" id)
;; Don't attempt counting distinct values if there are too (lambda (conn)
;; many rows, as that is far to slow and could use up all the (with-time-logging
;; disk space. "vacuuming package derivations by guix revision range table"
(when (< (string->number rows) (vacuum-package-derivations-table conn))))
1000000000)
(with-time-logging
"update-derivation-inputs-statistics"
(update-derivation-inputs-statistics conn)))))
(with-time-logging (with-postgresql-connection
"vacuum-derivation-outputs-table" (simple-format #f "post load-new-guix-revision ~A" id)
(vacuum-derivation-outputs-table conn)) (lambda (conn)
(with-time-logging
"vacuum-derivation-inputs-table"
(vacuum-derivation-inputs-table conn))
(with-time-logging (match (exec-query
"update-derivation-outputs-statistics" conn
(update-derivation-outputs-statistics conn)) "SELECT reltuples::bigint FROM pg_class WHERE relname = 'derivation_inputs'")
(((rows))
;; Don't attempt counting distinct values if there are too
;; many rows, as that is far to slow and could use up all the
;; disk space.
(when (< (string->number rows)
1000000000)
(with-time-logging
"update-derivation-inputs-statistics"
(update-derivation-inputs-statistics conn)))))))
#t) (with-postgresql-connection
(begin (simple-format #f "post load-new-guix-revision ~A" id)
(exec-query conn "ROLLBACK") (lambda (conn)
(record-job-event conn id "failure") (with-time-logging
"vacuum-derivation-outputs-table"
(vacuum-derivation-outputs-table conn))
#f))) (with-time-logging
(() "update-derivation-outputs-statistics"
(exec-query conn "ROLLBACK") (update-derivation-outputs-statistics conn))))))
(simple-format #t "job ~A not found to be processed\n"
id)))))) result)