;;; Guile Knots ;;; Copyright © 2020 Christopher Baines ;;; ;;; This file is part of Guile Knots. ;;; ;;; The Guile Knots is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU General Public License as ;;; published by the Free Software Foundation; either version 3 of the ;;; License, or (at your option) any later version. ;;; ;;; The Guile Knots is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; General Public License for more details. ;;; ;;; You should have received a copy of the GNU General Public License ;;; along with the guix-data-service. If not, see ;;; . (define-module (knots parallelism) #:use-module (srfi srfi-1) #:use-module (srfi srfi-71) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-43) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 exceptions) #:use-module (fibers) #:use-module (fibers channels) #:use-module (fibers operations) #:use-module (knots) #:use-module (knots resource-pool) #:export (fibers-batch-map fibers-map fibers-map-with-progress fibers-batch-for-each fibers-for-each fibers-parallel fibers-let fiberize make-parallelism-limiter parallelism-limiter? destroy-parallelism-limiter call-with-parallelism-limiter with-parallelism-limiter)) (define (defer-to-parallel-fiber thunk) (let ((reply (make-channel))) (spawn-fiber (lambda () (with-exception-handler (lambda (exn) (put-message reply (cons 'exception exn))) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (call-with-values (lambda () (start-stack #t (thunk))) (lambda vals (put-message reply (cons 'result vals))))))) #:unwind? #t)) #:parallel? #t) reply)) (define (fetch-result-of-defered-thunks . reply-channels) (let ((responses (map get-message reply-channels))) (map (match-lambda (('exception . exn) (raise-exception exn)) (('result . vals) (apply values vals))) responses))) (define (fibers-batch-map proc parallelism-limit . lists) "Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of the invocations of PROC raise an exception, this will be raised once all of the calls to PROC have finished." (define vecs (map (lambda (list-or-vec) (if (vector? list-or-vec) list-or-vec (list->vector list-or-vec))) lists)) (define vecs-length (vector-length (first vecs))) (define result-vec (make-vector vecs-length)) (let loop ((next-to-process-index (if (= 0 vecs-length) #f 0)) (channel-indexes '())) (if (and (eq? #f next-to-process-index) (null? channel-indexes)) (let ((processed-result-vec (vector-map (lambda (_ result-or-exn) (match result-or-exn (('exception . exn) (raise-exception exn)) (('result . vals) (car vals)))) result-vec))) (if (vector? (first lists)) processed-result-vec (vector->list processed-result-vec))) (if (or (= (length channel-indexes) (min parallelism-limit vecs-length)) (eq? #f next-to-process-index)) (let ((new-index new-channel-indexes (perform-operation (apply choice-operation (map (lambda (index) (wrap-operation (get-operation (vector-ref result-vec index)) (lambda (result) (vector-set! result-vec index result) (values next-to-process-index (lset-difference = channel-indexes (list index)))))) channel-indexes))))) (loop new-index new-channel-indexes)) (loop (if (= (+ 1 next-to-process-index) vecs-length) #f (+ 1 next-to-process-index)) (begin (vector-set! result-vec next-to-process-index (defer-to-parallel-fiber (lambda () (apply proc (map (lambda (vec) (vector-ref vec next-to-process-index)) vecs))))) (cons next-to-process-index channel-indexes))))))) (define (fibers-map proc . lists) "Map PROC over LISTS in parallel, running up to 20 fibers in PARALLEL. If any of the invocations of PROC raise an exception, this will be raised once all of the calls to PROC have finished." (apply fibers-batch-map proc 20 lists)) (define (fibers-batch-for-each proc parallelism-limit . lists) "Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in parallel." (apply fibers-batch-map (lambda args (apply proc args) *unspecified*) parallelism-limit lists) *unspecified*) (define (fibers-for-each proc . lists) "Call PROC on LISTS, running up to 20 fibers in parallel." (apply fibers-batch-for-each proc 20 lists)) (define-syntax fibers-parallel (lambda (x) "Run each expression in parallel. If any expression raises an exception, this will be raised after all exceptions have finished." (syntax-case x () ((_ e0 ...) (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) #'(let ((tmp0 (defer-to-parallel-fiber (lambda () e0))) ...) (apply values (fetch-result-of-defered-thunks tmp0 ...)))))))) (define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...) "Let, but run each binding in a fiber in parallel." (call-with-values (lambda () (fibers-parallel e ...)) (lambda (v ...) b0 b1 ...))) (define* (fibers-map-with-progress proc lists #:key report) "Map PROC over LISTS, calling #:REPORT if specified after each invocation of PROC finishes. REPORT is passed the results for each element of LISTS, or #f if no result has been received yet." (let loop ((channels-to-results (apply map (lambda args (cons (defer-to-parallel-fiber (lambda () (apply proc args))) #f)) lists))) (let ((active-channels (filter-map car channels-to-results))) (when report (report (apply map list (map cdr channels-to-results) lists))) (if (null? active-channels) (map (match-lambda ((#f . ('exception . exn)) (raise-exception exn)) ((#f . ('result . vals)) (car vals))) channels-to-results) (loop (perform-operation (apply choice-operation (filter-map (lambda (p) (match p ((channel . _) (if channel (wrap-operation (get-operation channel) (lambda (result) (map (match-lambda ((c . r) (if (eq? channel c) (cons #f result) (cons c r)))) channels-to-results))) #f)))) channels-to-results)))))))) (define* (fiberize proc #:key (parallelism 1) (input-channel (make-channel)) (process-channel input-channel)) "Convert PROC into a procedure backed by @code{#:parallelism} (default: 1) background fibers. Returns a wrapper that sends its arguments to one of the fibers and blocks until the result is returned. @code{#:input-channel} is the channel that callers write requests to; defaults to a fresh channel. @code{#:process-channel} is the channel the fibers read from; defaults to @code{#:input-channel}. Setting them differently allows external parties to bypass the wrapper and write directly to @code{process-channel}." (for-each (lambda _ (spawn-fiber (lambda () (while #t (let ((reply-channel args (car+cdr (get-message process-channel)))) (put-message reply-channel (with-exception-handler (lambda (exn) (cons 'exception exn)) (lambda () (with-exception-handler (lambda (exn) (let ((stack (match (fluid-ref %stacks) ((stack-tag . prompt-tag) (make-stack #t 0 prompt-tag 0 (and prompt-tag 1))) (_ (make-stack #t))))) (raise-exception (make-exception exn (make-knots-exception stack))))) (lambda () (call-with-values (lambda () (start-stack #t (apply proc args))) (lambda vals (cons 'result vals)))))) #:unwind? #t))))) #:parallel? #t)) (iota parallelism)) (lambda args (let ((reply-channel (make-channel))) (put-message input-channel (cons reply-channel args)) (match (get-message reply-channel) (('result . vals) (apply values vals)) (('exception . exn) (raise-exception exn)))))) (define-record-type (make-parallelism-limiter-record resource-pool) parallelism-limiter? (resource-pool parallelism-limiter-resource-pool)) (set-procedure-property! (macro-transformer (module-ref (current-module) 'parallelism-limiter?)) 'documentation "Return @code{#t} if OBJ is a @code{}.") (define* (make-parallelism-limiter limit #:key (name "unnamed")) "Return a parallelism limiter that allows at most LIMIT concurrent fibers to execute within @code{with-parallelism-limiter} at the same time. Further fibers block until a slot becomes free. @code{#:name} is a string used in log messages. Defaults to @code{\"unnamed\"}." (make-parallelism-limiter-record (make-fixed-size-resource-pool (iota limit) #:name name))) (define (destroy-parallelism-limiter parallelism-limiter) "Destroy PARALLELISM-LIMITER, releasing its underlying resource pool." (destroy-resource-pool (parallelism-limiter-resource-pool parallelism-limiter))) (define* (call-with-parallelism-limiter parallelism-limiter thunk) "Acquire a slot from PARALLELISM-LIMITER, call THUNK, release the slot, and return the values from THUNK. Blocks if no slot is currently available." (call-with-resource-from-pool (parallelism-limiter-resource-pool parallelism-limiter) (lambda _ (thunk)))) (define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...) "Evaluate EXP ... while holding a slot from PARALLELISM-LIMITER. Syntactic sugar around @code{call-with-parallelism-limiter}." (call-with-parallelism-limiter parallelism-limiter (lambda () exp ...)))