Compare commits
	
		
			No commits in common. "78d22d1accce74191bbfd68dce9e6ac11bc04fcc" and "7ba77010ae98e675340a7ea22b400f0dcc20ef65" have entirely different histories.
		
	
	
		
			78d22d1acc
			...
			7ba77010ae
		
	
		
					 5 changed files with 454 additions and 447 deletions
				
			
		
							
								
								
									
										34
									
								
								knots.scm
									
										
									
									
									
								
							
							
						
						
									
										34
									
								
								knots.scm
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -75,8 +75,6 @@
 | 
			
		|||
                         0 (and prompt-tag 1)))
 | 
			
		||||
            (_
 | 
			
		||||
             (make-stack #t))))
 | 
			
		||||
         (stack-len
 | 
			
		||||
          (stack-length stack))
 | 
			
		||||
         (error-string
 | 
			
		||||
          (call-with-output-string
 | 
			
		||||
            (lambda (port)
 | 
			
		||||
| 
						 | 
				
			
			@ -85,46 +83,30 @@
 | 
			
		|||
                          (filter knots-exception?
 | 
			
		||||
                                  (simple-exceptions exn)))))
 | 
			
		||||
 | 
			
		||||
                (let* ((stack-vec
 | 
			
		||||
                        (stack->vector stack))
 | 
			
		||||
                       (stack-vec-length
 | 
			
		||||
                        (vector-length stack-vec)))
 | 
			
		||||
                (let ((stack-vec
 | 
			
		||||
                       (stack->vector stack)))
 | 
			
		||||
                  (print-frames (list->vector
 | 
			
		||||
                                 (drop
 | 
			
		||||
                                  (vector->list stack-vec)
 | 
			
		||||
                                  (if (< stack-vec-length 5)
 | 
			
		||||
                                      0
 | 
			
		||||
                                      4)))
 | 
			
		||||
                                  6))
 | 
			
		||||
                                port
 | 
			
		||||
                                #:count (stack-length stack)))
 | 
			
		||||
                (for-each
 | 
			
		||||
                 (lambda (stack)
 | 
			
		||||
                   (let* ((stack-vec
 | 
			
		||||
                           (stack->vector stack))
 | 
			
		||||
                          (stack-vec-length
 | 
			
		||||
                           (vector-length stack-vec)))
 | 
			
		||||
                   (let ((stack-vec
 | 
			
		||||
                          (stack->vector stack)))
 | 
			
		||||
                     (print-frames (list->vector
 | 
			
		||||
                                    (drop
 | 
			
		||||
                                     (vector->list stack-vec)
 | 
			
		||||
                                     (if (< stack-vec-length 4)
 | 
			
		||||
                                         0
 | 
			
		||||
                                         3)))
 | 
			
		||||
                                     3))
 | 
			
		||||
                                   port
 | 
			
		||||
                                   #:count (stack-length stack))))
 | 
			
		||||
                 knots-stacks)
 | 
			
		||||
                (print-exception
 | 
			
		||||
                 port
 | 
			
		||||
                 (if (null? knots-stacks)
 | 
			
		||||
                     (stack-ref stack
 | 
			
		||||
                                (if (< stack-len 4)
 | 
			
		||||
                                    stack-len
 | 
			
		||||
                                    4))
 | 
			
		||||
                     (let* ((stack (last knots-stacks))
 | 
			
		||||
                            (stack-len (stack-length stack)))
 | 
			
		||||
                       (stack-ref stack
 | 
			
		||||
                                  (if (< stack-len 3)
 | 
			
		||||
                                      stack-len
 | 
			
		||||
                                      3))))
 | 
			
		||||
                     (stack-ref stack 1)
 | 
			
		||||
                     (stack-ref (last knots-stacks) 3))
 | 
			
		||||
                 '%exception
 | 
			
		||||
                 (list exn)))))))
 | 
			
		||||
    (display error-string port)))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,15 +19,10 @@
 | 
			
		|||
 | 
			
		||||
(define-module (knots promise)
 | 
			
		||||
  #:use-module (srfi srfi-9)
 | 
			
		||||
  #:use-module (ice-9 match)
 | 
			
		||||
  #:use-module (ice-9 atomic)
 | 
			
		||||
  #:use-module (ice-9 exceptions)
 | 
			
		||||
  #:use-module (fibers)
 | 
			
		||||
  #:use-module (fibers conditions)
 | 
			
		||||
  #:use-module (knots)
 | 
			
		||||
  #:export (fibers-promise?
 | 
			
		||||
 | 
			
		||||
            fibers-delay
 | 
			
		||||
  #:export (fibers-delay
 | 
			
		||||
            fibers-force
 | 
			
		||||
            fibers-promise-reset
 | 
			
		||||
            fibers-promise-result-available?))
 | 
			
		||||
| 
						 | 
				
			
			@ -46,18 +41,11 @@
 | 
			
		|||
   (make-condition)))
 | 
			
		||||
 | 
			
		||||
(define (fibers-force fp)
 | 
			
		||||
  (unless (fibers-promise? fp)
 | 
			
		||||
    (raise-exception
 | 
			
		||||
     (make-exception
 | 
			
		||||
      (make-exception-with-message "fibers-force: not a fibers promise")
 | 
			
		||||
      (make-exception-with-irritants fp))))
 | 
			
		||||
 | 
			
		||||
  (let ((res (atomic-box-compare-and-swap!
 | 
			
		||||
              (fibers-promise-values-box fp)
 | 
			
		||||
              #f
 | 
			
		||||
              'started)))
 | 
			
		||||
    (cond
 | 
			
		||||
     ((eq? #f res)
 | 
			
		||||
    (if (eq? #f res)
 | 
			
		||||
        (call-with-values
 | 
			
		||||
            (lambda ()
 | 
			
		||||
              (with-exception-handler
 | 
			
		||||
| 
						 | 
				
			
			@ -67,37 +55,21 @@
 | 
			
		|||
                    (signal-condition!
 | 
			
		||||
                     (fibers-promise-evaluated-condition fp))
 | 
			
		||||
                    (raise-exception exn))
 | 
			
		||||
              (lambda ()
 | 
			
		||||
                (with-exception-handler
 | 
			
		||||
                    (lambda (exn)
 | 
			
		||||
                      (let ((stack
 | 
			
		||||
                             (match (fluid-ref %stacks)
 | 
			
		||||
                               ((stack-tag . prompt-tag)
 | 
			
		||||
                                (make-stack #t
 | 
			
		||||
                                            0 prompt-tag
 | 
			
		||||
                                            0 (and prompt-tag 1)))
 | 
			
		||||
                               (_
 | 
			
		||||
                                (make-stack #t)))))
 | 
			
		||||
                        (raise-exception
 | 
			
		||||
                         (make-exception
 | 
			
		||||
                          exn
 | 
			
		||||
                          (make-knots-exception stack)))))
 | 
			
		||||
                  (fibers-promise-thunk fp)))
 | 
			
		||||
                (fibers-promise-thunk fp)
 | 
			
		||||
                #:unwind? #t))
 | 
			
		||||
          (lambda vals
 | 
			
		||||
            (atomic-box-set! (fibers-promise-values-box fp)
 | 
			
		||||
                             vals)
 | 
			
		||||
            (signal-condition!
 | 
			
		||||
             (fibers-promise-evaluated-condition fp))
 | 
			
		||||
          (apply values vals))))
 | 
			
		||||
      ((eq? res 'started)
 | 
			
		||||
            (apply values vals)))
 | 
			
		||||
        (if (eq? res 'started)
 | 
			
		||||
            (begin
 | 
			
		||||
              (wait (fibers-promise-evaluated-condition fp))
 | 
			
		||||
              (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
 | 
			
		||||
                (if (exception? result)
 | 
			
		||||
                    (raise-exception result)
 | 
			
		||||
               (apply values result)))))
 | 
			
		||||
      (else
 | 
			
		||||
                    (apply values result))))
 | 
			
		||||
            (if (exception? res)
 | 
			
		||||
                (raise-exception res)
 | 
			
		||||
                (apply values res))))))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,38 +27,29 @@
 | 
			
		|||
  #:use-module (rnrs bytevectors)
 | 
			
		||||
  #:use-module (ice-9 q)
 | 
			
		||||
  #:use-module (ice-9 match)
 | 
			
		||||
  #:use-module (ice-9 atomic)
 | 
			
		||||
  #:use-module (ice-9 threads)
 | 
			
		||||
  #:use-module (fibers)
 | 
			
		||||
  #:use-module (fibers timers)
 | 
			
		||||
  #:use-module (fibers channels)
 | 
			
		||||
  #:use-module (fibers operations)
 | 
			
		||||
  #:use-module (knots)
 | 
			
		||||
  #:use-module (knots resource-pool)
 | 
			
		||||
  #:export (set-thread-name
 | 
			
		||||
            thread-name
 | 
			
		||||
 | 
			
		||||
            &thread-pool-timeout-error
 | 
			
		||||
            thread-pool-timeout-error-pool
 | 
			
		||||
            thread-pool-timeout-error?
 | 
			
		||||
            thread-pool?
 | 
			
		||||
            thread-pool-channel
 | 
			
		||||
            thread-pool-arguments-parameter
 | 
			
		||||
            thread-pool-proc-vector
 | 
			
		||||
 | 
			
		||||
            make-thread-pool
 | 
			
		||||
            thread-pool?
 | 
			
		||||
            thread-pool-resource-pool
 | 
			
		||||
            call-with-thread
 | 
			
		||||
 | 
			
		||||
            make-fixed-size-thread-pool
 | 
			
		||||
            fixed-size-thread-pool?
 | 
			
		||||
            fixed-size-thread-pool-channel
 | 
			
		||||
            fixed-size-thread-pool-current-procedures
 | 
			
		||||
            &thread-pool-timeout
 | 
			
		||||
            thread-pool-timeout-error?
 | 
			
		||||
 | 
			
		||||
            ;; These procedures work for thread pools and fixed size
 | 
			
		||||
            ;; thread pools
 | 
			
		||||
            thread-pool-arguments-parameter
 | 
			
		||||
            thread-pool-default-checkout-timeout
 | 
			
		||||
            %thread-pool-default-timeout
 | 
			
		||||
 | 
			
		||||
            destroy-thread-pool
 | 
			
		||||
 | 
			
		||||
            call-with-thread))
 | 
			
		||||
            create-work-queue))
 | 
			
		||||
 | 
			
		||||
(define* (syscall->procedure return-type name argument-types
 | 
			
		||||
                             #:key library)
 | 
			
		||||
| 
						 | 
				
			
			@ -156,64 +147,28 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
      (const "")))
 | 
			
		||||
 | 
			
		||||
(define-record-type <thread-pool>
 | 
			
		||||
  (thread-pool resource-pool arguments-parameter)
 | 
			
		||||
  (thread-pool channel arguments-parameter proc-vector)
 | 
			
		||||
  thread-pool?
 | 
			
		||||
  (resource-pool       thread-pool-resource-pool)
 | 
			
		||||
  (arguments-parameter thread-pool-arguments-parameter-accessor))
 | 
			
		||||
  (channel             thread-pool-channel)
 | 
			
		||||
  (arguments-parameter thread-pool-arguments-parameter)
 | 
			
		||||
  (proc-vector         thread-pool-proc-vector)
 | 
			
		||||
  (default-checkout-timeout
 | 
			
		||||
    thread-pool-default-checkout-timeout))
 | 
			
		||||
 | 
			
		||||
(define-record-type <fixed-size-thread-pool>
 | 
			
		||||
  (fixed-size-thread-pool channel arguments-parameter current-procedures
 | 
			
		||||
                          default-checkout-timeout)
 | 
			
		||||
  fixed-size-thread-pool?
 | 
			
		||||
  (channel             fixed-size-thread-pool-channel)
 | 
			
		||||
  (arguments-parameter fixed-size-thread-pool-arguments-parameter)
 | 
			
		||||
  (current-procedures  fixed-size-thread-pool-current-procedures)
 | 
			
		||||
  (default-checkout-timeout fixed-size-thread-pool-default-checkout-timeout))
 | 
			
		||||
 | 
			
		||||
;; Since both thread pool records have this field, use a procedure
 | 
			
		||||
;; than handles the appropriate accessor
 | 
			
		||||
(define (thread-pool-arguments-parameter pool)
 | 
			
		||||
  (if (fixed-size-thread-pool? pool)
 | 
			
		||||
      (fixed-size-thread-pool-arguments-parameter pool)
 | 
			
		||||
      (thread-pool-arguments-parameter-accessor pool)))
 | 
			
		||||
 | 
			
		||||
(define (thread-pool-default-checkout-timeout pool)
 | 
			
		||||
  (if (fixed-size-thread-pool? pool)
 | 
			
		||||
      (fixed-size-thread-pool-default-checkout-timeout pool)
 | 
			
		||||
      (assq-ref (resource-pool-configuration
 | 
			
		||||
                 (thread-pool-resource-pool pool))
 | 
			
		||||
                'default-checkout-timeout)))
 | 
			
		||||
 | 
			
		||||
(define &thread-pool-timeout
 | 
			
		||||
  (make-exception-type '&thread-pool-timeout
 | 
			
		||||
                       &error
 | 
			
		||||
                       '(pool)))
 | 
			
		||||
 | 
			
		||||
(define make-thread-pool-timeout-error
 | 
			
		||||
  (record-constructor &thread-pool-timeout))
 | 
			
		||||
 | 
			
		||||
(define thread-pool-timeout-error-pool
 | 
			
		||||
  (exception-accessor
 | 
			
		||||
   &thread-pool-timeout
 | 
			
		||||
   (record-accessor &thread-pool-timeout 'pool)))
 | 
			
		||||
 | 
			
		||||
(define thread-pool-timeout-error?
 | 
			
		||||
  (record-predicate &thread-pool-timeout))
 | 
			
		||||
 | 
			
		||||
(define* (make-fixed-size-thread-pool size
 | 
			
		||||
(define* (make-thread-pool size
 | 
			
		||||
                           #:key
 | 
			
		||||
                           thread-initializer
 | 
			
		||||
                           thread-destructor
 | 
			
		||||
                                      delay-logger
 | 
			
		||||
                                      duration-logger
 | 
			
		||||
                           (delay-logger (lambda _ #f))
 | 
			
		||||
                           (duration-logger (const #f))
 | 
			
		||||
                           thread-lifetime
 | 
			
		||||
                           (log-exception? (const #t))
 | 
			
		||||
                           (expire-on-exception? #f)
 | 
			
		||||
                           (name "unnamed")
 | 
			
		||||
                           (use-default-io-waiters? #t)
 | 
			
		||||
                           default-checkout-timeout)
 | 
			
		||||
  (define channel
 | 
			
		||||
    (make-channel))
 | 
			
		||||
 | 
			
		||||
  "Return a channel used to offload work to a dedicated thread.  ARGS are the
 | 
			
		||||
arguments of the thread pool procedure."
 | 
			
		||||
  (define param
 | 
			
		||||
    (make-parameter #f))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -269,18 +224,17 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
            (sleep 1)
 | 
			
		||||
            (destructor/safe args)))))
 | 
			
		||||
 | 
			
		||||
  (define (process channel args)
 | 
			
		||||
    (let loop ()
 | 
			
		||||
  (define (process thread-index channel args)
 | 
			
		||||
    (let loop ((current-lifetime thread-lifetime))
 | 
			
		||||
      (let ((exception?
 | 
			
		||||
             (match (get-message channel)
 | 
			
		||||
        ('destroy #f)
 | 
			
		||||
        ((reply sent-time proc)
 | 
			
		||||
         (when delay-logger
 | 
			
		||||
               (((? channel? reply) sent-time (? procedure? proc))
 | 
			
		||||
                (let ((time-delay
 | 
			
		||||
                       (- (get-internal-real-time)
 | 
			
		||||
                          sent-time)))
 | 
			
		||||
                  (delay-logger (/ time-delay
 | 
			
		||||
                                   internal-time-units-per-second)
 | 
			
		||||
                           proc)))
 | 
			
		||||
                                proc)
 | 
			
		||||
 | 
			
		||||
                  (let* ((start-time (get-internal-real-time))
 | 
			
		||||
                         (response
 | 
			
		||||
| 
						 | 
				
			
			@ -292,6 +246,9 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
                                         internal-time-units-per-second)
 | 
			
		||||
                                      exn))
 | 
			
		||||
                            (lambda ()
 | 
			
		||||
                              (vector-set! thread-proc-vector
 | 
			
		||||
                                           thread-index
 | 
			
		||||
                                           proc)
 | 
			
		||||
                              (with-exception-handler
 | 
			
		||||
                                  (lambda (exn)
 | 
			
		||||
                                    (let ((stack
 | 
			
		||||
| 
						 | 
				
			
			@ -318,11 +275,13 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
                                               internal-time-units-per-second)
 | 
			
		||||
                                            vals))))))
 | 
			
		||||
                            #:unwind? #t)))
 | 
			
		||||
 | 
			
		||||
                    (put-message reply
 | 
			
		||||
                                 response)
 | 
			
		||||
 | 
			
		||||
           (let ((exception?
 | 
			
		||||
                    (vector-set! thread-proc-vector
 | 
			
		||||
                                 thread-index
 | 
			
		||||
                                 #f)
 | 
			
		||||
 | 
			
		||||
                    (match response
 | 
			
		||||
                      (('thread-pool-error duration _)
 | 
			
		||||
                       (when duration-logger
 | 
			
		||||
| 
						 | 
				
			
			@ -331,26 +290,32 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
                      ((duration . _)
 | 
			
		||||
                       (when duration-logger
 | 
			
		||||
                         (duration-logger duration proc))
 | 
			
		||||
                     #f))))
 | 
			
		||||
             (if (and exception?
 | 
			
		||||
                      expire-on-exception?)
 | 
			
		||||
                 #t
 | 
			
		||||
                 (loop))))))))
 | 
			
		||||
                       #f))))))))
 | 
			
		||||
        (unless (and expire-on-exception?
 | 
			
		||||
                     exception?)
 | 
			
		||||
          (if (number? current-lifetime)
 | 
			
		||||
              (unless (< current-lifetime 0)
 | 
			
		||||
                (loop (if current-lifetime
 | 
			
		||||
                          (- current-lifetime 1)
 | 
			
		||||
                          #f)))
 | 
			
		||||
              (loop #f))))))
 | 
			
		||||
 | 
			
		||||
  (define (start-thread index channel)
 | 
			
		||||
  (define (start-threads channel)
 | 
			
		||||
    (for-each
 | 
			
		||||
     (lambda (thread-index)
 | 
			
		||||
       (call-with-new-thread
 | 
			
		||||
        (lambda ()
 | 
			
		||||
          (catch 'system-error
 | 
			
		||||
            (lambda ()
 | 
			
		||||
              (set-thread-name
 | 
			
		||||
               (string-append
 | 
			
		||||
             name " w t " (number->string index))))
 | 
			
		||||
                name " w t "
 | 
			
		||||
                (number->string thread-index))))
 | 
			
		||||
            (const #t))
 | 
			
		||||
 | 
			
		||||
          (let init ((args (if thread-initializer
 | 
			
		||||
                               (initializer/safe)
 | 
			
		||||
                               '())))
 | 
			
		||||
         (let ((continue?
 | 
			
		||||
            (with-exception-handler
 | 
			
		||||
                (lambda (exn)
 | 
			
		||||
                  (simple-format
 | 
			
		||||
| 
						 | 
				
			
			@ -358,96 +323,54 @@ from there, or #f if that would be an empty string."
 | 
			
		|||
                   "knots: thread-pool: internal exception: ~A\n" exn))
 | 
			
		||||
              (lambda ()
 | 
			
		||||
                (parameterize ((param args))
 | 
			
		||||
                      (process channel args)))
 | 
			
		||||
                  #:unwind? #t)))
 | 
			
		||||
                  (process thread-index channel args)))
 | 
			
		||||
              #:unwind? #t)
 | 
			
		||||
 | 
			
		||||
            (when thread-destructor
 | 
			
		||||
              (destructor/safe args))
 | 
			
		||||
 | 
			
		||||
           (when continue?
 | 
			
		||||
             (init (if thread-initializer
 | 
			
		||||
                       (initializer/safe)
 | 
			
		||||
                       '()))))))))
 | 
			
		||||
            (init (initializer/safe))))))
 | 
			
		||||
     (iota size)))
 | 
			
		||||
 | 
			
		||||
  (for-each
 | 
			
		||||
   (lambda (i)
 | 
			
		||||
  (let ((channel (make-channel)))
 | 
			
		||||
    (if use-default-io-waiters?
 | 
			
		||||
        (call-with-default-io-waiters
 | 
			
		||||
         (lambda ()
 | 
			
		||||
            (start-thread i channel)))
 | 
			
		||||
         (start-thread i channel)))
 | 
			
		||||
   (iota size))
 | 
			
		||||
           (start-threads channel)))
 | 
			
		||||
        (start-threads channel))
 | 
			
		||||
 | 
			
		||||
  (fixed-size-thread-pool channel
 | 
			
		||||
    (thread-pool channel
 | 
			
		||||
                 param
 | 
			
		||||
                          thread-proc-vector
 | 
			
		||||
                          default-checkout-timeout))
 | 
			
		||||
                 thread-proc-vector)))
 | 
			
		||||
 | 
			
		||||
(define* (make-thread-pool max-size
 | 
			
		||||
                           #:key
 | 
			
		||||
                           (min-size max-size)
 | 
			
		||||
                           scheduler
 | 
			
		||||
                           thread-initializer
 | 
			
		||||
                           thread-destructor
 | 
			
		||||
                           (delay-logger (lambda _ #f))
 | 
			
		||||
                           (duration-logger (const #f))
 | 
			
		||||
                           thread-lifetime
 | 
			
		||||
                           (expire-on-exception? #f)
 | 
			
		||||
                           (name "unnamed")
 | 
			
		||||
                           (use-default-io-waiters? #t)
 | 
			
		||||
                           default-checkout-timeout)
 | 
			
		||||
  "Return a channel used to offload work to a dedicated thread.  ARGS are the
 | 
			
		||||
arguments of the thread pool procedure."
 | 
			
		||||
  (define param
 | 
			
		||||
    (make-parameter #f))
 | 
			
		||||
(define &thread-pool-timeout
 | 
			
		||||
  (make-exception-type '&thread-pool-timeout
 | 
			
		||||
                       &error
 | 
			
		||||
                       '()))
 | 
			
		||||
 | 
			
		||||
  (let ((resource-pool
 | 
			
		||||
         (make-resource-pool
 | 
			
		||||
          (lambda ()
 | 
			
		||||
            (make-fixed-size-thread-pool
 | 
			
		||||
             1
 | 
			
		||||
             #:thread-initializer thread-initializer
 | 
			
		||||
             #:thread-destructor thread-destructor
 | 
			
		||||
             #:thread-lifetime thread-lifetime
 | 
			
		||||
             #:expire-on-exception? expire-on-exception?
 | 
			
		||||
             #:name name
 | 
			
		||||
             #:use-default-io-waiters? use-default-io-waiters?))
 | 
			
		||||
          max-size
 | 
			
		||||
          #:destructor destroy-thread-pool
 | 
			
		||||
          #:min-size min-size
 | 
			
		||||
          #:delay-logger delay-logger
 | 
			
		||||
          #:scheduler scheduler
 | 
			
		||||
          #:duration-logger duration-logger
 | 
			
		||||
          #:default-checkout-timeout default-checkout-timeout)))
 | 
			
		||||
(define make-thread-pool-timeout-error
 | 
			
		||||
  (record-constructor &thread-pool-timeout))
 | 
			
		||||
 | 
			
		||||
    (thread-pool resource-pool
 | 
			
		||||
                 param)))
 | 
			
		||||
(define thread-pool-timeout-error?
 | 
			
		||||
  (record-predicate &thread-pool-timeout))
 | 
			
		||||
 | 
			
		||||
(define* (call-with-thread thread-pool
 | 
			
		||||
                           proc
 | 
			
		||||
                           #:key
 | 
			
		||||
                           duration-logger
 | 
			
		||||
                           checkout-timeout
 | 
			
		||||
                           channel
 | 
			
		||||
                           destroy-thread-on-exception?
 | 
			
		||||
                           (max-waiters 'default))
 | 
			
		||||
(define* (call-with-thread record proc #:key duration-logger
 | 
			
		||||
                           (timeout (thread-pool-default-checkout-timeout
 | 
			
		||||
                                     record))
 | 
			
		||||
                           (channel (thread-pool-channel record)))
 | 
			
		||||
  "Send PROC to the thread pool through CHANNEL.  Return the result of PROC.
 | 
			
		||||
If already in the thread pool, call PROC immediately."
 | 
			
		||||
  (define (handle-proc fixed-size-thread-pool
 | 
			
		||||
                       reply-channel
 | 
			
		||||
                       start-time
 | 
			
		||||
                       timeout)
 | 
			
		||||
    (let* ((request-channel
 | 
			
		||||
            (or channel
 | 
			
		||||
                (fixed-size-thread-pool-channel
 | 
			
		||||
                 fixed-size-thread-pool)))
 | 
			
		||||
  (let ((args ((thread-pool-arguments-parameter record))))
 | 
			
		||||
    (if args
 | 
			
		||||
        (apply proc args)
 | 
			
		||||
        (let* ((reply (make-channel))
 | 
			
		||||
               (operation-success?
 | 
			
		||||
                (perform-operation
 | 
			
		||||
                 (let ((put
 | 
			
		||||
                        (wrap-operation
 | 
			
		||||
                     (put-operation request-channel
 | 
			
		||||
                                    (list reply-channel
 | 
			
		||||
                                          start-time
 | 
			
		||||
                         (put-operation channel
 | 
			
		||||
                                        (list reply
 | 
			
		||||
                                              (get-internal-real-time)
 | 
			
		||||
                                              proc))
 | 
			
		||||
                         (const #t))))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -462,8 +385,7 @@ If already in the thread pool, call PROC immediately."
 | 
			
		|||
            (raise-exception
 | 
			
		||||
             (make-thread-pool-timeout-error)))
 | 
			
		||||
 | 
			
		||||
    (let ((reply (get-message reply-channel)))
 | 
			
		||||
      (match reply
 | 
			
		||||
          (match (get-message reply)
 | 
			
		||||
            (('thread-pool-error duration exn)
 | 
			
		||||
             (when duration-logger
 | 
			
		||||
               (duration-logger duration))
 | 
			
		||||
| 
						 | 
				
			
			@ -471,54 +393,213 @@ If already in the thread pool, call PROC immediately."
 | 
			
		|||
            ((duration . result)
 | 
			
		||||
             (when duration-logger
 | 
			
		||||
               (duration-logger duration))
 | 
			
		||||
         (apply values result))))))
 | 
			
		||||
             (apply values result)))))))
 | 
			
		||||
 | 
			
		||||
  (let ((args ((thread-pool-arguments-parameter thread-pool))))
 | 
			
		||||
    (if args
 | 
			
		||||
        (apply proc args)
 | 
			
		||||
        (let ((start-time (get-internal-real-time))
 | 
			
		||||
              (reply-channel (make-channel)))
 | 
			
		||||
          (if (fixed-size-thread-pool? thread-pool)
 | 
			
		||||
              (handle-proc thread-pool
 | 
			
		||||
                           reply-channel
 | 
			
		||||
                           start-time
 | 
			
		||||
                           checkout-timeout)
 | 
			
		||||
(define* (create-work-queue thread-count-parameter proc
 | 
			
		||||
                            #:key thread-start-delay
 | 
			
		||||
                            (thread-stop-delay
 | 
			
		||||
                             (make-time time-duration 0 0))
 | 
			
		||||
                            (name "unnamed")
 | 
			
		||||
                            priority<?)
 | 
			
		||||
  (let ((queue (make-q))
 | 
			
		||||
        (queue-mutex (make-mutex))
 | 
			
		||||
        (job-available (make-condition-variable))
 | 
			
		||||
        (running-job-args (make-hash-table)))
 | 
			
		||||
 | 
			
		||||
    (define get-thread-count
 | 
			
		||||
      (cond
 | 
			
		||||
       ((number? thread-count-parameter)
 | 
			
		||||
        (const thread-count-parameter))
 | 
			
		||||
       ((eq? thread-count-parameter #f)
 | 
			
		||||
        ;; Run one thread per job
 | 
			
		||||
        (lambda ()
 | 
			
		||||
          (+ (q-length queue)
 | 
			
		||||
             (hash-count (lambda (index val)
 | 
			
		||||
                           (list? val))
 | 
			
		||||
                         running-job-args))))
 | 
			
		||||
       (else
 | 
			
		||||
        thread-count-parameter)))
 | 
			
		||||
 | 
			
		||||
    (define process-job
 | 
			
		||||
      (if priority<?
 | 
			
		||||
          (lambda* (args #:key priority)
 | 
			
		||||
            (with-mutex queue-mutex
 | 
			
		||||
              (enq! queue (cons priority args))
 | 
			
		||||
              (set-car!
 | 
			
		||||
               queue
 | 
			
		||||
               (stable-sort! (car queue)
 | 
			
		||||
                             (lambda (a b)
 | 
			
		||||
                               (priority<?
 | 
			
		||||
                                (car a)
 | 
			
		||||
                                (car b)))))
 | 
			
		||||
              (sync-q! queue)
 | 
			
		||||
              (start-new-threads-if-necessary (get-thread-count))
 | 
			
		||||
              (signal-condition-variable job-available)))
 | 
			
		||||
          (lambda args
 | 
			
		||||
            (with-mutex queue-mutex
 | 
			
		||||
              (enq! queue args)
 | 
			
		||||
              (start-new-threads-if-necessary (get-thread-count))
 | 
			
		||||
              (signal-condition-variable job-available)))))
 | 
			
		||||
 | 
			
		||||
    (define (count-threads)
 | 
			
		||||
      (with-mutex queue-mutex
 | 
			
		||||
        (hash-count (const #t) running-job-args)))
 | 
			
		||||
 | 
			
		||||
    (define (count-jobs)
 | 
			
		||||
      (with-mutex queue-mutex
 | 
			
		||||
        (+ (q-length queue)
 | 
			
		||||
           (hash-count (lambda (index val)
 | 
			
		||||
                         (list? val))
 | 
			
		||||
                       running-job-args))))
 | 
			
		||||
 | 
			
		||||
    (define (list-jobs)
 | 
			
		||||
      (with-mutex queue-mutex
 | 
			
		||||
        (append (if priority<?
 | 
			
		||||
                    (map cdr (car queue))
 | 
			
		||||
                    (list-copy (car queue)))
 | 
			
		||||
                (hash-fold (lambda (key val result)
 | 
			
		||||
                             (if val
 | 
			
		||||
                                 (cons val result)
 | 
			
		||||
                                 result))
 | 
			
		||||
                           '()
 | 
			
		||||
                           running-job-args))))
 | 
			
		||||
 | 
			
		||||
    (define (thread-process-job job-args)
 | 
			
		||||
      (with-exception-handler
 | 
			
		||||
          (lambda _ #f)
 | 
			
		||||
        (lambda ()
 | 
			
		||||
          (with-exception-handler
 | 
			
		||||
              (lambda (exn)
 | 
			
		||||
                    (if (and (resource-pool-timeout-error? exn)
 | 
			
		||||
                             (eq? (resource-pool-timeout-error-pool exn)
 | 
			
		||||
                                  (thread-pool-resource-pool thread-pool)))
 | 
			
		||||
                        (raise-exception
 | 
			
		||||
                         (make-thread-pool-timeout-error thread-pool))
 | 
			
		||||
                        (raise-exception exn)))
 | 
			
		||||
                (simple-format (current-error-port)
 | 
			
		||||
                               "~A work queue, job raised exception ~A\n"
 | 
			
		||||
                               name job-args)
 | 
			
		||||
                (print-backtrace-and-exception/knots exn)
 | 
			
		||||
                (raise-exception exn))
 | 
			
		||||
            (lambda ()
 | 
			
		||||
                  (call-with-resource-from-pool (thread-pool-resource-pool
 | 
			
		||||
                                                 thread-pool)
 | 
			
		||||
                    (lambda (fixed-size-thread-pool)
 | 
			
		||||
                      (if checkout-timeout
 | 
			
		||||
                          (let ((remaining-time
 | 
			
		||||
                                 (/ (- (get-internal-real-time) start-time)
 | 
			
		||||
                                    internal-time-units-per-second)))
 | 
			
		||||
                            (if (< remaining-time checkout-timeout)
 | 
			
		||||
                                (handle-proc fixed-size-thread-pool
 | 
			
		||||
                                             reply-channel
 | 
			
		||||
                                             start-time
 | 
			
		||||
                                             remaining-time)
 | 
			
		||||
                                (raise-exception
 | 
			
		||||
                                 (make-thread-pool-timeout-error thread-pool))))
 | 
			
		||||
                          (handle-proc fixed-size-thread-pool
 | 
			
		||||
                                       reply-channel
 | 
			
		||||
                                       start-time
 | 
			
		||||
                                       #f)))
 | 
			
		||||
                    #:max-waiters max-waiters
 | 
			
		||||
                    #:timeout checkout-timeout
 | 
			
		||||
                    #:destroy-resource-on-exception?
 | 
			
		||||
                    destroy-thread-on-exception?))))))))
 | 
			
		||||
              (apply proc job-args))))
 | 
			
		||||
        #:unwind? #t))
 | 
			
		||||
 | 
			
		||||
(define (destroy-thread-pool pool)
 | 
			
		||||
  (if (fixed-size-thread-pool? pool)
 | 
			
		||||
      (put-message
 | 
			
		||||
       (fixed-size-thread-pool-channel pool)
 | 
			
		||||
       'destroy)
 | 
			
		||||
      (destroy-resource-pool
 | 
			
		||||
       (thread-pool-resource-pool pool))))
 | 
			
		||||
    (define (start-thread thread-index)
 | 
			
		||||
      (define (too-many-threads?)
 | 
			
		||||
        (let ((running-jobs-count
 | 
			
		||||
               (hash-count (lambda (index val)
 | 
			
		||||
                             (list? val))
 | 
			
		||||
                           running-job-args))
 | 
			
		||||
              (desired-thread-count (get-thread-count)))
 | 
			
		||||
 | 
			
		||||
          (>= running-jobs-count
 | 
			
		||||
              desired-thread-count)))
 | 
			
		||||
 | 
			
		||||
      (define (thread-idle-for-too-long? last-job-finished-at)
 | 
			
		||||
        (time>=?
 | 
			
		||||
         (time-difference (current-time time-monotonic)
 | 
			
		||||
                          last-job-finished-at)
 | 
			
		||||
         thread-stop-delay))
 | 
			
		||||
 | 
			
		||||
      (define (stop-thread)
 | 
			
		||||
        (hash-remove! running-job-args
 | 
			
		||||
                      thread-index)
 | 
			
		||||
        (unlock-mutex queue-mutex))
 | 
			
		||||
 | 
			
		||||
      (call-with-new-thread
 | 
			
		||||
       (lambda ()
 | 
			
		||||
         (catch 'system-error
 | 
			
		||||
           (lambda ()
 | 
			
		||||
             (set-thread-name
 | 
			
		||||
              (string-append name " q t "
 | 
			
		||||
                             (number->string thread-index))))
 | 
			
		||||
           (const #t))
 | 
			
		||||
 | 
			
		||||
         (let loop ((last-job-finished-at (current-time time-monotonic)))
 | 
			
		||||
           (lock-mutex queue-mutex)
 | 
			
		||||
 | 
			
		||||
           (if (too-many-threads?)
 | 
			
		||||
               (stop-thread)
 | 
			
		||||
               (let ((job-args
 | 
			
		||||
                      (if (q-empty? queue)
 | 
			
		||||
                          ;; #f from wait-condition-variable indicates a timeout
 | 
			
		||||
                          (if (wait-condition-variable
 | 
			
		||||
                               job-available
 | 
			
		||||
                               queue-mutex
 | 
			
		||||
                               (+ 9 (time-second (current-time))))
 | 
			
		||||
                              ;; Another thread could have taken
 | 
			
		||||
                              ;; the job in the mean time
 | 
			
		||||
                              (if (q-empty? queue)
 | 
			
		||||
                                  #f
 | 
			
		||||
                                  (if priority<?
 | 
			
		||||
                                      (cdr (deq! queue))
 | 
			
		||||
                                      (deq! queue)))
 | 
			
		||||
                              #f)
 | 
			
		||||
                          (if priority<?
 | 
			
		||||
                              (cdr (deq! queue))
 | 
			
		||||
                              (deq! queue)))))
 | 
			
		||||
 | 
			
		||||
                 (if job-args
 | 
			
		||||
                     (begin
 | 
			
		||||
                       (hash-set! running-job-args
 | 
			
		||||
                                  thread-index
 | 
			
		||||
                                  job-args)
 | 
			
		||||
 | 
			
		||||
                       (unlock-mutex queue-mutex)
 | 
			
		||||
                       (thread-process-job job-args)
 | 
			
		||||
 | 
			
		||||
                       (with-mutex queue-mutex
 | 
			
		||||
                         (hash-set! running-job-args
 | 
			
		||||
                                    thread-index
 | 
			
		||||
                                    #f))
 | 
			
		||||
 | 
			
		||||
                       (loop (current-time time-monotonic)))
 | 
			
		||||
                     (if (thread-idle-for-too-long? last-job-finished-at)
 | 
			
		||||
                         (stop-thread)
 | 
			
		||||
                         (begin
 | 
			
		||||
                           (unlock-mutex queue-mutex)
 | 
			
		||||
 | 
			
		||||
                           (loop last-job-finished-at))))))))))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    (define start-new-threads-if-necessary
 | 
			
		||||
      (let ((previous-thread-started-at (make-time time-monotonic 0 0)))
 | 
			
		||||
        (lambda (desired-count)
 | 
			
		||||
          (let* ((thread-count
 | 
			
		||||
                  (hash-count (const #t) running-job-args))
 | 
			
		||||
                 (threads-to-start
 | 
			
		||||
                  (- desired-count thread-count)))
 | 
			
		||||
            (when (> threads-to-start 0)
 | 
			
		||||
              (for-each
 | 
			
		||||
               (lambda (thread-index)
 | 
			
		||||
                 (when (eq? (hash-ref running-job-args
 | 
			
		||||
                                      thread-index
 | 
			
		||||
                                      'slot-free)
 | 
			
		||||
                            'slot-free)
 | 
			
		||||
                   (let* ((now     (current-time time-monotonic))
 | 
			
		||||
                          (elapsed (time-difference now
 | 
			
		||||
                                                    previous-thread-started-at)))
 | 
			
		||||
                     (when (or (eq? #f thread-start-delay)
 | 
			
		||||
                               (time>=? elapsed thread-start-delay))
 | 
			
		||||
                       (set! previous-thread-started-at now)
 | 
			
		||||
                       (hash-set! running-job-args
 | 
			
		||||
                                  thread-index
 | 
			
		||||
                                  #f)
 | 
			
		||||
                       (start-thread thread-index)))))
 | 
			
		||||
               (iota desired-count)))))))
 | 
			
		||||
 | 
			
		||||
    (if (procedure? thread-count-parameter)
 | 
			
		||||
        (call-with-new-thread
 | 
			
		||||
         (lambda ()
 | 
			
		||||
           (catch 'system-error
 | 
			
		||||
             (lambda ()
 | 
			
		||||
               (set-thread-name
 | 
			
		||||
                (string-append name " q t")))
 | 
			
		||||
             (const #t))
 | 
			
		||||
 | 
			
		||||
           (while #t
 | 
			
		||||
             (sleep 15)
 | 
			
		||||
             (with-mutex queue-mutex
 | 
			
		||||
               (let ((idle-threads (hash-count (lambda (index val)
 | 
			
		||||
                                                 (eq? #f val))
 | 
			
		||||
                                               running-job-args)))
 | 
			
		||||
                 (when (= 0 idle-threads)
 | 
			
		||||
                   (start-new-threads-if-necessary (get-thread-count))))))))
 | 
			
		||||
        (start-new-threads-if-necessary (get-thread-count)))
 | 
			
		||||
 | 
			
		||||
    (values process-job count-jobs count-threads list-jobs)))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,8 +45,6 @@
 | 
			
		|||
            &request-body-ended-prematurely
 | 
			
		||||
            request-body-ended-prematurely-error?
 | 
			
		||||
 | 
			
		||||
            sanitize-response
 | 
			
		||||
 | 
			
		||||
            request-body-port/knots
 | 
			
		||||
            read-request-body/knots
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,47 +6,10 @@
 | 
			
		|||
             (knots thread-pool))
 | 
			
		||||
 | 
			
		||||
(let ((thread-pool
 | 
			
		||||
       (make-fixed-size-thread-pool 2)))
 | 
			
		||||
 | 
			
		||||
  (assert-equal
 | 
			
		||||
   (call-with-thread
 | 
			
		||||
    thread-pool
 | 
			
		||||
    (lambda ()
 | 
			
		||||
      4))
 | 
			
		||||
   4))
 | 
			
		||||
 | 
			
		||||
(let ((thread-pool
 | 
			
		||||
       (make-fixed-size-thread-pool
 | 
			
		||||
        2
 | 
			
		||||
        #:thread-initializer (const '(2)))))
 | 
			
		||||
 | 
			
		||||
  (assert-equal
 | 
			
		||||
   (call-with-thread
 | 
			
		||||
    thread-pool
 | 
			
		||||
    (lambda (num)
 | 
			
		||||
      (* 2 num)))
 | 
			
		||||
   4))
 | 
			
		||||
 | 
			
		||||
(let ((thread-pool
 | 
			
		||||
       (make-fixed-size-thread-pool 2)))
 | 
			
		||||
 | 
			
		||||
  (assert-equal
 | 
			
		||||
   #t
 | 
			
		||||
   (with-exception-handler
 | 
			
		||||
       (lambda (exn)
 | 
			
		||||
         (knots-exception? exn))
 | 
			
		||||
     (lambda ()
 | 
			
		||||
       (call-with-thread
 | 
			
		||||
        thread-pool
 | 
			
		||||
        (lambda ()
 | 
			
		||||
          (+ 1 'a))))
 | 
			
		||||
     #:unwind? #t)))
 | 
			
		||||
 | 
			
		||||
(run-fibers-for-tests
 | 
			
		||||
 (lambda ()
 | 
			
		||||
   (let ((thread-pool
 | 
			
		||||
       (make-thread-pool 2)))
 | 
			
		||||
 | 
			
		||||
  (run-fibers-for-tests
 | 
			
		||||
   (lambda ()
 | 
			
		||||
     (assert-equal
 | 
			
		||||
      (call-with-thread
 | 
			
		||||
       thread-pool
 | 
			
		||||
| 
						 | 
				
			
			@ -54,13 +17,13 @@
 | 
			
		|||
         4))
 | 
			
		||||
      4))))
 | 
			
		||||
 | 
			
		||||
(run-fibers-for-tests
 | 
			
		||||
 (lambda ()
 | 
			
		||||
   (let ((thread-pool
 | 
			
		||||
(let ((thread-pool
 | 
			
		||||
       (make-thread-pool
 | 
			
		||||
        2
 | 
			
		||||
        #:thread-initializer (const '(2)))))
 | 
			
		||||
 | 
			
		||||
  (run-fibers-for-tests
 | 
			
		||||
   (lambda ()
 | 
			
		||||
     (assert-equal
 | 
			
		||||
      (call-with-thread
 | 
			
		||||
       thread-pool
 | 
			
		||||
| 
						 | 
				
			
			@ -68,11 +31,22 @@
 | 
			
		|||
         (* 2 num)))
 | 
			
		||||
      4))))
 | 
			
		||||
 | 
			
		||||
(run-fibers-for-tests
 | 
			
		||||
 (lambda ()
 | 
			
		||||
   (let ((thread-pool
 | 
			
		||||
(let ((process-job
 | 
			
		||||
       count-jobs
 | 
			
		||||
       count-threads
 | 
			
		||||
       list-jobs
 | 
			
		||||
       (create-work-queue
 | 
			
		||||
        2
 | 
			
		||||
        (lambda (i)
 | 
			
		||||
          (* i 2)))))
 | 
			
		||||
 | 
			
		||||
  (process-job 3))
 | 
			
		||||
 | 
			
		||||
(let ((thread-pool
 | 
			
		||||
       (make-thread-pool 2)))
 | 
			
		||||
 | 
			
		||||
  (run-fibers-for-tests
 | 
			
		||||
   (lambda ()
 | 
			
		||||
     (assert-equal
 | 
			
		||||
      #t
 | 
			
		||||
      (with-exception-handler
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue