From 8cff54ea437881e39729a08cdd3258e07698fcb4 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Wed, 18 Mar 2026 09:51:54 +0000 Subject: [PATCH] Add (knots web) --- Makefile.am | 2 + knots/web.scm | 204 +++++++++++++++++++++++++++++++++++++++++++++ tests/web.scm | 223 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 429 insertions(+) create mode 100644 knots/web.scm create mode 100644 tests/web.scm diff --git a/Makefile.am b/Makefile.am index 5551fbe..7942955 100644 --- a/Makefile.am +++ b/Makefile.am @@ -5,6 +5,7 @@ SOURCES = \ knots/non-blocking.scm \ knots/parallelism.scm \ knots/promise.scm \ + knots/web.scm \ knots/queue.scm \ knots/resource-pool.scm \ knots/sort.scm \ @@ -18,6 +19,7 @@ SCM_TESTS = \ tests/parallelism.scm \ tests/promise.scm \ tests/queue.scm \ + tests/web.scm \ tests/resource-pool.scm \ tests/sort.scm \ tests/thread-pool.scm \ diff --git a/knots/web.scm b/knots/web.scm new file mode 100644 index 0000000..73edf37 --- /dev/null +++ b/knots/web.scm @@ -0,0 +1,204 @@ +;;; Guile Knots +;;; Copyright © 2026 Christopher Baines +;;; +;;; This file is part of Guile Knots. +;;; +;;; The Guile Knots is free software; you can redistribute it and/or +;;; modify it under the terms of the GNU General Public License as +;;; published by the Free Software Foundation; either version 3 of the +;;; License, or (at your option) any later version. +;;; +;;; The Guile Knots is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with the guix-data-service. If not, see +;;; . + +(define-module (knots web) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-71) + #:use-module (ice-9 match) + #:use-module (ice-9 exceptions) + #:use-module (web uri) + #:use-module (web request) + #:use-module (web response) + #:use-module (knots) + #:use-module (knots non-blocking) + #:use-module (knots resource-pool) + #:export (make-connection-cache + call-with-connection-cache + call-with-cached-connection + http-fold-requests)) + +(define* (make-connection-cache uri + max-cached-connections + #:key (verify-certificate? #t)) + "Create a resource pool of up to MAX-CACHED-CONNECTIONS +to URI." + (make-resource-pool + (lambda () + ;; Open the socket in a temporary thread so that the blocking + ;; connection attempt does not stall the fiber scheduler. + (call-with-temporary-thread + (lambda () + (non-blocking-open-socket-for-uri + uri + #:verify-certificate? verify-certificate?)))) + max-cached-connections + #:destructor close-port)) + +(define* (call-with-connection-cache uri + max-cached-connections + proc + #:key (verify-certificate? #t)) + "Create a connection cache for URI with up to MAX-CACHED-CONNECTIONS, +call @code{(proc cache)}, then destroy the cache and return +the values returned by PROC." + (let ((cache (make-connection-cache + uri + max-cached-connections + #:verify-certificate? verify-certificate?))) + (call-with-values + (lambda () + (proc cache)) + (lambda vals + (destroy-resource-pool cache) + (apply values vals))))) + +(define* (call-with-cached-connection + cache proc + #:key (close-connection-on-exception? #t)) + "Check out a connection port from CACHE and call @code{(proc port)}, +returning the result. The port is returned to the cache when PROC +returns, or closed on exception if CLOSE-CONNECTION-ON-EXCEPTION? is +true (the default)." + (with-exception-handler + (lambda (exn) + (if (resource-pool-destroy-resource-exception? exn) + (call-with-cached-connection + cache + proc + #:close-connection-on-exception? + close-connection-on-exception?) + (raise-exception exn))) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (call-with-resource-from-pool cache + (lambda (port) + (when (port-closed? port) + (raise-exception + (make-resource-pool-destroy-resource-exception))) + (proc port)) + #:destroy-resource-on-exception? close-connection-on-exception?)))) + #:unwind? #t)) + +(define* (http-fold-requests connection-cache proc seed requests + #:key + (batch-size 1000)) + "Fold PROC over HTTP request/response pairs using CONNECTION-CACHE +for connections. PROC is called as +@code{(proc request response body-port accumulator)} and its return +value becomes the new accumulator. Requests are sent in batches of +up to BATCH-SIZE before responses are read (HTTP pipelining). + +When the server closes the connection mid-batch the remaining requests +are retried on a fresh connection from the cache." + + (define &send-error + (make-exception-type '&send-error &exception '())) + (define make-send-error + (record-constructor &send-error)) + (define send-error? + (exception-predicate &send-error)) + + (define (read-responses port batch result) + (let loop ((request (car batch)) + (remaining-requests (cdr batch)) + (result result)) + (let ((response + (with-exception-handler + (lambda (exn) + (close-port port) + #f) + (lambda () + (read-response port)) + #:unwind? #t))) + (if (not response) + (values (cons request remaining-requests) result) + (let* ((body (response-body-port response)) + (new-result (proc request response body result))) + (if (memq 'close (response-connection response)) + (begin + (close-port port) + (values remaining-requests new-result)) + (if (null? remaining-requests) + (values '() new-result) + (loop (car remaining-requests) + (cdr remaining-requests) + new-result)))))))) + + ;; Send up to BATCH-SIZE requests then hand off to read-responses. + ;; If writing fails the connection has dropped; raise &send-error so the + ;; outer loop retries all remaining requests on a fresh connection. + (define (send-batch port batch) + (with-exception-handler + (lambda (exn) + (close-port port) + (raise-exception (make-send-error))) + (lambda () + (for-each (lambda (req) + (write-request req port)) + batch) + (force-output port)) + #:unwind? #t)) + + (let loop ((remaining-requests requests) + (result seed)) + (if (null? remaining-requests) + result + (let ((next-remaining-requests + next-result + (with-exception-handler + (lambda (exn) + (if (or (send-error? exn) + (resource-pool-destroy-resource-exception? exn)) + (values remaining-requests result) + (raise-exception exn))) + (lambda () + (call-with-resource-from-pool connection-cache + (lambda (port) + (if (port-closed? port) + (raise-exception + (make-resource-pool-destroy-resource-exception)) + (let ((batch + pending + (split-at + remaining-requests + (min batch-size (length + remaining-requests))))) + (send-batch port batch) + (let ((remaining-requests + next-result + (read-responses port batch result))) + (values (append remaining-requests pending) + next-result))))) + #:destroy-resource-on-exception? #t)) + #:unwind? #t))) + (loop next-remaining-requests next-result))))) diff --git a/tests/web.scm b/tests/web.scm new file mode 100644 index 0000000..836f4ca --- /dev/null +++ b/tests/web.scm @@ -0,0 +1,223 @@ +(use-modules (tests) + (fibers) + (srfi srfi-71) + (ice-9 rdelim) + (ice-9 exceptions) + (unit-test) + (web uri) + (web client) + (web request) + (web response) + (knots resource-pool) + (knots web-server) + (knots web)) + +;; Test that call-with-cached-connection passes the port to proc and +;; returns its result. +(run-fibers-for-tests + (lambda () + (let* ((port (open-input-string "")) + (cache (make-fixed-size-resource-pool (list port)))) + (assert-equal + 'ok + (call-with-cached-connection cache (lambda (p) 'ok))) + (destroy-resource-pool cache)))) + +;; Test that call-with-cached-connection retries when the checked-out +;; port is already closed, using a fresh connection from the pool. +(run-fibers-for-tests + (lambda () + (let* ((n 0) + (cache (make-resource-pool + (lambda () + (set! n (+ n 1)) + (if (= n 1) + (let ((p (open-input-string ""))) + (close-port p) + p) + (open-input-string ""))) + 1 + ;; Without a destructor, the resource pool calls (#f port) + ;; when destroying the closed-port resource, looping forever. + #:destructor (const #t)))) + (assert-equal + 'ok + (call-with-cached-connection cache (lambda (p) 'ok))) + (destroy-resource-pool cache)))) + +;; Test that call-with-connection-cache provides a working cache and +;; destroys it after the body returns. +(run-fibers-for-tests + (lambda () + (let* ((web-server + (run-knots-web-server + (lambda (request) + (values '((content-type . (text/plain))) "ok")) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port))) + (assert-equal + 200 + (call-with-connection-cache + uri 1 + (lambda (cache) + (call-with-cached-connection cache + (lambda (p) + (let ((response body + (http-get uri #:port p #:keep-alive? #t))) + (response-code response)))))))))) + +;; Test that http-fold-requests sends requests and folds over responses. +;; The proc must drain the body port between responses so that HTTP +;; pipelining works correctly. +(run-fibers-for-tests + (lambda () + (let* ((web-server + (run-knots-web-server + (lambda (request) + (values '((content-type . (text/plain))) "ok")) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port)) + (cache (make-connection-cache uri 1)) + (requests (list (build-request uri) + (build-request uri)))) + (let ((codes + (http-fold-requests + cache + (lambda (req resp body result) + (read-string body) ; drain body before next pipelined response + (cons (response-code resp) result)) + '() + requests))) + (assert-equal '(200 200) codes)) + (destroy-resource-pool cache)))) + +;; Test that http-fold-requests reconnects and retries remaining requests when +;; the server closes the connection mid-batch via Connection: close. Three +;; requests are sent in one batch; the server closes after the first response, +;; so the remaining two must be retried on a fresh connection. +(run-fibers-for-tests + (lambda () + (let* ((n 0) + (web-server + (run-knots-web-server + (lambda (request) + (set! n (1+ n)) + (if (= n 1) + (values '((content-type . (text/plain)) + (connection . (close))) + "ok") + (values '((content-type . (text/plain))) "ok"))) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port)) + (cache (make-connection-cache uri 1)) + (requests (list (build-request uri) + (build-request uri) + (build-request uri)))) + (let ((codes + (http-fold-requests + cache + (lambda (req resp body result) + (read-string body) + (cons (response-code resp) result)) + '() + requests))) + (assert-equal '(200 200 200) codes)) + (destroy-resource-pool cache)))) + +;; Test that write errors in send-batch are handled gracefully. Each request +;; carries a large header so that the batch data exceeds the TCP send buffer, +;; causing write-request to fail while the server has already closed the +;; connection after the first response. +(run-fibers-for-tests + (lambda () + (let* ((n 0) + (web-server + (run-knots-web-server + (lambda (request) + (set! n (1+ n)) + (if (= n 1) + (values '((content-type . (text/plain)) + (connection . (close))) + "ok") + (values '((content-type . (text/plain))) "ok"))) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port)) + (cache (make-connection-cache uri 1)) + (n-requests 100) + ;; 100 requests x ~100 KB of headers each = ~10 MB, well above + ;; the typical TCP send buffer, so writes fail mid-batch. + (large-request + (build-request uri + #:headers + `((x-padding . ,(make-string 100000 #\a))))) + (requests (make-list n-requests large-request))) + (let ((codes + (http-fold-requests + cache + (lambda (req resp body result) + (read-string body) + (cons (response-code resp) result)) + '() + requests))) + (assert-equal (make-list n-requests 200) codes)) + (destroy-resource-pool cache)))) + +;; Test that http-fold-requests processes multiple batches. With batch-size 2 +;; and 5 requests, three batches are needed; without the pending fix only the +;; first batch would be processed. +(run-fibers-for-tests + (lambda () + (let* ((web-server + (run-knots-web-server + (lambda (request) + (values '((content-type . (text/plain))) "ok")) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port)) + (cache (make-connection-cache uri 1)) + (requests (make-list 5 (build-request uri)))) + (let ((codes + (http-fold-requests + cache + (lambda (req resp body result) + (read-string body) + (cons (response-code resp) result)) + '() + requests + #:batch-size 2))) + (assert-equal (make-list 5 200) codes)) + (destroy-resource-pool cache)))) + +;; Test that an exception raised by proc propagates out of http-fold-requests. +(run-fibers-for-tests + (lambda () + (let* ((web-server + (run-knots-web-server + (lambda (request) + (values '((content-type . (text/plain))) "ok")) + #:port 0)) + (server-port (web-server-port web-server)) + (uri (build-uri 'http #:host "127.0.0.1" #:port server-port)) + (cache (make-connection-cache uri 1)) + (requests (list (build-request uri)))) + (assert-equal + 'proc-exception + (exception-message + (with-exception-handler + (lambda (exn) exn) + (lambda () + (http-fold-requests + cache + (lambda (req resp body result) + (raise-exception + (make-exception-with-message 'proc-exception))) + '() + requests)) + #:unwind? #t))) + (destroy-resource-pool cache)))) + +(display "web test finished successfully\n")