Stop using a pool of threads for database operations

Now that squee cooperates with suspendable ports, this is unnecessary. Use a
connection pool to still support running queries in parallel using multiple
connections.
This commit is contained in:
Christopher Baines 2023-07-09 16:52:35 +01:00
parent 672ee6216e
commit 7251c7d653
15 changed files with 1292 additions and 1310 deletions

View file

@ -31,10 +31,12 @@
with-time-logging
prevent-inlining-for-tests
thread-pool-channel
thread-pool-request-timeout
make-thread-pool-channel
parallel-via-thread-pool-channel
resource-pool-default-timeout
make-resource-pool
call-with-resource-from-pool
with-resource-from-pool
parallel-via-fibers
par-map&
letpar&
@ -44,7 +46,10 @@
delete-duplicates/sort!
get-gc-metrics-updater))
get-gc-metrics-updater
call-with-sigint
run-server/patched))
(define (call-with-time-logging action thunk)
(simple-format #t "debug: Starting ~A\n" action)
@ -63,113 +68,206 @@
(define-syntax-rule (prevent-inlining-for-tests var)
(set! var var))
(define* (make-thread-pool-channel threads
#:key
idle-thunk
idle-seconds)
(define (delay-logger seconds-delayed)
(when (> seconds-delayed 1)
(format
(current-error-port)
"warning: thread pool delayed by ~1,2f seconds~%"
seconds-delayed)))
(define* (make-resource-pool initializer max-size
#:key (min-size max-size)
(idle-duration #f)
(delay-logger (const #f))
(duration-logger (const #f))
destructor
lifetime
(name "unnamed"))
(define (initializer/safe)
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"exception running ~A resource pool initializer: ~A:\n ~A\n"
name
initializer
exn)
#f)
(lambda ()
(with-throw-handler #t
initializer
(lambda args
(backtrace))))
#:unwind? #t))
(let ((channel (make-channel)))
(for-each
(lambda _
(call-with-new-thread
(lambda ()
(let loop ()
(match (if idle-seconds
(spawn-fiber
(lambda ()
(let loop ((resources '())
(available '())
(waiters '()))
(match (get-message channel)
(('checkout reply)
(if (null? available)
(if (= (length resources) max-size)
(loop resources
available
(cons reply waiters))
(let ((new-resource (initializer/safe)))
(if new-resource
(let ((checkout-success?
(perform-operation
(choice-operation
(wrap-operation
(put-operation reply new-resource)
(const #t))
(wrap-operation (sleep-operation 0.2)
(const #f))))))
(loop (cons new-resource resources)
(if checkout-success?
available
(cons new-resource available))
waiters))
(loop resources
available
(cons reply waiters)))))
(let ((checkout-success?
(perform-operation
(choice-operation
(get-operation channel)
(wrap-operation (sleep-operation idle-seconds)
(const 'timeout))))
(get-message channel))
('timeout
(when idle-thunk
(with-exception-handler
(lambda (exn)
(simple-format (current-error-port)
"worker thread idle thunk exception: ~A\n"
exn))
idle-thunk
#:unwind? #t))
(wrap-operation
(put-operation reply (car available))
(const #t))
(wrap-operation (sleep-operation 0.2)
(const #f))))))
(if checkout-success?
(loop resources
(cdr available)
waiters)
(loop resources
available
waiters)))))
(('return resource)
;; When a resource is returned, prompt all the waiters to request
;; again. This is to avoid the pool waiting on channels that may
;; be dead.
(for-each
(lambda (waiter)
(spawn-fiber
(lambda ()
(perform-operation
(choice-operation
(put-operation waiter 'resource-pool-retry-checkout)
(sleep-operation 0.2))))))
waiters)
(loop))
(loop resources
(cons resource available)
;; clear waiters, as they've been notified
'()))
(unknown
(simple-format
(current-error-port)
"unrecognised message to ~A resource pool channel: ~A\n"
name
unknown)
(loop resources
available
waiters))))))
(((? channel? reply) sent-time (? procedure? proc))
(let ((time-delay
(- (get-internal-real-time)
sent-time)))
(delay-logger (/ time-delay
internal-time-units-per-second))
(put-message
reply
(with-exception-handler
(lambda (exn)
(cons 'worker-thread-error exn))
(lambda ()
(with-exception-handler
(lambda (exn)
(simple-format
(current-error-port)
"worker thread: exception: ~A\n"
exn)
(backtrace)
(raise-exception exn))
(lambda ()
(call-with-values
proc
(lambda vals
vals)))))
#:unwind? #t)))
(loop))
(_ #f))))))
(iota threads))
channel))
(define &thread-pool-request-timeout
(make-exception-type '&thread-pool-request-timeout
(define resource-pool-default-timeout
(make-parameter #f))
(define &resource-pool-timeout
(make-exception-type '&recource-pool-timeout
&error
'()))
(define make-thread-pool-request-timeout-error
(record-constructor &thread-pool-request-timeout))
(define make-resource-pool-timeout-error
(record-constructor &resource-pool-timeout))
(define thread-pool-request-timeout-error?
(record-predicate &thread-pool-request-timeout))
(define resource-pool-timeout-error?
(record-predicate &resource-pool-timeout))
(define thread-pool-channel
(make-parameter #f))
(define* (call-with-resource-from-pool pool proc #:key (timeout 'default))
"Call PROC with a resource from POOL, blocking until a resource becomes
available. Return the resource once PROC has returned."
(define thread-pool-request-timeout
(make-parameter #f))
(define timeout-or-default
(if (eq? timeout 'default)
(resource-pool-default-timeout)
timeout))
(define (defer-to-thread-pool-channel thunk)
(let ((resource
(let ((reply (make-channel)))
(if timeout-or-default
(let loop ((start-time (get-internal-real-time)))
(perform-operation
(choice-operation
(wrap-operation
(put-operation pool `(checkout ,reply))
(const #t))
(wrap-operation (sleep-operation timeout-or-default)
(const #f))))
(let ((time-remaining
(- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))))
(if (> time-remaining 0)
(let ((response
(perform-operation
(choice-operation
(get-operation reply)
(wrap-operation (sleep-operation time-remaining)
(const #f))))))
(if (or (not response)
(eq? response 'resource-pool-retry-checkout))
(if (> (- timeout-or-default
(/ (- (get-internal-real-time)
start-time)
internal-time-units-per-second))
0)
(loop start-time)
#f)
response))
#f)))
(begin
(put-message pool `(checkout ,reply))
(get-message reply))))))
(when (or (not resource)
(eq? resource 'resource-pool-retry-checkout))
(raise-exception
(make-resource-pool-timeout-error)))
(with-exception-handler
(lambda (exception)
(put-message pool `(return ,resource))
(raise-exception exception))
(lambda ()
(call-with-values
(lambda ()
(proc resource))
(lambda vals
(put-message pool `(return ,resource))
(apply values vals))))
#:unwind? #t)))
(define-syntax-rule (with-resource-from-pool pool resource exp ...)
(call-with-resource-from-pool
pool
(lambda (resource) exp ...)))
(define (defer-to-parallel-fiber thunk)
(let ((reply (make-channel)))
(spawn-fiber
(lambda ()
(let ((val
(perform-operation
(let ((put
(wrap-operation
(put-operation (thread-pool-channel)
(list reply
(get-internal-real-time)
thunk))
(const 'success))))
(or
(and=> (thread-pool-request-timeout)
(lambda (timeout)
(choice-operation
put
(wrap-operation (sleep-operation timeout)
(const 'request-timeout)))))
put)))))
(when (eq? val 'request-timeout)
(put-message reply val)))))
(with-exception-handler
(lambda (exn)
(put-message reply (cons 'exception exn)))
(lambda ()
(call-with-values thunk
(lambda vals
(put-message reply vals))))
#:unwind? #t))
#:parallel? #t)
reply))
(define (fetch-result-of-defered-thunks . reply-channels)
@ -177,21 +275,18 @@
reply-channels)))
(map
(match-lambda
('request-timeout
(raise-exception
(make-thread-pool-request-timeout-error)))
(('worker-thread-error . exn)
(('exception . exn)
(raise-exception exn))
(result
(apply values result)))
responses)))
(define-syntax parallel-via-thread-pool-channel
(define-syntax parallel-via-fibers
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (defer-to-thread-pool-channel
#'(let ((tmp0 (defer-to-parallel-fiber
(lambda ()
e0)))
...)
@ -199,7 +294,7 @@
(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (parallel-via-thread-pool-channel e ...))
(lambda () (parallel-via-fibers e ...))
(lambda (v ...)
b0 b1 ...)))
@ -209,7 +304,7 @@
(match lists
(((heads tails ...) ...)
(let ((tail (loop tails))
(head (defer-to-thread-pool-channel
(head (defer-to-parallel-fiber
(lambda ()
(apply proc heads)))))
(cons (fetch-result-of-defered-thunks head) tail)))
@ -311,3 +406,50 @@
(metric-set metric value))))
metrics))))
;; This variant of run-server from the fibers library supports running
;; multiple servers within one process.
(define run-server/patched
(let ((fibers-web-server-module
(resolve-module '(fibers web server))))
(define set-nonblocking!
(module-ref fibers-web-server-module 'set-nonblocking!))
(define make-default-socket
(module-ref fibers-web-server-module 'make-default-socket))
(define socket-loop
(module-ref fibers-web-server-module 'socket-loop))
(lambda* (handler
#:key
(host #f)
(family AF_INET)
(addr (if host
(inet-pton family host)
INADDR_LOOPBACK))
(port 8080)
(socket (make-default-socket family addr port)))
;; We use a large backlog by default. If the server is suddenly hit
;; with a number of connections on a small backlog, clients won't
;; receive confirmation for their SYN, leading them to retry --
;; probably successfully, but with a large latency.
(listen socket 1024)
(set-nonblocking! socket)
(sigaction SIGPIPE SIG_IGN)
(spawn-fiber (lambda () (socket-loop socket handler))))))
;; Copied from (fibers web server)
(define (call-with-sigint thunk cvar)
(let ((handler #f))
(dynamic-wind
(lambda ()
(set! handler
(sigaction SIGINT (lambda (sig) (signal-condition! cvar)))))
thunk
(lambda ()
(if handler
;; restore Scheme handler, SIG_IGN or SIG_DFL.
(sigaction SIGINT (car handler) (cdr handler))
;; restore original C handler.
(sigaction SIGINT #f))))))