Compare commits
5 commits
trunk
...
actions-te
| Author | SHA1 | Date | |
|---|---|---|---|
| 003c5aa6b0 | |||
| eadfa53b36 | |||
| 81dd3370e6 | |||
| 7f5f05ef2b | |||
| 7c2c6f2de9 |
16 changed files with 663 additions and 1764 deletions
|
|
@ -1,7 +1,7 @@
|
|||
on:
|
||||
push:
|
||||
branches:
|
||||
- trunk
|
||||
- actions-test
|
||||
jobs:
|
||||
test:
|
||||
runs-on: host
|
||||
|
|
@ -10,17 +10,13 @@ jobs:
|
|||
- run: git clone --depth=1 https://$FORGEJO_TOKEN@forge.cbaines.net/cbaines/guile-knots.git --branch=pages knots-pages
|
||||
- run: |
|
||||
cd knots-trunk
|
||||
guix shell -D -f guix-dev.scm -- documenta api "knots.scm knots/"
|
||||
guix shell -D -f guix-dev.scm -- documenta api knots
|
||||
guix shell texinfo -- makeinfo --css-ref=https://luis-felipe.gitlab.io/texinfo-css/static/css/texinfo-7.css --no-split --html -c SHOW_TITLE=true -o ../knots-pages/index.html doc/index.texi
|
||||
|
||||
- run: |
|
||||
cd knots-pages
|
||||
git add .
|
||||
if [[ -z "$(git status -s)" ]]; then
|
||||
echo "Nothing to push"
|
||||
else
|
||||
git config user.email ""
|
||||
git config user.name "Automatic website updater"
|
||||
git commit -m "Automatic website update"
|
||||
git push
|
||||
fi
|
||||
git config user.email ""
|
||||
git config user.name "Automatic website updater"
|
||||
git commit -m "Automatic website update"
|
||||
git push
|
||||
18
Makefile.am
18
Makefile.am
|
|
@ -7,22 +7,20 @@ SOURCES = \
|
|||
knots/promise.scm \
|
||||
knots/queue.scm \
|
||||
knots/resource-pool.scm \
|
||||
knots/sort.scm \
|
||||
knots/thread-pool.scm \
|
||||
knots/timeout.scm \
|
||||
knots/web-server.scm
|
||||
knots/web-server.scm \
|
||||
knots/thread-pool.scm
|
||||
|
||||
SCM_TESTS = \
|
||||
tests/non-blocking.scm \
|
||||
tests/non-blocking.scm \
|
||||
tests/parallelism.scm \
|
||||
tests/promise.scm \
|
||||
tests/queue.scm \
|
||||
tests/resource-pool.scm \
|
||||
tests/sort.scm \
|
||||
tests/thread-pool.scm \
|
||||
tests/timeout.scm \
|
||||
tests/web-server.scm
|
||||
tests/non-blocking.scm \
|
||||
tests/queue.scm \
|
||||
tests/web-server.scm \
|
||||
tests/parallelism.scm \
|
||||
tests/resource-pool.scm \
|
||||
tests/thread-pool.scm
|
||||
|
||||
TESTS_GOBJECTS = $(SCM_TESTS:%.scm=%.go)
|
||||
|
||||
|
|
|
|||
232
knots.scm
232
knots.scm
|
|
@ -1,12 +1,7 @@
|
|||
(define-module (knots)
|
||||
#:use-module (srfi srfi-1)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 threads)
|
||||
#:use-module (ice-9 binary-ports)
|
||||
#:use-module (ice-9 suspendable-ports)
|
||||
#:use-module (rnrs bytevectors)
|
||||
#:use-module (fibers)
|
||||
#:use-module (fibers channels)
|
||||
#:use-module (fibers conditions)
|
||||
#:use-module (system repl debug)
|
||||
#:export (call-with-default-io-waiters
|
||||
|
|
@ -15,18 +10,12 @@
|
|||
|
||||
call-with-sigint
|
||||
|
||||
display/knots
|
||||
simple-format/knots
|
||||
format/knots
|
||||
|
||||
&knots-exception
|
||||
make-knots-exception
|
||||
knots-exception?
|
||||
knots-exception-stack
|
||||
|
||||
print-backtrace-and-exception/knots
|
||||
|
||||
spawn-fiber/knots))
|
||||
print-backtrace-and-exception/knots))
|
||||
|
||||
(define (call-with-default-io-waiters thunk)
|
||||
(parameterize
|
||||
|
|
@ -59,70 +48,6 @@
|
|||
;; restore original C handler.
|
||||
(sigaction SIGINT #f))))))
|
||||
|
||||
(define (call-with-temporary-thread thunk)
|
||||
(let ((channel (make-channel)))
|
||||
(call-with-new-thread
|
||||
(lambda ()
|
||||
(call-with-default-io-waiters
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(put-message channel `(exception . ,exn)))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let ((stack
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
(make-stack #t
|
||||
0 prompt-tag
|
||||
0 (and prompt-tag 1)))
|
||||
(_
|
||||
(make-stack #t)))))
|
||||
(raise-exception
|
||||
(make-exception
|
||||
exn
|
||||
(make-knots-exception stack)))))
|
||||
(lambda ()
|
||||
(call-with-values thunk
|
||||
(lambda values
|
||||
(put-message channel `(values ,@values)))))))
|
||||
#:unwind? #t)))))
|
||||
|
||||
(match (get-message channel)
|
||||
(('values . results)
|
||||
(apply values results))
|
||||
(('exception . exn)
|
||||
(raise-exception exn)))))
|
||||
|
||||
(define* (display/knots obj #:optional (port (current-output-port)))
|
||||
(put-bytevector
|
||||
port
|
||||
(string->utf8
|
||||
(call-with-output-string
|
||||
(lambda (port)
|
||||
(display obj port))))))
|
||||
|
||||
(define (simple-format/knots port s . args)
|
||||
(let ((str (apply simple-format #f s args)))
|
||||
(if (eq? #f port)
|
||||
str
|
||||
(display/knots
|
||||
str
|
||||
(if (eq? #t port)
|
||||
(current-output-port)
|
||||
port)))))
|
||||
|
||||
(define (format/knots port s . args)
|
||||
(let ((str (apply format #f s args)))
|
||||
(if (eq? #f port)
|
||||
str
|
||||
(display/knots
|
||||
str
|
||||
(if (eq? #t port)
|
||||
(current-output-port)
|
||||
port)))))
|
||||
|
||||
(define &knots-exception
|
||||
(make-exception-type '&knots-exception
|
||||
&exception
|
||||
|
|
@ -142,58 +67,6 @@
|
|||
(define* (print-backtrace-and-exception/knots
|
||||
exn
|
||||
#:key (port (current-error-port)))
|
||||
(define (get-string port stack)
|
||||
(define stack-len
|
||||
(stack-length stack))
|
||||
|
||||
(let ((knots-stacks
|
||||
(map knots-exception-stack
|
||||
(filter knots-exception?
|
||||
(simple-exceptions exn)))))
|
||||
|
||||
(let* ((stack-vec
|
||||
(stack->vector stack))
|
||||
(stack-vec-length
|
||||
(vector-length stack-vec)))
|
||||
(print-frames (list->vector
|
||||
(drop
|
||||
(vector->list stack-vec)
|
||||
(if (< stack-vec-length 5)
|
||||
0
|
||||
4)))
|
||||
port
|
||||
#:count (stack-length stack)))
|
||||
(for-each
|
||||
(lambda (stack)
|
||||
(let* ((stack-vec
|
||||
(stack->vector stack))
|
||||
(stack-vec-length
|
||||
(vector-length stack-vec)))
|
||||
(print-frames (list->vector
|
||||
(drop
|
||||
(vector->list stack-vec)
|
||||
(if (< stack-vec-length 4)
|
||||
0
|
||||
3)))
|
||||
port
|
||||
#:count (stack-length stack))))
|
||||
knots-stacks)
|
||||
(print-exception
|
||||
port
|
||||
(if (null? knots-stacks)
|
||||
(stack-ref stack
|
||||
(if (< stack-len 4)
|
||||
stack-len
|
||||
4))
|
||||
(let* ((stack (last knots-stacks))
|
||||
(stack-len (stack-length stack)))
|
||||
(stack-ref stack
|
||||
(if (< stack-len 3)
|
||||
stack-len
|
||||
3))))
|
||||
'%exception
|
||||
(list exn))))
|
||||
|
||||
(let* ((stack
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
|
|
@ -202,59 +75,56 @@
|
|||
0 (and prompt-tag 1)))
|
||||
(_
|
||||
(make-stack #t))))
|
||||
(string-port
|
||||
(open-output-string))
|
||||
(stack-len
|
||||
(stack-length stack))
|
||||
(error-string
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(display/knots (get-output-string string-port)
|
||||
port)
|
||||
(close-output-port string-port)
|
||||
(display/knots "\n\n" port)
|
||||
(call-with-output-string
|
||||
(lambda (port)
|
||||
(let ((knots-stacks
|
||||
(map knots-exception-stack
|
||||
(filter knots-exception?
|
||||
(simple-exceptions exn)))))
|
||||
|
||||
(let* ((stack (make-stack #t))
|
||||
(backtrace
|
||||
(call-with-output-string
|
||||
(lambda (port)
|
||||
(display-backtrace stack port)
|
||||
(newline port)))))
|
||||
(display/knots backtrace))
|
||||
(simple-format/knots
|
||||
(let* ((stack-vec
|
||||
(stack->vector stack))
|
||||
(stack-vec-length
|
||||
(vector-length stack-vec)))
|
||||
(print-frames (list->vector
|
||||
(drop
|
||||
(vector->list stack-vec)
|
||||
(if (< stack-vec-length 5)
|
||||
0
|
||||
4)))
|
||||
port
|
||||
#:count (stack-length stack)))
|
||||
(for-each
|
||||
(lambda (stack)
|
||||
(let* ((stack-vec
|
||||
(stack->vector stack))
|
||||
(stack-vec-length
|
||||
(vector-length stack-vec)))
|
||||
(print-frames (list->vector
|
||||
(drop
|
||||
(vector->list stack-vec)
|
||||
(if (< stack-vec-length 4)
|
||||
0
|
||||
3)))
|
||||
port
|
||||
#:count (stack-length stack))))
|
||||
knots-stacks)
|
||||
(print-exception
|
||||
port
|
||||
"\nexception in print-backtrace-and-exception/knots: ~A\n"
|
||||
exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(get-string string-port stack)
|
||||
(let ((str (get-output-string string-port)))
|
||||
(close-output-port string-port)
|
||||
str)))))
|
||||
(display/knots error-string port)))
|
||||
|
||||
(define* (spawn-fiber/knots thunk #:optional scheduler #:key parallel?)
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(display/knots "Uncaught exception in task:\n"
|
||||
(current-error-port))
|
||||
(print-backtrace-and-exception/knots exn))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let ((stack
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
(make-stack #t
|
||||
0 prompt-tag
|
||||
0 (and prompt-tag 1)))
|
||||
(_
|
||||
(make-stack #t)))))
|
||||
(raise-exception
|
||||
(make-exception
|
||||
exn
|
||||
(make-knots-exception stack)))))
|
||||
thunk))
|
||||
#:unwind? #t))
|
||||
scheduler
|
||||
#:parallel? parallel?))
|
||||
(if (null? knots-stacks)
|
||||
(stack-ref stack
|
||||
(if (< stack-len 4)
|
||||
stack-len
|
||||
4))
|
||||
(let* ((stack (last knots-stacks))
|
||||
(stack-len (stack-length stack)))
|
||||
(stack-ref stack
|
||||
(if (< stack-len 3)
|
||||
stack-len
|
||||
3))))
|
||||
'%exception
|
||||
(list exn)))))))
|
||||
(display error-string port)))
|
||||
|
|
|
|||
|
|
@ -20,9 +20,6 @@
|
|||
(define-module (knots parallelism)
|
||||
#:use-module (srfi srfi-1)
|
||||
#:use-module (srfi srfi-71)
|
||||
#:use-module (srfi srfi-9)
|
||||
#:use-module (srfi srfi-9 gnu)
|
||||
#:use-module (srfi srfi-43)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 control)
|
||||
#:use-module (ice-9 exceptions)
|
||||
|
|
@ -30,7 +27,6 @@
|
|||
#:use-module (fibers channels)
|
||||
#:use-module (fibers operations)
|
||||
#:use-module (knots)
|
||||
#:use-module (knots resource-pool)
|
||||
#:export (fibers-batch-map
|
||||
fibers-map
|
||||
|
||||
|
|
@ -42,13 +38,7 @@
|
|||
fibers-parallel
|
||||
fibers-let
|
||||
|
||||
fiberize
|
||||
|
||||
make-parallelism-limiter
|
||||
parallelism-limiter?
|
||||
destroy-parallelism-limiter
|
||||
call-with-parallelism-limiter
|
||||
with-parallelism-limiter))
|
||||
fiberize))
|
||||
|
||||
(define (defer-to-parallel-fiber thunk)
|
||||
(let ((reply (make-channel)))
|
||||
|
|
@ -58,7 +48,7 @@
|
|||
(lambda (exn)
|
||||
(put-message
|
||||
reply
|
||||
(cons 'exception exn)))
|
||||
(list 'exception exn)))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
|
|
@ -79,7 +69,7 @@
|
|||
(lambda ()
|
||||
(start-stack #t (thunk)))
|
||||
(lambda vals
|
||||
(put-message reply (cons 'result vals)))))))
|
||||
(put-message reply vals))))))
|
||||
#:unwind? #t))
|
||||
#:parallel? #t)
|
||||
reply))
|
||||
|
|
@ -89,16 +79,13 @@
|
|||
reply-channels)))
|
||||
(map
|
||||
(match-lambda
|
||||
(('exception . exn)
|
||||
(('exception exn)
|
||||
(raise-exception exn))
|
||||
(('result . vals)
|
||||
(apply values vals)))
|
||||
(result
|
||||
(apply values result)))
|
||||
responses)))
|
||||
|
||||
(define (fibers-batch-map proc parallelism-limit . lists)
|
||||
"Map PROC over LISTS in parallel, with a PARALLELISM-LIMIT. If any of
|
||||
the invocations of PROC raise an exception, this will be raised once
|
||||
all of the calls to PROC have finished."
|
||||
(define vecs (map (lambda (list-or-vec)
|
||||
(if (vector? list-or-vec)
|
||||
list-or-vec
|
||||
|
|
@ -118,18 +105,9 @@ all of the calls to PROC have finished."
|
|||
(channel-indexes '()))
|
||||
(if (and (eq? #f next-to-process-index)
|
||||
(null? channel-indexes))
|
||||
(let ((processed-result-vec
|
||||
(vector-map
|
||||
(lambda (_ result-or-exn)
|
||||
(match result-or-exn
|
||||
(('exception . exn)
|
||||
(raise-exception exn))
|
||||
(('result . vals)
|
||||
(car vals))))
|
||||
result-vec)))
|
||||
(if (vector? (first lists))
|
||||
processed-result-vec
|
||||
(vector->list processed-result-vec)))
|
||||
(if (vector? (first lists))
|
||||
result-vec
|
||||
(vector->list result-vec))
|
||||
|
||||
(if (or (= (length channel-indexes)
|
||||
(min parallelism-limit vecs-length))
|
||||
|
|
@ -145,13 +123,18 @@ all of the calls to PROC have finished."
|
|||
(get-operation
|
||||
(vector-ref result-vec index))
|
||||
(lambda (result)
|
||||
(vector-set! result-vec
|
||||
index
|
||||
result)
|
||||
(values next-to-process-index
|
||||
(lset-difference =
|
||||
channel-indexes
|
||||
(list index))))))
|
||||
(match result
|
||||
(('exception exn)
|
||||
(raise-exception exn))
|
||||
(_
|
||||
(vector-set! result-vec
|
||||
index
|
||||
(first result))
|
||||
|
||||
(values next-to-process-index
|
||||
(lset-difference =
|
||||
channel-indexes
|
||||
(list index))))))))
|
||||
channel-indexes)))))
|
||||
(loop new-index
|
||||
new-channel-indexes))
|
||||
|
|
@ -174,14 +157,9 @@ all of the calls to PROC have finished."
|
|||
channel-indexes)))))))
|
||||
|
||||
(define (fibers-map proc . lists)
|
||||
"Map PROC over LISTS in parallel, running up to 20 fibers in
|
||||
PARALLEL. If any of the invocations of PROC raise an exception, this
|
||||
will be raised once all of the calls to PROC have finished."
|
||||
(apply fibers-batch-map proc 20 lists))
|
||||
|
||||
(define (fibers-batch-for-each proc parallelism-limit . lists)
|
||||
"Call PROC on LISTS, running up to PARALLELISM-LIMIT fibers in
|
||||
parallel."
|
||||
(apply fibers-batch-map
|
||||
(lambda args
|
||||
(apply proc args)
|
||||
|
|
@ -192,13 +170,10 @@ parallel."
|
|||
*unspecified*)
|
||||
|
||||
(define (fibers-for-each proc . lists)
|
||||
"Call PROC on LISTS, running up to 20 fibers in parallel."
|
||||
(apply fibers-batch-for-each proc 20 lists))
|
||||
|
||||
(define-syntax fibers-parallel
|
||||
(lambda (x)
|
||||
"Run each expression in parallel. If any expression raises an
|
||||
exception, this will be raised after all exceptions have finished."
|
||||
(syntax-case x ()
|
||||
((_ e0 ...)
|
||||
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
|
||||
|
|
@ -209,16 +184,12 @@ parallel."
|
|||
(apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
|
||||
|
||||
(define-syntax-rule (fibers-let ((v e) ...) b0 b1 ...)
|
||||
"Let, but run each binding in a fiber in parallel."
|
||||
(call-with-values
|
||||
(lambda () (fibers-parallel e ...))
|
||||
(lambda (v ...)
|
||||
b0 b1 ...)))
|
||||
|
||||
(define* (fibers-map-with-progress proc lists #:key report)
|
||||
"Map PROC over LISTS, calling #:REPORT if specified after each
|
||||
invocation of PROC finishes. REPORT is passed the results for each
|
||||
element of LISTS, or #f if no result has been received yet."
|
||||
(let loop ((channels-to-results
|
||||
(apply map
|
||||
(lambda args
|
||||
|
|
@ -239,8 +210,8 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
(match-lambda
|
||||
((#f . ('exception . exn))
|
||||
(raise-exception exn))
|
||||
((#f . ('result . vals))
|
||||
(car vals)))
|
||||
((#f . ('result . val))
|
||||
val))
|
||||
channels-to-results)
|
||||
(loop
|
||||
(perform-operation
|
||||
|
|
@ -257,7 +228,12 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
(map (match-lambda
|
||||
((c . r)
|
||||
(if (eq? channel c)
|
||||
(cons #f result)
|
||||
(cons #f
|
||||
(match result
|
||||
(('exception . exn)
|
||||
result)
|
||||
(_
|
||||
(cons 'result result))))
|
||||
(cons c r))))
|
||||
channels-to-results)))
|
||||
#f))))
|
||||
|
|
@ -278,7 +254,7 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
reply-channel
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(cons 'exception exn))
|
||||
(list 'exception exn))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
|
|
@ -309,32 +285,5 @@ invocation of PROC finishes. REPORT is passed the results for each
|
|||
(put-message input-channel (cons reply-channel args))
|
||||
(match (get-message reply-channel)
|
||||
(('result . vals) (apply values vals))
|
||||
(('exception . exn)
|
||||
(('exception exn)
|
||||
(raise-exception exn))))))
|
||||
|
||||
(define-record-type <parallelism-limiter>
|
||||
(make-parallelism-limiter-record resource-pool)
|
||||
parallelism-limiter?
|
||||
(resource-pool parallelism-limiter-resource-pool))
|
||||
|
||||
(define* (make-parallelism-limiter limit #:key (name "unnamed"))
|
||||
(make-parallelism-limiter-record
|
||||
(make-fixed-size-resource-pool
|
||||
(iota limit)
|
||||
#:name name)))
|
||||
|
||||
(define (destroy-parallelism-limiter parallelism-limiter)
|
||||
(destroy-resource-pool
|
||||
(parallelism-limiter-resource-pool
|
||||
parallelism-limiter)))
|
||||
|
||||
(define* (call-with-parallelism-limiter parallelism-limiter thunk)
|
||||
(call-with-resource-from-pool
|
||||
(parallelism-limiter-resource-pool parallelism-limiter)
|
||||
(lambda _
|
||||
(thunk))))
|
||||
|
||||
(define-syntax-rule (with-parallelism-limiter parallelism-limiter exp ...)
|
||||
(call-with-parallelism-limiter
|
||||
parallelism-limiter
|
||||
(lambda () exp ...)))
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@
|
|||
#:export (fibers-promise?
|
||||
|
||||
fibers-delay
|
||||
fibers-delay/eager
|
||||
fibers-force
|
||||
fibers-promise-reset
|
||||
fibers-promise-result-available?))
|
||||
|
|
@ -83,10 +82,7 @@
|
|||
(make-exception
|
||||
exn
|
||||
(make-knots-exception stack)))))
|
||||
(lambda ()
|
||||
(start-stack
|
||||
#t
|
||||
((fibers-promise-thunk fp))))))
|
||||
(fibers-promise-thunk fp)))
|
||||
#:unwind? #t))
|
||||
(lambda vals
|
||||
(atomic-box-set! (fibers-promise-values-box fp)
|
||||
|
|
@ -106,20 +102,6 @@
|
|||
(raise-exception res)
|
||||
(apply values res))))))
|
||||
|
||||
|
||||
(define (fibers-delay/eager thunk)
|
||||
(let ((promise (fibers-delay thunk)))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda _
|
||||
;; Silently handle this exception
|
||||
#f)
|
||||
(lambda ()
|
||||
(fibers-force promise))
|
||||
#:unwind? #t)))
|
||||
promise))
|
||||
|
||||
(define (fibers-promise-reset fp)
|
||||
(atomic-box-set! (fibers-promise-values-box fp)
|
||||
#f))
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -1,88 +0,0 @@
|
|||
;;; Guile Knots
|
||||
;;; Copyright © 2020, 2025 Christopher Baines <mail@cbaines.net>
|
||||
;;;
|
||||
;;; This file is part of Guile Knots.
|
||||
;;;
|
||||
;;; The Guile Knots is free software; you can redistribute it and/or
|
||||
;;; modify it under the terms of the GNU General Public License as
|
||||
;;; published by the Free Software Foundation; either version 3 of the
|
||||
;;; License, or (at your option) any later version.
|
||||
;;;
|
||||
;;; The Guile Knots is distributed in the hope that it will be useful,
|
||||
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;;; General Public License for more details.
|
||||
;;;
|
||||
;;; You should have received a copy of the GNU General Public License
|
||||
;;; along with the guix-data-service. If not, see
|
||||
;;; <http://www.gnu.org/licenses/>.
|
||||
|
||||
(define-module (knots sort)
|
||||
#:use-module (srfi srfi-1)
|
||||
#:use-module (srfi srfi-71)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (fibers scheduler)
|
||||
#:use-module (knots promise)
|
||||
#:export (fibers-sort!))
|
||||
|
||||
(define (try-split-at! lst i)
|
||||
(cond ((< i 0)
|
||||
(error "negitive split size"))
|
||||
((= i 0)
|
||||
(values '() lst))
|
||||
(else
|
||||
(let lp ((l lst) (n (- i 1)))
|
||||
(if (<= n 0)
|
||||
(let ((tmp (cdr l)))
|
||||
(unless (null? tmp)
|
||||
(set-cdr! l '()))
|
||||
(values lst tmp))
|
||||
(if (or (null? l)
|
||||
(null? (cdr l)))
|
||||
(values lst '())
|
||||
(lp (cdr l) (- n 1))))))))
|
||||
|
||||
(define (chunk! lst max-length)
|
||||
(let loop ((chunks '())
|
||||
(lst lst))
|
||||
(let ((chunk
|
||||
rest
|
||||
(try-split-at! lst max-length)))
|
||||
(if (null? rest)
|
||||
(reverse! (cons chunk chunks))
|
||||
(loop (cons chunk chunks)
|
||||
rest)))))
|
||||
|
||||
(define* (fibers-sort! items less #:key parallelism)
|
||||
(define requested-chunk-count
|
||||
(or parallelism
|
||||
(+ 1 (length (scheduler-remote-peers (current-scheduler))))))
|
||||
|
||||
(define items-length (length items))
|
||||
|
||||
(if (= 0 items-length)
|
||||
items
|
||||
(let* ((chunk-length (ceiling (/ items-length
|
||||
requested-chunk-count)))
|
||||
(chunks (chunk! items chunk-length)))
|
||||
(let loop ((sorted-chunk-promises
|
||||
(map
|
||||
(lambda (chunk)
|
||||
(fibers-delay/eager
|
||||
(lambda ()
|
||||
(sort! chunk less))))
|
||||
chunks)))
|
||||
(if (null? (cdr sorted-chunk-promises))
|
||||
(fibers-force
|
||||
(first sorted-chunk-promises))
|
||||
(loop
|
||||
(map
|
||||
(match-lambda
|
||||
((items) items)
|
||||
((a b)
|
||||
(fibers-delay/eager
|
||||
(lambda ()
|
||||
(merge! (fibers-force a)
|
||||
(fibers-force b)
|
||||
less)))))
|
||||
(chunk! sorted-chunk-promises 2))))))))
|
||||
|
|
@ -198,7 +198,7 @@ from there, or #f if that would be an empty string."
|
|||
(record-accessor &thread-pool-timeout-error 'pool)))
|
||||
|
||||
(define thread-pool-timeout-error?
|
||||
(exception-predicate &thread-pool-timeout-error))
|
||||
(record-predicate &thread-pool-timeout-error))
|
||||
|
||||
(define* (make-fixed-size-thread-pool size
|
||||
#:key
|
||||
|
|
@ -269,8 +269,8 @@ from there, or #f if that would be an empty string."
|
|||
(sleep 1)
|
||||
(destructor/safe args)))))
|
||||
|
||||
(define (process thread-index channel args)
|
||||
(let loop ((lifetime thread-lifetime))
|
||||
(define (process channel args)
|
||||
(let loop ()
|
||||
(match (get-message channel)
|
||||
('destroy #f)
|
||||
((reply sent-time proc)
|
||||
|
|
@ -292,9 +292,6 @@ from there, or #f if that would be an empty string."
|
|||
internal-time-units-per-second)
|
||||
exn))
|
||||
(lambda ()
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
proc)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let ((stack
|
||||
|
|
@ -322,10 +319,6 @@ from there, or #f if that would be an empty string."
|
|||
vals))))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(vector-set! thread-proc-vector
|
||||
thread-index
|
||||
#f)
|
||||
|
||||
(put-message reply
|
||||
response)
|
||||
|
||||
|
|
@ -342,11 +335,7 @@ from there, or #f if that would be an empty string."
|
|||
(if (and exception?
|
||||
expire-on-exception?)
|
||||
#t
|
||||
(if lifetime
|
||||
(if (<= lifetime 1)
|
||||
#t
|
||||
(loop (- lifetime 1)))
|
||||
(loop lifetime)))))))))
|
||||
(loop))))))))
|
||||
|
||||
(define (start-thread index channel)
|
||||
(call-with-new-thread
|
||||
|
|
@ -369,7 +358,7 @@ from there, or #f if that would be an empty string."
|
|||
"knots: thread-pool: internal exception: ~A\n" exn))
|
||||
(lambda ()
|
||||
(parameterize ((param args))
|
||||
(process index channel args)))
|
||||
(process channel args)))
|
||||
#:unwind? #t)))
|
||||
|
||||
(when thread-destructor
|
||||
|
|
@ -406,8 +395,7 @@ from there, or #f if that would be an empty string."
|
|||
(expire-on-exception? #f)
|
||||
(name "unnamed")
|
||||
(use-default-io-waiters? #t)
|
||||
default-checkout-timeout
|
||||
default-max-waiters)
|
||||
default-checkout-timeout)
|
||||
"Return a channel used to offload work to a dedicated thread. ARGS are the
|
||||
arguments of the thread pool procedure."
|
||||
(define param
|
||||
|
|
@ -420,6 +408,7 @@ arguments of the thread pool procedure."
|
|||
1
|
||||
#:thread-initializer thread-initializer
|
||||
#:thread-destructor thread-destructor
|
||||
#:thread-lifetime thread-lifetime
|
||||
#:expire-on-exception? expire-on-exception?
|
||||
#:name name
|
||||
#:use-default-io-waiters? use-default-io-waiters?))
|
||||
|
|
@ -427,11 +416,9 @@ arguments of the thread pool procedure."
|
|||
#:destructor destroy-thread-pool
|
||||
#:min-size min-size
|
||||
#:delay-logger delay-logger
|
||||
#:lifetime thread-lifetime
|
||||
#:scheduler scheduler
|
||||
#:duration-logger duration-logger
|
||||
#:default-checkout-timeout default-checkout-timeout
|
||||
#:default-max-waiters default-max-waiters)))
|
||||
#:default-checkout-timeout default-checkout-timeout)))
|
||||
|
||||
(thread-pool resource-pool
|
||||
param)))
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@
|
|||
(record-constructor &port-timeout-error))
|
||||
|
||||
(define port-timeout-error?
|
||||
(exception-predicate &port-timeout-error))
|
||||
(record-predicate &port-timeout-error))
|
||||
|
||||
(define &port-read-timeout-error
|
||||
(make-exception-type '&port-read-timeout-error
|
||||
|
|
@ -96,7 +96,7 @@
|
|||
(record-constructor &port-read-timeout-error))
|
||||
|
||||
(define port-read-timeout-error?
|
||||
(exception-predicate &port-read-timeout-error))
|
||||
(record-predicate &port-read-timeout-error))
|
||||
|
||||
(define &port-write-timeout-error
|
||||
(make-exception-type '&port-write-timeout-error
|
||||
|
|
@ -107,7 +107,7 @@
|
|||
(record-constructor &port-write-timeout-error))
|
||||
|
||||
(define port-write-timeout-error?
|
||||
(exception-predicate &port-write-timeout-error))
|
||||
(record-predicate &port-write-timeout-error))
|
||||
|
||||
(define (readable? port)
|
||||
"Test if PORT is writable."
|
||||
|
|
|
|||
|
|
@ -63,14 +63,6 @@
|
|||
(bind sock family addr port)
|
||||
sock))
|
||||
|
||||
(define crlf-bv
|
||||
(string->utf8 "\r\n"))
|
||||
|
||||
(define (chunked-output-port-overhead-bytes write-size)
|
||||
(+ (string-length (number->string write-size 16))
|
||||
(bytevector-length crlf-bv)
|
||||
(bytevector-length crlf-bv)))
|
||||
|
||||
(define* (make-chunked-output-port/knots port #:key (keep-alive? #f)
|
||||
(buffering 1200))
|
||||
"Returns a new port which translates non-encoded data into a HTTP
|
||||
|
|
@ -82,12 +74,10 @@ when done, as it will output the remaining data, and encode the final
|
|||
zero chunk. When the port is closed it will also close PORT, unless
|
||||
KEEP-ALIVE? is true."
|
||||
(define (write! bv start count)
|
||||
(let ((len-string
|
||||
(number->string count 16)))
|
||||
(put-string port len-string))
|
||||
(put-bytevector port crlf-bv 0 2)
|
||||
(put-string port (number->string count 16))
|
||||
(put-string port "\r\n")
|
||||
(put-bytevector port bv start count)
|
||||
(put-bytevector port crlf-bv 0 2)
|
||||
(put-string port "\r\n")
|
||||
(force-output port)
|
||||
count)
|
||||
|
||||
|
|
@ -140,7 +130,7 @@ closes PORT, unless KEEP-ALIVE? is true."
|
|||
(record-constructor &request-body-ended-prematurely))
|
||||
|
||||
(define request-body-ended-prematurely-error?
|
||||
(exception-predicate &request-body-ended-prematurely))
|
||||
(record-predicate &request-body-ended-prematurely))
|
||||
|
||||
(define (request-body-port/knots r)
|
||||
(cond
|
||||
|
|
@ -228,6 +218,8 @@ on the procedure being called at any particular time."
|
|||
(adapt-response-version response
|
||||
(request-version request))
|
||||
body))
|
||||
((not body)
|
||||
(values response #vu8()))
|
||||
((string? body)
|
||||
(let* ((type (response-content-type response
|
||||
'(text/plain)))
|
||||
|
|
@ -241,15 +233,16 @@ on the procedure being called at any particular time."
|
|||
`(,@type (charset . ,charset))))
|
||||
(string->bytevector body charset))))
|
||||
((not (or (bytevector? body)
|
||||
(procedure? body)
|
||||
(eq? #f body)))
|
||||
(procedure? body)))
|
||||
(raise-exception
|
||||
(make-exception-with-irritants
|
||||
(list (make-exception-with-message
|
||||
"unexpected body type")
|
||||
body))))
|
||||
((and (response-must-not-include-body? response)
|
||||
body)
|
||||
body
|
||||
;; FIXME make this stricter: even an empty body should be prohibited.
|
||||
(not (zero? (bytevector-length body))))
|
||||
(raise-exception
|
||||
(make-exception-with-irritants
|
||||
(list (make-exception-with-message
|
||||
|
|
@ -259,24 +252,25 @@ on the procedure being called at any particular time."
|
|||
;; check length; assert type; add other required fields?
|
||||
(values (response-maybe-add-connection-header-value
|
||||
request
|
||||
(cond
|
||||
((procedure? body)
|
||||
(if (response-content-length response)
|
||||
response
|
||||
(extend-response response
|
||||
'transfer-encoding
|
||||
'((chunked)))))
|
||||
((bytevector? body)
|
||||
(let ((rlen (response-content-length response))
|
||||
(blen (bytevector-length body)))
|
||||
(cond
|
||||
(rlen (if (= rlen blen)
|
||||
response
|
||||
(error "bad content-length" rlen blen)))
|
||||
(else (extend-response response 'content-length blen)))))
|
||||
(else response)))
|
||||
(if (procedure? body)
|
||||
(if (response-content-length response)
|
||||
response
|
||||
(extend-response response
|
||||
'transfer-encoding
|
||||
'((chunked))))
|
||||
(let ((rlen (response-content-length response))
|
||||
(blen (bytevector-length body)))
|
||||
(cond
|
||||
(rlen (if (= rlen blen)
|
||||
response
|
||||
(error "bad content-length" rlen blen)))
|
||||
(else (extend-response response 'content-length blen))))))
|
||||
(if (eq? (request-method request) 'HEAD)
|
||||
#f
|
||||
(raise-exception
|
||||
(make-exception-with-irritants
|
||||
(list (make-exception-with-message
|
||||
"unexpected body type")
|
||||
body)))
|
||||
body)))))
|
||||
|
||||
(define (with-stack-and-prompt thunk)
|
||||
|
|
@ -289,7 +283,7 @@ on the procedure being called at any particular time."
|
|||
(not (memq 'close (response-connection response))))
|
||||
|
||||
(define (default-read-request-exception-handler exn)
|
||||
(display/knots "While reading request:\n" (current-error-port))
|
||||
(display "While reading request:\n" (current-error-port))
|
||||
(print-exception
|
||||
(current-error-port)
|
||||
#f
|
||||
|
|
@ -302,12 +296,12 @@ on the procedure being called at any particular time."
|
|||
(if (and (exception-with-origin? exn)
|
||||
(string=? (exception-origin exn)
|
||||
"fport_write"))
|
||||
(simple-format/knots
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"~A ~A: error replying to client\n"
|
||||
(request-method request)
|
||||
(uri-path (request-uri request)))
|
||||
(simple-format/knots
|
||||
(simple-format
|
||||
(current-error-port)
|
||||
"knots web server: ~A ~A: exception replying to client: ~A\n"
|
||||
(request-method request)
|
||||
|
|
@ -329,27 +323,23 @@ on the procedure being called at any particular time."
|
|||
(print-backtrace-and-exception/knots
|
||||
exn
|
||||
#:port port)))))
|
||||
(display/knots error-string
|
||||
(current-error-port)))
|
||||
(display error-string
|
||||
(current-error-port)))
|
||||
|
||||
(values (build-response #:code 500)
|
||||
;; TODO Make this configurable
|
||||
(string->utf8
|
||||
"internal server error")))
|
||||
|
||||
(define* (handle-request handler client
|
||||
read-request-exception-handler
|
||||
write-response-exception-handler
|
||||
buffer-size
|
||||
#:key post-request-hook)
|
||||
(define (handle-request handler client
|
||||
read-request-exception-handler
|
||||
write-response-exception-handler)
|
||||
(let ((request
|
||||
(with-exception-handler
|
||||
read-request-exception-handler
|
||||
(lambda ()
|
||||
(read-request client))
|
||||
#:unwind? #t))
|
||||
(read-request-time
|
||||
(get-internal-real-time)))
|
||||
#:unwind? #t)))
|
||||
(let ((response
|
||||
body
|
||||
(cond
|
||||
|
|
@ -398,59 +388,41 @@ on the procedure being called at any particular time."
|
|||
(lambda ()
|
||||
(write-response response client)
|
||||
|
||||
(let ((response-start-time
|
||||
(get-internal-real-time))
|
||||
(body-written?
|
||||
(cond
|
||||
((and (procedure? body)
|
||||
(not
|
||||
(eq? (request-method request)
|
||||
'HEAD)))
|
||||
(let* ((type (response-content-type response
|
||||
'(text/plain)))
|
||||
(declared-charset (assq-ref (cdr type) 'charset))
|
||||
(charset (or declared-charset "ISO-8859-1"))
|
||||
(body-port
|
||||
(if (response-content-length response)
|
||||
client
|
||||
(make-chunked-output-port/knots
|
||||
client
|
||||
#:keep-alive? #t
|
||||
#:buffering
|
||||
(- buffer-size
|
||||
(chunked-output-port-overhead-bytes
|
||||
buffer-size))))))
|
||||
(set-port-encoding! body-port charset)
|
||||
(let ((body-written?
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
#f)
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(body body-port)))
|
||||
#t)
|
||||
#:unwind? #t)))
|
||||
(unless (response-content-length response)
|
||||
(close-port body-port))
|
||||
body-written?)))
|
||||
((bytevector? body)
|
||||
(put-bytevector client body)
|
||||
#t)
|
||||
(else
|
||||
;; No body to write
|
||||
#t))))
|
||||
(let ((body-written?
|
||||
(if (procedure? body)
|
||||
(let* ((type (response-content-type response
|
||||
'(text/plain)))
|
||||
(declared-charset (assq-ref (cdr type) 'charset))
|
||||
(charset (or declared-charset "ISO-8859-1"))
|
||||
(body-port
|
||||
(if (response-content-length response)
|
||||
client
|
||||
(make-chunked-output-port/knots
|
||||
client
|
||||
#:keep-alive? #t))))
|
||||
(set-port-encoding! body-port charset)
|
||||
(let ((body-written?
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
#f)
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(body body-port)))
|
||||
#t)
|
||||
#:unwind? #t)))
|
||||
(unless (response-content-length response)
|
||||
(close-port body-port))
|
||||
body-written?))
|
||||
(begin
|
||||
(put-bytevector client body)
|
||||
#t))))
|
||||
(if body-written?
|
||||
(begin
|
||||
(force-output client)
|
||||
(when post-request-hook
|
||||
(post-request-hook request
|
||||
#:read-request-time read-request-time
|
||||
#:response-start-time response-start-time
|
||||
#:response-end-time (get-internal-real-time)))
|
||||
(when (and (procedure? body)
|
||||
(response-content-length response))
|
||||
(set-port-encoding! client "ISO-8859-1"))
|
||||
|
|
@ -462,8 +434,7 @@ on the procedure being called at any particular time."
|
|||
read-request-exception-handler
|
||||
write-response-exception-handler
|
||||
connection-idle-timeout
|
||||
buffer-size
|
||||
post-request-hook)
|
||||
buffer-size)
|
||||
;; Always disable Nagle's algorithm, as we handle buffering
|
||||
;; ourselves; when we force-output, we really want the data to go
|
||||
;; out.
|
||||
|
|
@ -476,17 +447,13 @@ on the procedure being called at any particular time."
|
|||
(unless (and (exception-with-origin? exn)
|
||||
(string=? (exception-origin exn)
|
||||
"fport_read"))
|
||||
(display/knots "knots web-server, exception in client loop:\n"
|
||||
(current-error-port))
|
||||
(display/knots
|
||||
(call-with-output-string
|
||||
(lambda (port)
|
||||
(print-exception
|
||||
port
|
||||
#f
|
||||
'%exception
|
||||
(list exn))))
|
||||
(current-error-port)))
|
||||
(display "knots web-server, exception in client loop:\n"
|
||||
(current-error-port))
|
||||
(print-exception
|
||||
(current-error-port)
|
||||
#f
|
||||
'%exception
|
||||
(list exn)))
|
||||
#t)
|
||||
(lambda ()
|
||||
(or
|
||||
|
|
@ -505,29 +472,11 @@ on the procedure being called at any particular time."
|
|||
(else
|
||||
(let ((keep-alive? (handle-request handler client
|
||||
read-request-exception-handler
|
||||
write-response-exception-handler
|
||||
buffer-size
|
||||
#:post-request-hook
|
||||
post-request-hook)))
|
||||
write-response-exception-handler)))
|
||||
(if keep-alive?
|
||||
(loop)
|
||||
(close-port client)))))))
|
||||
|
||||
(define (post-request-hook/safe post-request-hook)
|
||||
(if post-request-hook
|
||||
(lambda args
|
||||
(with-exception-handler
|
||||
(lambda (exn) #f)
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(apply post-request-hook args))))
|
||||
#:unwind? #t))
|
||||
#f))
|
||||
|
||||
(define-record-type <web-server>
|
||||
(make-web-server socket port)
|
||||
web-server?
|
||||
|
|
@ -547,8 +496,7 @@ on the procedure being called at any particular time."
|
|||
(write-response-exception-handler
|
||||
default-write-response-exception-handler)
|
||||
(connection-idle-timeout #f)
|
||||
(connection-buffer-size 1024)
|
||||
post-request-hook)
|
||||
(connection-buffer-size 1024))
|
||||
"Run the knots web server.
|
||||
|
||||
HANDLER should be a procedure that takes one argument, the HTTP
|
||||
|
|
@ -584,9 +532,7 @@ before sending back to the client."
|
|||
read-request-exception-handler
|
||||
write-response-exception-handler
|
||||
connection-idle-timeout
|
||||
connection-buffer-size
|
||||
(post-request-hook/safe
|
||||
post-request-hook)))
|
||||
connection-buffer-size))
|
||||
#:parallel? #t)
|
||||
(loop))))))
|
||||
|
||||
|
|
|
|||
12
tests.scm
12
tests.scm
|
|
@ -1,11 +1,10 @@
|
|||
(define-module (tests)
|
||||
#:use-module (ice-9 exceptions)
|
||||
#:use-module (fibers)
|
||||
#:use-module (knots)
|
||||
#:export (run-fibers-for-tests
|
||||
assert-no-heap-growth))
|
||||
|
||||
(define* (run-fibers-for-tests thunk #:key (drain? #t))
|
||||
(define (run-fibers-for-tests thunk)
|
||||
(let ((result
|
||||
(run-fibers
|
||||
(lambda ()
|
||||
|
|
@ -13,18 +12,15 @@
|
|||
(lambda (exn)
|
||||
exn)
|
||||
(lambda ()
|
||||
(simple-format #t "running ~A\n" thunk)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(print-backtrace-and-exception/knots exn)
|
||||
(backtrace)
|
||||
(raise-exception exn))
|
||||
(lambda ()
|
||||
(start-stack #t (thunk))))
|
||||
thunk)
|
||||
#t)
|
||||
#:unwind? #t))
|
||||
#:hz 0
|
||||
#:parallelism 1
|
||||
#:drain? drain?)))
|
||||
#:parallelism 1)))
|
||||
(if (exception? result)
|
||||
(raise-exception result)
|
||||
result)))
|
||||
|
|
|
|||
|
|
@ -61,24 +61,6 @@
|
|||
identity
|
||||
'(()))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(unless (and (exception-with-message? exn)
|
||||
(string=? (exception-message exn)
|
||||
"foo"))
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(fibers-map-with-progress
|
||||
(lambda _
|
||||
(raise-exception
|
||||
(make-exception-with-message "foo")))
|
||||
'((1)))
|
||||
|
||||
(error 'should-not-reach-here))
|
||||
#:unwind? #t)))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
|
|
@ -129,16 +111,4 @@
|
|||
|
||||
(assert-equal a 1))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((parallelism-limiter (make-parallelism-limiter 2)))
|
||||
(fibers-for-each
|
||||
(lambda _
|
||||
(with-parallelism-limiter
|
||||
parallelism-limiter
|
||||
#f))
|
||||
(iota 50))
|
||||
|
||||
(destroy-parallelism-limiter parallelism-limiter))))
|
||||
|
||||
(display "parallelism test finished successfully\n")
|
||||
|
|
|
|||
|
|
@ -1,33 +1,9 @@
|
|||
(use-modules (tests)
|
||||
(fibers)
|
||||
(fibers channels)
|
||||
(unit-test)
|
||||
(knots parallelism)
|
||||
(knots resource-pool))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((parallelism-limiter (make-parallelism-limiter
|
||||
1)))
|
||||
(with-parallelism-limiter parallelism-limiter
|
||||
#f)
|
||||
|
||||
(destroy-parallelism-limiter parallelism-limiter))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((parallelism-limiter (make-parallelism-limiter
|
||||
1))
|
||||
(channel
|
||||
(make-channel)))
|
||||
(spawn-fiber
|
||||
(lambda ()
|
||||
(with-parallelism-limiter parallelism-limiter
|
||||
(put-message channel #t)
|
||||
(sleep 1))))
|
||||
(get-message channel)
|
||||
(destroy-parallelism-limiter parallelism-limiter))))
|
||||
|
||||
(define new-number
|
||||
(let ((val 0))
|
||||
(lambda ()
|
||||
|
|
@ -43,21 +19,7 @@
|
|||
(number?
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res)))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((resource-pool (make-fixed-size-resource-pool
|
||||
(list 1))))
|
||||
(assert-true
|
||||
(number?
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res)))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
res))))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
|
|
@ -69,9 +31,7 @@
|
|||
(number?
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res)))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
res))))))
|
||||
|
||||
(let* ((error-constructor
|
||||
(record-constructor &resource-pool-timeout))
|
||||
|
|
@ -128,13 +88,10 @@
|
|||
res))
|
||||
(iota 20))
|
||||
|
||||
(let loop ((stats (resource-pool-stats resource-pool
|
||||
#:timeout #f)))
|
||||
(let loop ((stats (resource-pool-stats resource-pool)))
|
||||
(unless (= 0 (assq-ref stats 'resources))
|
||||
(sleep 0.1)
|
||||
(loop (resource-pool-stats resource-pool #:timeout #f))))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
(loop (resource-pool-stats resource-pool)))))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
|
|
@ -158,9 +115,7 @@
|
|||
(set! counter (+ 1 counter))
|
||||
(error "collision detected")))))
|
||||
20
|
||||
(iota 50))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
(iota 50)))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
|
|
@ -174,7 +129,7 @@
|
|||
(error "collision detected")))
|
||||
(new-number))
|
||||
1
|
||||
#:default-checkout-timeout 5)))
|
||||
#:default-checkout-timeout 120)))
|
||||
(fibers-batch-for-each
|
||||
(lambda _
|
||||
(with-resource-from-pool
|
||||
|
|
@ -185,9 +140,7 @@
|
|||
(set! counter (+ 1 counter))
|
||||
(error "collision detected")))))
|
||||
20
|
||||
(iota 50))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
(iota 50)))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
|
|
@ -211,14 +164,14 @@
|
|||
(call-with-resource-from-pool
|
||||
resource-pool
|
||||
(lambda (res)
|
||||
#f)))
|
||||
(error 'should-not-be-reached))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(while (= 0
|
||||
(assq-ref
|
||||
(resource-pool-stats resource-pool #:timeout #f)
|
||||
(resource-pool-stats resource-pool)
|
||||
'waiters))
|
||||
(sleep 0.1))
|
||||
(sleep 0))
|
||||
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
|
|
@ -231,55 +184,6 @@
|
|||
resource-pool
|
||||
(lambda (res)
|
||||
(error 'should-not-be-reached))))
|
||||
#:unwind? #t)))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((resource-pool (make-resource-pool
|
||||
(const 'foo)
|
||||
1
|
||||
#:lifetime 1
|
||||
#:destructor
|
||||
(const #t))))
|
||||
(for-each
|
||||
(lambda _
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res))
|
||||
(iota 20))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
|
||||
;; Test allocating resources to waiters and destroying resources
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((resource-pool (make-resource-pool
|
||||
(lambda ()
|
||||
(sleep 1)
|
||||
'res)
|
||||
2
|
||||
#:idle-seconds 1
|
||||
#:add-resources-parallelism 10
|
||||
#:destructor
|
||||
(const #t))))
|
||||
(fibers-for-each
|
||||
(lambda _
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res))
|
||||
(iota 20))
|
||||
|
||||
(sleep 2)
|
||||
|
||||
(fibers-for-each
|
||||
(lambda _
|
||||
(with-resource-from-pool resource-pool
|
||||
res
|
||||
res))
|
||||
(iota 20))
|
||||
|
||||
(destroy-resource-pool resource-pool))))
|
||||
#:unwind? #t))))))
|
||||
|
||||
(display "resource-pool test finished successfully\n")
|
||||
|
|
|
|||
|
|
@ -1,28 +0,0 @@
|
|||
(use-modules (tests)
|
||||
(fibers)
|
||||
(unit-test)
|
||||
(knots sort))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(assert-equal
|
||||
'()
|
||||
(fibers-sort! '() <))
|
||||
|
||||
(assert-equal
|
||||
'(1)
|
||||
(fibers-sort! (list 1) <))
|
||||
|
||||
(assert-equal
|
||||
'(1)
|
||||
(fibers-sort! (list 1) < #:parallelism 10))
|
||||
|
||||
(assert-equal
|
||||
'(1 2)
|
||||
(fibers-sort! (list 2 1) <))
|
||||
|
||||
(assert-equal
|
||||
(sort (reverse! (iota 100)) <)
|
||||
(fibers-sort! (reverse! (iota 100)) < #:parallelism 10))))
|
||||
|
||||
(display "sort test finished successfully\n")
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
(use-modules (tests)
|
||||
(ice-9 atomic)
|
||||
(srfi srfi-71)
|
||||
(fibers)
|
||||
(unit-test)
|
||||
|
|
@ -86,60 +85,4 @@
|
|||
(+ 1 'a))))
|
||||
#:unwind? #t)))))
|
||||
|
||||
(let ((thread-pool
|
||||
(make-fixed-size-thread-pool
|
||||
1
|
||||
#:thread-lifetime 1
|
||||
#:thread-initializer
|
||||
(lambda ()
|
||||
(list (make-atomic-box #t))))))
|
||||
|
||||
(for-each
|
||||
(lambda _
|
||||
(call-with-thread
|
||||
thread-pool
|
||||
(lambda (box)
|
||||
(if (atomic-box-ref box)
|
||||
(atomic-box-set! box #f)
|
||||
(error (atomic-box-ref box))))))
|
||||
(iota 10)))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let ((thread-pool
|
||||
(make-thread-pool 1 #:thread-lifetime 1)))
|
||||
|
||||
(for-each
|
||||
(lambda _
|
||||
(call-with-thread
|
||||
thread-pool
|
||||
(lambda () #f)))
|
||||
(iota 10)))))
|
||||
|
||||
(let ((thread-pool
|
||||
(make-fixed-size-thread-pool
|
||||
1
|
||||
#:thread-lifetime 2
|
||||
#:thread-initializer
|
||||
(lambda ()
|
||||
(list (make-atomic-box 2))))))
|
||||
|
||||
(define (ref-and-decrement box)
|
||||
(let ((val (atomic-box-ref box)))
|
||||
(atomic-box-set! box (- val 1))
|
||||
val))
|
||||
|
||||
(unless (= 2 (call-with-thread
|
||||
thread-pool
|
||||
ref-and-decrement))
|
||||
(error))
|
||||
(unless (= 1 (call-with-thread
|
||||
thread-pool
|
||||
ref-and-decrement))
|
||||
(error))
|
||||
(unless (= 2 (call-with-thread
|
||||
thread-pool
|
||||
ref-and-decrement))
|
||||
(error)))
|
||||
|
||||
(display "thread-pool test finished successfully\n")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
(use-modules (srfi srfi-71)
|
||||
(rnrs bytevectors)
|
||||
(ice-9 match)
|
||||
(ice-9 binary-ports)
|
||||
(ice-9 textual-ports)
|
||||
(tests)
|
||||
|
|
@ -234,68 +233,4 @@
|
|||
(assert-equal (get-message exception-handled-sucecssfully-channel)
|
||||
#t))))
|
||||
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(match (split-and-decode-uri-path
|
||||
(uri-path (request-uri request)))
|
||||
(("head-no-body")
|
||||
(values '((content-type . (text/plain)))
|
||||
#f))
|
||||
(("head-empty-body")
|
||||
(values '((content-type . (text/plain)))
|
||||
""))
|
||||
(("head-no-body-with-content-length")
|
||||
(values '((content-type . (text/plain))
|
||||
(content-length . 10))
|
||||
#f))
|
||||
(("head-with-body")
|
||||
(values '((content-type . (text/plain)))
|
||||
"foo"))
|
||||
(("head-procedure-body")
|
||||
(values '((content-type . (text/plain)))
|
||||
(lambda _
|
||||
(error "should not be run"))))
|
||||
(("head-procedure-body-with-content-length")
|
||||
(values '((content-type . (text/plain))
|
||||
(content-length . 10))
|
||||
(lambda _
|
||||
(error "should not be run"))))))
|
||||
#:port 0)) ;; Bind to any port
|
||||
(port
|
||||
(web-server-port web-server)))
|
||||
|
||||
(define* (head path)
|
||||
(let ((uri
|
||||
(build-uri 'http #:host "127.0.0.1" #:port port
|
||||
#:path path)))
|
||||
(http-head
|
||||
uri
|
||||
#:port (non-blocking-open-socket-for-uri uri))))
|
||||
|
||||
(let ((response
|
||||
(head "/head-no-body")))
|
||||
(assert-equal 200 (response-code response)))
|
||||
(let ((response
|
||||
(head "/head-empty-body")))
|
||||
(assert-equal 200 (response-code response))
|
||||
(assert-equal 0 (response-content-length response)))
|
||||
(let ((response
|
||||
(head "/head-no-body-with-content-length")))
|
||||
(assert-equal 200 (response-code response))
|
||||
(assert-equal 10 (response-content-length response)))
|
||||
(let ((response
|
||||
(head "/head-with-body")))
|
||||
(assert-equal 200 (response-code response))
|
||||
(assert-equal 3 (response-content-length response)))
|
||||
(let ((response
|
||||
(head "/head-procedure-body")))
|
||||
(assert-equal 200 (response-code response)))
|
||||
(let ((response
|
||||
(head "/head-procedure-body-with-content-length")))
|
||||
(assert-equal 200 (response-code response))
|
||||
(assert-equal 10 (response-content-length response))))))
|
||||
|
||||
(display "web-server test finished successfully\n")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue