Rewrite deleting unreferenced derivations

Use fibers more, leaning in on the non-blocking use of Squee for parallelism.
This commit is contained in:
Christopher Baines 2023-07-21 21:03:32 +01:00
parent 348fe36b55
commit bbc53deb1f

View file

@ -22,6 +22,7 @@
#:use-module (ice-9 threads) #:use-module (ice-9 threads)
#:use-module (squee) #:use-module (squee)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers channels)
#:use-module (guix-data-service utils) #:use-module (guix-data-service utils)
#:use-module (guix-data-service database) #:use-module (guix-data-service database)
#:use-module (guix-data-service model git-branch) #:use-module (guix-data-service model git-branch)
@ -538,7 +539,10 @@ DELETE FROM derivations WHERE id = $1"
1))) 1)))
(define (delete-batch conn connection-pool) (define deleted-count 0)
(define channel (make-channel))
(define (delete-batch conn)
(let* ((derivations (let* ((derivations
(with-time-logging "fetching batch of derivations" (with-time-logging "fetching batch of derivations"
(map car (map car
@ -566,22 +570,38 @@ WHERE NOT EXISTS (
) LIMIT $1" ) LIMIT $1"
(list (number->string batch-size)))))) (list (number->string batch-size))))))
(derivations-count (length derivations))) (derivations-count (length derivations)))
(let ((deleted-count 0))
(with-time-logging (with-time-logging
(simple-format #f (simple-format #f "Looking at ~A derivations" derivations-count)
"Looking at ~A derivations"
derivations-count) (set! deleted-count 0)
(n-par-for-each (for-each
8
(lambda (derivation-id) (lambda (derivation-id)
(put-message channel derivation-id))
derivations))
(simple-format (current-error-port)
"Deleted ~A derivations\n"
deleted-count)
deleted-count))
(run-fibers
(lambda ()
;; First spawn some fibers to delete the derivations
(for-each
(lambda _
(spawn-fiber
(lambda ()
(with-postgresql-connection
"data-deletion"
(lambda (conn)
(let loop ((derivation-id (get-message channel)))
(unless (string->number derivation-id) (unless (string->number derivation-id)
(error (error
(simple-format #f "derivation-id: ~A is not a number" (simple-format #f "derivation-id: ~A is not a number"
derivation-id))) derivation-id)))
(let ((val (let ((val
(call-with-resource-from-pool connection-pool
(lambda (conn)
(catch 'psql-query-error (catch 'psql-query-error
(lambda () (lambda ()
(with-postgresql-transaction (with-postgresql-transaction
@ -602,24 +622,14 @@ SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
"error when attempting to delete derivation: ~A ~A\n" "error when attempting to delete derivation: ~A ~A\n"
key args) key args)
0)))))) 0))))
(monitor
;; This is safe as all fibers are in the same
;; thread and cooperative.
(set! deleted-count (set! deleted-count
(+ val deleted-count))))) (+ val deleted-count)))
derivations)) (loop (get-message channel))))))))
(iota 12))
(simple-format (current-error-port)
"Deleted ~A derivations\n"
deleted-count)
deleted-count)))
(run-fibers
(lambda ()
(let* ((connection-pool
(make-resource-pool
(lambda ()
(open-postgresql-connection "data-deletion" #f))
8)))
(with-postgresql-connection (with-postgresql-connection
"data-deletion" "data-deletion"
@ -629,7 +639,7 @@ SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
'delete-unreferenced-derivations) 'delete-unreferenced-derivations)
(let loop ((total-deleted 0)) (let loop ((total-deleted 0))
(let ((batch-deleted-count (delete-batch conn connection-pool))) (let ((batch-deleted-count (delete-batch conn)))
(if (eq? 0 batch-deleted-count) (if (eq? 0 batch-deleted-count)
(begin (begin
(with-time-logging (with-time-logging
@ -639,4 +649,6 @@ SET CONSTRAINTS derivations_by_output_details_set_derivation_id_fkey DEFERRED")
(current-output-port) (current-output-port)
"Finished deleting derivations, deleted ~A in total\n" "Finished deleting derivations, deleted ~A in total\n"
total-deleted)) total-deleted))
(loop (+ total-deleted batch-deleted-count))))))))))) (loop (+ total-deleted batch-deleted-count))))))))
#:hz 0
#:parallelism 1))