Finish chasing the call-with-resource-pool bug

This took a while to find as process-job would just get stuck, and this wasn't
directly related to any particular change, just that more fibers increased the
chance of hitting it.

This commit includes lots of the things I changed while debugging.
This commit is contained in:
Christopher Baines 2024-10-31 16:56:30 +00:00
parent af93bdcf5e
commit e67edf54bc

View file

@ -35,7 +35,9 @@
#:use-module (squee)
#:use-module (gcrypt hash)
#:use-module (fibers)
#:use-module (fibers timers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (guix monads)
#:use-module (guix base32)
#:use-module (guix store)
@ -117,7 +119,9 @@
(missing-store-item-error-item exn)
thunk)
(when on-exception (on-exception))
(retry-on-missing-store-item thunk))
(retry-on-missing-store-item
thunk
#:on-exception on-exception))
(raise-exception exn)))
thunk
#:unwind? #t))
@ -929,21 +933,19 @@
(define (update-derivation-ids-hash-table! conn
derivation-ids-hash-table
derivations)
(define derivations-count (length derivations))
derivation-file-names)
(define derivations-count (vector-length derivation-file-names))
(simple-format #t "debug: update-derivation-ids-hash-table!: ~A file-names\n"
derivations-count)
(let ((missing-file-names
(fold
(lambda (drv result)
(vector-fold
(lambda (_ result file-name)
(if (hash-ref derivation-ids-hash-table
(derivation-file-name drv))
file-name)
result
(cons (derivation-file-name drv)
(cons file-name
result)))
'()
derivations)))
derivation-file-names)))
(simple-format
#t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n"
@ -961,29 +963,11 @@
(exec-query conn (select-existing-derivations chunk))))
(chunk! missing-file-names 1000)))))
(define (insert-missing-derivations postgresql-connection-pool
(define* (insert-missing-derivations postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
unfiltered-derivations)
(define (ensure-input-derivations-exist input-derivation-file-names)
(unless (null? input-derivation-file-names)
;; Ensure all the input derivations exist
(for-each
(lambda (chunk)
(simple-format
#t "debug: ensure-input-derivations-exist: processing ~A derivations\n"
(length chunk))
(insert-missing-derivations
postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
(call-with-worker-thread
utility-thread-channel
(lambda ()
(map read-derivation-from-file chunk)))))
(chunk! input-derivation-file-names 1000))))
unfiltered-derivations
#:key (log-tag "unspecified"))
(define (insert-into-derivations conn drvs)
(string-append
@ -1011,29 +995,41 @@
" RETURNING id"
";"))
(with-time-logging
(simple-format
#f "insert-missing-derivations: inserting ~A derivations"
(length unfiltered-derivations))
(let ((derivations
derivation-ids
(define (insert-derivations)
(with-resource-from-pool postgresql-connection-pool conn
(update-derivation-ids-hash-table! conn
(update-derivation-ids-hash-table!
conn
derivation-ids-hash-table
(let ((file-names-vector
(make-vector (length unfiltered-derivations))))
(for-each
(lambda (i drv)
(vector-set! file-names-vector
i
(derivation-file-name drv)))
(iota (vector-length file-names-vector))
unfiltered-derivations)
file-names-vector))
(let ((derivations
;; Do this while holding the PostgreSQL connection to
;; avoid conflicts with other fibers
(delete-duplicates
(filter-map (lambda (derivation)
(if (hash-ref derivation-ids-hash-table
(derivation-file-name
derivation))
#f
derivation))
unfiltered-derivations)))
unfiltered-derivations))))
(if (null? derivations)
(values '() '())
(begin
(simple-format
(current-error-port)
"insert-missing-derivations: inserting ~A derivations (~A)\n"
(length unfiltered-derivations)
log-tag)
(let ((derivation-ids
(append-map!
(lambda (chunk)
@ -1045,7 +1041,7 @@
;; Do this while holding the connection so that other
;; fibers don't also try inserting the same derivations
(with-time-logging
"insert-missing-derivations: updating hash table"
(string-append "insert-missing-derivations: updating hash table (" log-tag ")")
(for-each (lambda (derivation derivation-id)
(hash-set! derivation-ids-hash-table
(derivation-file-name derivation)
@ -1053,13 +1049,18 @@
derivations
derivation-ids))
(simple-format
(current-error-port)
"insert-missing-derivations: finished inserting ~A derivations (~A)\n"
(length unfiltered-derivations)
log-tag)
(values derivations
derivation-ids)))))))
(unless (null? derivations)
(parallel-via-fibers
(define (insert-sources derivations derivation-ids)
(with-time-logging
"insert-missing-derivations: inserting sources"
(string-append "insert-missing-derivations: inserting sources (" log-tag ")")
(fibers-for-each
(lambda (derivation-id derivation)
(let ((sources (derivation-sources derivation)))
@ -1069,7 +1070,7 @@
(insert-derivation-sources conn
derivation-id
sources))))
(par-map&
(fibers-for-each
(lambda (id source-file)
(when
(with-resource-from-pool postgresql-connection-pool conn
@ -1087,10 +1088,12 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
id)
#t)
(_ #f)))
(let ((nar-bytevector
;; Use the utility-thread-channel to control concurrency here,
;; to avoid using too much memory
(call-with-worker-thread
utility-thread-channel
(lambda ()
(let ((nar-bytevector
(call-with-values
(lambda ()
(open-bytevector-output-port))
@ -1102,12 +1105,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(write-file source-file port)
(let ((res (get-bytevector)))
(close-port port) ; maybe reduces memory?
res)))))))
(letpar&
((compressed-nar-bytevector
(call-with-worker-thread
utility-thread-channel
(lambda ()
res)))))
(let ((compressed-nar-bytevector
(call-with-values
(lambda ()
(open-bytevector-output-port))
@ -1118,29 +1117,36 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#:level 9)
(let ((res (get-bytevector)))
(close-port port) ; maybe reduces memory?
res))))))
res))))
(hash
(call-with-worker-thread
utility-thread-channel
(lambda ()
(bytevector->nix-base32-string
(sha256 nar-bytevector)))))
(uncompressed-size (bytevector-length nar-bytevector)))
(sha256 nar-bytevector)))
(uncompressed-size
(bytevector-length nar-bytevector)))
(with-resource-from-pool postgresql-connection-pool conn
(update-derivation-source-file-nar
conn
id
hash
compressed-nar-bytevector
uncompressed-size))))))
uncompressed-size))))))))
sources-ids
sources)))))
derivation-ids
derivations))
derivations)))
(with-resource-from-pool postgresql-connection-pool conn
(let ((derivations
derivation-ids
(insert-derivations)))
(unless (null? derivations)
(parallel-via-fibers
(insert-sources derivations
derivation-ids)
(with-time-logging
"insert-missing-derivations: inserting outputs"
(string-append "insert-missing-derivations: inserting outputs ("
log-tag ")")
(with-resource-from-pool postgresql-connection-pool conn
(for-each (lambda (derivation-id derivation)
(insert-derivation-outputs conn
derivation-id
@ -1149,25 +1155,43 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
derivations)))
(with-time-logging
"insert-missing-derivations: ensure-input-derivations-exist"
(ensure-input-derivations-exist (deduplicate-strings
(map derivation-input-path
(string-append
"insert-missing-derivations: ensure-input-derivations-exist ("
log-tag ")")
(let ((input-derivations
(map
derivation-input-derivation
(append-map derivation-inputs
derivations))))))
derivations))))
(unless (null? input-derivations)
;; Ensure all the input derivations exist
(for-each
(lambda (chunk)
(insert-missing-derivations
postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
chunk
#:log-tag log-tag))
(chunk! input-derivations 1000))))))
(string-append "insert-missing-derivations: done parallel (" log-tag ")")
(with-resource-from-pool postgresql-connection-pool conn
(with-time-logging
(simple-format
#f "insert-missing-derivations: inserting inputs for ~A derivations"
(length derivations))
#f "insert-missing-derivations: inserting inputs for ~A derivations (~A)"
(length derivations)
log-tag)
(insert-derivation-inputs conn
derivation-ids
derivations)))))))
derivations))))))
(define (derivation-file-names->derivation-ids postgresql-connection-pool
(define* (derivation-file-names->derivation-ids postgresql-connection-pool
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
derivation-file-names)
derivation-file-names
#:key (log-tag "unspecified"))
(define derivations-count
(vector-length derivation-file-names))
@ -1175,8 +1199,9 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#()
(begin
(simple-format
#t "debug: derivation-file-names->derivation-ids: processing ~A derivations\n"
derivations-count)
#t "debug: derivation-file-names->derivation-ids: processing ~A derivations (~A)\n"
derivations-count
log-tag)
(let* ((missing-derivation-filenames
(deduplicate-strings
@ -1189,67 +1214,37 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
result
(cons derivation-file-name result))))
'()
derivation-file-names)))
(missing-derivations-chunked-promises
(map
(lambda (chunk)
(fibers-delay
(lambda ()
(map (lambda (filename)
(if (file-exists? filename)
(read-derivation-from-file filename)
(raise-exception
(make-missing-store-item-error
filename))))
chunk))))
(chunk! missing-derivation-filenames 1000))))
derivation-file-names))))
(let ((chunks (chunk! missing-derivation-filenames 1000)))
(for-each
(lambda (missing-derivation-chunk-promise)
(lambda (i missing-derivation-file-names-chunk)
(let ((missing-derivations-chunk
(fibers-force
missing-derivation-chunk-promise)))
(unless (null? missing-derivations-chunk)
(read-derivations/fiberized
missing-derivation-file-names-chunk)))
(simple-format
#t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n"
i
log-tag)
(insert-missing-derivations postgresql-connection-pool
utility-thread-channel
derivation-ids-hash-table
missing-derivations-chunk))))
missing-derivations-chunked-promises)
missing-derivations-chunk
#:log-tag log-tag)))
(iota (length chunks))
chunks))
(let ((all-ids
(vector-map
(lambda (_ derivation-file-name)
(if derivation-file-name
(or (hash-ref derivation-ids-hash-table
derivation-file-name)
;; If a derivation ID can't be found, update the
;; hash table then check again
(with-resource-from-pool postgresql-connection-pool conn
(for-each
(lambda (missing-derivations-chunked-promise)
(update-derivation-ids-hash-table!
conn
derivation-ids-hash-table
(fibers-force missing-derivations-chunked-promise)))
missing-derivations-chunked-promises)
(or (hash-ref derivation-ids-hash-table
derivation-file-name)
(error
(simple-format #f "missing derivation id (~A)"
derivation-file-name)))))
derivation-file-name)))
#f))
derivation-file-names)))
(with-resource-from-pool postgresql-connection-pool conn
(simple-format
(current-error-port)
"guix-data-service: clearing the derivation-ids-hash-table\n")
(hash-clear! derivation-ids-hash-table))
;; Just in case this helps clear memory
(for-each fibers-promise-reset
missing-derivations-chunked-promises)
all-ids)))))
(prevent-inlining-for-tests derivation-file-names->derivation-ids)
@ -1489,6 +1484,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(cons inferior inferior-store)))
parallelism
#:min-size 0
#:name "inferior"
#:idle-seconds 30
#:destructor (match-lambda
((inferior . store)
@ -1501,7 +1497,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(inferior-eval '(@ (guix packages) %supported-systems)
inferior)))))
(result
(par-map&
(fibers-map
(lambda (system)
(with-resource-from-pool inferior-and-store-pool res
(match res
@ -1725,6 +1721,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
guix-source store-item
guix-derivation
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
#:key skip-system-tests?
extra-inferior-environment-variables
@ -1776,6 +1773,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
parallelism
#:min-size 0
#:idle-seconds 20
#:name "inferior"
#:destructor
(match-lambda
((inferior . store)
@ -1853,6 +1851,8 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
'load-new-guix-revision-inserts))
db-conn)
1
#:name "postgres"
#:assume-reliable-waiters? #t
#:min-size 0))
(define package-ids-promise
@ -1892,7 +1892,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
conn descriptions-by-locale))))
inferior-lint-checkers-data))))
(lint-warnings-data
(par-map&
(fibers-map
(match-lambda
((checker-name _ network-dependent?)
(and (and (not network-dependent?)
@ -1978,8 +1978,10 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(derivation-file-names->derivation-ids
postgresql-connection-pool
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
derivations-vector)))
derivations-vector
#:log-tag (simple-format #f "~A:~A" system target))))
(guix-revision-id
(fibers-force guix-revision-id-promise))
(package-ids (fibers-force package-ids-promise))
@ -2013,29 +2015,42 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(fibers-force guix-revision-id-promise)
(number->string
(system->system-id conn system))
(or target "")))))
(or target ""))))
'finished)
(let ((get-derivations/fiberized
(fiberize get-derivations
;; Limit concurrency here to keep focused on specific
;; systems until they've been fully processed
#:parallelism parallelism)))
(par-map&
(with-time-logging "extract-and-store-package-derivations"
(fibers-map-with-progress
(match-lambda
((system . target)
(retry-on-missing-store-item
(lambda ()
(process-system-and-target system target
get-derivations/fiberized)))))
(list
(call-with-inferior
(lambda (inferior inferior-store)
(inferior-fetch-system-target-pairs inferior))))))
(inferior-fetch-system-target-pairs inferior))))
#:report
(lambda (data)
(for-each
(match-lambda
((result (system . target))
(simple-format #t "~A ~A: ~A\n"
system target result)))
data))))))
(define (extract-and-store-system-tests)
(if skip-system-tests?
(begin
(simple-format #t "debug: skipping system tests\n")
'())
(with-time-logging "extract-and-store-system-tests"
(let ((data-with-derivation-file-names
(call-with-inferior
(lambda (inferior inferior-store)
@ -2057,6 +2072,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(derivation-file-names->derivation-ids
postgresql-connection-pool
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
(list->vector
(map cdr derivation-file-names-by-system)))))
@ -2067,21 +2083,25 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(insert-system-tests-for-guix-revision
conn
(fibers-force guix-revision-id-promise)
data-with-derivation-ids)))))))
data-with-derivation-ids))))))))
(with-time-logging
(simple-format #f "extract-information-from: ~A\n" store-item)
(parallel-via-fibers
(begin
(fibers-force package-ids-promise)
#f)
(extract-and-store-package-derivations)
(retry-on-missing-store-item extract-and-store-system-tests)
(extract-and-store-lint-checkers-and-warnings)))
(with-time-logging "extract-and-store-lint-checkers-and-warnings"
(extract-and-store-lint-checkers-and-warnings))))
#t)
(prevent-inlining-for-tests extract-information-from)
(define (load-channel-instances utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
git-repository-id commit
channel-derivations-by-system)
@ -2113,6 +2133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(make-resource-pool
(const channel-instances-conn)
1
#:name "postgres"
#:min-size 0)))
(unless existing-guix-revision-id
@ -2130,6 +2151,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
(derivation-file-names->derivation-ids
postgresql-connection-pool
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
(list->vector (map cdr derivations-by-system)))))
@ -2150,15 +2172,34 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
#:key skip-system-tests? parallelism
extra-inferior-environment-variables)
(define utility-thread-channel
;; There might be high demand for this, so order the requests
(make-queueing-channel
(call-with-default-io-waiters
(lambda ()
(make-worker-thread-channel
(const '())
#:parallelism parallelism))
#:parallelism parallelism)))))
(define (read-derivations filenames)
(call-with-worker-thread
utility-thread-channel
(lambda ()
(map (lambda (filename)
(if (file-exists? filename)
(read-derivation-from-file filename)
(raise-exception
(make-missing-store-item-error
filename))))
filenames))))
(define read-derivations/fiberized
(fiberize read-derivations
;; Don't do this in parallel as there's caching involved with
;; read-derivation-from-file
#:parallelism 1))
(define derivation-ids-hash-table
(make-hash-table))
(%worker-thread-default-timeout #f)
(let* ((git-repository-fields
(select-git-repository conn git-repository-id))
(git-repository-url
@ -2188,6 +2229,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
channel-derivations-by-system
(fibers-force channel-derivations-by-system-promise)))
(load-channel-instances utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
git-repository-id commit
channel-derivations-by-system)))
@ -2214,6 +2256,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1"
commit guix-source store-item
guix-derivation
utility-thread-channel
read-derivations/fiberized
derivation-ids-hash-table
#:skip-system-tests?
skip-system-tests?
@ -2633,6 +2676,9 @@ SKIP LOCKED")
(define* (process-load-new-guix-revision-job id #:key skip-system-tests?
extra-inferior-environment-variables
parallelism)
(define finished-channel
(make-channel))
(define result
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
@ -2642,12 +2688,20 @@ SKIP LOCKED")
;; instances have the data updated.
(fix-derivation-output-details-hash-encoding conn)
(%worker-thread-default-timeout #f)
(resource-pool-retry-checkout-timeout 120)
(exec-query conn "BEGIN")
(spawn-fiber
(lambda ()
(while #t
(sleep 30)
(while (perform-operation
(choice-operation
(wrap-operation (get-operation finished-channel)
(const #f))
(wrap-operation (sleep-operation 20)
(const #t))))
(let ((stats (gc-stats)))
(simple-format
@ -2752,4 +2806,5 @@ SKIP LOCKED")
"update-derivation-outputs-statistics"
(update-derivation-outputs-statistics conn))))))
(put-message finished-channel #t)
result)