Compare commits

...

2 commits

Author SHA1 Message Date
0fa6737a39 Document some things
All checks were successful
/ test (push) Successful in 9s
2025-06-27 23:28:47 +02:00
4140ef0bd6 More consistently handle results and exceptions
In the parallelism module.
2025-06-27 22:43:25 +02:00

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
@ -57,7 +58,7 @@
(lambda (exn)
(put-message
reply
(list 'exception exn)))
(cons 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -78,7 +79,7 @@
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply vals))))))
(put-message reply (cons 'result vals)))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -88,13 +89,16 @@
reply-channels)))
(map
(match-lambda
(('exception exn)
(('exception . exn)
(raise-exception exn))
(result
(apply values result)))
(('result . vals)
(apply values vals)))
responses)))
(define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec)
list-or-vec
@ -114,9 +118,18 @@
(channel-indexes '()))
(if (and (eq? #f next-to-process-index)
(null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists))
result-vec
(vector->list result-vec))
processed-result-vec
(vector->list processed-result-vec)))
(if (or (= (length channel-indexes)
(min parallelism-limit vecs-length))
@ -132,18 +145,13 @@
(get-operation
(vector-ref result-vec index))
(lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec
index
(first result))
result)
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))))
(list index))))))
channel-indexes)))))
(loop new-index
new-channel-indexes))
@ -166,9 +174,14 @@
channel-indexes)))))))
(define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map
(lambda args
(apply proc args)
@ -179,10 +192,13 @@
*unspecified*)
(define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -193,12 +209,16 @@
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results
(apply map
(lambda args
@ -217,10 +237,10 @@
(if (null? active-channels)
(map
(match-lambda
((#f . ('exception exn))
((#f . ('exception . exn))
(raise-exception exn))
((#f . ('result val))
val))
((#f . ('result . vals))
(car vals)))
channels-to-results)
(loop
(perform-operation
@ -237,12 +257,7 @@
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f
(match result
(('exception exn)
result)
(_
(list 'result result))))
(cons #f result)
(cons c r))))
channels-to-results)))
#f))))
@ -263,7 +278,7 @@
reply-channel
(with-exception-handler
(lambda (exn)
(list 'exception exn))
(cons 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -294,7 +309,7 @@
(put-message input-channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception exn)
(('exception . exn)
(raise-exception exn))))))
(define-record-type <parallelism-limiter>