More consistently handle results and exceptions

In the parallelism module.
This commit is contained in:
Christopher Baines 2025-06-27 22:43:25 +02:00
parent 6f6d57b189
commit 4140ef0bd6

View file

@ -22,6 +22,7 @@
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 control) #:use-module (ice-9 control)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
@ -57,7 +58,7 @@
(lambda (exn) (lambda (exn)
(put-message (put-message
reply reply
(list 'exception exn))) (cons 'exception exn)))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -78,7 +79,7 @@
(lambda () (lambda ()
(start-stack #t (thunk))) (start-stack #t (thunk)))
(lambda vals (lambda vals
(put-message reply vals)))))) (put-message reply (cons 'result vals)))))))
#:unwind? #t)) #:unwind? #t))
#:parallel? #t) #:parallel? #t)
reply)) reply))
@ -88,10 +89,10 @@
reply-channels))) reply-channels)))
(map (map
(match-lambda (match-lambda
(('exception exn) (('exception . exn)
(raise-exception exn)) (raise-exception exn))
(result (('result . vals)
(apply values result))) (apply values vals)))
responses))) responses)))
(define (fibers-batch-map proc parallelism-limit . lists) (define (fibers-batch-map proc parallelism-limit . lists)
@ -114,9 +115,18 @@
(channel-indexes '())) (channel-indexes '()))
(if (and (eq? #f next-to-process-index) (if (and (eq? #f next-to-process-index)
(null? channel-indexes)) (null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists)) (if (vector? (first lists))
result-vec processed-result-vec
(vector->list result-vec)) (vector->list processed-result-vec)))
(if (or (= (length channel-indexes) (if (or (= (length channel-indexes)
(min parallelism-limit vecs-length)) (min parallelism-limit vecs-length))
@ -132,18 +142,13 @@
(get-operation (get-operation
(vector-ref result-vec index)) (vector-ref result-vec index))
(lambda (result) (lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec (vector-set! result-vec
index index
(first result)) result)
(values next-to-process-index (values next-to-process-index
(lset-difference = (lset-difference =
channel-indexes channel-indexes
(list index)))))))) (list index))))))
channel-indexes))))) channel-indexes)))))
(loop new-index (loop new-index
new-channel-indexes)) new-channel-indexes))
@ -217,10 +222,10 @@
(if (null? active-channels) (if (null? active-channels)
(map (map
(match-lambda (match-lambda
((#f . ('exception exn)) ((#f . ('exception . exn))
(raise-exception exn)) (raise-exception exn))
((#f . ('result val)) ((#f . ('result . vals))
val)) (car vals)))
channels-to-results) channels-to-results)
(loop (loop
(perform-operation (perform-operation
@ -237,12 +242,7 @@
(map (match-lambda (map (match-lambda
((c . r) ((c . r)
(if (eq? channel c) (if (eq? channel c)
(cons #f (cons #f result)
(match result
(('exception exn)
result)
(_
(list 'result result))))
(cons c r)))) (cons c r))))
channels-to-results))) channels-to-results)))
#f)))) #f))))
@ -263,7 +263,7 @@
reply-channel reply-channel
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(list 'exception exn)) (cons 'exception exn))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -294,7 +294,7 @@
(put-message input-channel (cons reply-channel args)) (put-message input-channel (cons reply-channel args))
(match (get-message reply-channel) (match (get-message reply-channel)
(('result . vals) (apply values vals)) (('result . vals) (apply values vals))
(('exception exn) (('exception . exn)
(raise-exception exn)))))) (raise-exception exn))))))
(define-record-type <parallelism-limiter> (define-record-type <parallelism-limiter>