;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map fibers-map-with-progress fibers-batch-for-each fibers-for-each fibers-parallel fibers-let fiberize make-parallelism-limiter parallelism-limiter? destroy-parallelism-limiter call-with-parallelism-limiter with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) (put-message reply (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (call-with-values (lambda () (start-stack #t (thunk))) (lambda vals (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('exception . exn) (raise-exception exn)) (('result . vals) (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of the invocations of PROC raise an exception, this will be raised once all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec (list->vector list-or-vec))) lists)) (define vecs-length (vector-length (first vecs))) (define result-vec (make-vector vecs-length)) (let loop ((next-to-process-index (if (= 0 vecs-length) #f 0)) (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) (let ((processed-result-vec (vector-map (lambda (_ result-or-exn) (match result-or-exn (('exception . exn) (raise-exception exn)) (('result . vals) (car vals)))) result-vec))) (if (vector? (first lists)) processed-result-vec (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) (eq? #f next-to-process-index)) (let ((new-index new-channel-indexes (perform-operation (apply choice-operation (map (lambda (index) (wrap-operation (get-operation (vector-ref result-vec index)) (lambda (result) (vector-set! result-vec index result) (values next-to-process-index (lset-difference = channel-indexes (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) (loop (if (= (+ 1 next-to-process-index) vecs-length) #f (+ 1 next-to-process-index)) (begin (vector-set! result-vec next-to-process-index (defer-to-parallel-fiber (lambda () (apply proc (map (lambda (vec) (vector-ref vec next-to-process-index)) vecs))))) (cons next-to-process-index channel-indexes))))))) (define (fibers-map proc . lists) "Map PROC over LISTS in parallel, running up to 20 fibers in PARALLEL. If any of the invocations of PROC raise an exception, this will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in parallel." (apply fibers-batch-map (lambda args (apply proc args) *unspecified*) parallelism-limit lists) *unspecified*) (define (fibers-for-each proc . lists) "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) "Run each expression in parallel. If any expression raises an exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-parallel-fiber (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) "Map PROC over LISTS, calling #:REPORT if specified after each invocation of PROC finishes. REPORT is passed the results for each element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args (cons (defer-to-parallel-fiber (lambda () (apply proc args))) #f)) lists))) (let ((active-channels (filter-map car channels-to-results))) (when report (report (apply map list (map cdr channels-to-results) lists))) (if (null? active-channels) (map (match-lambda ((#f . ('exception . exn)) (raise-exception exn)) ((#f . ('result . vals)) (car vals))) channels-to-results) (loop (perform-operation (apply choice-operation (filter-map (lambda (p) (match p ((channel . _) (if channel (wrap-operation (get-operation channel) (lambda (result) (map (match-lambda ((c . r) (if (eq? channel c) (cons #f result) (cons c r)))) channels-to-results))) #f)))) channels-to-results)))))))) (define* (fiberize proc #:key (parallelism 1) (input-channel (make-channel)) (process-channel input-channel)) (for-each (lambda _ (spawn-fiber (lambda () (while #t (let ((reply-channel args (car+cdr (get-message process-channel)))) (put-message reply-channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (call-with-values (lambda () (start-stack #t (apply proc args))) (lambda vals (cons 'result vals)))))) #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) (lambda args (let ((reply-channel (make-channel))) (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) (('exception . exn) (raise-exception exn)))))) (define-record-type (make-parallelism-limiter-record resource-pool) parallelism-limiter? (resource-pool parallelism-limiter-resource-pool)) (define* (make-parallelism-limiter limit #:key (name "unnamed")) (make-parallelism-limiter-record (make-fixed-size-resource-pool (iota limit) #:name name))) (define (destroy-parallelism-limiter parallelism-limiter) (destroy-resource-pool (parallelism-limiter-resource-pool parallelism-limiter))) (define* (call-with-parallelism-limiter parallelism-limiter thunk) (call-with-resource-from-pool (parallelism-limiter-resource-pool parallelism-limiter) (lambda _ (thunk)))) (define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) (call-with-parallelism-limiter parallelism-limiter (lambda () exp ...)))