diff --git a/knots/parallelism.scm b/knots/parallelism.scm index 8f3ec39..c829254 100644 --- a/knots/parallelism.scm +++ b/knots/parallelism.scm @@ -33,7 +33,9 @@ fibers-for-each fibers-parallel - fibers-let)) + fibers-let + + fiberize)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) @@ -219,3 +221,40 @@ channels-to-results))) #f)))) channels-to-results)))))))) + +(define* (fiberize proc #:key (parallelism 1) + (show-backtrace? (const #t))) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (while #t + (let ((reply-channel args (car+cdr + (get-message channel)))) + (put-message + reply-channel + (with-exception-handler + (lambda (exn) + (cons 'exception exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons 'result vals)))) + (lambda args + (when (apply show-backtrace? args) + (backtrace))))) + #:unwind? #t))))) + #:parallel? #t)) + (iota parallelism)) + + (lambda args + (let ((reply-channel (make-channel))) + (put-message channel (cons reply-channel args)) + (match (get-message reply-channel) + (('result . vals) (apply values vals)) + (('exception . exn) (raise-exception exn)))))))