Improve the job logging

Switch to using a sequence for the ids in the log parts table, and spawn a
thread to listen for output from the inferior processes, and enter it in to
the database.
This commit is contained in:
Christopher Baines 2020-02-15 17:42:07 +00:00
parent 40f6de27f6
commit 36254f98e3

View file

@ -18,6 +18,7 @@
(define-module (guix-data-service jobs load-new-guix-revision)
#:use-module (srfi srfi-1)
#:use-module (ice-9 match)
#:use-module (ice-9 textual-ports)
#:use-module (ice-9 hash-table)
#:use-module (rnrs exceptions)
#:use-module (json)
@ -65,11 +66,17 @@
enqueue-load-new-guix-revision-job
most-recent-n-load-new-guix-revision-jobs))
(define* (log-port job-id conn #:key delete-existing-log-parts?)
(define output-port
(current-output-port))
(define (log-part-sequence-name job-id)
(simple-format #f "load_new_guix_revision_job_log_parts_id_seq_~A" job-id))
(define* (log-port job-id conn
#:key
delete-existing-log-parts?
real-output-port)
(define output-port
(or real-output-port
(current-output-port)))
(define id 0)
(define buffer "")
(define (insert job_id s)
@ -77,18 +84,21 @@
conn
(string-append
"INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents) "
"VALUES ($1, $2, $3)")
(list (number->string id) job_id s)))
"VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
(list job_id s)))
(define (log-string s)
(if (string-contains s "\n")
(let ((output (string-append buffer s)))
(set! id (+ 1 id)) ; increment id
(set! buffer "") ; clear the buffer
(insert job-id output)
(display output output-port))
(set! buffer (string-append buffer s))))
(exec-query
conn
(simple-format #f "CREATE SEQUENCE IF NOT EXISTS ~A"
(log-part-sequence-name job-id)))
(when delete-existing-log-parts?
;; TODO, this is useful when re-running jobs, but I'm not sure that should
;; be a thing, jobs should probably be only attempted once.
@ -113,6 +123,34 @@
(setvbuf port 'line)
port))
(define (setup-port-for-inferior-error-output job-id real-output-port)
(define (insert conn job_id s)
(exec-query
conn
(string-append "
INSERT INTO load_new_guix_revision_job_log_parts (id, job_id, contents)
VALUES (nextval('" (log-part-sequence-name job_id) "'), $1, $2)")
(list job_id s)))
(match (pipe)
((port-to-read-from . port-to-write-to)
(setvbuf port-to-read-from 'line)
(setvbuf port-to-write-to 'line)
(call-with-new-thread
(lambda ()
(with-postgresql-connection
(simple-format #f "~A inferior error logging" job-id)
(lambda (logging-conn)
(let loop ((line (get-line port-to-read-from)))
(let ((line-with-newline
(string-append line "\n")))
(insert logging-conn job-id line-with-newline)
(display line-with-newline real-output-port))
(loop (get-line port-to-read-from)))))))3
port-to-write-to)))
(define real-error-port
(make-parameter (current-error-port)))
@ -206,6 +244,13 @@ WHERE job_id = $1"
"DELETE FROM load_new_guix_revision_job_log_parts WHERE job_id = $1"
(list job-id)))))
(define (drop-log-parts-sequence conn job-id)
(exec-query
conn
(string-append
"DROP SEQUENCE "
(log-part-sequence-name job-id))))
(define inferior-package-id
(@@ (guix inferior) inferior-package-id))
@ -1554,7 +1599,8 @@ SKIP LOCKED")
(let ((result
(parameterize ((current-build-output-port logging-port)
(real-error-port previous-error-port)
(inferior-error-port previous-error-port))
(inferior-error-port
(setup-port-for-inferior-error-output id previous-error-port)))
(catch #t
(lambda ()
(with-store-connection
@ -1570,6 +1616,7 @@ SKIP LOCKED")
key args)
#f)))))
(combine-log-parts! logging-conn id)
(drop-log-parts-sequence logging-conn id)
;; This can happen with GC, so do it explicitly
(close-port logging-port)