Rewrite the key parts of loading data to be even more parallel
Use a pool for the database connection, and a fibers promise for the package ids, and run other parts of the process in parallel too. This change also means that inserting starts as soon as some data is available, rather than when all the data is available.
This commit is contained in:
parent
5439159a16
commit
3f1c2ad603
1 changed files with 169 additions and 172 deletions
|
|
@ -902,32 +902,6 @@
|
||||||
lint-checker-ids
|
lint-checker-ids
|
||||||
lint-warnings-data)))
|
lint-warnings-data)))
|
||||||
|
|
||||||
(define (inferior-data->package-derivation-ids
|
|
||||||
conn inf
|
|
||||||
package-ids
|
|
||||||
inferior-packages-system-and-target-to-derivations-alist)
|
|
||||||
(append-map!
|
|
||||||
(lambda (data)
|
|
||||||
(let* ((system-and-target (car data))
|
|
||||||
(derivations-vector (cdr data))
|
|
||||||
(derivation-ids
|
|
||||||
(with-time-logging
|
|
||||||
(simple-format #f "derivation-file-names->derivation-ids (~A)"
|
|
||||||
system-and-target)
|
|
||||||
(derivation-file-names->derivation-ids
|
|
||||||
conn
|
|
||||||
derivations-vector))))
|
|
||||||
|
|
||||||
(with-time-logging
|
|
||||||
(simple-format #f "insert-package-derivations (~A)"
|
|
||||||
system-and-target)
|
|
||||||
(insert-package-derivations conn
|
|
||||||
(car system-and-target)
|
|
||||||
(or (cdr system-and-target) "")
|
|
||||||
package-ids
|
|
||||||
derivation-ids))))
|
|
||||||
inferior-packages-system-and-target-to-derivations-alist))
|
|
||||||
|
|
||||||
(define guix-store-path
|
(define guix-store-path
|
||||||
(let ((store-path #f))
|
(let ((store-path #f))
|
||||||
(lambda (store)
|
(lambda (store)
|
||||||
|
|
@ -1418,8 +1392,8 @@
|
||||||
|
|
||||||
inf))))
|
inf))))
|
||||||
|
|
||||||
(define* (extract-information-from conn guix-revision-id commit
|
(define* (extract-information-from db-conn guix-revision-id commit
|
||||||
guix-source store-path
|
guix-source store-item
|
||||||
#:key skip-system-tests?
|
#:key skip-system-tests?
|
||||||
extra-inferior-environment-variables
|
extra-inferior-environment-variables
|
||||||
parallelism)
|
parallelism)
|
||||||
|
|
@ -1432,7 +1406,7 @@
|
||||||
(string-append
|
(string-append
|
||||||
(with-store-connection
|
(with-store-connection
|
||||||
(lambda (store)
|
(lambda (store)
|
||||||
(glibc-locales-for-guix-store-path store store-path)))
|
(glibc-locales-for-guix-store-path store store-item)))
|
||||||
"/lib/locale"
|
"/lib/locale"
|
||||||
":" (getenv "GUIX_LOCPATH")))
|
":" (getenv "GUIX_LOCPATH")))
|
||||||
|
|
||||||
|
|
@ -1442,7 +1416,7 @@
|
||||||
(let* ((inferior-store (open-store-connection))
|
(let* ((inferior-store (open-store-connection))
|
||||||
(inferior (start-inferior-for-data-extration
|
(inferior (start-inferior-for-data-extration
|
||||||
inferior-store
|
inferior-store
|
||||||
store-path
|
store-item
|
||||||
guix-locpath
|
guix-locpath
|
||||||
extra-inferior-environment-variables)))
|
extra-inferior-environment-variables)))
|
||||||
(ensure-non-blocking-store-connection inferior-store)
|
(ensure-non-blocking-store-connection inferior-store)
|
||||||
|
|
@ -1462,161 +1436,184 @@
|
||||||
(close-connection store)
|
(close-connection store)
|
||||||
(close-inferior inferior)))))
|
(close-inferior inferior)))))
|
||||||
|
|
||||||
(simple-format #t "debug: extract-information-from: ~A\n" store-path)
|
(define postgresql-connection-pool
|
||||||
|
(make-resource-pool
|
||||||
|
(lambda ()
|
||||||
|
(with-time-logging
|
||||||
|
"acquiring advisory transaction lock: load-new-guix-revision-inserts"
|
||||||
|
;; Wait until this is the only transaction inserting data, to
|
||||||
|
;; avoid any concurrency issues
|
||||||
|
(obtain-advisory-transaction-lock db-conn
|
||||||
|
'load-new-guix-revision-inserts))
|
||||||
|
db-conn)
|
||||||
|
1
|
||||||
|
#:min-size 0))
|
||||||
|
|
||||||
(letpar& ((inferior-lint-checkers-and-warnings-data
|
(define package-ids-promise
|
||||||
(let ((inferior-lint-checkers-data
|
(fibers-delay
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
(lambda ()
|
||||||
(match res
|
(let ((packages-data
|
||||||
((inferior . inferior-store)
|
|
||||||
(inferior-lint-checkers inferior))))))
|
|
||||||
(cons
|
|
||||||
inferior-lint-checkers-data
|
|
||||||
(and inferior-lint-checkers-data
|
|
||||||
(par-map&
|
|
||||||
(match-lambda
|
|
||||||
((checker-name _ network-dependent?)
|
|
||||||
(and (and (not network-dependent?)
|
|
||||||
;; Running the derivation linter is
|
|
||||||
;; currently infeasible
|
|
||||||
(not (eq? checker-name 'derivation)))
|
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
|
||||||
(match res
|
|
||||||
((inferior . inferior-store)
|
|
||||||
(inferior-lint-warnings inferior
|
|
||||||
inferior-store
|
|
||||||
checker-name)))))))
|
|
||||||
inferior-lint-checkers-data)))))
|
|
||||||
(inferior-packages-system-and-target-to-derivations-alist
|
|
||||||
(par-map&
|
|
||||||
(match-lambda
|
|
||||||
((system . target)
|
|
||||||
(let loop ((wal-bytes (stat:size (stat "/var/guix/db/db.sqlite-wal"))))
|
|
||||||
(when (> wal-bytes (* 2048 (expt 2 20)))
|
|
||||||
(simple-format #t "debug: guix-daemon WAL is large (~A), waiting\n"
|
|
||||||
wal-bytes)
|
|
||||||
|
|
||||||
(sleep 30)
|
|
||||||
(loop (stat:size (stat "/var/guix/db/db.sqlite-wal")))))
|
|
||||||
|
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
|
||||||
(with-time-logging
|
|
||||||
(simple-format #f "getting derivations for ~A" (cons system target))
|
|
||||||
(match res
|
|
||||||
((inferior . inferior-store)
|
|
||||||
(ensure-gds-inferior-packages-defined! inferior)
|
|
||||||
|
|
||||||
(let ((drvs
|
|
||||||
(inferior-package-derivations
|
|
||||||
inferior-store
|
|
||||||
inferior
|
|
||||||
system
|
|
||||||
target)))
|
|
||||||
|
|
||||||
(cons (cons system target)
|
|
||||||
drvs))))))))
|
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
(match res
|
(match res
|
||||||
((inferior . inferior-store)
|
((inferior . inferior-store)
|
||||||
(inferior-fetch-system-target-pairs inferior))))))
|
(with-time-logging "getting all inferior package data"
|
||||||
(inferior-system-tests
|
(let ((packages
|
||||||
(if skip-system-tests?
|
pkg-to-replacement-hash-table
|
||||||
(begin
|
(inferior-packages-plus-replacements inferior)))
|
||||||
(simple-format #t "debug: skipping system tests\n")
|
(all-inferior-packages-data
|
||||||
'())
|
inferior
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
packages
|
||||||
(match res
|
pkg-to-replacement-hash-table))))))))
|
||||||
((inferior . inferior-store)
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
(with-time-logging "getting inferior system tests"
|
(insert-packages conn packages-data))))))
|
||||||
(all-inferior-system-tests inferior inferior-store
|
|
||||||
guix-source commit)))))))
|
|
||||||
(packages-data
|
|
||||||
(with-resource-from-pool inf-and-store-pool res
|
|
||||||
(match res
|
|
||||||
((inferior . inferior-store)
|
|
||||||
(with-time-logging "getting all inferior package data"
|
|
||||||
(let ((packages
|
|
||||||
pkg-to-replacement-hash-table
|
|
||||||
(inferior-packages-plus-replacements inferior)))
|
|
||||||
(all-inferior-packages-data inferior
|
|
||||||
packages
|
|
||||||
pkg-to-replacement-hash-table))))))))
|
|
||||||
|
|
||||||
(destroy-resource-pool inf-and-store-pool)
|
(define (extract-and-store-lint-checkers-and-warnings)
|
||||||
|
(define inferior-lint-checkers-data
|
||||||
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
|
(match res
|
||||||
|
((inferior . inferior-store)
|
||||||
|
(inferior-lint-checkers inferior)))))
|
||||||
|
|
||||||
(simple-format
|
(when inferior-lint-checkers-data
|
||||||
#t "debug: finished loading information from inferior\n")
|
(letpar& ((lint-checker-ids
|
||||||
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
|
(lint-checkers->lint-checker-ids
|
||||||
|
conn
|
||||||
|
(map (match-lambda
|
||||||
|
((name descriptions-by-locale network-dependent)
|
||||||
|
(list
|
||||||
|
name
|
||||||
|
network-dependent
|
||||||
|
(lint-checker-description-data->lint-checker-description-set-id
|
||||||
|
conn descriptions-by-locale))))
|
||||||
|
inferior-lint-checkers-data))))
|
||||||
|
(lint-warnings-data
|
||||||
|
(par-map&
|
||||||
|
(match-lambda
|
||||||
|
((checker-name _ network-dependent?)
|
||||||
|
(and (and (not network-dependent?)
|
||||||
|
;; Running the derivation linter is
|
||||||
|
;; currently infeasible
|
||||||
|
(not (eq? checker-name 'derivation)))
|
||||||
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
|
(match res
|
||||||
|
((inferior . inferior-store)
|
||||||
|
(inferior-lint-warnings inferior
|
||||||
|
inferior-store
|
||||||
|
checker-name)))))))
|
||||||
|
inferior-lint-checkers-data)))
|
||||||
|
|
||||||
(with-time-logging
|
(let ((package-ids (fibers-force package-ids-promise)))
|
||||||
"acquiring advisory transaction lock: load-new-guix-revision-inserts"
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
;; Wait until this is the only transaction inserting data, to
|
|
||||||
;; avoid any concurrency issues
|
|
||||||
(obtain-advisory-transaction-lock conn
|
|
||||||
'load-new-guix-revision-inserts))
|
|
||||||
(with-time-logging
|
|
||||||
"inserting data"
|
|
||||||
(let* ((package-ids
|
|
||||||
(insert-packages conn packages-data)))
|
|
||||||
(let* ((package-derivation-ids
|
|
||||||
(with-time-logging "inferior-data->package-derivation-ids"
|
|
||||||
(inferior-data->package-derivation-ids
|
|
||||||
conn
|
|
||||||
inf
|
|
||||||
package-ids
|
|
||||||
inferior-packages-system-and-target-to-derivations-alist)))
|
|
||||||
(ids-count
|
|
||||||
(length package-derivation-ids)))
|
|
||||||
(chunk-for-each! (lambda (package-derivation-ids-chunk)
|
|
||||||
(insert-guix-revision-package-derivations
|
|
||||||
conn
|
|
||||||
guix-revision-id
|
|
||||||
package-derivation-ids-chunk))
|
|
||||||
2000
|
|
||||||
package-derivation-ids)
|
|
||||||
(simple-format
|
|
||||||
#t "Successfully loaded ~A package/derivation pairs\n"
|
|
||||||
ids-count))
|
|
||||||
|
|
||||||
(when inferior-lint-warnings
|
|
||||||
(let* ((lint-checker-ids
|
|
||||||
(lint-checkers->lint-checker-ids
|
|
||||||
conn
|
|
||||||
(map (match-lambda
|
|
||||||
((name descriptions-by-locale network-dependent)
|
|
||||||
(list
|
|
||||||
name
|
|
||||||
network-dependent
|
|
||||||
(lint-checker-description-data->lint-checker-description-set-id
|
|
||||||
conn descriptions-by-locale))))
|
|
||||||
(car inferior-lint-checkers-and-warnings-data))))
|
|
||||||
(lint-warning-ids
|
|
||||||
(insert-lint-warnings
|
|
||||||
conn
|
|
||||||
package-ids
|
|
||||||
lint-checker-ids
|
|
||||||
(cdr inferior-lint-checkers-and-warnings-data))))
|
|
||||||
(insert-guix-revision-lint-checkers conn
|
(insert-guix-revision-lint-checkers conn
|
||||||
guix-revision-id
|
guix-revision-id
|
||||||
lint-checker-ids)
|
lint-checker-ids)
|
||||||
|
|
||||||
(chunk-for-each!
|
(let ((lint-warning-ids
|
||||||
(lambda (lint-warning-ids-chunk)
|
(insert-lint-warnings
|
||||||
(insert-guix-revision-lint-warnings conn
|
conn
|
||||||
guix-revision-id
|
package-ids
|
||||||
lint-warning-ids-chunk))
|
lint-checker-ids
|
||||||
5000
|
lint-warnings-data)))
|
||||||
lint-warning-ids)))
|
(chunk-for-each!
|
||||||
|
(lambda (lint-warning-ids-chunk)
|
||||||
|
(insert-guix-revision-lint-warnings conn
|
||||||
|
guix-revision-id
|
||||||
|
lint-warning-ids-chunk))
|
||||||
|
5000
|
||||||
|
lint-warning-ids)))))))
|
||||||
|
|
||||||
(when inferior-system-tests
|
(define (extract-and-store-package-derivations)
|
||||||
(insert-system-tests-for-guix-revision conn
|
(fibers-for-each
|
||||||
guix-revision-id
|
(match-lambda
|
||||||
inferior-system-tests))
|
((system . target)
|
||||||
|
(let loop ((wal-bytes (stat:size (stat "/var/guix/db/db.sqlite-wal"))))
|
||||||
|
(when (> wal-bytes (* 2048 (expt 2 20)))
|
||||||
|
(simple-format #t "debug: guix-daemon WAL is large (~A), waiting\n"
|
||||||
|
wal-bytes)
|
||||||
|
|
||||||
(with-time-logging
|
(sleep 30)
|
||||||
"insert-guix-revision-package-derivation-distribution-counts"
|
(loop (stat:size (stat "/var/guix/db/db.sqlite-wal")))))
|
||||||
(insert-guix-revision-package-derivation-distribution-counts
|
|
||||||
conn
|
(let ((derivations-vector
|
||||||
guix-revision-id))))))
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
|
(with-time-logging
|
||||||
|
(simple-format #f "getting derivations for ~A" (cons system target))
|
||||||
|
(match res
|
||||||
|
((inferior . inferior-store)
|
||||||
|
(ensure-gds-inferior-packages-defined! inferior)
|
||||||
|
|
||||||
|
(inferior-package-derivations
|
||||||
|
inferior-store
|
||||||
|
inferior
|
||||||
|
system
|
||||||
|
target)))))))
|
||||||
|
|
||||||
|
(let ((package-ids (fibers-force package-ids-promise)))
|
||||||
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
|
(let* ((derivation-ids
|
||||||
|
(with-time-logging
|
||||||
|
(simple-format #f "derivation-file-names->derivation-ids (~A ~A)"
|
||||||
|
system target)
|
||||||
|
(derivation-file-names->derivation-ids
|
||||||
|
conn
|
||||||
|
derivations-vector))))
|
||||||
|
|
||||||
|
(let ((package-derivation-ids
|
||||||
|
(with-time-logging
|
||||||
|
(simple-format #f "insert-package-derivations (~A ~A)"
|
||||||
|
system target)
|
||||||
|
(insert-package-derivations conn
|
||||||
|
system
|
||||||
|
(or target "")
|
||||||
|
package-ids
|
||||||
|
derivation-ids))))
|
||||||
|
(chunk-for-each! (lambda (package-derivation-ids-chunk)
|
||||||
|
(insert-guix-revision-package-derivations
|
||||||
|
conn
|
||||||
|
guix-revision-id
|
||||||
|
package-derivation-ids-chunk))
|
||||||
|
2000
|
||||||
|
package-derivation-ids))))))))
|
||||||
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
|
(match res
|
||||||
|
((inferior . inferior-store)
|
||||||
|
(inferior-fetch-system-target-pairs inferior)))))
|
||||||
|
|
||||||
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
|
(with-time-logging
|
||||||
|
"insert-guix-revision-package-derivation-distribution-counts"
|
||||||
|
(insert-guix-revision-package-derivation-distribution-counts
|
||||||
|
conn
|
||||||
|
guix-revision-id))))
|
||||||
|
|
||||||
|
(define (extract-and-store-system-tests)
|
||||||
|
(if skip-system-tests?
|
||||||
|
(begin
|
||||||
|
(simple-format #t "debug: skipping system tests\n")
|
||||||
|
'())
|
||||||
|
(let ((data
|
||||||
|
(with-resource-from-pool inf-and-store-pool res
|
||||||
|
(match res
|
||||||
|
((inferior . inferior-store)
|
||||||
|
(with-time-logging "getting inferior system tests"
|
||||||
|
(all-inferior-system-tests
|
||||||
|
inferior
|
||||||
|
inferior-store
|
||||||
|
guix-source
|
||||||
|
commit)))))))
|
||||||
|
(when data
|
||||||
|
(with-resource-from-pool postgresql-connection-pool conn
|
||||||
|
(insert-system-tests-for-guix-revision conn
|
||||||
|
guix-revision-id
|
||||||
|
data))))))
|
||||||
|
|
||||||
|
(simple-format #t "debug: extract-information-from: ~A\n" store-path)
|
||||||
|
(parallel-via-fibers
|
||||||
|
(fibers-force package-ids-promise)
|
||||||
|
(extract-and-store-lint-checkers-and-warnings)
|
||||||
|
(extract-and-store-package-derivations)
|
||||||
|
(extract-and-store-system-tests)))
|
||||||
|
|
||||||
(prevent-inlining-for-tests extract-information-from)
|
(prevent-inlining-for-tests extract-information-from)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue