diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index da08566..8852139 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -35,7 +35,9 @@ #:use-module (squee) #:use-module (gcrypt hash) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) + #:use-module (fibers operations) #:use-module (guix monads) #:use-module (guix base32) #:use-module (guix store) @@ -117,7 +119,9 @@ (missing-store-item-error-item exn) thunk) (when on-exception (on-exception)) - (retry-on-missing-store-item thunk)) + (retry-on-missing-store-item + thunk + #:on-exception on-exception)) (raise-exception exn))) thunk #:unwind? #t)) @@ -929,21 +933,19 @@ (define (update-derivation-ids-hash-table! conn derivation-ids-hash-table - derivations) - (define derivations-count (length derivations)) + derivation-file-names) + (define derivations-count (vector-length derivation-file-names)) - (simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n" - derivations-count) (let ((missing-file-names - (fold - (lambda (drv result) + (vector-fold + (lambda (_ result file-name) (if (hash-ref derivation-ids-hash-table - (derivation-file-name drv)) + file-name) result - (cons (derivation-file-name drv) + (cons file-name result))) '() - derivations))) + derivation-file-names))) (simple-format #t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n" @@ -961,29 +963,11 @@ (exec-query conn (select-existing-derivations chunk)))) (chunk! missing-file-names 1000))))) -(define (insert-missing-derivations postgresql-connection-pool - utility-thread-channel - derivation-ids-hash-table - unfiltered-derivations) - - (define (ensure-input-derivations-exist input-derivation-file-names) - (unless (null? input-derivation-file-names) - ;; Ensure all the input derivations exist - (for-each - (lambda (chunk) - (simple-format - #t "debug: ensure-input-derivations-exist: processing ~A derivations\n" - (length chunk)) - - (insert-missing-derivations - postgresql-connection-pool - utility-thread-channel - derivation-ids-hash-table - (call-with-worker-thread - utility-thread-channel - (lambda () - (map read-derivation-from-file chunk))))) - (chunk! input-derivation-file-names 1000)))) +(define* (insert-missing-derivations postgresql-connection-pool + utility-thread-channel + derivation-ids-hash-table + unfiltered-derivations + #:key (log-tag "unspecified")) (define (insert-into-derivations conn drvs) (string-append @@ -1011,163 +995,203 @@ " RETURNING id" ";")) - (with-time-logging - (simple-format - #f "insert-missing-derivations: inserting ~A derivations" - (length unfiltered-derivations)) - (let ((derivations - derivation-ids - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-ids-hash-table! conn - derivation-ids-hash-table - unfiltered-derivations) + (define (insert-derivations) + (with-resource-from-pool postgresql-connection-pool conn + (update-derivation-ids-hash-table! + conn + derivation-ids-hash-table + (let ((file-names-vector + (make-vector (length unfiltered-derivations)))) + (for-each + (lambda (i drv) + (vector-set! file-names-vector + i + (derivation-file-name drv))) + (iota (vector-length file-names-vector)) + unfiltered-derivations) + file-names-vector)) - (let ((derivations - ;; Do this while holding the PostgreSQL connection to - ;; avoid conflicts with other fibers - (filter-map (lambda (derivation) - (if (hash-ref derivation-ids-hash-table - (derivation-file-name - derivation)) - #f - derivation)) - unfiltered-derivations))) - (if (null? derivations) - (values '() '()) - (let ((derivation-ids - (append-map! - (lambda (chunk) - (map (lambda (result) - (string->number (car result))) - (exec-query conn (insert-into-derivations conn chunk)))) - (chunk derivations 500)))) - - ;; Do this while holding the connection so that other - ;; fibers don't also try inserting the same derivations - (with-time-logging - "insert-missing-derivations: updating hash table" - (for-each (lambda (derivation derivation-id) - (hash-set! derivation-ids-hash-table - (derivation-file-name derivation) - derivation-id)) - derivations - derivation-ids)) - - (values derivations - derivation-ids))))))) - - (unless (null? derivations) - (parallel-via-fibers - (with-time-logging - "insert-missing-derivations: inserting sources" - (fibers-for-each - (lambda (derivation-id derivation) - (let ((sources (derivation-sources derivation))) - (unless (null? sources) - (let ((sources-ids - (with-resource-from-pool postgresql-connection-pool conn - (insert-derivation-sources conn - derivation-id - sources)))) - (par-map& - (lambda (id source-file) - (when - (with-resource-from-pool postgresql-connection-pool conn - (match - (exec-query - conn - " -SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" - (list (number->string id))) - (() - ;; Insert a placeholder to avoid other fibers - ;; working on this source file - (insert-placeholder-derivation-source-file-nar - conn - id) - #t) - (_ #f))) - (let ((nar-bytevector - (call-with-worker-thread - utility-thread-channel - (lambda () - (call-with-values - (lambda () - (open-bytevector-output-port)) - (lambda (port get-bytevector) - (unless (file-exists? source-file) - (raise-exception - (make-missing-store-item-error - source-file))) - (write-file source-file port) - (let ((res (get-bytevector))) - (close-port port) ; maybe reduces memory? - res))))))) - (letpar& - ((compressed-nar-bytevector - (call-with-worker-thread - utility-thread-channel - (lambda () - (call-with-values - (lambda () - (open-bytevector-output-port)) - (lambda (port get-bytevector) - (call-with-lzip-output-port port - (lambda (port) - (put-bytevector port nar-bytevector)) - #:level 9) - (let ((res (get-bytevector))) - (close-port port) ; maybe reduces memory? - res)))))) - (hash - (call-with-worker-thread - utility-thread-channel - (lambda () - (bytevector->nix-base32-string - (sha256 nar-bytevector))))) - (uncompressed-size (bytevector-length nar-bytevector))) - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-source-file-nar - conn - id - hash - compressed-nar-bytevector - uncompressed-size)))))) - sources-ids - sources))))) - derivation-ids - derivations)) - - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - "insert-missing-derivations: inserting outputs" - (for-each (lambda (derivation-id derivation) - (insert-derivation-outputs conn - derivation-id - (derivation-outputs derivation))) - derivation-ids - derivations))) - - (with-time-logging - "insert-missing-derivations: ensure-input-derivations-exist" - (ensure-input-derivations-exist (deduplicate-strings - (map derivation-input-path - (append-map derivation-inputs - derivations)))))) - - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging + (let ((derivations + ;; Do this while holding the PostgreSQL connection to + ;; avoid conflicts with other fibers + (delete-duplicates + (filter-map (lambda (derivation) + (if (hash-ref derivation-ids-hash-table + (derivation-file-name + derivation)) + #f + derivation)) + unfiltered-derivations)))) + (if (null? derivations) + (values '() '()) + (begin (simple-format - #f "insert-missing-derivations: inserting inputs for ~A derivations" - (length derivations)) - (insert-derivation-inputs conn - derivation-ids - derivations))))))) + (current-error-port) + "insert-missing-derivations: inserting ~A derivations (~A)\n" + (length unfiltered-derivations) + log-tag) + (let ((derivation-ids + (append-map! + (lambda (chunk) + (map (lambda (result) + (string->number (car result))) + (exec-query conn (insert-into-derivations conn chunk)))) + (chunk derivations 500)))) -(define (derivation-file-names->derivation-ids postgresql-connection-pool - utility-thread-channel - derivation-ids-hash-table - derivation-file-names) + ;; Do this while holding the connection so that other + ;; fibers don't also try inserting the same derivations + (with-time-logging + (string-append "insert-missing-derivations: updating hash table (" log-tag ")") + (for-each (lambda (derivation derivation-id) + (hash-set! derivation-ids-hash-table + (derivation-file-name derivation) + derivation-id)) + derivations + derivation-ids)) + + (simple-format + (current-error-port) + "insert-missing-derivations: finished inserting ~A derivations (~A)\n" + (length unfiltered-derivations) + log-tag) + + (values derivations + derivation-ids))))))) + + (define (insert-sources derivations derivation-ids) + (with-time-logging + (string-append "insert-missing-derivations: inserting sources (" log-tag ")") + (fibers-for-each + (lambda (derivation-id derivation) + (let ((sources (derivation-sources derivation))) + (unless (null? sources) + (let ((sources-ids + (with-resource-from-pool postgresql-connection-pool conn + (insert-derivation-sources conn + derivation-id + sources)))) + (fibers-for-each + (lambda (id source-file) + (when + (with-resource-from-pool postgresql-connection-pool conn + (match + (exec-query + conn + " +SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" + (list (number->string id))) + (() + ;; Insert a placeholder to avoid other fibers + ;; working on this source file + (insert-placeholder-derivation-source-file-nar + conn + id) + #t) + (_ #f))) + ;; Use the utility-thread-channel to control concurrency here, + ;; to avoid using too much memory + (call-with-worker-thread + utility-thread-channel + (lambda () + (let ((nar-bytevector + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (unless (file-exists? source-file) + (raise-exception + (make-missing-store-item-error + source-file))) + (write-file source-file port) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res))))) + (let ((compressed-nar-bytevector + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (call-with-lzip-output-port port + (lambda (port) + (put-bytevector port nar-bytevector)) + #:level 9) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res)))) + (hash + (bytevector->nix-base32-string + (sha256 nar-bytevector))) + (uncompressed-size + (bytevector-length nar-bytevector))) + (with-resource-from-pool postgresql-connection-pool conn + (update-derivation-source-file-nar + conn + id + hash + compressed-nar-bytevector + uncompressed-size)))))))) + sources-ids + sources))))) + derivation-ids + derivations))) + + (let ((derivations + derivation-ids + (insert-derivations))) + + (unless (null? derivations) + (parallel-via-fibers + (insert-sources derivations + derivation-ids) + (with-time-logging + (string-append "insert-missing-derivations: inserting outputs (" + log-tag ")") + (with-resource-from-pool postgresql-connection-pool conn + (for-each (lambda (derivation-id derivation) + (insert-derivation-outputs conn + derivation-id + (derivation-outputs derivation))) + derivation-ids + derivations))) + + (with-time-logging + (string-append + "insert-missing-derivations: ensure-input-derivations-exist (" + log-tag ")") + (let ((input-derivations + (map + derivation-input-derivation + (append-map derivation-inputs + derivations)))) + (unless (null? input-derivations) + ;; Ensure all the input derivations exist + (for-each + (lambda (chunk) + (insert-missing-derivations + postgresql-connection-pool + utility-thread-channel + derivation-ids-hash-table + chunk + #:log-tag log-tag)) + (chunk! input-derivations 1000)))))) + + (string-append "insert-missing-derivations: done parallel (" log-tag ")") + (with-resource-from-pool postgresql-connection-pool conn + (with-time-logging + (simple-format + #f "insert-missing-derivations: inserting inputs for ~A derivations (~A)" + (length derivations) + log-tag) + (insert-derivation-inputs conn + derivation-ids + derivations)))))) + +(define* (derivation-file-names->derivation-ids postgresql-connection-pool + utility-thread-channel + read-derivations/fiberized + derivation-ids-hash-table + derivation-file-names + #:key (log-tag "unspecified")) (define derivations-count (vector-length derivation-file-names)) @@ -1175,8 +1199,9 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" #() (begin (simple-format - #t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n" - derivations-count) + #t "debug: derivation-file-names->derivation-ids: processing ~A derivations (~A)\n" + derivations-count + log-tag) (let* ((missing-derivation-filenames (deduplicate-strings @@ -1189,32 +1214,24 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" result (cons derivation-file-name result)))) '() - derivation-file-names))) - (missing-derivations-chunked-promises - (map - (lambda (chunk) - (fibers-delay - (lambda () - (map (lambda (filename) - (if (file-exists? filename) - (read-derivation-from-file filename) - (raise-exception - (make-missing-store-item-error - filename)))) - chunk)))) - (chunk! missing-derivation-filenames 1000)))) - - (for-each - (lambda (missing-derivation-chunk-promise) - (let ((missing-derivations-chunk - (fibers-force - missing-derivation-chunk-promise))) - (unless (null? missing-derivations-chunk) + derivation-file-names)))) + (let ((chunks (chunk! missing-derivation-filenames 1000))) + (for-each + (lambda (i missing-derivation-file-names-chunk) + (let ((missing-derivations-chunk + (read-derivations/fiberized + missing-derivation-file-names-chunk))) + (simple-format + #t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n" + i + log-tag) (insert-missing-derivations postgresql-connection-pool utility-thread-channel derivation-ids-hash-table - missing-derivations-chunk)))) - missing-derivations-chunked-promises) + missing-derivations-chunk + #:log-tag log-tag))) + (iota (length chunks)) + chunks)) (let ((all-ids (vector-map @@ -1222,34 +1239,12 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (if derivation-file-name (or (hash-ref derivation-ids-hash-table derivation-file-name) - ;; If a derivation ID can't be found, update the - ;; hash table then check again - (with-resource-from-pool postgresql-connection-pool conn - (for-each - (lambda (missing-derivations-chunked-promise) - (update-derivation-ids-hash-table! - conn - derivation-ids-hash-table - (fibers-force missing-derivations-chunked-promise))) - missing-derivations-chunked-promises) - (or (hash-ref derivation-ids-hash-table - derivation-file-name) (error (simple-format #f "missing derivation id (~A)" - derivation-file-name))))) + derivation-file-name))) #f)) derivation-file-names))) - (with-resource-from-pool postgresql-connection-pool conn - (simple-format - (current-error-port) - "guix-data-service: clearing the derivation-ids-hash-table\n") - (hash-clear! derivation-ids-hash-table)) - - ;; Just in case this helps clear memory - (for-each fibers-promise-reset - missing-derivations-chunked-promises) - all-ids))))) (prevent-inlining-for-tests derivation-file-names->derivation-ids) @@ -1489,6 +1484,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (cons inferior inferior-store))) parallelism #:min-size 0 + #:name "inferior" #:idle-seconds 30 #:destructor (match-lambda ((inferior . store) @@ -1501,7 +1497,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (inferior-eval '(@ (guix packages) %supported-systems) inferior))))) (result - (par-map& + (fibers-map (lambda (system) (with-resource-from-pool inferior-and-store-pool res (match res @@ -1725,6 +1721,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" guix-source store-item guix-derivation utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table #:key skip-system-tests? extra-inferior-environment-variables @@ -1776,6 +1773,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" parallelism #:min-size 0 #:idle-seconds 20 + #:name "inferior" #:destructor (match-lambda ((inferior . store) @@ -1853,6 +1851,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" 'load-new-guix-revision-inserts)) db-conn) 1 + #:name "postgres" + #:assume-reliable-waiters? #t #:min-size 0)) (define package-ids-promise @@ -1892,7 +1892,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" conn descriptions-by-locale)))) inferior-lint-checkers-data)))) (lint-warnings-data - (par-map& + (fibers-map (match-lambda ((checker-name _ network-dependent?) (and (and (not network-dependent?) @@ -1978,8 +1978,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table - derivations-vector))) + derivations-vector + #:log-tag (simple-format #f "~A:~A" system target)))) (guix-revision-id (fibers-force guix-revision-id-promise)) (package-ids (fibers-force package-ids-promise)) @@ -2013,75 +2015,93 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (fibers-force guix-revision-id-promise) (number->string (system->system-id conn system)) - (or target ""))))) + (or target "")))) + + 'finished) (let ((get-derivations/fiberized (fiberize get-derivations ;; Limit concurrency here to keep focused on specific ;; systems until they've been fully processed #:parallelism parallelism))) - (par-map& - (match-lambda - ((system . target) - (retry-on-missing-store-item - (lambda () - (process-system-and-target system target - get-derivations/fiberized))))) - (call-with-inferior - (lambda (inferior inferior-store) - (inferior-fetch-system-target-pairs inferior)))))) + (with-time-logging "extract-and-store-package-derivations" + (fibers-map-with-progress + (match-lambda + ((system . target) + (retry-on-missing-store-item + (lambda () + (process-system-and-target system target + get-derivations/fiberized))))) + (list + (call-with-inferior + (lambda (inferior inferior-store) + (inferior-fetch-system-target-pairs inferior)))) + #:report + (lambda (data) + (for-each + (match-lambda + ((result (system . target)) + (simple-format #t "~A ~A: ~A\n" + system target result))) + data)))))) (define (extract-and-store-system-tests) (if skip-system-tests? (begin (simple-format #t "debug: skipping system tests\n") '()) - (let ((data-with-derivation-file-names - (call-with-inferior - (lambda (inferior inferior-store) - (with-time-logging "getting inferior system tests" - (all-inferior-system-tests - inferior - inferior-store - guix-source - commit)))))) - (when data-with-derivation-file-names - (let ((data-with-derivation-ids - (map (match-lambda - ((name description derivation-file-names-by-system location-data) - (list name - description - (let ((systems - (map car derivation-file-names-by-system)) - (derivation-ids - (derivation-file-names->derivation-ids - postgresql-connection-pool - utility-thread-channel - derivation-ids-hash-table - (list->vector - (map cdr derivation-file-names-by-system))))) - (map cons systems derivation-ids)) - location-data))) - data-with-derivation-file-names))) - (with-resource-from-pool postgresql-connection-pool conn - (insert-system-tests-for-guix-revision - conn - (fibers-force guix-revision-id-promise) - data-with-derivation-ids))))))) + (with-time-logging "extract-and-store-system-tests" + (let ((data-with-derivation-file-names + (call-with-inferior + (lambda (inferior inferior-store) + (with-time-logging "getting inferior system tests" + (all-inferior-system-tests + inferior + inferior-store + guix-source + commit)))))) + (when data-with-derivation-file-names + (let ((data-with-derivation-ids + (map (match-lambda + ((name description derivation-file-names-by-system location-data) + (list name + description + (let ((systems + (map car derivation-file-names-by-system)) + (derivation-ids + (derivation-file-names->derivation-ids + postgresql-connection-pool + utility-thread-channel + read-derivations/fiberized + derivation-ids-hash-table + (list->vector + (map cdr derivation-file-names-by-system))))) + (map cons systems derivation-ids)) + location-data))) + data-with-derivation-file-names))) + (with-resource-from-pool postgresql-connection-pool conn + (insert-system-tests-for-guix-revision + conn + (fibers-force guix-revision-id-promise) + data-with-derivation-ids)))))))) (with-time-logging (simple-format #f "extract-information-from: ~A\n" store-item) (parallel-via-fibers - (fibers-force package-ids-promise) + (begin + (fibers-force package-ids-promise) + #f) (extract-and-store-package-derivations) (retry-on-missing-store-item extract-and-store-system-tests) - (extract-and-store-lint-checkers-and-warnings))) + (with-time-logging "extract-and-store-lint-checkers-and-warnings" + (extract-and-store-lint-checkers-and-warnings)))) #t) (prevent-inlining-for-tests extract-information-from) (define (load-channel-instances utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table git-repository-id commit channel-derivations-by-system) @@ -2113,6 +2133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (make-resource-pool (const channel-instances-conn) 1 + #:name "postgres" #:min-size 0))) (unless existing-guix-revision-id @@ -2130,6 +2151,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (derivation-file-names->derivation-ids postgresql-connection-pool utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table (list->vector (map cdr derivations-by-system))))) @@ -2150,15 +2172,34 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" #:key skip-system-tests? parallelism extra-inferior-environment-variables) (define utility-thread-channel - (make-worker-thread-channel - (const '()) - #:parallelism parallelism)) + ;; There might be high demand for this, so order the requests + (make-queueing-channel + (call-with-default-io-waiters + (lambda () + (make-worker-thread-channel + (const '()) + #:parallelism parallelism))))) + + (define (read-derivations filenames) + (call-with-worker-thread + utility-thread-channel + (lambda () + (map (lambda (filename) + (if (file-exists? filename) + (read-derivation-from-file filename) + (raise-exception + (make-missing-store-item-error + filename)))) + filenames)))) + (define read-derivations/fiberized + (fiberize read-derivations + ;; Don't do this in parallel as there's caching involved with + ;; read-derivation-from-file + #:parallelism 1)) (define derivation-ids-hash-table (make-hash-table)) - (%worker-thread-default-timeout #f) - (let* ((git-repository-fields (select-git-repository conn git-repository-id)) (git-repository-url @@ -2188,6 +2229,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" channel-derivations-by-system (fibers-force channel-derivations-by-system-promise))) (load-channel-instances utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table git-repository-id commit channel-derivations-by-system))) @@ -2214,6 +2256,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" commit guix-source store-item guix-derivation utility-thread-channel + read-derivations/fiberized derivation-ids-hash-table #:skip-system-tests? skip-system-tests? @@ -2633,6 +2676,9 @@ SKIP LOCKED") (define* (process-load-new-guix-revision-job id #:key skip-system-tests? extra-inferior-environment-variables parallelism) + (define finished-channel + (make-channel)) + (define result (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A" id) @@ -2642,12 +2688,20 @@ SKIP LOCKED") ;; instances have the data updated. (fix-derivation-output-details-hash-encoding conn) + (%worker-thread-default-timeout #f) + + (resource-pool-retry-checkout-timeout 120) + (exec-query conn "BEGIN") (spawn-fiber (lambda () - (while #t - (sleep 30) + (while (perform-operation + (choice-operation + (wrap-operation (get-operation finished-channel) + (const #f)) + (wrap-operation (sleep-operation 20) + (const #t)))) (let ((stats (gc-stats))) (simple-format @@ -2752,4 +2806,5 @@ SKIP LOCKED") "update-derivation-outputs-statistics" (update-derivation-outputs-statistics conn)))))) + (put-message finished-channel #t) result)