guix-data-service/guix-data-service/utils.scm
2024-11-05 09:41:30 +00:00

1351 lines
49 KiB
Scheme

;;; Guix Data Service -- Information about Guix over time
;;; Copyright © 2020 Christopher Baines <mail@cbaines.net>
;;;
;;; This program is free software: you can redistribute it and/or
;;; modify it under the terms of the GNU Affero General Public License
;;; as published by the Free Software Foundation, either version 3 of
;;; the License, or (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;; Affero General Public License for more details.
;;;
;;; You should have received a copy of the GNU Affero General Public
;;; License along with this program. If not, see
;;; <http://www.gnu.org/licenses/>.
(define-module (guix-data-service utils)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-11)
#:use-module (srfi srfi-71)
#:use-module (ice-9 q)
#:use-module (ice-9 ftw)
#:use-module (ice-9 match)
#:use-module (ice-9 atomic)
#:use-module (ice-9 format)
#:use-module (ice-9 threads)
#:use-module (ice-9 exceptions)
#:use-module (ice-9 ports internal)
#:use-module (ice-9 suspendable-ports)
#:use-module (lzlib)
#:use-module ((guix build syscalls)
#:select (set-thread-name))
#:use-module (fibers)
#:use-module (fibers channels)
#:use-module (fibers operations)
#:use-module (fibers timers)
#:use-module (fibers conditions)
#:use-module (fibers scheduler)
#:use-module (prometheus)
#:export (call-with-time-logging
with-time-logging
prevent-inlining-for-tests
resource-pool-default-timeout
resource-pool-retry-checkout-timeout
%resource-pool-timeout-handler
resource-pool-timeout-error?
make-resource-pool
destroy-resource-pool
call-with-resource-from-pool
with-resource-from-pool
resource-pool-stats
call-with-default-io-waiters
make-worker-thread-channel
%worker-thread-default-timeout
call-with-worker-thread
worker-thread-timeout-error?
fiberize
fibers-delay
fibers-force
fibers-promise-reset
fibers-batch-for-each
fibers-for-each
fibers-batch-map
fibers-map
parallel-via-fibers
par-map&
letpar&
fibers-map-with-progress
chunk
chunk!
chunk-for-each!
delete-duplicates/sort!
get-guix-metrics-updater
call-with-sigint
run-server/patched
spawn-port-monitoring-fiber
make-queueing-channel))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
(let ((start-time (current-time)))
(let-values
((result (thunk)))
(let ((time-taken (- (current-time) start-time)))
(simple-format #t "debug: Finished ~A, took ~A seconds\n"
action time-taken))
(apply values result))))
(define-syntax-rule (with-time-logging action exp ...)
"Log under NAME the time taken to evaluate EXP."
(call-with-time-logging action (lambda () exp ...)))
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
(define-record-type <resource-pool>
(make-resource-pool-record name channel)
resource-pool?
(name resource-pool-name)
(channel resource-pool-channel))
(define* (make-resource-pool initializer max-size
#:key (min-size max-size)
(idle-seconds #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
scheduler
(name "unnamed"))
(define (initializer/safe)
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running ~A resource pool initializer: ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running resource pool destructor (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 5)
(destructor/safe args)))))
(let ((channel (make-channel))
(checkout-failure-count 0))
(spawn-fiber
(lambda ()
(when idle-seconds
(spawn-fiber
(lambda ()
(while #t
(sleep idle-seconds)
(put-message channel '(check-for-idle-resources))))))
(while #t
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception in the ~A pool fiber: ~A\n"
name
exn))
(lambda ()
(let loop ((resources '())
(available '())
(waiters '())
(resources-last-used '()))
(match (get-message channel)
(('checkout reply)
(if (null? available)
(if (= (length resources) max-size)
(loop resources
available
(cons reply waiters)
resources-last-used)
(let ((new-resource (initializer/safe)))
(if new-resource
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply new-resource)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(loop (cons new-resource resources)
(if checkout-success?
available
(cons new-resource available))
waiters
(cons (get-internal-real-time)
resources-last-used)))
(loop resources
available
(cons reply waiters)
resources-last-used))))
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply (car available))
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success?
(loop resources
(cdr available)
waiters
resources-last-used)
(loop resources
available
waiters
resources-last-used)))))
(('return resource)
(if (null? waiters)
(loop resources
(cons resource available)
waiters
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (last waiters)
resource)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f))))))
(unless checkout-success?
(set! checkout-failure-count
(+ 1 checkout-failure-count)))
(if checkout-success?
(loop resources
available
(drop-right! waiters 1)
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used))
(begin
(for-each
(lambda (waiter)
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation waiter 'resource-pool-retry-checkout)
(sleep-operation 10))))))
waiters)
(loop resources
(cons resource available)
'()
(begin
(list-set!
resources-last-used
(list-index (lambda (x)
(eq? x resource))
resources)
(get-internal-real-time))
resources-last-used)))))))
(('stats reply)
(let ((stats
`((resources . ,(length resources))
(available . ,(length available))
(waiters . ,(length waiters))
(checkout-failure-count . ,checkout-failure-count))))
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply stats)
(const #t))
(wrap-operation (sleep-operation 1)
(const #f)))))))
(loop resources
available
waiters
resources-last-used))
(('check-for-idle-resources)
(let* ((resources-last-used-seconds
(map
(lambda (internal-time)
(/ (- (get-internal-real-time) internal-time)
internal-time-units-per-second))
resources-last-used))
(resources-to-destroy
(filter-map
(lambda (resource last-used-seconds)
(if (and (member resource available)
(> last-used-seconds idle-seconds))
resource
#f))
resources
resources-last-used-seconds)))
(for-each
(lambda (resource)
(destructor/safe resource))
resources-to-destroy)
(loop (lset-difference eq? resources resources-to-destroy)
(lset-difference eq? available resources-to-destroy)
waiters
(filter-map
(lambda (resource last-used)
(if (memq resource resources-to-destroy)
#f
last-used))
resources
resources-last-used))))
(('destroy reply)
(if (= (length resources) (length available))
(begin
(for-each
(lambda (resource)
(destructor/safe resource))
resources)
(put-message reply 'destroy-success))
(begin
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation reply 'resource-pool-destroy-failed)
(sleep-operation 10)))))
(loop resources
available
waiters
resources-last-used))))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters
resources-last-used)))))
#:unwind? #t)))
(or scheduler
(current-scheduler)))
(make-resource-pool-record name channel)))
(define (destroy-resource-pool pool)
(let ((reply (make-channel)))
(put-message (resource-pool-channel pool)
(list 'destroy reply))
(let ((msg (get-message reply)))
(unless (eq? msg 'destroy-success)
(error msg)))))
(define resource-pool-default-timeout
(make-parameter #f))
(define resource-pool-retry-checkout-timeout
(make-parameter 5))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'(name)))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define %resource-pool-timeout-handler
(make-parameter #f))
(define* (call-with-resource-from-pool pool proc #:key (timeout 'default)
(timeout-handler (%resource-pool-timeout-handler)))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define retry-timeout
(resource-pool-retry-checkout-timeout))
(define timeout-or-default
(if (eq? timeout 'default)
(resource-pool-default-timeout)
timeout))
(let ((resource
(let ((reply (make-channel)))
(let loop ((start-time (get-internal-real-time)))
(let ((request-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(checkout ,reply))
(const #t))
(wrap-operation (sleep-operation (or timeout-or-default
retry-timeout))
(const #f))))))
(if request-success?
(let ((time-remaining
(- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- (or timeout-or-default
retry-timeout)
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop start-time)
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f))
response))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))
(if (eq? timeout-or-default #f)
(loop (get-internal-real-time))
#f)))))))
(when (or (not resource)
(eq? resource 'resource-pool-retry-checkout))
(when timeout-handler
(timeout-handler pool proc timeout))
(raise-exception
(make-resource-pool-timeout-error (resource-pool-name pool))))
(with-exception-handler
(lambda (exception)
(put-message (resource-pool-channel pool)
`(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values
(lambda ()
(with-throw-handler #t
(lambda ()
(proc resource))
(lambda _
(backtrace))))
(lambda vals
(put-message (resource-pool-channel pool)
`(return ,resource))
(apply values vals))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define* (resource-pool-stats pool #:key (timeout 5))
(let ((reply (make-channel))
(start-time (get-internal-real-time)))
(perform-operation
(choice-operation
(wrap-operation
(put-operation (resource-pool-channel pool)
`(stats ,reply))
(const #t))
(wrap-operation (sleep-operation timeout)
(lambda _
(raise-exception
(make-resource-pool-timeout-error))))))
(let ((time-remaining
(- timeout
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(lambda _
(raise-exception
(make-resource-pool-timeout-error))))))
(raise-exception
(make-resource-pool-timeout-error))))))
(define (call-with-default-io-waiters thunk)
(parameterize
((current-read-waiter (@@ (ice-9 suspendable-ports)
default-read-waiter))
(current-write-waiter (@@ (ice-9 suspendable-ports)
default-write-waiter)))
(thunk)))
(define %worker-thread-args
(make-parameter #f))
(define* (make-worker-thread-channel initializer
#:key (parallelism 1)
(delay-logger (lambda _ #f))
(duration-logger (const #f))
destructor
lifetime
(log-exception? (const #t))
(expire-on-exception? #f)
(name "unnamed"))
"Return a channel used to offload work to a dedicated thread. ARGS are the
arguments of the worker thread procedure."
(define thread-proc-vector
(make-vector parallelism #f))
(define (initializer/safe)
(let ((args
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running initializer in worker thread (~A): ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t)))
(if args
args
;; never give up, just keep retrying
(begin
(sleep 1)
(initializer/safe)))))
(define (destructor/safe args)
(let ((success?
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running destructor in worker thread (~A): ~A:\n ~A\n"
name
destructor
exn)
#f)
(lambda ()
(with-throw-handler #t
(lambda ()
(apply destructor args)
#t)
(lambda _
(backtrace))))
#:unwind? #t)))
(or success?
#t
(begin
(sleep 1)
(destructor/safe args)))))
(define (process thread-index channel args)
(let loop ((current-lifetime lifetime))
(let ((exception?
(match (get-message channel)
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(let* ((start-time (get-internal-real-time))
(response
(with-exception-handler
(lambda (exn)
(list 'worker-thread-error
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
exn))
(lambda ()
(vector-set! thread-proc-vector
thread-index
proc)
(with-throw-handler #t
(lambda ()
(call-with-values
(lambda ()
(start-stack
'worker-thread
(apply proc args)))
(lambda vals
(cons (/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second)
vals))))
(lambda args
(when (match args
(('%exception exn)
(log-exception? exn))
(_ #t))
(simple-format
(current-error-port)
"worker-thread: exception: ~A\n" args)
(backtrace)))))
#:unwind? #t)))
(put-message reply
response)
(vector-set! thread-proc-vector
thread-index
#f)
(match response
(('worker-thread-error duration _)
(when duration-logger
(duration-logger duration proc))
#t)
((duration . _)
(when duration-logger
(duration-logger duration proc))
#f))))))))
(unless (and expire-on-exception?
exception?)
(if (number? current-lifetime)
(unless (< current-lifetime 0)
(loop (if current-lifetime
(- current-lifetime 1)
#f)))
(loop #f))))))
(let ((channel (make-channel)))
(for-each
(lambda (thread-index)
(call-with-new-thread
(lambda ()
(catch 'system-error
(lambda ()
(set-thread-name
(string-append
name " w t "
(number->string thread-index))))
(const #t))
(let init ((args (initializer/safe)))
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker-thread-channel: exception: ~A\n" exn))
(lambda ()
(parameterize ((%worker-thread-args args))
(process thread-index channel args)))
#:unwind? #t)
(when destructor
(destructor/safe args))
(init (initializer/safe))))))
(iota parallelism))
(values channel
thread-proc-vector)))
(define &worker-thread-timeout
(make-exception-type '&worker-thread-timeout
&error
'()))
(define make-worker-thread-timeout-error
(record-constructor &worker-thread-timeout))
(define worker-thread-timeout-error?
(record-predicate &worker-thread-timeout))
(define %worker-thread-default-timeout
(make-parameter 30))
(define* (call-with-worker-thread channel proc #:key duration-logger
(timeout (%worker-thread-default-timeout)))
"Send PROC to the worker thread through CHANNEL. Return the result of PROC.
If already in the worker thread, call PROC immediately."
(let ((args (%worker-thread-args)))
(if args
(apply proc args)
(let* ((reply (make-channel))
(operation-success?
(perform-operation
(let ((put
(wrap-operation
(put-operation channel
(list reply
(get-internal-real-time)
proc))
(const #t))))
(if timeout
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const #f)))
put)))))
(unless operation-success?
(raise-exception
(make-worker-thread-timeout-error)))
(match (get-message reply)
(('worker-thread-error duration exn)
(when duration-logger
(duration-logger duration))
(raise-exception exn))
((duration . result)
(when duration-logger
(duration-logger duration))
(apply values result)))))))
(define* (fiberize proc #:key (parallelism 1))
(let ((channel (make-channel)))
(for-each
(lambda _
(spawn-fiber
(lambda ()
(while #t
(let ((reply-channel args (car+cdr
(get-message channel))))
(put-message
reply-channel
(with-exception-handler
(lambda (exn)
(cons 'exception exn))
(lambda ()
(with-throw-handler #t
(lambda ()
(call-with-values
(lambda ()
(apply proc args))
(lambda vals
(cons 'result vals))))
(lambda _
(backtrace))))
#:unwind? #t)))))
#:parallel? #t))
(iota parallelism))
(lambda args
(let ((reply-channel (make-channel)))
(put-message channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception . exn) (raise-exception exn)))))))
(define-record-type <fibers-promise>
(make-fibers-promise thunk values-box evaluated-condition)
fibers-promise?
(thunk fibers-promise-thunk)
(values-box fibers-promise-values-box)
(evaluated-condition fibers-promise-evaluated-condition))
(define (fibers-delay thunk)
(make-fibers-promise
thunk
(make-atomic-box #f)
(make-condition)))
(define (fibers-force fp)
(let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp)
#f
'started)))
(if (eq? #f res)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(atomic-box-set! (fibers-promise-values-box fp)
exn)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(raise-exception exn))
(fibers-promise-thunk fp)
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals)))
(if (eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result))))
(if (exception? res)
(raise-exception res)
(apply values res))))))
(define (fibers-promise-reset fp)
(atomic-box-set! (fibers-promise-values-box fp)
#f))
;; Like split-at, but don't care about the order of the resulting lists, and
;; don't error if the list is shorter than i elements
(define (split-at* lst i)
(let lp ((l lst) (n i) (acc '()))
(if (or (<= n 0) (null? l))
(values (reverse! acc) l)
(lp (cdr l) (- n 1) (cons (car l) acc)))))
;; As this can be called with lists with tens of thousands of items in them,
;; batch the
(define (get-batch batch-size lists)
(let ((split-lists
(map (lambda (lst)
(let ((batch rest (split-at* lst batch-size)))
(cons batch rest)))
lists)))
(values (map car split-lists)
(map cdr split-lists))))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(with-exception-handler
(lambda (exn)
(put-message reply (cons 'exception exn)))
(lambda ()
(call-with-values
(lambda ()
(with-throw-handler #t
thunk
(lambda _
(backtrace))))
(lambda vals
(put-message reply vals))))
#:unwind? #t))
#:parallel? #t)
reply))
(define (fetch-result-of-defered-thunks . reply-channels)
(let ((responses (map get-message
reply-channels)))
(map
(match-lambda
(('exception . exn)
(raise-exception exn))
(result
(apply values result)))
responses)))
(define (fibers-batch-map proc batch-size . lists)
(let loop ((lists lists)
(result '()))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
result
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(loop rest
(append! result
(apply fetch-result-of-defered-thunks
response-channels))))))))
(define (fibers-map proc . lists)
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc batch-size . lists)
(let loop ((lists lists))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
*unspecified*
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(apply fetch-result-of-defered-thunks
response-channels)
(loop rest))))))
(define (fibers-for-each proc . lists)
(apply fibers-batch-for-each proc 20 lists))
(define-syntax parallel-via-fibers
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (defer-to-parallel-fiber
(lambda ()
e0)))
...)
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (parallel-via-fibers e ...))
(lambda (v ...)
b0 b1 ...)))
(define (par-mapper' mapper cons)
(lambda (proc . lists)
(apply
fetch-result-of-defered-thunks
(let loop ((lists lists))
(match lists
(((heads tails ...) ...)
(let ((tail (loop tails))
(head (defer-to-parallel-fiber
(lambda ()
(apply proc heads)))))
(cons head tail)))
(_
'()))))))
(define par-map& (par-mapper' map cons))
(define* (fibers-map-with-progress proc lists #:key report)
(let loop ((channels-to-results
(apply map
(lambda args
(cons (defer-to-parallel-fiber
(lambda ()
(apply proc args)))
#f))
lists)))
(let ((active-channels
(filter-map car channels-to-results)))
(when report
(report (apply map
list
(map cdr channels-to-results)
lists)))
(if (null? active-channels)
(map
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . val))
val))
channels-to-results)
(loop
(perform-operation
(apply
choice-operation
(filter-map
(lambda (p)
(match p
((channel . _)
(if channel
(wrap-operation
(get-operation channel)
(lambda (result)
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r))))
channels-to-results)))
#f))))
channels-to-results))))))))
(define (chunk lst max-length)
(if (> (length lst)
max-length)
(call-with-values (lambda ()
(split-at lst max-length))
(lambda (first-lst rest)
(cons first-lst
(chunk rest max-length))))
(list lst)))
(define (chunk! lst max-length)
(if (> (length lst)
max-length)
(call-with-values (lambda ()
(split-at! lst max-length))
(lambda (first-lst rest)
(cons first-lst
(chunk! rest max-length))))
(list lst)))
(define* (chunk-for-each! proc chunk-size #:rest lsts)
(define (do-one-iteration lsts)
(if (> (length (car lsts))
chunk-size)
(let ((chunks-and-rest
(map (lambda (lst)
(call-with-values (lambda ()
(split-at! lst chunk-size))
(lambda (first-lst rest)
(cons first-lst
rest))))
lsts)))
(apply proc
(map car chunks-and-rest))
(do-one-iteration
(map cdr chunks-and-rest)))
(apply proc lsts)))
(let ((list-lengths (map length lsts)))
(unless (eq? 1 (length (delete-duplicates list-lengths)))
(error "lists not equal length"))
(unless (eq? 0 (first list-lengths))
(do-one-iteration lsts)))
#t)
(define* (delete-duplicates/sort! unsorted-lst less #:optional (equal? equal?))
(if (null? unsorted-lst)
unsorted-lst
(let ((sorted-lst (sort! unsorted-lst less)))
(let loop ((lst (cdr sorted-lst))
(last-element (car sorted-lst))
(result (list (car sorted-lst))))
(if (null? lst)
result
(let ((current-element (car lst)))
(if (equal? current-element last-element)
(loop (cdr lst)
last-element
result)
(loop (cdr lst)
current-element
(cons current-element
result)))))))))
(define (get-guix-metrics-updater registry)
(define guix-db "/var/guix/db/db.sqlite")
(define guix-db-wal (string-append guix-db "-wal"))
(let ((guix-db-bytes-metric
(make-gauge-metric registry "guix_db_bytes"))
(guix-db-wal-bytes-metric
(make-gauge-metric registry "guix_db_wal_bytes")))
(lambda ()
(with-exception-handler
(lambda _
#f)
(lambda ()
(metric-set guix-db-bytes-metric (stat:size (stat guix-db)))
(metric-set guix-db-wal-bytes-metric
(if (file-exists? guix-db-wal)
(stat:size (stat guix-db-wal))
0)))
#:unwind? #t))))
;; This variant of run-server from the fibers library supports running
;; multiple servers within one process.
(define run-server/patched
(let ((fibers-web-server-module
(resolve-module '(fibers web server))))
(define set-nonblocking!
(module-ref fibers-web-server-module 'set-nonblocking!))
(define make-default-socket
(module-ref fibers-web-server-module 'make-default-socket))
(define socket-loop
(module-ref fibers-web-server-module 'socket-loop))
(lambda* (handler
#:key
(host #f)
(family AF_INET)
(addr (if host
(inet-pton family host)
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port)))
;; We use a large backlog by default. If the server is suddenly hit
;; with a number of connections on a small backlog, clients won't
;; receive confirmation for their SYN, leading them to retry --
;; probably successfully, but with a large latency.
(listen socket 1024)
(set-nonblocking! socket)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
(define &port-timeout
(make-exception-type '&port-timeout
&external-error
'(port)))
(define make-port-timeout-error
(record-constructor &port-timeout))
(define port-timeout-error?
(record-predicate &port-timeout))
(define &port-read-timeout
(make-exception-type '&port-read-timeout
&port-timeout
'()))
(define make-port-read-timeout-error
(record-constructor &port-read-timeout))
(define port-read-timeout-error?
(record-predicate &port-read-timeout))
(define &port-write-timeout
(make-exception-type '&port-write-timeout
&port-timeout
'()))
(define make-port-write-timeout-error
(record-constructor &port-write-timeout))
(define port-write-timeout-error?
(record-predicate &port-write-timeout))
;; These procedure are subject to spurious wakeups.
(define (readable? port)
"Test if PORT is writable."
(match (select (vector port) #() #() 0)
((#() #() #()) #f)
((#(_) #() #()) #t)))
(define (writable? port)
"Test if PORT is writable."
(match (select #() (vector port) #() 0)
((#() #() #()) #f)
((#() #(_) #()) #t)))
(define (make-wait-operation ready? schedule-when-ready port
port-ready-fd this-procedure)
(make-base-operation
#f
(lambda _
(and (ready? (port-ready-fd port)) values))
(lambda (flag sched resume)
(define (commit)
(match (atomic-box-compare-and-swap! flag 'W 'S)
('W (resume values))
('C (commit))
('S #f)))
(schedule-when-ready
sched (port-ready-fd port) commit))))
(define (wait-until-port-readable-operation port)
"Make an operation that will succeed when PORT is readable."
(unless (input-port? port)
(error "refusing to wait forever for input on non-input port"))
(make-wait-operation readable? schedule-task-when-fd-readable port
port-read-wait-fd
wait-until-port-readable-operation))
(define (wait-until-port-writable-operation port)
"Make an operation that will succeed when PORT is writable."
(unless (output-port? port)
(error "refusing to wait forever for output on non-output port"))
(make-wait-operation writable? schedule-task-when-fd-writable port
port-write-wait-fd
wait-until-port-writable-operation))
(define* (with-fibers-port-timeouts thunk
#:key timeout
(read-timeout timeout)
(write-timeout timeout))
(define (no-fibers-wait port mode timeout)
(define poll-timeout-ms 200)
;; When the GC runs, it restarts the poll syscall, but the timeout
;; remains unchanged! When the timeout is longer than the time
;; between the syscall restarting, I think this renders the
;; timeout useless. Therefore, this code uses a short timeout, and
;; repeatedly calls poll while watching the clock to see if it has
;; timed out overall.
(let ((timeout-internal
(+ (get-internal-real-time)
(* internal-time-units-per-second
(/ timeout 1000)))))
(let loop ((poll-value
(port-poll port mode poll-timeout-ms)))
(if (= poll-value 0)
(if (> (get-internal-real-time)
timeout-internal)
(raise-exception
(if (string=? mode "r")
(make-port-read-timeout-error port)
(make-port-write-timeout-error port)))
(loop (port-poll port mode poll-timeout-ms)))
poll-value))))
(parameterize
((current-read-waiter
(lambda (port)
(if (current-scheduler)
(perform-operation
(choice-operation
(wait-until-port-readable-operation port)
(wrap-operation
(sleep-operation read-timeout)
(lambda ()
(raise-exception
(make-port-read-timeout-error thunk port))))))
(no-fibers-wait port "r" read-timeout))))
(current-write-waiter
(lambda (port)
(if (current-scheduler)
(perform-operation
(choice-operation
(wait-until-port-writable-operation port)
(wrap-operation
(sleep-operation write-timeout)
(lambda ()
(raise-exception
(make-port-write-timeout-error thunk port))))))
(no-fibers-wait port "w" write-timeout)))))
(thunk)))
(define (spawn-port-monitoring-fiber port error-condition)
(spawn-fiber
(lambda ()
(while #t
(sleep 20)
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"port monitoring fiber failed to connect to ~A: ~A\n"
port exn)
(signal-condition! error-condition))
(lambda ()
(with-fibers-port-timeouts
(lambda ()
(let ((sock (socket PF_INET SOCK_STREAM 0)))
(connect sock AF_INET INADDR_LOOPBACK port)
(close-port sock)))
#:timeout 20))
#:unwind? #t)))))
;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
(let ((handler #f))
(dynamic-wind
(lambda ()
(set! handler
(sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
thunk
(lambda ()
(if handler
;; restore Scheme handler, SIG_IGN or SIG_DFL.
(sigaction SIGINT (car handler) (cdr handler))
;; restore original C handler.
(sigaction SIGINT #f))))))
(define (make-queueing-channel channel)
(define queue (make-q))
(let ((queue-channel (make-channel)))
(spawn-fiber
(lambda ()
(while #t
(if (q-empty? queue)
(enq! queue
(perform-operation
(get-operation queue-channel)))
(let ((front (q-front queue)))
(perform-operation
(choice-operation
(wrap-operation (get-operation queue-channel)
(lambda (val)
(enq! queue val)))
(wrap-operation (put-operation channel front)
(lambda _
(q-pop! queue))))))))))
queue-channel))