Reserve some capacity to process revisions which are the tip of a branch. This should reduce the time between new revisions appearing, and then being processed.
145 lines
4.2 KiB
Scheme
145 lines
4.2 KiB
Scheme
(define-module (guix-data-service jobs)
|
|
#:use-module (ice-9 match)
|
|
#:use-module (ice-9 format)
|
|
#:use-module (guix-data-service jobs load-new-guix-revision)
|
|
#:export (process-jobs))
|
|
|
|
(define (process-jobs conn)
|
|
(define (fetch-new-jobs)
|
|
(fetch-unlocked-jobs conn))
|
|
|
|
(define (process-job job-id)
|
|
(execlp "guix-data-service-process-job"
|
|
"guix-data-service-process-job"
|
|
job-id))
|
|
|
|
(define (handle-job-failure job-id)
|
|
(record-job-event conn job-id "failure")
|
|
(display (simple-format #f "recording failure for job ~A\n" job-id)
|
|
(current-error-port))
|
|
(when (> (count-log-parts conn job-id)
|
|
0)
|
|
(combine-log-parts! conn job-id)))
|
|
|
|
(process-jobs-concurrently fetch-new-jobs
|
|
process-job
|
|
handle-job-failure))
|
|
|
|
(define default-max-processes
|
|
(max (round (/ (current-processor-count)
|
|
4))
|
|
1))
|
|
|
|
(define default-timeout
|
|
(* (* 60 60) ;; 1 hour in seconds
|
|
24))
|
|
|
|
(define* (process-jobs-concurrently fetch-new-jobs
|
|
process-job
|
|
handle-job-failure
|
|
#:key (max-processes
|
|
default-max-processes)
|
|
(timeout default-timeout))
|
|
(define processes
|
|
(make-hash-table))
|
|
|
|
(define (display-status)
|
|
(display
|
|
(string-append
|
|
"\n\n"
|
|
(let ((running-jobs (hash-count (const #t) processes)))
|
|
(cond
|
|
((eq? running-jobs 0)
|
|
"status: 0 running jobs")
|
|
((eq? running-jobs 1)
|
|
"status: 1 running job")
|
|
(else
|
|
(simple-format #f "status: ~A running jobs"
|
|
running-jobs))))
|
|
"\n"
|
|
(string-concatenate
|
|
(hash-map->list
|
|
(match-lambda*
|
|
((pid (start-time job-args))
|
|
(format #f " pid: ~5d job args: ~a\n"
|
|
pid job-args)))
|
|
processes))
|
|
"\n")))
|
|
|
|
(define (wait-on-processes)
|
|
(catch
|
|
#t
|
|
(lambda ()
|
|
(match (waitpid WAIT_ANY WNOHANG)
|
|
((0 . status)
|
|
;; No process to wait for
|
|
#f)
|
|
((pid . status)
|
|
(hashv-remove! processes pid)
|
|
(simple-format (current-error-port)
|
|
"pid ~A failed with status ~A\n"
|
|
pid status)
|
|
|
|
;; Recurse, to check for other finished processes.
|
|
(wait-on-processes))))
|
|
(lambda (key . args)
|
|
(simple-format #t "key ~A args ~A\n"
|
|
key args))))
|
|
|
|
(define (kill-long-running-processes)
|
|
(hash-map->list
|
|
(match-lambda*
|
|
((pid (start-time job-args))
|
|
(let ((running-for
|
|
(- (current-time) start-time)))
|
|
(when (> running-for timeout)
|
|
(display
|
|
(simple-format
|
|
#f "sending SIGTERM to pid ~A started at ~A, now running for ~A\n"
|
|
pid start-time running-for)
|
|
(current-error-port))
|
|
(kill pid SIGTERM)
|
|
|
|
(match job-args
|
|
((id)
|
|
(handle-job-failure id)))))))
|
|
processes))
|
|
|
|
(define (fork-and-process-job job-args)
|
|
(match (primitive-fork)
|
|
(0
|
|
(dynamic-wind
|
|
(const #t)
|
|
(lambda ()
|
|
(apply process-job job-args))
|
|
(lambda ()
|
|
(primitive-exit 127))))
|
|
(pid
|
|
(hashv-set! processes pid
|
|
(list (current-time) job-args))
|
|
#t)))
|
|
|
|
(while #t
|
|
(kill-long-running-processes)
|
|
(wait-on-processes)
|
|
(display-status)
|
|
(match (fetch-new-jobs)
|
|
(()
|
|
;; Nothing to do
|
|
#f)
|
|
((jobs ...)
|
|
(for-each
|
|
(match-lambda
|
|
((job-id priority?)
|
|
(let ((current-processes
|
|
(hash-count (const #t) processes)))
|
|
(when (< current-processes
|
|
(if priority?
|
|
;; For priority jobs, burst up to twice the number
|
|
;; of max processes
|
|
(* 2 max-processes)
|
|
max-processes))
|
|
(fork-and-process-job (list job-id))))))
|
|
jobs)))
|
|
(unless (eq? 0 (sleep 15))
|
|
(exit 0))))
|