Allow passing custom channels to fiberize
This allows customising the behevaiour, for example by using a queue.
This commit is contained in:
parent
ca3d5a1781
commit
99245034ea
1 changed files with 43 additions and 41 deletions
|
@ -238,45 +238,47 @@
|
|||
#f))))
|
||||
channels-to-results))))))))
|
||||
|
||||
(define* (fiberize proc #:key (parallelism 1))
|
||||
(let ((channel (make-channel)))
|
||||
(for-each
|
||||
(lambda _
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(while #t
|
||||
(let ((reply-channel args (car+cdr
|
||||
(get-message channel))))
|
||||
(put-message
|
||||
reply-channel
|
||||
(call-with-escape-continuation
|
||||
(lambda (return)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
(let ((stack (make-stack #t
|
||||
0 prompt-tag
|
||||
0 (and prompt-tag 1))))
|
||||
(return (list 'exception exn stack))))))
|
||||
(lambda ()
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(start-stack #t (apply proc args)))
|
||||
(lambda vals
|
||||
(cons 'result vals)))))))))))
|
||||
#:parallel? #t))
|
||||
(iota parallelism))
|
||||
(define* (fiberize proc
|
||||
#:key (parallelism 1)
|
||||
(input-channel (make-channel))
|
||||
(process-channel input-channel))
|
||||
(for-each
|
||||
(lambda _
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(while #t
|
||||
(let ((reply-channel args (car+cdr
|
||||
(get-message process-channel))))
|
||||
(put-message
|
||||
reply-channel
|
||||
(call-with-escape-continuation
|
||||
(lambda (return)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
(let ((stack (make-stack #t
|
||||
0 prompt-tag
|
||||
0 (and prompt-tag 1))))
|
||||
(return (list 'exception exn stack))))))
|
||||
(lambda ()
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(start-stack #t (apply proc args)))
|
||||
(lambda vals
|
||||
(cons 'result vals)))))))))))
|
||||
#:parallel? #t))
|
||||
(iota parallelism))
|
||||
|
||||
(lambda args
|
||||
(let ((reply-channel (make-channel)))
|
||||
(put-message channel (cons reply-channel args))
|
||||
(match (get-message reply-channel)
|
||||
(('result . vals) (apply values vals))
|
||||
(('exception exn stack)
|
||||
(let ((knots-exn
|
||||
(make-knots-exception stack)))
|
||||
(raise-exception
|
||||
(make-exception
|
||||
knots-exn
|
||||
exn)))))))))
|
||||
(lambda args
|
||||
(let ((reply-channel (make-channel)))
|
||||
(put-message input-channel (cons reply-channel args))
|
||||
(match (get-message reply-channel)
|
||||
(('result . vals) (apply values vals))
|
||||
(('exception exn stack)
|
||||
(let ((knots-exn
|
||||
(make-knots-exception stack)))
|
||||
(raise-exception
|
||||
(make-exception
|
||||
knots-exn
|
||||
exn))))))))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue