diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 7631055..c98ca3f 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -22,7 +22,6 @@ #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) - #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) @@ -58,7 +57,7 @@ (lambda (exn) (put-message reply - (cons 'exception exn))) + (list 'exception exn))) (lambda () (with-exception-handler (lambda (exn) @@ -79,7 +78,7 @@ (lambda () (start-stack #t (thunk))) (lambda vals - (put-message reply (cons 'result vals))))))) + (put-message reply vals)))))) #:unwind? #t)) #:parallel? #t) reply)) @@ -89,16 +88,13 @@ reply-channels))) (map (match-lambda - (('exception . exn) + (('exception exn) (raise-exception exn)) - (('result . vals) - (apply values vals))) + (result + (apply values result))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) - "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of -the invocations of PROC raise an exception, this will be raised once -all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec @@ -118,18 +114,9 @@ all of the calls to PROC have finished." (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) - (let ((processed-result-vec - (vector-map - (lambda (_ result-or-exn) - (match result-or-exn - (('exception . exn) - (raise-exception exn)) - (('result . vals) - (car vals)))) - result-vec))) - (if (vector? (first lists)) - processed-result-vec - (vector->list processed-result-vec))) + (if (vector? (first lists)) + result-vec + (vector->list result-vec)) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) @@ -145,13 +132,18 @@ all of the calls to PROC have finished." (get-operation (vector-ref result-vec index)) (lambda (result) - (vector-set! result-vec - index - result) - (values next-to-process-index - (lset-difference = - channel-indexes - (list index)))))) + (match result + (('exception exn) + (raise-exception exn)) + (_ + (vector-set! result-vec + index + (first result)) + + (values next-to-process-index + (lset-difference = + channel-indexes + (list index)))))))) channel-indexes))))) (loop new-index new-channel-indexes)) @@ -174,14 +166,9 @@ all of the calls to PROC have finished." channel-indexes))))))) (define (fibers-map proc . lists) - "Map PROC over LISTS in parallel, running up to 20 fibers in - PARALLEL. If any of the invocations of PROC raise an exception, this -will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) - "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in -parallel." (apply fibers-batch-map (lambda args (apply proc args) @@ -192,13 +179,10 @@ parallel." *unspecified*) (define (fibers-for-each proc . lists) - "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) - "Run each expression in parallel. If any expression raises an - exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) @@ -209,16 +193,12 @@ parallel." (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) - "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) - "Map PROC over LISTS, calling #:REPORT if specified after each -invocation of PROC finishes. REPORT is passed the results for each - element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args @@ -237,10 +217,10 @@ invocation of PROC finishes. REPORT is passed the results for each (if (null? active-channels) (map (match-lambda - ((#f . ('exception . exn)) + ((#f . ('exception exn)) (raise-exception exn)) - ((#f . ('result . vals)) - (car vals))) + ((#f . ('result val)) + val)) channels-to-results) (loop (perform-operation @@ -257,7 +237,12 @@ invocation of PROC finishes. REPORT is passed the results for each (map (match-lambda ((c . r) (if (eq? channel c) - (cons #f result) + (cons #f + (match result + (('exception exn) + result) + (_ + (list 'result result)))) (cons c r)))) channels-to-results))) #f)))) @@ -278,7 +263,7 @@ invocation of PROC finishes. REPORT is passed the results for each reply-channel (with-exception-handler (lambda (exn) - (cons 'exception exn)) + (list 'exception exn)) (lambda () (with-exception-handler (lambda (exn) @@ -309,7 +294,7 @@ invocation of PROC finishes. REPORT is passed the results for each (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) - (('exception . exn) + (('exception exn) (raise-exception exn)))))) (define-record-type