Compare commits

..

No commits in common. "0fa6737a39f866bdbffc11fd16348c5411c11a7c" and "6f6d57b189a7073718407df263bbe3c1245f2e51" have entirely different histories.

View file

@ -22,7 +22,6 @@
#:use-module (srfi srfi-71)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-43)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
#:use-module (ice-9 exceptions)
@ -58,7 +57,7 @@
(lambda (exn)
(put-message
reply
(cons 'exception exn)))
(list 'exception exn)))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -79,7 +78,7 @@
(lambda ()
(start-stack #t (thunk)))
(lambda vals
(put-message reply (cons 'result vals)))))))
(put-message reply vals))))))
#:unwind? #t))
#:parallel? #t)
reply))
@ -89,16 +88,13 @@
reply-channels)))
(map
(match-lambda
(('exception . exn)
(('exception exn)
(raise-exception exn))
(('result . vals)
(apply values vals)))
(result
(apply values result)))
responses)))
(define (fibers-batch-map proc parallelism-limit . lists)
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
the invocations of PROC raise an exception, this will be raised once
all of the calls to PROC have finished."
(define vecs (map (lambda (list-or-vec)
(if (vector? list-or-vec)
list-or-vec
@ -118,18 +114,9 @@ all of the calls to PROC have finished."
(channel-indexes '()))
(if (and (eq? #f next-to-process-index)
(null? channel-indexes))
(let ((processed-result-vec
(vector-map
(lambda (_ result-or-exn)
(match result-or-exn
(('exception . exn)
(raise-exception exn))
(('result . vals)
(car vals))))
result-vec)))
(if (vector? (first lists))
processed-result-vec
(vector->list processed-result-vec)))
result-vec
(vector->list result-vec))
(if (or (= (length channel-indexes)
(min parallelism-limit vecs-length))
@ -145,13 +132,18 @@ all of the calls to PROC have finished."
(get-operation
(vector-ref result-vec index))
(lambda (result)
(match result
(('exception exn)
(raise-exception exn))
(_
(vector-set! result-vec
index
result)
(first result))
(values next-to-process-index
(lset-difference =
channel-indexes
(list index))))))
(list index))))))))
channel-indexes)))))
(loop new-index
new-channel-indexes))
@ -174,14 +166,9 @@ all of the calls to PROC have finished."
channel-indexes)))))))
(define (fibers-map proc . lists)
"Map PROC over LISTS in parallel, running up to 20 fibers in
PARALLEL. If any of the invocations of PROC raise an exception, this
will be raised once all of the calls to PROC have finished."
(apply fibers-batch-map proc 20 lists))
(define (fibers-batch-for-each proc parallelism-limit . lists)
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
parallel."
(apply fibers-batch-map
(lambda args
(apply proc args)
@ -192,13 +179,10 @@ parallel."
*unspecified*)
(define (fibers-for-each proc . lists)
"Call PROC on LISTS, running up to 20 fibers in parallel."
(apply fibers-batch-for-each proc 20 lists))
(define-syntax fibers-parallel
(lambda (x)
"Run each expression in parallel. If any expression raises an
exception, this will be raised after all exceptions have finished."
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
@ -209,16 +193,12 @@ parallel."
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
"Let, but run each binding in a fiber in parallel."
(call-with-values
(lambda () (fibers-parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define* (fibers-map-with-progress proc lists #:key report)
"Map PROC over LISTS, calling #:REPORT if specified after each
invocation of PROC finishes. REPORT is passed the results for each
element of LISTS, or #f if no result has been received yet."
(let loop ((channels-to-results
(apply map
(lambda args
@ -237,10 +217,10 @@ invocation of PROC finishes. REPORT is passed the results for each
(if (null? active-channels)
(map
(match-lambda
((#f . ('exception . exn))
((#f . ('exception exn))
(raise-exception exn))
((#f . ('result . vals))
(car vals)))
((#f . ('result val))
val))
channels-to-results)
(loop
(perform-operation
@ -257,7 +237,12 @@ invocation of PROC finishes. REPORT is passed the results for each
(map (match-lambda
((c . r)
(if (eq? channel c)
(cons #f result)
(cons #f
(match result
(('exception exn)
result)
(_
(list 'result result))))
(cons c r))))
channels-to-results)))
#f))))
@ -278,7 +263,7 @@ invocation of PROC finishes. REPORT is passed the results for each
reply-channel
(with-exception-handler
(lambda (exn)
(cons 'exception exn))
(list 'exception exn))
(lambda ()
(with-exception-handler
(lambda (exn)
@ -309,7 +294,7 @@ invocation of PROC finishes. REPORT is passed the results for each
(put-message input-channel (cons reply-channel args))
(match (get-message reply-channel)
(('result . vals) (apply values vals))
(('exception . exn)
(('exception exn)
(raise-exception exn))))))
(define-record-type <parallelism-limiter>