Compare commits

...

2 commits

Author SHA1 Message Date
0fa6737a39 Document some things
All checks were successful
/ test (push) Successful in 9s
2025-06-27 23:28:47 +02:00
4140ef0bd6 More consistently handle results and exceptions
In the parallelism module.
2025-06-27 22:43:25 +02:00

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 control) #:use-module (ice-9 control)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
@ -57,7 +58,7 @@
(lambda (exn) (lambda (exn)
(put-message (put-message
reply reply
(list 'exception exn))) (cons 'exception exn)))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -78,7 +79,7 @@
(lambda () (lambda ()
(start-stack #t (thunk))) (start-stack #t (thunk)))
(lambda vals (lambda vals
(put-message reply vals)))))) (put-message reply (cons 'result vals)))))))
#:unwind? #t)) #:unwind? #t))
#:parallel? #t) #:parallel? #t)
reply)) reply))
@ -88,13 +89,16 @@
reply-channels))) reply-channels)))
(map (map
(match-lambda (match-lambda
(('exception exn) (('exception . exn)
(raise-exception exn)) (raise-exception exn))
(result (('result . vals)
(apply values result))) (apply values vals)))
responses))) responses)))
(define (fibers-batch-map proc parallelism-limit . lists) (define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec) (define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec) (if (vector? list-or-vec)
list-or-vec list-or-vec
@ -114,9 +118,18 @@
(channel-indexes '())) (channel-indexes '()))
(if (and (eq? #f next-to-process-index) (if (and (eq? #f next-to-process-index)
(null? channel-indexes)) (null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists)) (if (vector? (first lists))
result-vec processed-result-vec
(vector->list result-vec)) (vector->list processed-result-vec)))
(if (or (= (length channel-indexes) (if (or (= (length channel-indexes)
(min parallelism-limit vecs-length)) (min parallelism-limit vecs-length))
@ -132,18 +145,13 @@
(get-operation (get-operation
(vector-ref result-vec index)) (vector-ref result-vec index))
(lambda (result) (lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec (vector-set! result-vec
index index
(first result)) result)
(values next-to-process-index (values next-to-process-index
(lset-difference = (lset-difference =
channel-indexes channel-indexes
(list index)))))))) (list index))))))
channel-indexes))))) channel-indexes)))))
(loop new-index (loop new-index
new-channel-indexes)) new-channel-indexes))
@ -166,9 +174,14 @@
channel-indexes))))))) channel-indexes)))))))
(define (fibers-map proc . lists) (define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists)) (apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists) (define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map (apply fibers-batch-map
(lambda args (lambda args
(apply proc args) (apply proc args)
@ -179,10 +192,13 @@
*unspecified*) *unspecified*)
(define (fibers-for-each proc . lists) (define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists)) (apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel (define-syntax fibers-parallel
(lambda (x) (lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x () (syntax-case x ()
((_ e0 ...) ((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -193,12 +209,16 @@
(apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values (call-with-values
(lambda () (fibers-parallel e ...)) (lambda () (fibers-parallel e ...))
(lambda (v ...) (lambda (v ...)
b0 b1 ...))) b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report) (define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results (let loop ((channels-to-results
(apply map (apply map
(lambda args (lambda args
@ -217,10 +237,10 @@
(if (null? active-channels) (if (null? active-channels)
(map (map
(match-lambda (match-lambda
((#f . ('exception exn)) ((#f . ('exception . exn))
(raise-exception exn)) (raise-exception exn))
((#f . ('result val)) ((#f . ('result . vals))
val)) (car vals)))
channels-to-results) channels-to-results)
(loop (loop
(perform-operation (perform-operation
@ -237,12 +257,7 @@
(map (match-lambda (map (match-lambda
((c . r) ((c . r)
(if (eq? channel c) (if (eq? channel c)
(cons #f (cons #f result)
(match result
(('exception exn)
result)
(_
(list 'result result))))
(cons c r)))) (cons c r))))
channels-to-results))) channels-to-results)))
#f)))) #f))))
@ -263,7 +278,7 @@
reply-channel reply-channel
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(list 'exception exn)) (cons 'exception exn))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -294,7 +309,7 @@
(put-message input-channel (cons reply-channel args)) (put-message input-channel (cons reply-channel args))
(match (get-message reply-channel) (match (get-message reply-channel)
(('result . vals) (apply values vals)) (('result . vals) (apply values vals))
(('exception exn) (('exception . exn)
(raise-exception exn)))))) (raise-exception exn))))))
(define-record-type <parallelism-limiter> (define-record-type <parallelism-limiter>