Add new fibers utilities

The new fibers-map uses the same batching approach that fibers-for-each uses,
and fibers-map-with-progress allows tracking on the results while the map is
happening.
This commit is contained in:
Christopher Baines 2024-10-31 16:43:13 +00:00
parent f8ac6e3dd9
commit 55af7c82e8

View file

@ -65,10 +65,13 @@
fibers-batch-for-each
fibers-for-each
fibers-batch-map
fibers-map
parallel-via-fibers
par-map&
letpar&
fibers-map-with-progress
chunk
chunk!
@ -805,38 +808,24 @@ If already in the worker thread, call PROC immediately."
(atomic-box-set! (fibers-promise-values-box fp)
#f))
(define (fibers-batch-for-each proc batch-size . lists)
;; Like split-at, but don't care about the order of the resulting lists, and
;; don't error if the list is shorter than i elements
(define (split-at* lst i)
(let lp ((l lst) (n i) (acc '()))
(if (or (<= n 0) (null? l))
(values (reverse! acc) l)
(lp (cdr l) (- n 1) (cons (car l) acc)))))
;; Like split-at, but don't care about the order of the resulting lists, and
;; don't error if the list is shorter than i elements
(define (split-at* lst i)
(let lp ((l lst) (n i) (acc '()))
(if (or (<= n 0) (null? l))
(values (reverse! acc) l)
(lp (cdr l) (- n 1) (cons (car l) acc)))))
;; As this can be called with lists with tens of thousands of items in them,
;; batch the
(define (get-batch lists)
(let ((split-lists
(map (lambda (lst)
(let ((batch rest (split-at* lst batch-size)))
(cons batch rest)))
lists)))
(values (map car split-lists)
(map cdr split-lists))))
(let loop ((lists lists))
(call-with-values
(lambda ()
(get-batch lists))
(lambda (batch rest)
(apply par-map& proc batch)
(unless (null? (car rest))
(loop rest)))))
*unspecified*)
(define (fibers-for-each proc . lists)
(apply fibers-batch-for-each proc 20 lists))
;; As this can be called with lists with tens of thousands of items in them,
;; batch the
(define (get-batch batch-size lists)
(let ((split-lists
(map (lambda (lst)
(let ((batch rest (split-at* lst batch-size)))
(cons batch rest)))
lists)))
(values (map car split-lists)
(map cdr split-lists))))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
@ -869,6 +858,50 @@ If already in the worker thread, call PROC immediately."
(apply values result)))
responses)))
(define (fibers-batch-map proc batch-size . lists)
(let loop ((lists lists)
(result '()))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
result
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(loop rest
(append! result
(apply fetch-result-of-defered-thunks
response-channels))))))))
(define (fibers-map proc . lists)
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc batch-size . lists)
(let loop ((lists lists))
(let ((batch
rest
(get-batch batch-size lists)))
(if (any null? batch)
*unspecified*
(let ((response-channels
(apply map
(lambda args
(defer-to-parallel-fiber
(lambda ()
(apply proc args))))
batch)))
(apply fetch-result-of-defered-thunks
response-channels)
(loop rest))))))
(define (fibers-for-each proc . lists)
(apply fibers-batch-for-each proc 20 lists))
(define-syntax parallel-via-fibers
(lambda (x)
(syntax-case x ()
@ -903,6 +936,56 @@ If already in the worker thread, call PROC immediately."
(define par-map& (par-mapper' map cons))
(define* (fibers-map-with-progress proc lists #:key report)
(let loop ((channels-to-results
(apply map
(lambda args
(cons (defer-to-parallel-fiber
(lambda ()
(apply proc args)))
#f))
lists)))
(let ((active-channels
(filter-map car channels-to-results)))
(when report
(report (apply map
list
(map cdr channels-to-results)
lists)))
(if (null? active-channels)
(map
(match-lambda
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result . val))
val))
channels-to-results)
(loop
(perform-operation
(apply
choice-operation
(filter-map
(lambda (p)
(match p
((channel . _)
(if channel
(wrap-operation
(get-operation channel)
(lambda (result)
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f
(match result
(('exception . exn)
result)
(_
(cons 'result result))))
(cons c r))))
channels-to-results)))
#f))))
channels-to-results))))))))
(define (chunk lst max-length)
(if (> (length lst)
max-length)