Rework inserting derivations

To add more parallelism.
This commit is contained in:
Christopher Baines 2024-10-27 14:02:57 +00:00
parent 1e0407e9b6
commit c650fc6e7a

View file

@ -929,25 +929,25 @@
(define (update-derivation-ids-hash-table! conn (define (update-derivation-ids-hash-table! conn
derivation-ids-hash-table derivation-ids-hash-table
file-names) derivations)
(define file-names-count (vector-length file-names)) (define derivations-count (length derivations))
(simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n" (simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n"
file-names-count) derivations-count)
(let ((missing-file-names (let ((missing-file-names
(vector-fold (fold
(lambda (_ result file-name) (lambda (drv result)
(if (and file-name (if (hash-ref derivation-ids-hash-table
(hash-ref derivation-ids-hash-table (derivation-file-name drv))
file-name))
result result
(cons file-name result))) (cons (derivation-file-name drv)
result)))
'() '()
file-names))) derivations)))
(simple-format (simple-format
#t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n" #t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n"
file-names-count (length missing-file-names)) derivations-count (length missing-file-names))
(unless (null? missing-file-names) (unless (null? missing-file-names)
(for-each (for-each
@ -964,42 +964,26 @@
(define (insert-missing-derivations postgresql-connection-pool (define (insert-missing-derivations postgresql-connection-pool
utility-thread-channel utility-thread-channel
derivation-ids-hash-table derivation-ids-hash-table
derivations) unfiltered-derivations)
(define (ensure-input-derivations-exist input-derivation-file-names) (define (ensure-input-derivations-exist input-derivation-file-names)
(unless (null? input-derivation-file-names) (unless (null? input-derivation-file-names)
(simple-format ;; Ensure all the input derivations exist
#t "debug: ensure-input-derivations-exist: processing ~A derivations\n" (for-each
(length input-derivation-file-names)) (lambda (chunk)
(simple-format
#t "debug: ensure-input-derivations-exist: processing ~A derivations\n"
(length chunk))
(with-resource-from-pool postgresql-connection-pool conn (insert-missing-derivations
(update-derivation-ids-hash-table! conn postgresql-connection-pool
derivation-ids-hash-table utility-thread-channel
(list->vector derivation-ids-hash-table
input-derivation-file-names))) (call-with-worker-thread
(simple-format
#t
"debug: ensure-input-derivations-exist: checking for missing input derivations\n")
(let ((missing-derivations-filenames
(remove (lambda (derivation-file-name)
(hash-ref derivation-ids-hash-table
derivation-file-name))
input-derivation-file-names)))
(unless (null? missing-derivations-filenames)
(simple-format
#f
"debug: ensure-input-derivations-exist: inserting missing input derivations\n")
;; Ensure all the input derivations exist
(insert-missing-derivations
postgresql-connection-pool
utility-thread-channel utility-thread-channel
derivation-ids-hash-table (lambda ()
(call-with-worker-thread (map read-derivation-from-file chunk)))))
utility-thread-channel (chunk! input-derivation-file-names 1000))))
(lambda ()
(map read-derivation-from-file
missing-derivations-filenames))))))))
(define (insert-into-derivations conn drvs) (define (insert-into-derivations conn drvs)
(string-append (string-append
@ -1030,121 +1014,155 @@
(with-time-logging (with-time-logging
(simple-format (simple-format
#f "insert-missing-derivations: inserting ~A derivations" #f "insert-missing-derivations: inserting ~A derivations"
(length derivations)) (length unfiltered-derivations))
(let* ((chunks (chunk derivations 500)) (let ((derivations
(derivation-ids derivation-ids
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(append-map! (update-derivation-ids-hash-table! conn
(lambda (chunk) derivation-ids-hash-table
(map (lambda (result) unfiltered-derivations)
(string->number (car result)))
(exec-query conn (insert-into-derivations conn chunk))))
chunks))))
(with-time-logging (let ((derivations
"insert-missing-derivations: updating hash table" ;; Do this while holding the PostgreSQL connection to
(for-each (lambda (derivation derivation-id) ;; avoid conflicts with other fibers
(hash-set! derivation-ids-hash-table (filter-map (lambda (derivation)
(derivation-file-name derivation) (if (hash-ref derivation-ids-hash-table
derivation-id)) (derivation-file-name
derivations derivation))
derivation-ids)) #f
derivation))
unfiltered-derivations)))
(if (null? derivations)
(values '() '())
(let ((derivation-ids
(append-map!
(lambda (chunk)
(map (lambda (result)
(string->number (car result)))
(exec-query conn (insert-into-derivations conn chunk))))
(chunk derivations 500))))
(with-time-logging ;; Do this while holding the connection so that other
"insert-missing-derivations: inserting sources" ;; fibers don't also try inserting the same derivations
(for-each (with-time-logging
(lambda (derivation-id derivation) "insert-missing-derivations: updating hash table"
(let ((sources (derivation-sources derivation))) (for-each (lambda (derivation derivation-id)
(unless (null? sources) (hash-set! derivation-ids-hash-table
(let ((sources-ids (derivation-file-name derivation)
(with-resource-from-pool postgresql-connection-pool conn derivation-id))
(insert-derivation-sources conn derivations
derivation-id derivation-ids))
sources))))
(par-map&
(lambda (id source-file)
(match
(with-resource-from-pool postgresql-connection-pool conn
(exec-query
conn
"
SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(list (number->string id))))
(()
(let ((nar-bytevector
(call-with-worker-thread
utility-thread-channel
(lambda ()
(call-with-values
(lambda ()
(open-bytevector-output-port))
(lambda (port get-bytevector)
(unless (file-exists? source-file)
(raise-exception
(make-missing-store-item-error
source-file)))
(write-file source-file port)
(get-bytevector)))))))
(letpar&
((compressed-nar-bytevector
(call-with-worker-thread
utility-thread-channel
(lambda ()
(call-with-values
(lambda ()
(open-bytevector-output-port))
(lambda (port get-bytevector)
(call-with-lzip-output-port port
(lambda (port)
(put-bytevector port nar-bytevector))
#:level 9)
(get-bytevector))))))
(hash
(call-with-worker-thread
utility-thread-channel
(lambda ()
(bytevector->nix-base32-string
(sha256 nar-bytevector)))))
(uncompressed-size (bytevector-length nar-bytevector)))
(values derivations
derivation-ids)))))))
(unless (null? derivations)
(parallel-via-fibers
(with-time-logging
"insert-missing-derivations: inserting sources"
(fibers-for-each
(lambda (derivation-id derivation)
(let ((sources (derivation-sources derivation)))
(unless (null? sources)
(let ((sources-ids
(with-resource-from-pool postgresql-connection-pool conn
(insert-derivation-sources conn
derivation-id
sources))))
(par-map&
(lambda (id source-file)
(when
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(insert-derivation-source-file-nar (match
conn (exec-query
id conn
hash "
compressed-nar-bytevector SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
uncompressed-size))))) (list (number->string id)))
(_ #f))) (()
sources-ids ;; Insert a placeholder to avoid other fibers
sources))))) ;; working on this source file
derivation-ids (insert-placeholder-derivation-source-file-nar
derivations)) conn
id)
#t)
(_ #f)))
(let ((nar-bytevector
(call-with-worker-thread
utility-thread-channel
(lambda ()
(call-with-values
(lambda ()
(open-bytevector-output-port))
(lambda (port get-bytevector)
(unless (file-exists? source-file)
(raise-exception
(make-missing-store-item-error
source-file)))
(write-file source-file port)
(let ((res (get-bytevector)))
(close-port port) ; maybe reduces memory?
res)))))))
(letpar&
((compressed-nar-bytevector
(call-with-worker-thread
utility-thread-channel
(lambda ()
(call-with-values
(lambda ()
(open-bytevector-output-port))
(lambda (port get-bytevector)
(call-with-lzip-output-port port
(lambda (port)
(put-bytevector port nar-bytevector))
#:level 9)
(let ((res (get-bytevector)))
(close-port port) ; maybe reduces memory?
res))))))
(hash
(call-with-worker-thread
utility-thread-channel
(lambda ()
(bytevector->nix-base32-string
(sha256 nar-bytevector)))))
(uncompressed-size (bytevector-length nar-bytevector)))
(with-resource-from-pool postgresql-connection-pool conn
(update-derivation-source-file-nar
conn
id
hash
compressed-nar-bytevector
uncompressed-size))))))
sources-ids
sources)))))
derivation-ids
derivations))
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(with-time-logging (with-time-logging
"insert-missing-derivations: inserting outputs" "insert-missing-derivations: inserting outputs"
(for-each (lambda (derivation-id derivation) (for-each (lambda (derivation-id derivation)
(insert-derivation-outputs conn (insert-derivation-outputs conn
derivation-id derivation-id
(derivation-outputs derivation))) (derivation-outputs derivation)))
derivation-ids derivation-ids
derivations))) derivations)))
(with-time-logging (with-time-logging
"insert-missing-derivations: ensure-input-derivations-exist" "insert-missing-derivations: ensure-input-derivations-exist"
(ensure-input-derivations-exist (deduplicate-strings (ensure-input-derivations-exist (deduplicate-strings
(map derivation-input-path (map derivation-input-path
(append-map derivation-inputs (append-map derivation-inputs
derivations))))) derivations))))))
(with-resource-from-pool postgresql-connection-pool conn (with-resource-from-pool postgresql-connection-pool conn
(with-time-logging (with-time-logging
(simple-format (simple-format
#f "insert-missing-derivations: inserting inputs for ~A derivations" #f "insert-missing-derivations: inserting inputs for ~A derivations"
(length derivations)) (length derivations))
(insert-derivation-inputs conn (insert-derivation-inputs conn
derivation-ids derivation-ids
derivations)))))) derivations)))))))
(define (derivation-file-names->derivation-ids postgresql-connection-pool (define (derivation-file-names->derivation-ids postgresql-connection-pool
utility-thread-channel utility-thread-channel
@ -1160,11 +1178,6 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n" #t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n"
derivations-count) derivations-count)
(with-resource-from-pool postgresql-connection-pool conn
(update-derivation-ids-hash-table! conn
derivation-ids-hash-table
derivation-file-names))
(let* ((missing-derivation-filenames (let* ((missing-derivation-filenames
(deduplicate-strings (deduplicate-strings
(vector-fold (vector-fold
@ -1192,36 +1205,52 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(chunk! missing-derivation-filenames 1000)))) (chunk! missing-derivation-filenames 1000))))
(for-each (for-each
(lambda (missing-derivation-filenames-chunk) (lambda (missing-derivation-chunk-promise)
(let ((missing-derivations-chunk (let ((missing-derivations-chunk
;; Do the filter again, since processing the last chunk (fibers-force
;; might have inserted some of the derivations in this missing-derivation-chunk-promise)))
;; chunk
(remove! (lambda (derivation)
(hash-ref derivation-ids-hash-table
(derivation-file-name
derivation)))
(fibers-force
missing-derivation-filenames-chunk))))
(unless (null? missing-derivations-chunk) (unless (null? missing-derivations-chunk)
(insert-missing-derivations postgresql-connection-pool (insert-missing-derivations postgresql-connection-pool
utility-thread-channel utility-thread-channel
derivation-ids-hash-table derivation-ids-hash-table
missing-derivations-chunk)))) missing-derivations-chunk))))
missing-derivations-chunked-promises)) missing-derivations-chunked-promises)
(let ((all-ids (let ((all-ids
(vector-map (vector-map
(lambda (_ derivation-file-name) (lambda (_ derivation-file-name)
(if derivation-file-name (if derivation-file-name
(or (hash-ref derivation-ids-hash-table (or (hash-ref derivation-ids-hash-table
derivation-file-name) derivation-file-name)
(error "missing derivation id")) ;; If a derivation ID can't be found, update the
#f)) ;; hash table then check again
derivation-file-names))) (with-resource-from-pool postgresql-connection-pool conn
(for-each
(lambda (missing-derivations-chunked-promise)
(update-derivation-ids-hash-table!
conn
derivation-ids-hash-table
(fibers-force missing-derivations-chunked-promise)))
missing-derivations-chunked-promises)
(or (hash-ref derivation-ids-hash-table
derivation-file-name)
(error
(simple-format #f "missing derivation id (~A)"
derivation-file-name)))))
#f))
derivation-file-names)))
all-ids)))) (with-resource-from-pool postgresql-connection-pool conn
(simple-format
(current-error-port)
"guix-data-service: clearing the derivation-ids-hash-table\n")
(hash-clear! derivation-ids-hash-table))
;; Just in case this helps clear memory
(for-each fibers-promise-reset
missing-derivations-chunked-promises)
all-ids)))))
(prevent-inlining-for-tests derivation-file-names->derivation-ids) (prevent-inlining-for-tests derivation-file-names->derivation-ids)