Switch to processing jobs in parallel

This should speed up processing new revisions, reduce latency between finding
out about new revisions and processing them, as well as help manage memory
usage, by processing each job in a process that then exits.
This commit is contained in:
Christopher Baines 2019-07-12 19:58:37 +01:00
parent 09d927cb99
commit 83ef624b97
3 changed files with 194 additions and 69 deletions

View file

@ -1,11 +1,103 @@
(define-module (guix-data-service jobs)
#:use-module (ice-9 match)
#:use-module (ice-9 format)
#:use-module (guix-data-service jobs load-new-guix-revision)
#:export (process-jobs))
(define (process-jobs conn)
(define (fetch-new-jobs)
(fetch-unlocked-jobs conn))
(define (process-job job-id)
(execlp "guix-data-service-process-job"
"guix-data-service-process-job"
job-id))
(process-jobs-concurrently fetch-new-jobs
process-job))
(define default-max-processes
(max (round (/ (current-processor-count)
4))
1))
(define* (process-jobs-concurrently fetch-new-jobs
process-job
#:key (max-processes
default-max-processes))
(define processes
(make-hash-table))
(define (display-status)
(display
(string-append
"\n\n"
(let ((running-jobs (hash-count (const #t) processes)))
(cond
((eq? running-jobs 0)
"status: 0 running jobs")
((eq? running-jobs 1)
"status: 1 running job")
(else
(simple-format #f "status: ~A running jobs"
running-jobs))))
"\n"
(string-concatenate
(hash-map->list
(lambda (pid job-args)
(format #f " pid: ~5d job args: ~a\n"
pid job-args))
processes))
"\n")))
(define (wait-on-processes)
(catch
#t
(lambda ()
(match (waitpid WAIT_ANY WNOHANG)
((0 . status)
;; No process to wait for
#f)
((pid . status)
(let ((job-args (hashv-ref processes pid)))
(hashv-remove! processes pid)
(simple-format
(current-error-port)
"pid ~A failed with status ~A\n"
pid status))
(wait-on-processes))))
(lambda (key . args)
(simple-format #t "key ~A args ~A\n"
key args))))
(define (fork-and-process-job job-args)
(match (primitive-fork)
(0
(dynamic-wind
(const #t)
(lambda ()
(apply process-job job-args))
(lambda ()
(primitive-exit 127))))
(pid
(hashv-set! processes pid job-args)
#t)))
(while #t
(match (process-next-load-new-guix-revision-job conn)
(#f (unless (eq? 0 (sleep 5))
(exit 0)))
(_ (simple-format #t "\nFinished processing job\n\n")))))
(wait-on-processes)
(display-status)
(match (fetch-new-jobs)
(()
;; Nothing to do
#f)
((jobs ...)
(for-each
(lambda (job-args)
(let ((current-processes
(hash-count (const #t) processes)))
(when (< current-processes
max-processes)
(fork-and-process-job job-args))))
jobs)))
(unless (eq? 0 (sleep 15))
(exit 0))))

View file

@ -24,7 +24,8 @@
#:use-module (guix-data-service model package-metadata)
#:use-module (guix-data-service model derivation)
#:export (log-for-job
process-next-load-new-guix-revision-job
fetch-unlocked-jobs
process-load-new-guix-revision-job
select-job-for-commit
select-jobs-and-events
enqueue-load-new-guix-revision-job
@ -671,18 +672,20 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
(list (number->string n)))))
result))
(define (select-next-job-to-process conn)
(define (select-job-for-update conn id)
(exec-query
conn
(string-append
"SELECT id, commit, source, git_repository_id "
"FROM load_new_guix_revision_jobs "
"WHERE succeeded_at IS NULL AND NOT EXISTS ("
"WHERE id = $1 AND succeeded_at IS NULL AND NOT EXISTS ("
"SELECT 1 "
"FROM load_new_guix_revision_job_events "
;; Skip jobs that have failed, to avoid trying them over and over again
"WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'"
") ORDER BY id DESC LIMIT 1")))
") ORDER BY id DESC "
"FOR NO KEY UPDATE SKIP LOCKED")
(list id)))
(define (record-job-event conn job-id event)
(exec-query
@ -701,20 +704,48 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
"WHERE id = $1 ")
(list id)))
(define (process-next-load-new-guix-revision-job conn)
(match (select-next-job-to-process conn)
(define (fetch-unlocked-jobs conn)
(exec-query
conn
"
SELECT id FROM load_new_guix_revision_jobs
WHERE
succeeded_at IS NULL AND
NOT EXISTS (
SELECT 1
FROM load_new_guix_revision_job_events
-- Skip jobs that have failed, to avoid trying them over and over again
WHERE job_id = load_new_guix_revision_jobs.id AND event = 'failure'
)
ORDER BY id DESC
FOR NO KEY UPDATE SKIP LOCKED"))
(define (process-load-new-guix-revision-job id)
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A" id)
(lambda (conn)
(exec-query conn "BEGIN")
(match (select-job-for-update conn id)
(((id commit source git-repository-id))
(let ((previous-output-port (current-output-port))
(previous-error-port (current-error-port)))
(record-job-event conn id "start")
;; With a separate connection, outside of the transaction so the event
;; gets persisted regardless.
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A start-event" id)
(lambda (start-event-conn)
(record-job-event start-event-conn id "start")))
(simple-format #t "Processing job ~A (commit: ~A, source: ~A)\n\n"
id commit source)
(exec-query conn "BEGIN")
(if (or (guix-revision-exists? conn git-repository-id commit)
(eq? (log-time
(string-append "loading revision " commit)
(lambda ()
(let ((result
(let* ((previous-output-port (current-output-port))
(previous-error-port (current-error-port))
(result
(with-postgresql-connection
(simple-format #f "load-new-guix-revision ~A logging" id)
(lambda (logging-conn)
@ -739,5 +770,7 @@ ORDER BY load_new_guix_revision_jobs.id DESC")
(begin
(exec-query conn "ROLLBACK")
(record-job-event conn id "failure")
#f))))
(_ #f)))
#f)))
(()
(simple-format #t "job ~A not found to be processed\n"
id))))))

View file

@ -32,13 +32,13 @@
(lambda (conn git-repository-id commit store-path)
#t))
(enqueue-load-new-guix-revision-job
(match (enqueue-load-new-guix-revision-job
conn
(git-repository-url->git-repository-id conn "test-url")
"test-commit"
"test-source")
(process-next-load-new-guix-revision-job conn))))
((id)
(process-load-new-guix-revision-job id))))))
(test-equal "test build store item failure"
#f
@ -48,13 +48,13 @@
(lambda (conn git-repository-id commit)
#f))
(enqueue-load-new-guix-revision-job
(match (enqueue-load-new-guix-revision-job
conn
(git-repository-url->git-repository-id conn "test-url")
"test-commit"
"test-source")
(process-next-load-new-guix-revision-job conn)))
((id)
(process-load-new-guix-revision-job id)))))
(test-equal "test extract information failure"
#f
@ -70,12 +70,12 @@
(lambda (conn git-repository-id commit store-path)
#f))
(enqueue-load-new-guix-revision-job
(match (enqueue-load-new-guix-revision-job
conn
(git-repository-url->git-repository-id conn "test-url")
"test-commit"
"test-source")
(process-next-load-new-guix-revision-job conn))))))
((id)
(process-load-new-guix-revision-job id))))))))
(test-end)