Re-work the fibers scheduling

Use a single thread for receiving and responding to requests, and delegate the
processing of the requests to a separate set of threads.

I'm hoping this will avoid the processing of requests affecting accepting new
connections, or the sending of responses.
This commit is contained in:
Christopher Baines 2024-07-16 21:50:36 +01:00
parent d8e9de4ed6
commit e81c6377bf

View file

@ -20,12 +20,14 @@
#:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
#:use-module (srfi srfi-11) #:use-module (srfi srfi-11)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 threads)
#:use-module (web http) #:use-module (web http)
#:use-module (web request) #:use-module (web request)
#:use-module (web uri) #:use-module (web uri)
#:use-module (system repl error-handling) #:use-module (system repl error-handling)
#:use-module (ice-9 atomic) #:use-module (ice-9 atomic)
#:use-module (fibers) #:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers scheduler) #:use-module (fibers scheduler)
#:use-module (fibers conditions) #:use-module (fibers conditions)
#:use-module ((guix build syscalls) #:use-module ((guix build syscalls)
@ -98,101 +100,133 @@
(%guix-data-service-metrics-registry registry) (%guix-data-service-metrics-registry registry)
(let ((finished? (make-condition))) (let ((finished? (make-condition))
(render-metrics (make-render-metrics registry))
(request-scheduler #f))
(call-with-sigint (call-with-sigint
(lambda () (lambda ()
(call-with-new-thread
(lambda ()
(run-fibers
(lambda ()
(let* ((current (current-scheduler))
(schedulers
(cons current (scheduler-remote-peers current))))
(set! request-scheduler current)
(for-each
(lambda (i sched)
(spawn-fiber
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append "rp " (number->string i))))
(const #t)))
sched))
(iota (length schedulers))
schedulers))
(wait finished?))
#:hz 0
#:parallelism 4)))
(run-fibers (run-fibers
(lambda () (lambda ()
(let* ((current (current-scheduler)) (catch 'system-error
(schedulers (lambda ()
(cons current (scheduler-remote-peers current)))) (set-thread-name
(for-each (string-append "server")))
(lambda (i sched) (const #t))
(spawn-fiber
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append "fibers " (number->string i))))
(const #t)))
sched))
(iota (length schedulers))
schedulers))
(parameterize (while (not request-scheduler)
((connection-pool (sleep 0.1))
(make-resource-pool
(lambda ()
(open-postgresql-connection
"web"
postgresql-statement-timeout))
(floor (/ postgresql-connections 2))
#:idle-seconds 30
#:destructor
(lambda (conn)
(close-postgresql-connection conn "web"))))
(reserved-connection-pool (let ((requests-metric
(make-resource-pool (make-counter-metric registry "requests_total")))
(lambda ()
(open-postgresql-connection
"web-reserved"
postgresql-statement-timeout))
(floor (/ postgresql-connections 2))
#:idle-seconds 600
#:destructor
(lambda (conn)
(close-postgresql-connection conn "web-reserved"))))
(resource-pool-default-timeout 5)) (with-exception-handler
(lambda (exn)
(let ((resource-pool-checkout-failures-metric (simple-format
(make-counter-metric registry (current-error-port)
"resource_pool_checkout_timeouts_total" "\n
#:labels '(pool_name))))
(%resource-pool-timeout-handler
(lambda (pool proc timeout)
(let ((pool-name
(cond
((eq? pool (connection-pool)) "normal")
((eq? pool (reserved-connection-pool)) "reserved")
(else #f))))
(when pool-name
(metric-increment
resource-pool-checkout-failures-metric
#:label-values `((pool_name . ,pool-name))))))))
(spawn-fiber
(lambda ()
(with-resource-from-pool (connection-pool) conn
(backfill-guix-revision-package-derivation-distribution-counts
conn))))
(let ((render-metrics
(make-render-metrics registry))
(requests-metric
(make-counter-metric registry "requests_total")))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"\n
error: guix-data-service could not start: ~A error: guix-data-service could not start: ~A
Check if it's already running, or whether another process is using that Check if it's already running, or whether another process is using that
port. Also, the port used can be changed by passing the --port option.\n" port. Also, the port used can be changed by passing the --port option.\n"
exn) exn)
(primitive-exit 1)) (primitive-exit 1))
(lambda () (lambda ()
(parameterize
((connection-pool
(make-resource-pool
(lambda ()
(open-postgresql-connection
"web"
postgresql-statement-timeout))
(floor (/ postgresql-connections 2))
#:idle-seconds 30
#:destructor
(lambda (conn)
(close-postgresql-connection conn "web"))))
(reserved-connection-pool
(make-resource-pool
(lambda ()
(open-postgresql-connection
"web-reserved"
postgresql-statement-timeout))
(floor (/ postgresql-connections 2))
#:idle-seconds 600
#:destructor
(lambda (conn)
(close-postgresql-connection conn "web-reserved"))))
(resource-pool-default-timeout 5))
(let ((resource-pool-checkout-failures-metric
(make-counter-metric registry
"resource_pool_checkout_timeouts_total"
#:labels '(pool_name))))
(%resource-pool-timeout-handler
(lambda (pool proc timeout)
(let ((pool-name
(cond
((eq? pool (connection-pool)) "normal")
((eq? pool (reserved-connection-pool)) "reserved")
(else #f))))
(when pool-name
(metric-increment
resource-pool-checkout-failures-metric
#:label-values `((pool_name . ,pool-name))))))))
(spawn-fiber
(lambda ()
(with-resource-from-pool (connection-pool) conn
(backfill-guix-revision-package-derivation-distribution-counts
conn)))
request-scheduler)
(run-server/patched (run-server/patched
(lambda (request body) (lambda (request body)
(metric-increment requests-metric) (metric-increment requests-metric)
(handler request finished? body controller (let ((reply (make-channel)))
secret-key-base (spawn-fiber
startup-completed (lambda ()
render-metrics)) (call-with-values
(lambda ()
(handler request finished? body controller
secret-key-base
startup-completed
render-metrics))
(lambda vals
(put-message reply vals))))
request-scheduler
#:parallel? #t)
(apply values (get-message reply))))
#:host host #:host host
#:port port)) #:port port))
#:unwind? #t))) #:unwind? #t)))
@ -202,5 +236,6 @@ port. Also, the port used can be changed by passing the --port option.\n"
(spawn-port-monitoring-fiber port finished?) (spawn-port-monitoring-fiber port finished?)
(wait finished?)) (wait finished?))
#:parallelism 4)) #:hz 5
#:parallelism 1))
finished?))) finished?)))