Copy the port monitoring fiber from the build coordinator
As the data service has the same issue where it stops listening on the port.
This commit is contained in:
parent
76712e2b00
commit
26a751570c
2 changed files with 168 additions and 1 deletions
|
|
@ -20,13 +20,19 @@
|
||||||
#:use-module (srfi srfi-11)
|
#:use-module (srfi srfi-11)
|
||||||
#:use-module (ice-9 ftw)
|
#:use-module (ice-9 ftw)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
|
#:use-module (ice-9 atomic)
|
||||||
#:use-module (ice-9 format)
|
#:use-module (ice-9 format)
|
||||||
#:use-module (ice-9 threads)
|
#:use-module (ice-9 threads)
|
||||||
|
#:use-module (ice-9 exceptions)
|
||||||
|
#:use-module (ice-9 ports internal)
|
||||||
|
#:use-module (ice-9 suspendable-ports)
|
||||||
|
#:use-module (lzlib)
|
||||||
#:use-module (fibers)
|
#:use-module (fibers)
|
||||||
#:use-module (fibers channels)
|
#:use-module (fibers channels)
|
||||||
#:use-module (fibers operations)
|
#:use-module (fibers operations)
|
||||||
#:use-module (fibers timers)
|
#:use-module (fibers timers)
|
||||||
#:use-module (fibers conditions)
|
#:use-module (fibers conditions)
|
||||||
|
#:use-module (fibers scheduler)
|
||||||
#:use-module (prometheus)
|
#:use-module (prometheus)
|
||||||
#:export (call-with-time-logging
|
#:export (call-with-time-logging
|
||||||
with-time-logging
|
with-time-logging
|
||||||
|
|
@ -53,7 +59,9 @@
|
||||||
get-port-metrics-updater
|
get-port-metrics-updater
|
||||||
|
|
||||||
call-with-sigint
|
call-with-sigint
|
||||||
run-server/patched))
|
run-server/patched
|
||||||
|
|
||||||
|
spawn-port-monitoring-fiber))
|
||||||
|
|
||||||
(define (call-with-time-logging action thunk)
|
(define (call-with-time-logging action thunk)
|
||||||
(simple-format #t "debug: Starting ~A\n" action)
|
(simple-format #t "debug: Starting ~A\n" action)
|
||||||
|
|
@ -653,6 +661,160 @@ available. Return the resource once PROC has returned."
|
||||||
(sigaction SIGPIPE SIG_IGN)
|
(sigaction SIGPIPE SIG_IGN)
|
||||||
(spawn-fiber (lambda () (socket-loop socket handler))))))
|
(spawn-fiber (lambda () (socket-loop socket handler))))))
|
||||||
|
|
||||||
|
(define &port-timeout
|
||||||
|
(make-exception-type '&port-timeout
|
||||||
|
&external-error
|
||||||
|
'(port)))
|
||||||
|
|
||||||
|
(define make-port-timeout-error
|
||||||
|
(record-constructor &port-timeout))
|
||||||
|
|
||||||
|
(define port-timeout-error?
|
||||||
|
(record-predicate &port-timeout))
|
||||||
|
|
||||||
|
(define &port-read-timeout
|
||||||
|
(make-exception-type '&port-read-timeout
|
||||||
|
&port-timeout
|
||||||
|
'()))
|
||||||
|
|
||||||
|
(define make-port-read-timeout-error
|
||||||
|
(record-constructor &port-read-timeout))
|
||||||
|
|
||||||
|
(define port-read-timeout-error?
|
||||||
|
(record-predicate &port-read-timeout))
|
||||||
|
|
||||||
|
(define &port-write-timeout
|
||||||
|
(make-exception-type '&port-write-timeout
|
||||||
|
&port-timeout
|
||||||
|
'()))
|
||||||
|
|
||||||
|
(define make-port-write-timeout-error
|
||||||
|
(record-constructor &port-write-timeout))
|
||||||
|
|
||||||
|
(define port-write-timeout-error?
|
||||||
|
(record-predicate &port-write-timeout))
|
||||||
|
|
||||||
|
;; These procedure are subject to spurious wakeups.
|
||||||
|
|
||||||
|
(define (readable? port)
|
||||||
|
"Test if PORT is writable."
|
||||||
|
(match (select (vector port) #() #() 0)
|
||||||
|
((#() #() #()) #f)
|
||||||
|
((#(_) #() #()) #t)))
|
||||||
|
|
||||||
|
(define (writable? port)
|
||||||
|
"Test if PORT is writable."
|
||||||
|
(match (select #() (vector port) #() 0)
|
||||||
|
((#() #() #()) #f)
|
||||||
|
((#() #(_) #()) #t)))
|
||||||
|
|
||||||
|
(define (make-wait-operation ready? schedule-when-ready port
|
||||||
|
port-ready-fd this-procedure)
|
||||||
|
(make-base-operation
|
||||||
|
#f
|
||||||
|
(lambda _
|
||||||
|
(and (ready? (port-ready-fd port)) values))
|
||||||
|
(lambda (flag sched resume)
|
||||||
|
(define (commit)
|
||||||
|
(match (atomic-box-compare-and-swap! flag 'W 'S)
|
||||||
|
('W (resume values))
|
||||||
|
('C (commit))
|
||||||
|
('S #f)))
|
||||||
|
(schedule-when-ready
|
||||||
|
sched (port-ready-fd port) commit))))
|
||||||
|
|
||||||
|
(define (wait-until-port-readable-operation port)
|
||||||
|
"Make an operation that will succeed when PORT is readable."
|
||||||
|
(unless (input-port? port)
|
||||||
|
(error "refusing to wait forever for input on non-input port"))
|
||||||
|
(make-wait-operation readable? schedule-task-when-fd-readable port
|
||||||
|
port-read-wait-fd
|
||||||
|
wait-until-port-readable-operation))
|
||||||
|
|
||||||
|
(define (wait-until-port-writable-operation port)
|
||||||
|
"Make an operation that will succeed when PORT is writable."
|
||||||
|
(unless (output-port? port)
|
||||||
|
(error "refusing to wait forever for output on non-output port"))
|
||||||
|
(make-wait-operation writable? schedule-task-when-fd-writable port
|
||||||
|
port-write-wait-fd
|
||||||
|
wait-until-port-writable-operation))
|
||||||
|
|
||||||
|
(define* (with-fibers-port-timeouts thunk
|
||||||
|
#:key timeout
|
||||||
|
(read-timeout timeout)
|
||||||
|
(write-timeout timeout))
|
||||||
|
(define (no-fibers-wait port mode timeout)
|
||||||
|
(define poll-timeout-ms 200)
|
||||||
|
|
||||||
|
;; When the GC runs, it restarts the poll syscall, but the timeout
|
||||||
|
;; remains unchanged! When the timeout is longer than the time
|
||||||
|
;; between the syscall restarting, I think this renders the
|
||||||
|
;; timeout useless. Therefore, this code uses a short timeout, and
|
||||||
|
;; repeatedly calls poll while watching the clock to see if it has
|
||||||
|
;; timed out overall.
|
||||||
|
(let ((timeout-internal
|
||||||
|
(+ (get-internal-real-time)
|
||||||
|
(* internal-time-units-per-second
|
||||||
|
(/ timeout 1000)))))
|
||||||
|
(let loop ((poll-value
|
||||||
|
(port-poll port mode poll-timeout-ms)))
|
||||||
|
(if (= poll-value 0)
|
||||||
|
(if (> (get-internal-real-time)
|
||||||
|
timeout-internal)
|
||||||
|
(raise-exception
|
||||||
|
(if (string=? mode "r")
|
||||||
|
(make-port-read-timeout-error port)
|
||||||
|
(make-port-write-timeout-error port)))
|
||||||
|
(loop (port-poll port mode poll-timeout-ms)))
|
||||||
|
poll-value))))
|
||||||
|
|
||||||
|
(parameterize
|
||||||
|
((current-read-waiter
|
||||||
|
(lambda (port)
|
||||||
|
(if (current-scheduler)
|
||||||
|
(perform-operation
|
||||||
|
(choice-operation
|
||||||
|
(wait-until-port-readable-operation port)
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation read-timeout)
|
||||||
|
(lambda ()
|
||||||
|
(raise-exception
|
||||||
|
(make-port-read-timeout-error thunk port))))))
|
||||||
|
(no-fibers-wait port "r" read-timeout))))
|
||||||
|
(current-write-waiter
|
||||||
|
(lambda (port)
|
||||||
|
(if (current-scheduler)
|
||||||
|
(perform-operation
|
||||||
|
(choice-operation
|
||||||
|
(wait-until-port-writable-operation port)
|
||||||
|
(wrap-operation
|
||||||
|
(sleep-operation write-timeout)
|
||||||
|
(lambda ()
|
||||||
|
(raise-exception
|
||||||
|
(make-port-write-timeout-error thunk port))))))
|
||||||
|
(no-fibers-wait port "w" write-timeout)))))
|
||||||
|
(thunk)))
|
||||||
|
|
||||||
|
(define (spawn-port-monitoring-fiber port error-condition)
|
||||||
|
(spawn-fiber
|
||||||
|
(lambda ()
|
||||||
|
(while #t
|
||||||
|
(with-exception-handler
|
||||||
|
(lambda (exn)
|
||||||
|
(simple-format (current-error-port)
|
||||||
|
"port monitoring fiber failed to connect to ~A: ~A\n"
|
||||||
|
port exn)
|
||||||
|
(signal-condition! error-condition))
|
||||||
|
(lambda ()
|
||||||
|
(with-fibers-port-timeouts
|
||||||
|
(lambda ()
|
||||||
|
(let ((sock (socket PF_INET SOCK_STREAM 0)))
|
||||||
|
(connect sock AF_INET INADDR_LOOPBACK port)
|
||||||
|
(close-port sock)))
|
||||||
|
#:timeout 20))
|
||||||
|
#:unwind? #t)
|
||||||
|
(sleep 20)))))
|
||||||
|
|
||||||
;; Copied from (fibers web server)
|
;; Copied from (fibers web server)
|
||||||
(define (call-with-sigint thunk cvar)
|
(define (call-with-sigint thunk cvar)
|
||||||
(let ((handler #f))
|
(let ((handler #f))
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,11 @@ port. Also, the port used can be changed by passing the --port option.\n"
|
||||||
#:host host
|
#:host host
|
||||||
#:port port))
|
#:port port))
|
||||||
#:unwind? #t)))
|
#:unwind? #t)))
|
||||||
|
|
||||||
|
;; Guile sometimes just seems to stop listening on the port, so try
|
||||||
|
;; and detect this and quit
|
||||||
|
(spawn-port-monitoring-fiber port finished?)
|
||||||
|
|
||||||
(wait finished?))
|
(wait finished?))
|
||||||
#:parallelism 4))
|
#:parallelism 4))
|
||||||
finished?)))
|
finished?)))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue