This commit is contained in:
parent
db9b549e59
commit
8cff54ea43
3 changed files with 429 additions and 0 deletions
|
|
@ -5,6 +5,7 @@ SOURCES = \
|
|||
knots/non-blocking.scm \
|
||||
knots/parallelism.scm \
|
||||
knots/promise.scm \
|
||||
knots/web.scm \
|
||||
knots/queue.scm \
|
||||
knots/resource-pool.scm \
|
||||
knots/sort.scm \
|
||||
|
|
@ -18,6 +19,7 @@ SCM_TESTS = \
|
|||
tests/parallelism.scm \
|
||||
tests/promise.scm \
|
||||
tests/queue.scm \
|
||||
tests/web.scm \
|
||||
tests/resource-pool.scm \
|
||||
tests/sort.scm \
|
||||
tests/thread-pool.scm \
|
||||
|
|
|
|||
204
knots/web.scm
Normal file
204
knots/web.scm
Normal file
|
|
@ -0,0 +1,204 @@
|
|||
;;; Guile Knots
|
||||
;;; Copyright © 2026 Christopher Baines <mail@cbaines.net>
|
||||
;;;
|
||||
;;; This file is part of Guile Knots.
|
||||
;;;
|
||||
;;; The Guile Knots is free software; you can redistribute it and/or
|
||||
;;; modify it under the terms of the GNU General Public License as
|
||||
;;; published by the Free Software Foundation; either version 3 of the
|
||||
;;; License, or (at your option) any later version.
|
||||
;;;
|
||||
;;; The Guile Knots is distributed in the hope that it will be useful,
|
||||
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;;; General Public License for more details.
|
||||
;;;
|
||||
;;; You should have received a copy of the GNU General Public License
|
||||
;;; along with the guix-data-service. If not, see
|
||||
;;; <http://www.gnu.org/licenses/>.
|
||||
|
||||
(define-module (knots web)
|
||||
#:use-module (srfi srfi-1)
|
||||
#:use-module (srfi srfi-71)
|
||||
#:use-module (ice-9 match)
|
||||
#:use-module (ice-9 exceptions)
|
||||
#:use-module (web uri)
|
||||
#:use-module (web request)
|
||||
#:use-module (web response)
|
||||
#:use-module (knots)
|
||||
#:use-module (knots non-blocking)
|
||||
#:use-module (knots resource-pool)
|
||||
#:export (make-connection-cache
|
||||
call-with-connection-cache
|
||||
call-with-cached-connection
|
||||
http-fold-requests))
|
||||
|
||||
(define* (make-connection-cache uri
|
||||
max-cached-connections
|
||||
#:key (verify-certificate? #t))
|
||||
"Create a resource pool of up to MAX-CACHED-CONNECTIONS
|
||||
to URI."
|
||||
(make-resource-pool
|
||||
(lambda ()
|
||||
;; Open the socket in a temporary thread so that the blocking
|
||||
;; connection attempt does not stall the fiber scheduler.
|
||||
(call-with-temporary-thread
|
||||
(lambda ()
|
||||
(non-blocking-open-socket-for-uri
|
||||
uri
|
||||
#:verify-certificate? verify-certificate?))))
|
||||
max-cached-connections
|
||||
#:destructor close-port))
|
||||
|
||||
(define* (call-with-connection-cache uri
|
||||
max-cached-connections
|
||||
proc
|
||||
#:key (verify-certificate? #t))
|
||||
"Create a connection cache for URI with up to MAX-CACHED-CONNECTIONS,
|
||||
call @code{(proc cache)}, then destroy the cache and return
|
||||
the values returned by PROC."
|
||||
(let ((cache (make-connection-cache
|
||||
uri
|
||||
max-cached-connections
|
||||
#:verify-certificate? verify-certificate?)))
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(proc cache))
|
||||
(lambda vals
|
||||
(destroy-resource-pool cache)
|
||||
(apply values vals)))))
|
||||
|
||||
(define* (call-with-cached-connection
|
||||
cache proc
|
||||
#:key (close-connection-on-exception? #t))
|
||||
"Check out a connection port from CACHE and call @code{(proc port)},
|
||||
returning the result. The port is returned to the cache when PROC
|
||||
returns, or closed on exception if CLOSE-CONNECTION-ON-EXCEPTION? is
|
||||
true (the default)."
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(if (resource-pool-destroy-resource-exception? exn)
|
||||
(call-with-cached-connection
|
||||
cache
|
||||
proc
|
||||
#:close-connection-on-exception?
|
||||
close-connection-on-exception?)
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(let ((stack
|
||||
(match (fluid-ref %stacks)
|
||||
((stack-tag . prompt-tag)
|
||||
(make-stack #t
|
||||
0 prompt-tag
|
||||
0 (and prompt-tag 1)))
|
||||
(_
|
||||
(make-stack #t)))))
|
||||
(raise-exception
|
||||
(make-exception
|
||||
exn
|
||||
(make-knots-exception stack)))))
|
||||
(lambda ()
|
||||
(call-with-resource-from-pool cache
|
||||
(lambda (port)
|
||||
(when (port-closed? port)
|
||||
(raise-exception
|
||||
(make-resource-pool-destroy-resource-exception)))
|
||||
(proc port))
|
||||
#:destroy-resource-on-exception? close-connection-on-exception?))))
|
||||
#:unwind? #t))
|
||||
|
||||
(define* (http-fold-requests connection-cache proc seed requests
|
||||
#:key
|
||||
(batch-size 1000))
|
||||
"Fold PROC over HTTP request/response pairs using CONNECTION-CACHE
|
||||
for connections. PROC is called as
|
||||
@code{(proc request response body-port accumulator)} and its return
|
||||
value becomes the new accumulator. Requests are sent in batches of
|
||||
up to BATCH-SIZE before responses are read (HTTP pipelining).
|
||||
|
||||
When the server closes the connection mid-batch the remaining requests
|
||||
are retried on a fresh connection from the cache."
|
||||
|
||||
(define &send-error
|
||||
(make-exception-type '&send-error &exception '()))
|
||||
(define make-send-error
|
||||
(record-constructor &send-error))
|
||||
(define send-error?
|
||||
(exception-predicate &send-error))
|
||||
|
||||
(define (read-responses port batch result)
|
||||
(let loop ((request (car batch))
|
||||
(remaining-requests (cdr batch))
|
||||
(result result))
|
||||
(let ((response
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(close-port port)
|
||||
#f)
|
||||
(lambda ()
|
||||
(read-response port))
|
||||
#:unwind? #t)))
|
||||
(if (not response)
|
||||
(values (cons request remaining-requests) result)
|
||||
(let* ((body (response-body-port response))
|
||||
(new-result (proc request response body result)))
|
||||
(if (memq 'close (response-connection response))
|
||||
(begin
|
||||
(close-port port)
|
||||
(values remaining-requests new-result))
|
||||
(if (null? remaining-requests)
|
||||
(values '() new-result)
|
||||
(loop (car remaining-requests)
|
||||
(cdr remaining-requests)
|
||||
new-result))))))))
|
||||
|
||||
;; Send up to BATCH-SIZE requests then hand off to read-responses.
|
||||
;; If writing fails the connection has dropped; raise &send-error so the
|
||||
;; outer loop retries all remaining requests on a fresh connection.
|
||||
(define (send-batch port batch)
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(close-port port)
|
||||
(raise-exception (make-send-error)))
|
||||
(lambda ()
|
||||
(for-each (lambda (req)
|
||||
(write-request req port))
|
||||
batch)
|
||||
(force-output port))
|
||||
#:unwind? #t))
|
||||
|
||||
(let loop ((remaining-requests requests)
|
||||
(result seed))
|
||||
(if (null? remaining-requests)
|
||||
result
|
||||
(let ((next-remaining-requests
|
||||
next-result
|
||||
(with-exception-handler
|
||||
(lambda (exn)
|
||||
(if (or (send-error? exn)
|
||||
(resource-pool-destroy-resource-exception? exn))
|
||||
(values remaining-requests result)
|
||||
(raise-exception exn)))
|
||||
(lambda ()
|
||||
(call-with-resource-from-pool connection-cache
|
||||
(lambda (port)
|
||||
(if (port-closed? port)
|
||||
(raise-exception
|
||||
(make-resource-pool-destroy-resource-exception))
|
||||
(let ((batch
|
||||
pending
|
||||
(split-at
|
||||
remaining-requests
|
||||
(min batch-size (length
|
||||
remaining-requests)))))
|
||||
(send-batch port batch)
|
||||
(let ((remaining-requests
|
||||
next-result
|
||||
(read-responses port batch result)))
|
||||
(values (append remaining-requests pending)
|
||||
next-result)))))
|
||||
#:destroy-resource-on-exception? #t))
|
||||
#:unwind? #t)))
|
||||
(loop next-remaining-requests next-result)))))
|
||||
223
tests/web.scm
Normal file
223
tests/web.scm
Normal file
|
|
@ -0,0 +1,223 @@
|
|||
(use-modules (tests)
|
||||
(fibers)
|
||||
(srfi srfi-71)
|
||||
(ice-9 rdelim)
|
||||
(ice-9 exceptions)
|
||||
(unit-test)
|
||||
(web uri)
|
||||
(web client)
|
||||
(web request)
|
||||
(web response)
|
||||
(knots resource-pool)
|
||||
(knots web-server)
|
||||
(knots web))
|
||||
|
||||
;; Test that call-with-cached-connection passes the port to proc and
|
||||
;; returns its result.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((port (open-input-string ""))
|
||||
(cache (make-fixed-size-resource-pool (list port))))
|
||||
(assert-equal
|
||||
'ok
|
||||
(call-with-cached-connection cache (lambda (p) 'ok)))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that call-with-cached-connection retries when the checked-out
|
||||
;; port is already closed, using a fresh connection from the pool.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((n 0)
|
||||
(cache (make-resource-pool
|
||||
(lambda ()
|
||||
(set! n (+ n 1))
|
||||
(if (= n 1)
|
||||
(let ((p (open-input-string "")))
|
||||
(close-port p)
|
||||
p)
|
||||
(open-input-string "")))
|
||||
1
|
||||
;; Without a destructor, the resource pool calls (#f port)
|
||||
;; when destroying the closed-port resource, looping forever.
|
||||
#:destructor (const #t))))
|
||||
(assert-equal
|
||||
'ok
|
||||
(call-with-cached-connection cache (lambda (p) 'ok)))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that call-with-connection-cache provides a working cache and
|
||||
;; destroys it after the body returns.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(values '((content-type . (text/plain))) "ok"))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port)))
|
||||
(assert-equal
|
||||
200
|
||||
(call-with-connection-cache
|
||||
uri 1
|
||||
(lambda (cache)
|
||||
(call-with-cached-connection cache
|
||||
(lambda (p)
|
||||
(let ((response body
|
||||
(http-get uri #:port p #:keep-alive? #t)))
|
||||
(response-code response))))))))))
|
||||
|
||||
;; Test that http-fold-requests sends requests and folds over responses.
|
||||
;; The proc must drain the body port between responses so that HTTP
|
||||
;; pipelining works correctly.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(values '((content-type . (text/plain))) "ok"))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
|
||||
(cache (make-connection-cache uri 1))
|
||||
(requests (list (build-request uri)
|
||||
(build-request uri))))
|
||||
(let ((codes
|
||||
(http-fold-requests
|
||||
cache
|
||||
(lambda (req resp body result)
|
||||
(read-string body) ; drain body before next pipelined response
|
||||
(cons (response-code resp) result))
|
||||
'()
|
||||
requests)))
|
||||
(assert-equal '(200 200) codes))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that http-fold-requests reconnects and retries remaining requests when
|
||||
;; the server closes the connection mid-batch via Connection: close. Three
|
||||
;; requests are sent in one batch; the server closes after the first response,
|
||||
;; so the remaining two must be retried on a fresh connection.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((n 0)
|
||||
(web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(set! n (1+ n))
|
||||
(if (= n 1)
|
||||
(values '((content-type . (text/plain))
|
||||
(connection . (close)))
|
||||
"ok")
|
||||
(values '((content-type . (text/plain))) "ok")))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
|
||||
(cache (make-connection-cache uri 1))
|
||||
(requests (list (build-request uri)
|
||||
(build-request uri)
|
||||
(build-request uri))))
|
||||
(let ((codes
|
||||
(http-fold-requests
|
||||
cache
|
||||
(lambda (req resp body result)
|
||||
(read-string body)
|
||||
(cons (response-code resp) result))
|
||||
'()
|
||||
requests)))
|
||||
(assert-equal '(200 200 200) codes))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that write errors in send-batch are handled gracefully. Each request
|
||||
;; carries a large header so that the batch data exceeds the TCP send buffer,
|
||||
;; causing write-request to fail while the server has already closed the
|
||||
;; connection after the first response.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((n 0)
|
||||
(web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(set! n (1+ n))
|
||||
(if (= n 1)
|
||||
(values '((content-type . (text/plain))
|
||||
(connection . (close)))
|
||||
"ok")
|
||||
(values '((content-type . (text/plain))) "ok")))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
|
||||
(cache (make-connection-cache uri 1))
|
||||
(n-requests 100)
|
||||
;; 100 requests x ~100 KB of headers each = ~10 MB, well above
|
||||
;; the typical TCP send buffer, so writes fail mid-batch.
|
||||
(large-request
|
||||
(build-request uri
|
||||
#:headers
|
||||
`((x-padding . ,(make-string 100000 #\a)))))
|
||||
(requests (make-list n-requests large-request)))
|
||||
(let ((codes
|
||||
(http-fold-requests
|
||||
cache
|
||||
(lambda (req resp body result)
|
||||
(read-string body)
|
||||
(cons (response-code resp) result))
|
||||
'()
|
||||
requests)))
|
||||
(assert-equal (make-list n-requests 200) codes))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that http-fold-requests processes multiple batches. With batch-size 2
|
||||
;; and 5 requests, three batches are needed; without the pending fix only the
|
||||
;; first batch would be processed.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(values '((content-type . (text/plain))) "ok"))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
|
||||
(cache (make-connection-cache uri 1))
|
||||
(requests (make-list 5 (build-request uri))))
|
||||
(let ((codes
|
||||
(http-fold-requests
|
||||
cache
|
||||
(lambda (req resp body result)
|
||||
(read-string body)
|
||||
(cons (response-code resp) result))
|
||||
'()
|
||||
requests
|
||||
#:batch-size 2)))
|
||||
(assert-equal (make-list 5 200) codes))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
;; Test that an exception raised by proc propagates out of http-fold-requests.
|
||||
(run-fibers-for-tests
|
||||
(lambda ()
|
||||
(let* ((web-server
|
||||
(run-knots-web-server
|
||||
(lambda (request)
|
||||
(values '((content-type . (text/plain))) "ok"))
|
||||
#:port 0))
|
||||
(server-port (web-server-port web-server))
|
||||
(uri (build-uri 'http #:host "127.0.0.1" #:port server-port))
|
||||
(cache (make-connection-cache uri 1))
|
||||
(requests (list (build-request uri))))
|
||||
(assert-equal
|
||||
'proc-exception
|
||||
(exception-message
|
||||
(with-exception-handler
|
||||
(lambda (exn) exn)
|
||||
(lambda ()
|
||||
(http-fold-requests
|
||||
cache
|
||||
(lambda (req resp body result)
|
||||
(raise-exception
|
||||
(make-exception-with-message 'proc-exception)))
|
||||
'()
|
||||
requests))
|
||||
#:unwind? #t)))
|
||||
(destroy-resource-pool cache))))
|
||||
|
||||
(display "web test finished successfully\n")
|
||||
Loading…
Add table
Add a link
Reference in a new issue