From 4140ef0bd67dfed419203713df497f94b0ac2e45 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 22:43:25 +0200 Subject: [PATCH 1/2] More consistently handle results and exceptions In the parallelism module. --- knots/parallelism.scm | 62 +++++++++++++++++++++---------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index c98ca3f..f15dbe8 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,6 +22,7 @@ #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -57,7 +58,7 @@ (lambda (exn) (put-message reply - (list 'exception exn))) + (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -78,7 +79,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply vals)))))) + (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -88,10 +89,10 @@ reply-channels))) (map (match-lambda - (('exception exn) + (('exception . exn) (raise-exception exn)) - (result - (apply values result))) + (('result . vals) + (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) @@ -114,9 +115,18 @@ (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (if (vector? (first lists)) - result-vec - (vector->list result-vec)) + (let ((processed-result-vec + (vector-map + (lambda (_ result-or-exn) + (match result-or-exn + (('exception . exn) + (raise-exception exn)) + (('result . vals) + (car vals)))) + result-vec))) + (if (vector? (first lists)) + processed-result-vec + (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -132,18 +142,13 @@ (get-operation (vector-ref result-vec index)) (lambda (result) - (match result - (('exception exn) - (raise-exception exn)) - (_ - (vector-set! result-vec - index - (first result)) - - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))))) + (vector-set! result-vec + index + result) + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -217,10 +222,10 @@ (if (null? active-channels) (map (match-lambda - ((#f . ('exception exn)) + ((#f . ('exception . exn)) (raise-exception exn)) - ((#f . ('result val)) - val)) + ((#f . ('result . vals)) + (car vals))) channels-to-results) (loop (perform-operation @@ -237,12 +242,7 @@ (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f - (match result - (('exception exn) - result) - (_ - (list 'result result)))) + (cons #f result) (cons c r)))) channels-to-results))) #f)))) @@ -263,7 +263,7 @@ reply-channel (with-exception-handler (lambda (exn) - (list 'exception exn)) + (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -294,7 +294,7 @@ (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception exn) + (('exception . exn) (raise-exception exn)))))) (define-record-type From 0fa6737a39f866bdbffc11fd16348c5411c11a7c Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Fri, 27 Jun 2025 23:28:47 +0200 Subject: [PATCH 2/2] Document some things --- knots/parallelism.scm | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/knots/parallelism.scm b/knots/parallelism.scm index f15dbe8..7631055 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -96,6 +96,9 @@ responses))) (define (fibers-batch-map proc parallelism-limit . lists) + "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of +the invocations of PROC raise an exception, this will be raised once +all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -171,9 +174,14 @@ channel-indexes))))))) (define (fibers-map proc . lists) + "Map PROC over LISTS in parallel, running up to 20 fibers in + PARALLEL. If any of the invocations of PROC raise an exception, this +will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) + "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in +parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -184,10 +192,13 @@ *unspecified*) (define (fibers-for-each proc . lists) + "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) + "Run each expression in parallel. If any expression raises an + exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -198,12 +209,16 @@ (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) + "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) + "Map PROC over LISTS, calling #:REPORT if specified after each +invocation of PROC finishes. REPORT is passed the results for each + element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args