Compare commits
2 commits
6f6d57b189
...
0fa6737a39
Author | SHA1 | Date | |
---|---|---|---|
0fa6737a39 | |||
4140ef0bd6 |
1 changed files with 46 additions and 31 deletions
|
@ -22,6 +22,7 @@
|
|||
#:use-module (srfi srfi-71)
|
||||
#:use-module (srfi srfi-9)
|
||||
#:use-module (srfi srfi-9 gnu)
|
||||
#:use-module (srfi srfi-43)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 control)
|
||||
#:use-module (ice-9 exceptions)
|
||||
|
@ -57,7 +58,7 @@
|
|||
(lambda (exn)
|
||||
(put-message
|
||||
reply
|
||||
(list 'exception exn)))
|
||||
(cons 'exception exn)))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
|
@ -78,7 +79,7 @@
|
|||
(lambda ()
|
||||
(start-stack #t (thunk)))
|
||||
(lambda vals
|
||||
(put-message reply vals))))))
|
||||
(put-message reply (cons 'result vals)))))))
|
||||
#:unwind? #t))
|
||||
#:parallel? #t)
|
||||
reply))
|
||||
|
@ -88,13 +89,16 @@
|
|||
reply-channels)))
|
||||
(map
|
||||
(match-lambda
|
||||
(('exception exn)
|
||||
(('exception . exn)
|
||||
(raise-exception exn))
|
||||
(result
|
||||
(apply values result)))
|
||||
(('result . vals)
|
||||
(apply values vals)))
|
||||
responses)))
|
||||
|
||||
(define (fibers-batch-map proc parallelism-limit . lists)
|
||||
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
|
||||
the invocations of PROC raise an exception, this will be raised once
|
||||
all of the calls to PROC have finished."
|
||||
(define vecs (map (lambda (list-or-vec)
|
||||
(if (vector? list-or-vec)
|
||||
list-or-vec
|
||||
|
@ -114,9 +118,18 @@
|
|||
(channel-indexes '()))
|
||||
(if (and (eq? #f next-to-process-index)
|
||||
(null? channel-indexes))
|
||||
(let ((processed-result-vec
|
||||
(vector-map
|
||||
(lambda (_ result-or-exn)
|
||||
(match result-or-exn
|
||||
(('exception . exn)
|
||||
(raise-exception exn))
|
||||
(('result . vals)
|
||||
(car vals))))
|
||||
result-vec)))
|
||||
(if (vector? (first lists))
|
||||
result-vec
|
||||
(vector->list result-vec))
|
||||
processed-result-vec
|
||||
(vector->list processed-result-vec)))
|
||||
|
||||
(if (or (= (length channel-indexes)
|
||||
(min parallelism-limit vecs-length))
|
||||
|
@ -132,18 +145,13 @@
|
|||
(get-operation
|
||||
(vector-ref result-vec index))
|
||||
(lambda (result)
|
||||
(match result
|
||||
(('exception exn)
|
||||
(raise-exception exn))
|
||||
(_
|
||||
(vector-set! result-vec
|
||||
index
|
||||
(first result))
|
||||
|
||||
result)
|
||||
(values next-to-process-index
|
||||
(lset-difference =
|
||||
channel-indexes
|
||||
(list index))))))))
|
||||
(list index))))))
|
||||
channel-indexes)))))
|
||||
(loop new-index
|
||||
new-channel-indexes))
|
||||
|
@ -166,9 +174,14 @@
|
|||
channel-indexes)))))))
|
||||
|
||||
(define (fibers-map proc . lists)
|
||||
"Map PROC over LISTS in parallel, running up to 20 fibers in
|
||||
PARALLEL. If any of the invocations of PROC raise an exception, this
|
||||
will be raised once all of the calls to PROC have finished."
|
||||
(apply fibers-batch-map proc 20 lists))
|
||||
|
||||
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
||||
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
|
||||
parallel."
|
||||
(apply fibers-batch-map
|
||||
(lambda args
|
||||
(apply proc args)
|
||||
|
@ -179,10 +192,13 @@
|
|||
*unspecified*)
|
||||
|
||||
(define (fibers-for-each proc . lists)
|
||||
"Call PROC on LISTS, running up to 20 fibers in parallel."
|
||||
(apply fibers-batch-for-each proc 20 lists))
|
||||
|
||||
(define-syntax fibers-parallel
|
||||
(lambda (x)
|
||||
"Run each expression in parallel. If any expression raises an
|
||||
exception, this will be raised after all exceptions have finished."
|
||||
(syntax-case x ()
|
||||
((_ e0 ...)
|
||||
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
||||
|
@ -193,12 +209,16 @@
|
|||
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
||||
|
||||
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
||||
"Let, but run each binding in a fiber in parallel."
|
||||
(call-with-values
|
||||
(lambda () (fibers-parallel e ...))
|
||||
(lambda (v ...)
|
||||
b0 b1 ...)))
|
||||
|
||||
(define* (fibers-map-with-progress proc lists #:key report)
|
||||
"Map PROC over LISTS, calling #:REPORT if specified after each
|
||||
invocation of PROC finishes. REPORT is passed the results for each
|
||||
element of LISTS, or #f if no result has been received yet."
|
||||
(let loop ((channels-to-results
|
||||
(apply map
|
||||
(lambda args
|
||||
|
@ -217,10 +237,10 @@
|
|||
(if (null? active-channels)
|
||||
(map
|
||||
(match-lambda
|
||||
((#f . ('exception exn))
|
||||
((#f . ('exception . exn))
|
||||
(raise-exception exn))
|
||||
((#f . ('result val))
|
||||
val))
|
||||
((#f . ('result . vals))
|
||||
(car vals)))
|
||||
channels-to-results)
|
||||
(loop
|
||||
(perform-operation
|
||||
|
@ -237,12 +257,7 @@
|
|||
(map (match-lambda
|
||||
((c . r)
|
||||
(if (eq? channel c)
|
||||
(cons #f
|
||||
(match result
|
||||
(('exception exn)
|
||||
result)
|
||||
(_
|
||||
(list 'result result))))
|
||||
(cons #f result)
|
||||
(cons c r))))
|
||||
channels-to-results)))
|
||||
#f))))
|
||||
|
@ -263,7 +278,7 @@
|
|||
reply-channel
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(list 'exception exn))
|
||||
(cons 'exception exn))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
|
@ -294,7 +309,7 @@
|
|||
(put-message input-channel (cons reply-channel args))
|
||||
(match (get-message reply-channel)
|
||||
(('result . vals) (apply values vals))
|
||||
(('exception exn)
|
||||
(('exception . exn)
|
||||
(raise-exception exn))))))
|
||||
|
||||
(define-record-type <parallelism-limiter>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue