From 5439159a169661ee4507fa2f565c38e2b14398d8 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 19 Jul 2024 17:07:10 +0100 Subject: [PATCH] Add more fibers utilities --- guix-data-service/utils.scm | 88 +++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index a9e8f39..736e24d 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -17,7 +17,9 @@ (define-module (guix-data-service utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 atomic) @@ -46,6 +48,12 @@ with-resource-from-pool resource-pool-stats + fibers-delay + fibers-force + + fibers-batch-for-each + fibers-for-each + parallel-via-fibers par-map& letpar& @@ -456,6 +464,86 @@ available. Return the resource once PROC has returned." (raise-exception (make-resource-pool-timeout-error)))))) +(define-record-type + (make-fibers-promise thunk values-box evaluated-condition) + fibers-promise? + (thunk fibers-promise-thunk) + (values-box fibers-promise-values-box) + (evaluated-condition fibers-promise-evaluated-condition)) + +(define (fibers-delay thunk) + (make-fibers-promise + thunk + (make-atomic-box #f) + (make-condition))) + +(define (fibers-force fp) + (let ((res (atomic-box-compare-and-swap! + (fibers-promise-values-box fp) + #f + 'started))) + (if (eq? #f res) + (call-with-values + (lambda () + (with-exception-handler + (lambda (exn) + (atomic-box-set! (fibers-promise-values-box fp) + exn) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (raise-exception exn)) + (fibers-promise-thunk fp) + #:unwind? #t)) + (lambda vals + (atomic-box-set! (fibers-promise-values-box fp) + vals) + (signal-condition! + (fibers-promise-evaluated-condition fp)) + (apply values vals))) + (if (eq? res 'started) + (begin + (wait (fibers-promise-evaluated-condition fp)) + (let ((result (atomic-box-ref (fibers-promise-values-box fp)))) + (if (exception? result) + (raise-exception result) + (apply values result)))) + (if (exception? res) + (raise-exception res) + (apply values res)))))) + +(define (fibers-batch-for-each proc batch-size . lists) + ;; Like split-at, but don't care about the order of the resulting lists, and + ;; don't error if the list is shorter than i elements + (define (split-at* lst i) + (let lp ((l lst) (n i) (acc '())) + (if (or (<= n 0) (null? l)) + (values (reverse! acc) l) + (lp (cdr l) (- n 1) (cons (car l) acc))))) + + ;; As this can be called with lists with tens of thousands of items in them, + ;; batch the + (define (get-batch lists) + (let ((split-lists + (map (lambda (lst) + (let ((batch rest (split-at* lst batch-size))) + (cons batch rest))) + lists))) + (values (map car split-lists) + (map cdr split-lists)))) + + (let loop ((lists lists)) + (call-with-values + (lambda () + (get-batch lists)) + (lambda (batch rest) + (apply par-map& proc batch) + (unless (null? (car rest)) + (loop rest))))) + *unspecified*) + +(define (fibers-for-each proc . lists) + (apply fibers-batch-for-each proc 20 lists)) + (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) (spawn-fiber