Compare commits
2 commits
6f6d57b189
...
0fa6737a39
Author | SHA1 | Date | |
---|---|---|---|
0fa6737a39 | |||
4140ef0bd6 |
1 changed files with 46 additions and 31 deletions
|
@ -22,6 +22,7 @@
|
||||||
#:use-module (srfi srfi-71)
|
#:use-module (srfi srfi-71)
|
||||||
#:use-module (srfi srfi-9)
|
#:use-module (srfi srfi-9)
|
||||||
#:use-module (srfi srfi-9 gnu)
|
#:use-module (srfi srfi-9 gnu)
|
||||||
|
#:use-module (srfi srfi-43)
|
||||||
#:use-module (ice-9 match)
|
#:use-module (ice-9 match)
|
||||||
#:use-module (ice-9 control)
|
#:use-module (ice-9 control)
|
||||||
#:use-module (ice-9 exceptions)
|
#:use-module (ice-9 exceptions)
|
||||||
|
@ -57,7 +58,7 @@
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(put-message
|
(put-message
|
||||||
reply
|
reply
|
||||||
(list 'exception exn)))
|
(cons 'exception exn)))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -78,7 +79,7 @@
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(start-stack #t (thunk)))
|
(start-stack #t (thunk)))
|
||||||
(lambda vals
|
(lambda vals
|
||||||
(put-message reply vals))))))
|
(put-message reply (cons 'result vals)))))))
|
||||||
#:unwind? #t))
|
#:unwind? #t))
|
||||||
#:parallel? #t)
|
#:parallel? #t)
|
||||||
reply))
|
reply))
|
||||||
|
@ -88,13 +89,16 @@
|
||||||
reply-channels)))
|
reply-channels)))
|
||||||
(map
|
(map
|
||||||
(match-lambda
|
(match-lambda
|
||||||
(('exception exn)
|
(('exception . exn)
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
(result
|
(('result . vals)
|
||||||
(apply values result)))
|
(apply values vals)))
|
||||||
responses)))
|
responses)))
|
||||||
|
|
||||||
(define (fibers-batch-map proc parallelism-limit . lists)
|
(define (fibers-batch-map proc parallelism-limit . lists)
|
||||||
|
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
|
||||||
|
the invocations of PROC raise an exception, this will be raised once
|
||||||
|
all of the calls to PROC have finished."
|
||||||
(define vecs (map (lambda (list-or-vec)
|
(define vecs (map (lambda (list-or-vec)
|
||||||
(if (vector? list-or-vec)
|
(if (vector? list-or-vec)
|
||||||
list-or-vec
|
list-or-vec
|
||||||
|
@ -114,9 +118,18 @@
|
||||||
(channel-indexes '()))
|
(channel-indexes '()))
|
||||||
(if (and (eq? #f next-to-process-index)
|
(if (and (eq? #f next-to-process-index)
|
||||||
(null? channel-indexes))
|
(null? channel-indexes))
|
||||||
(if (vector? (first lists))
|
(let ((processed-result-vec
|
||||||
result-vec
|
(vector-map
|
||||||
(vector->list result-vec))
|
(lambda (_ result-or-exn)
|
||||||
|
(match result-or-exn
|
||||||
|
(('exception . exn)
|
||||||
|
(raise-exception exn))
|
||||||
|
(('result . vals)
|
||||||
|
(car vals))))
|
||||||
|
result-vec)))
|
||||||
|
(if (vector? (first lists))
|
||||||
|
processed-result-vec
|
||||||
|
(vector->list processed-result-vec)))
|
||||||
|
|
||||||
(if (or (= (length channel-indexes)
|
(if (or (= (length channel-indexes)
|
||||||
(min parallelism-limit vecs-length))
|
(min parallelism-limit vecs-length))
|
||||||
|
@ -132,18 +145,13 @@
|
||||||
(get-operation
|
(get-operation
|
||||||
(vector-ref result-vec index))
|
(vector-ref result-vec index))
|
||||||
(lambda (result)
|
(lambda (result)
|
||||||
(match result
|
(vector-set! result-vec
|
||||||
(('exception exn)
|
index
|
||||||
(raise-exception exn))
|
result)
|
||||||
(_
|
(values next-to-process-index
|
||||||
(vector-set! result-vec
|
(lset-difference =
|
||||||
index
|
channel-indexes
|
||||||
(first result))
|
(list index))))))
|
||||||
|
|
||||||
(values next-to-process-index
|
|
||||||
(lset-difference =
|
|
||||||
channel-indexes
|
|
||||||
(list index))))))))
|
|
||||||
channel-indexes)))))
|
channel-indexes)))))
|
||||||
(loop new-index
|
(loop new-index
|
||||||
new-channel-indexes))
|
new-channel-indexes))
|
||||||
|
@ -166,9 +174,14 @@
|
||||||
channel-indexes)))))))
|
channel-indexes)))))))
|
||||||
|
|
||||||
(define (fibers-map proc . lists)
|
(define (fibers-map proc . lists)
|
||||||
|
"Map PROC over LISTS in parallel, running up to 20 fibers in
|
||||||
|
PARALLEL. If any of the invocations of PROC raise an exception, this
|
||||||
|
will be raised once all of the calls to PROC have finished."
|
||||||
(apply fibers-batch-map proc 20 lists))
|
(apply fibers-batch-map proc 20 lists))
|
||||||
|
|
||||||
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
||||||
|
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
|
||||||
|
parallel."
|
||||||
(apply fibers-batch-map
|
(apply fibers-batch-map
|
||||||
(lambda args
|
(lambda args
|
||||||
(apply proc args)
|
(apply proc args)
|
||||||
|
@ -179,10 +192,13 @@
|
||||||
*unspecified*)
|
*unspecified*)
|
||||||
|
|
||||||
(define (fibers-for-each proc . lists)
|
(define (fibers-for-each proc . lists)
|
||||||
|
"Call PROC on LISTS, running up to 20 fibers in parallel."
|
||||||
(apply fibers-batch-for-each proc 20 lists))
|
(apply fibers-batch-for-each proc 20 lists))
|
||||||
|
|
||||||
(define-syntax fibers-parallel
|
(define-syntax fibers-parallel
|
||||||
(lambda (x)
|
(lambda (x)
|
||||||
|
"Run each expression in parallel. If any expression raises an
|
||||||
|
exception, this will be raised after all exceptions have finished."
|
||||||
(syntax-case x ()
|
(syntax-case x ()
|
||||||
((_ e0 ...)
|
((_ e0 ...)
|
||||||
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
||||||
|
@ -193,12 +209,16 @@
|
||||||
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
||||||
|
|
||||||
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
||||||
|
"Let, but run each binding in a fiber in parallel."
|
||||||
(call-with-values
|
(call-with-values
|
||||||
(lambda () (fibers-parallel e ...))
|
(lambda () (fibers-parallel e ...))
|
||||||
(lambda (v ...)
|
(lambda (v ...)
|
||||||
b0 b1 ...)))
|
b0 b1 ...)))
|
||||||
|
|
||||||
(define* (fibers-map-with-progress proc lists #:key report)
|
(define* (fibers-map-with-progress proc lists #:key report)
|
||||||
|
"Map PROC over LISTS, calling #:REPORT if specified after each
|
||||||
|
invocation of PROC finishes. REPORT is passed the results for each
|
||||||
|
element of LISTS, or #f if no result has been received yet."
|
||||||
(let loop ((channels-to-results
|
(let loop ((channels-to-results
|
||||||
(apply map
|
(apply map
|
||||||
(lambda args
|
(lambda args
|
||||||
|
@ -217,10 +237,10 @@
|
||||||
(if (null? active-channels)
|
(if (null? active-channels)
|
||||||
(map
|
(map
|
||||||
(match-lambda
|
(match-lambda
|
||||||
((#f . ('exception exn))
|
((#f . ('exception . exn))
|
||||||
(raise-exception exn))
|
(raise-exception exn))
|
||||||
((#f . ('result val))
|
((#f . ('result . vals))
|
||||||
val))
|
(car vals)))
|
||||||
channels-to-results)
|
channels-to-results)
|
||||||
(loop
|
(loop
|
||||||
(perform-operation
|
(perform-operation
|
||||||
|
@ -237,12 +257,7 @@
|
||||||
(map (match-lambda
|
(map (match-lambda
|
||||||
((c . r)
|
((c . r)
|
||||||
(if (eq? channel c)
|
(if (eq? channel c)
|
||||||
(cons #f
|
(cons #f result)
|
||||||
(match result
|
|
||||||
(('exception exn)
|
|
||||||
result)
|
|
||||||
(_
|
|
||||||
(list 'result result))))
|
|
||||||
(cons c r))))
|
(cons c r))))
|
||||||
channels-to-results)))
|
channels-to-results)))
|
||||||
#f))))
|
#f))))
|
||||||
|
@ -263,7 +278,7 @@
|
||||||
reply-channel
|
reply-channel
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
(list 'exception exn))
|
(cons 'exception exn))
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(with-exception-handler
|
(with-exception-handler
|
||||||
(lambda (exn)
|
(lambda (exn)
|
||||||
|
@ -294,7 +309,7 @@
|
||||||
(put-message input-channel (cons reply-channel args))
|
(put-message input-channel (cons reply-channel args))
|
||||||
(match (get-message reply-channel)
|
(match (get-message reply-channel)
|
||||||
(('result . vals) (apply values vals))
|
(('result . vals) (apply values vals))
|
||||||
(('exception exn)
|
(('exception . exn)
|
||||||
(raise-exception exn))))))
|
(raise-exception exn))))))
|
||||||
|
|
||||||
(define-record-type <parallelism-limiter>
|
(define-record-type <parallelism-limiter>
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue