Add more fibers utilities

This commit is contained in:
Christopher Baines 2024-07-19 17:07:10 +01:00
parent 587277f347
commit 5439159a16

View file

@ -17,7 +17,9 @@
(define-module (guix-data-service utils) (define-module (guix-data-service utils)
#:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-11) #:use-module (srfi srfi-11)
#:use-module (srfi srfi-71)
#:use-module (ice-9 ftw) #:use-module (ice-9 ftw)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 atomic) #:use-module (ice-9 atomic)
@ -46,6 +48,12 @@
with-resource-from-pool with-resource-from-pool
resource-pool-stats resource-pool-stats
fibers-delay
fibers-force
fibers-batch-for-each
fibers-for-each
parallel-via-fibers parallel-via-fibers
par-map& par-map&
letpar& letpar&
@ -456,6 +464,86 @@ available. Return the resource once PROC has returned."
(raise-exception (raise-exception
(make-resource-pool-timeout-error)))))) (make-resource-pool-timeout-error))))))
(define-record-type <fibers-promise>
(make-fibers-promise thunk values-box evaluated-condition)
fibers-promise?
(thunk fibers-promise-thunk)
(values-box fibers-promise-values-box)
(evaluated-condition fibers-promise-evaluated-condition))
(define (fibers-delay thunk)
(make-fibers-promise
thunk
(make-atomic-box #f)
(make-condition)))
(define (fibers-force fp)
(let ((res (atomic-box-compare-and-swap!
(fibers-promise-values-box fp)
#f
'started)))
(if (eq? #f res)
(call-with-values
(lambda ()
(with-exception-handler
(lambda (exn)
(atomic-box-set! (fibers-promise-values-box fp)
exn)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(raise-exception exn))
(fibers-promise-thunk fp)
#:unwind? #t))
(lambda vals
(atomic-box-set! (fibers-promise-values-box fp)
vals)
(signal-condition!
(fibers-promise-evaluated-condition fp))
(apply values vals)))
(if (eq? res 'started)
(begin
(wait (fibers-promise-evaluated-condition fp))
(let ((result (atomic-box-ref (fibers-promise-values-box fp))))
(if (exception? result)
(raise-exception result)
(apply values result))))
(if (exception? res)
(raise-exception res)
(apply values res))))))
(define (fibers-batch-for-each proc batch-size . lists)
;; Like split-at, but don't care about the order of the resulting lists, and
;; don't error if the list is shorter than i elements
(define (split-at* lst i)
(let lp ((l lst) (n i) (acc '()))
(if (or (<= n 0) (null? l))
(values (reverse! acc) l)
(lp (cdr l) (- n 1) (cons (car l) acc)))))
;; As this can be called with lists with tens of thousands of items in them,
;; batch the
(define (get-batch lists)
(let ((split-lists
(map (lambda (lst)
(let ((batch rest (split-at* lst batch-size)))
(cons batch rest)))
lists)))
(values (map car split-lists)
(map cdr split-lists))))
(let loop ((lists lists))
(call-with-values
(lambda ()
(get-batch lists))
(lambda (batch rest)
(apply par-map& proc batch)
(unless (null? (car rest))
(loop rest)))))
*unspecified*)
(define (fibers-for-each proc . lists)
(apply fibers-batch-for-each proc 20 lists))
(define (defer-to-parallel-fiber thunk) (define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel))) (let ((reply (make-channel)))
(spawn-fiber (spawn-fiber