Compare commits

..

No commits in common. "0fa6737a39f866bdbffc11fd16348c5411c11a7c" and "6f6d57b189a7073718407df263bbe3c1245f2e51" have entirely different histories.

View file

@ -22,7 +22,6 @@
#:use-module (srfi srfi-71) #:use-module (srfi srfi-71)
#:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match) #:use-module (ice-9 match)
#:use-module (ice-9 control) #:use-module (ice-9 control)
#:use-module (ice-9 exceptions) #:use-module (ice-9 exceptions)
@ -58,7 +57,7 @@
(lambda (exn) (lambda (exn)
(put-message (put-message
reply reply
(cons 'exception exn))) (list 'exception exn)))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -79,7 +78,7 @@
(lambda () (lambda ()
(start-stack #t (thunk))) (start-stack #t (thunk)))
(lambda vals (lambda vals
(put-message reply (cons 'result vals))))))) (put-message reply vals))))))
#:unwind? #t)) #:unwind? #t))
#:parallel? #t) #:parallel? #t)
reply)) reply))
@ -89,16 +88,13 @@
reply-channels))) reply-channels)))
(map (map
(match-lambda (match-lambda
(('exception . exn) (('exception exn)
(raise-exception exn)) (raise-exception exn))
(('result . vals) (result
(apply values vals))) (apply values result)))
responses))) responses)))
(define (fibers-batch-map proc parallelism-limit . lists) (define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec) (define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec) (if (vector? list-or-vec)
list-or-vec list-or-vec
@ -118,18 +114,9 @@ all of the calls to PROC have finished."
(channel-indexes '())) (channel-indexes '()))
(if (and (eq? #f next-to-process-index) (if (and (eq? #f next-to-process-index)
(null? channel-indexes)) (null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists)) (if (vector? (first lists))
processed-result-vec result-vec
(vector->list processed-result-vec))) (vector->list result-vec))
(if (or (= (length channel-indexes) (if (or (= (length channel-indexes)
(min parallelism-limit vecs-length)) (min parallelism-limit vecs-length))
@ -145,13 +132,18 @@ all of the calls to PROC have finished."
(get-operation (get-operation
(vector-ref result-vec index)) (vector-ref result-vec index))
(lambda (result) (lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec (vector-set! result-vec
index index
result) (first result))
(values next-to-process-index (values next-to-process-index
(lset-difference = (lset-difference =
channel-indexes channel-indexes
(list index)))))) (list index))))))))
channel-indexes))))) channel-indexes)))))
(loop new-index (loop new-index
new-channel-indexes)) new-channel-indexes))
@ -174,14 +166,9 @@ all of the calls to PROC have finished."
channel-indexes))))))) channel-indexes)))))))
(define (fibers-map proc . lists) (define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists)) (apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists) (define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map (apply fibers-batch-map
(lambda args (lambda args
(apply proc args) (apply proc args)
@ -192,13 +179,10 @@ parallel."
*unspecified*) *unspecified*)
(define (fibers-for-each proc . lists) (define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists)) (apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel (define-syntax fibers-parallel
(lambda (x) (lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x () (syntax-case x ()
((_ e0 ...) ((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -209,16 +193,12 @@ parallel."
(apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values (call-with-values
(lambda () (fibers-parallel e ...)) (lambda () (fibers-parallel e ...))
(lambda (v ...) (lambda (v ...)
b0 b1 ...))) b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report) (define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results (let loop ((channels-to-results
(apply map (apply map
(lambda args (lambda args
@ -237,10 +217,10 @@ invocation of PROC finishes. REPORT is passed the results for each
(if (null? active-channels) (if (null? active-channels)
(map (map
(match-lambda (match-lambda
((#f . ('exception . exn)) ((#f . ('exception exn))
(raise-exception exn)) (raise-exception exn))
((#f . ('result . vals)) ((#f . ('result val))
(car vals))) val))
channels-to-results) channels-to-results)
(loop (loop
(perform-operation (perform-operation
@ -257,7 +237,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(map (match-lambda (map (match-lambda
((c . r) ((c . r)
(if (eq? channel c) (if (eq? channel c)
(cons #f result) (cons #f
(match result
(('exception exn)
result)
(_
(list 'result result))))
(cons c r)))) (cons c r))))
channels-to-results))) channels-to-results)))
#f)))) #f))))
@ -278,7 +263,7 @@ invocation of PROC finishes. REPORT is passed the results for each
reply-channel reply-channel
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
(cons 'exception exn)) (list 'exception exn))
(lambda () (lambda ()
(with-exception-handler (with-exception-handler
(lambda (exn) (lambda (exn)
@ -309,7 +294,7 @@ invocation of PROC finishes. REPORT is passed the results for each
(put-message input-channel (cons reply-channel args)) (put-message input-channel (cons reply-channel args))
(match (get-message reply-channel) (match (get-message reply-channel)
(('result . vals) (apply values vals)) (('result . vals) (apply values vals))
(('exception . exn) (('exception exn)
(raise-exception exn)))))) (raise-exception exn))))))
(define-record-type <parallelism-limiter> (define-record-type <parallelism-limiter>