From 66793a5568de6fcf9d656f3e7bdd67d9dd845893 Mon Sep 17 00:00:00 2001 From: Christopher Baines Date: Sun, 28 Dec 2025 19:01:39 +0000 Subject: [PATCH] Rework how data is inserted This is the big change needed to allow for parallel revision processing. Previously, a lock was used to prevent this since the parallel transactions could deadlock if each inserted data that the other then went to insert. By defining the order in which inserts happen, both in terms of the order of tables, and the order of rows within the table, this change should guarantee that there won't be deadlocks. I'm also hoping this change will address whatever issue was causing some derivation data to be missing from the database. --- .dir-locals.el | 1 + Makefile.am | 11 +- guix-data-service/database.scm | 13 + .../jobs/load-new-guix-revision.scm | 2455 +++++++++++------ guix-data-service/model/channel-news.scm | 83 +- guix-data-service/model/license-set.scm | 38 - guix-data-service/model/license.scm | 91 - guix-data-service/model/lint-checker.scm | 53 +- .../model/lint-warning-message.scm | 29 +- guix-data-service/model/lint-warning.scm | 34 +- .../model/package-derivation.scm | 34 +- guix-data-service/model/package-metadata.scm | 89 +- guix-data-service/model/package.scm | 8 - guix-data-service/model/system-test.scm | 49 +- guix-data-service/model/utils.scm | 1595 ++++++++++- guix-data-service/utils.scm | 155 +- scripts/guix-data-service-process-job.in | 4 +- tests/jobs-load-new-guix-revision.scm | 26 +- tests/model-derivation.scm | 17 - tests/model-license-set.scm | 47 - tests/model-license.scm | 44 - tests/model-lint-checker.scm | 38 - tests/model-lint-warning-message.scm | 59 - tests/model-package-metadata.scm | 98 - tests/model-package.scm | 125 - 25 files changed, 3371 insertions(+), 1825 deletions(-) delete mode 100644 guix-data-service/model/license-set.scm delete mode 100644 guix-data-service/model/license.scm delete mode 100644 tests/model-derivation.scm delete mode 100644 tests/model-license-set.scm delete mode 100644 tests/model-license.scm delete mode 100644 tests/model-lint-checker.scm delete mode 100644 tests/model-lint-warning-message.scm delete mode 100644 tests/model-package-metadata.scm delete mode 100644 tests/model-package.scm diff --git a/.dir-locals.el b/.dir-locals.el index cd6ada2..d0ffe3f 100644 --- a/.dir-locals.el +++ b/.dir-locals.el @@ -7,6 +7,7 @@ (scheme-mode (indent-tabs-mode) (eval put 'with-time-logging 'scheme-indent-function 1) + (eval put 'with-delay-logging 'scheme-indent-function 1) (eval put 'make-parameter 'scheme-indent-function 1) (eval put 'fibers-let 'scheme-indent-function 1) (eval put 'call-with-resource-from-pool 'scheme-indent-function 1) diff --git a/Makefile.am b/Makefile.am index 1875e97..14106d8 100644 --- a/Makefile.am +++ b/Makefile.am @@ -98,8 +98,6 @@ SOURCES = \ guix-data-service/model/git-repository.scm \ guix-data-service/model/guix-revision-package-derivation.scm \ guix-data-service/model/guix-revision.scm \ - guix-data-service/model/license-set.scm \ - guix-data-service/model/license.scm \ guix-data-service/model/lint-checker.scm \ guix-data-service/model/lint-warning-message.scm \ guix-data-service/model/lint-warning.scm \ @@ -148,16 +146,9 @@ TESTS = \ tests/branch-updated-emails.scm \ tests/forgejo.scm \ tests/jobs-load-new-guix-revision.scm \ - tests/model-derivation.scm \ tests/model-git-branch.scm \ tests/model-git-commit.scm \ - tests/model-git-repository.scm \ - tests/model-license-set.scm \ - tests/model-license.scm \ - tests/model-lint-checker.scm \ - tests/model-lint-warning-message.scm \ - tests/model-package.scm \ - tests/model-package-metadata.scm + tests/model-git-repository.scm AM_TESTS_ENVIRONMENT = abs_top_srcdir="$(abs_top_srcdir)" diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm index f048c0d..7b266e4 100644 --- a/guix-data-service/database.scm +++ b/guix-data-service/database.scm @@ -16,6 +16,7 @@ ;;; . (define-module (guix-data-service database) + #:use-module (srfi srfi-1) #:use-module (system foreign) #:use-module (ice-9 match) #:use-module (ice-9 threads) @@ -36,6 +37,8 @@ %postgresql-in-transaction? with-postgresql-transaction + postgresql-duplicate-key-error? + check-test-database! lock-advisory-session-lock @@ -250,6 +253,16 @@ result)))) #:unwind? #t)) +(define (postgresql-duplicate-key-error? exn) + (let ((args (exception-args exn))) + (if (and args + (>= (length args) 3)) + (let ((third-arg (third args))) + (and (string? third-arg) + (string-prefix? + "ERROR: duplicate key value violates unique constraint" + third-arg)))))) + (define (check-test-database! conn) (match (exec-query conn "SELECT current_database()") (((name)) diff --git a/guix-data-service/jobs/load-new-guix-revision.scm b/guix-data-service/jobs/load-new-guix-revision.scm index a41599f..3e9cfe4 100644 --- a/guix-data-service/jobs/load-new-guix-revision.scm +++ b/guix-data-service/jobs/load-new-guix-revision.scm @@ -47,6 +47,7 @@ #:use-module (knots parallelism) #:use-module (knots resource-pool) #:use-module (guix monads) + #:use-module (guix base16) #:use-module (guix base32) #:use-module (guix store) #:use-module (guix channels) @@ -76,8 +77,6 @@ #:use-module (guix-data-service model guix-revision) #:use-module (guix-data-service model package-derivation) #:use-module (guix-data-service model guix-revision-package-derivation) - #:use-module (guix-data-service model license) - #:use-module (guix-data-service model license-set) #:use-module (guix-data-service model lint-checker) #:use-module (guix-data-service model lint-warning) #:use-module (guix-data-service model lint-warning-message) @@ -180,6 +179,357 @@ inf))) string file line column) - (list file - line - column))))) + (vector file + line + column))))) (all-system-tests))))) (catch @@ -315,17 +665,18 @@ (setlocale LC_MESSAGES source-locale) (if (string=? description source-description) #f - (cons locale description)))) + (vector locale description)))) (list ,@locales)))) - (cons (cons source-locale source-description) - descriptions-by-locale))) + (list->vector + (cons (vector source-locale source-description) + descriptions-by-locale)))) (map (lambda (checker) (list (lint-checker-name checker) - (lint-descriptions-by-locale checker) (if (memq checker %network-dependent-checkers) #t - #f))) + #f) + (lint-descriptions-by-locale checker))) %all-checkers)) inf))) @@ -347,16 +698,16 @@ (list (match (lint-warning-location lint-warning) (($ file line column) - (list (if (string-prefix? "/gnu/store/" file) - ;; Convert a string like - ;; /gnu/store/53xh0mpigin2rffg31s52x5dc08y0qmr-guix-module-union/share/guile/site/2.2/gnu/packages/xdisorg.scm - ;; - ;; This happens when the checker uses - ;; package-field-location. - (string-join (drop (string-split file #\/) 8) "/") - file) - line - column))) + (vector (if (string-prefix? "/gnu/store/" file) + ;; Convert a string like + ;; /gnu/store/53xh0mpigin2rffg31s52x5dc08y0qmr-guix-module-union/share/guile/site/2.2/gnu/packages/xdisorg.scm + ;; + ;; This happens when the checker uses + ;; package-field-location. + (string-join (drop (string-split file #\/) 8) "/") + file) + line + column))) (let* ((source-locale "en_US.UTF-8") (source-message (begin @@ -379,28 +730,30 @@ (setlocale LC_MESSAGES source-locale) (if (string=? message source-message) #f - (cons locale message)))) + (vector locale message)))) (list ,@locales)))) - (cons (cons source-locale source-message) - messages-by-locale)))) + (list->vector + (cons (vector source-locale source-message) + messages-by-locale))))) (vector-map (lambda (_ package) - (map process-lint-warning - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "exception checking ~A with ~A checker: ~A\n" - package checker-name exn) - ;; TODO Record and report this exception - '()) - (lambda () - (if (and lint-checker-requires-store?-defined? - (lint-checker-requires-store? checker)) + (list->vector + (map process-lint-warning + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "exception checking ~A with ~A checker: ~A\n" + package checker-name exn) + ;; TODO Record and report this exception + '()) + (lambda () + (if (and lint-checker-requires-store?-defined? + (lint-checker-requires-store? checker)) - (check package #:store store) - (check package))) - #:unwind? #t))) + (check package #:store store) + (check package))) + #:unwind? #t)))) gds-inferior-packages)))) (ensure-gds-inferior-packages-defined! inf) @@ -857,6 +1210,41 @@ (with-time-logging "ensuring gds-inferior-packages is defined in inferior" (inferior-packages-plus-replacements inf)))) +(define (inferior-packages->license-data inf) + (define proc + `(vector-map + (lambda (_ package) + (match (package-license package) + ((? license? license) + (vector + (vector (license-name license) + (license-uri license) + (license-comment license)))) + ((values ...) + (list->vector + (map (match-lambda + ((? license? license) + (vector (license-name license) + (license-uri license) + (license-comment license))) + (x + (simple-format + (current-error-port) + "error: unknown license value ~A for package ~A" + x package) + #f)) + values))) + (x + (simple-format + (current-error-port) + "error: unknown license value ~A for package ~A" + x package) + '()))) + gds-inferior-packages)) + + (inferior-eval '(use-modules (guix licenses)) inf) + (inferior-eval proc inf)) + (define* (all-inferior-packages-data inf packages pkg-to-replacement-hash-table) (define inferior-package-id->packages-index-hash-table (let ((hash-table (make-hash-table))) @@ -917,133 +1305,235 @@ (metadata . ,package-metadata) (replacements . ,package-replacement-data)))) -(define (insert-packages conn inferior-packages-data) +(define (inferior-packages->package-metadata-ids table-manager-coordinator + package-metadata + license-set-ids) + (define (vector-zip . vecs) + (let ((result (make-vector (vector-length (first vecs))))) + (apply vector-map! + (lambda (i . vals) + (list->vector (cdr vals))) + (cons result vecs)) + result)) + + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "package_metadata") + (vector-zip + (vector-map (match-lambda* + ((_ (home-page rest ...)) + (if (string? home-page) + home-page + NULL))) + package-metadata) + (with-time-logging "preparing location ids" + (let* ((locations-vector + (vector-map + (match-lambda* + ((_ (_ location rest ...)) + (match location + (#f #f) + (($ file line column) + (vector file line column))))) + package-metadata)) + (result + (call-with-false-hidden-in-vector + locations-vector + (lambda (locations-vector) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "locations") + locations-vector))))) + (vector-map! + (lambda (_ val) + (or val NULL)) + result) + result)) + license-set-ids + (with-time-logging "preparing package description set ids" + (let* ((package-description-counts + (make-vector (vector-length package-metadata))) + (all-package-descriptions + (list->vector + (vector-fold-right + (lambda (index result metadata) + (match metadata + ((_ _ package-description-data _) + (vector-set! package-description-counts + index + (length package-description-data)) + (fold + (lambda (locale-and-description result) + (match locale-and-description + ((locale . description) + (cons + (vector locale + ;; \u0000 has appeared in package + ;; descriptions (#71968), so strip it + ;; out here to avoid PostgreSQL throwing + ;; an error + (string-delete-null description)) + result)))) + result + package-description-data)))) + '() + package-metadata))) + (package-descriptions-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "package_descriptions") + all-package-descriptions))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "package_description_sets") + (group-ids-by-counts-vector + package-descriptions-ids + package-description-counts)))) + (with-time-logging "preparing package synopsis set ids" + (let* ((package-synopsis-counts + (make-vector (vector-length package-metadata))) + (all-package-synopsis + (list->vector + (vector-fold-right + (lambda (index result metadata) + (match metadata + ((_ _ _ package-synopsis-data) + (vector-set! package-synopsis-counts + index + (length package-synopsis-data)) + (fold + (lambda (locale-and-synopsis result) + (match locale-and-synopsis + ((locale . synopsis) + (cons + (vector locale + (string-delete-null synopsis)) + result)))) + result + package-synopsis-data)))) + '() + package-metadata))) + (package-synopsis-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "package_synopsis") + all-package-synopsis))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "package_synopsis_sets") + (group-ids-by-counts-vector + package-synopsis-ids + package-synopsis-counts))))))) + +(define (insert-packages postgresql-connection-pool + table-manager-coordinator + inferior-packages-data) (let* ((names (assq-ref inferior-packages-data 'names)) (versions (assq-ref inferior-packages-data 'versions)) (package-license-set-ids (with-time-logging "inserting package license sets" - (inferior-packages->license-set-ids - conn - (inferior-packages->license-id-lists - conn - (assq-ref inferior-packages-data 'license-data))))) + (let* ((license-sets-counts + (make-vector + (vector-length + (assq-ref inferior-packages-data 'license-data)))) + (license-set-ids + ;; TODO Deduplicate + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "licenses") + (list->vector + (vector-fold-right + (lambda (index result license-details-vector) + (vector-set! license-sets-counts + index + (vector-length license-details-vector)) + (vector-fold-right + (lambda (_ result details) + (match details + (#(name uri comment) + (cons + (vector name + (or uri NULL) + (or comment NULL)) + result)))) + result + license-details-vector)) + '() + (assq-ref inferior-packages-data 'license-data))))) + (license-set-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "license_sets") + (group-ids-by-counts-vector + license-set-ids + license-sets-counts)))) + (close-table-manager table-manager-coordinator + "licenses") + (close-table-manager table-manager-coordinator + "license_sets") + + license-set-ids))) (all-package-metadata-ids new-package-metadata-ids (with-time-logging "inserting package metadata entries" (inferior-packages->package-metadata-ids - conn + table-manager-coordinator (assq-ref inferior-packages-data 'metadata) package-license-set-ids))) (replacement-package-ids (vector-map (lambda (_ package-index-or-false) (if package-index-or-false - (vector-ref - (inferior-packages->package-ids - conn - (vector - (list (vector-ref names package-index-or-false) - (vector-ref versions package-index-or-false) - (vector-ref all-package-metadata-ids - package-index-or-false) - (cons "integer" NULL)))) - 0) - (cons "integer" NULL))) + (table-manager-add-row + (fetch-table-manager table-manager-coordinator + "packages") + (vector + (vector-ref names package-index-or-false) + (vector-ref versions package-index-or-false) + (vector-ref all-package-metadata-ids + package-index-or-false) + NULL)) + NULL)) (assq-ref inferior-packages-data 'replacements)))) - (unless (= 0 (vector-length new-package-metadata-ids)) - (with-time-logging "inserting package metadata tsvector entries" - (insert-package-metadata-tsvector-entries - conn new-package-metadata-ids))) + (close-table-manager table-manager-coordinator + "package_descriptions") + (close-table-manager table-manager-coordinator + "package_description_sets") + (close-table-manager table-manager-coordinator + "package_synopsis") + (close-table-manager table-manager-coordinator + "package_synopsis_sets") + (close-table-manager table-manager-coordinator + "package_metadata") - (with-time-logging "getting package-ids (without replacements)" - (inferior-packages->package-ids - conn - ;; Similar to zip, but generating a vector of lists - (vector-map (lambda (index . vals) vals) - names - versions - all-package-metadata-ids - replacement-package-ids))))) + (let ((new-package-metadata-ids-list + (vector-fold-right + (lambda (_ result id new?) + (if new? + (cons id result) + result)) + '() + all-package-metadata-ids + new-package-metadata-ids)) + (package-ids + (with-time-logging "getting package-ids (without replacements)" + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "packages") + (vector-map (lambda (index . vals) + (list->vector vals)) + names + versions + all-package-metadata-ids + replacement-package-ids))))) -(define (insert-lint-warnings conn - package-ids - lint-checker-ids - lint-warnings-data) - (vector-fold - (lambda (_ result lint-checker-id warnings-per-package) - (if warnings-per-package - (vector-fold - (lambda (_ result package-id warnings) - (if (null? warnings) - result - (cons - (lint-warnings-data->lint-warning-ids - conn - (list->vector - (map - (match-lambda - ((location-data messages-by-locale) - (let ((location-id - (location->location-id - conn - (apply location location-data))) - (lint-warning-message-set-id - (lint-warning-message-data->lint-warning-message-set-id - conn - messages-by-locale))) - (list lint-checker-id - package-id - location-id - lint-warning-message-set-id)))) - warnings))) - result))) - result - package-ids - warnings-per-package) - result)) - '() - lint-checker-ids - lint-warnings-data)) + (close-table-manager table-manager-coordinator + "packages") -(define (update-derivation-ids-hash-table! conn - derivation-ids-hash-table - derivations-or-file-names) - (define derivations-count (vector-length derivations-or-file-names)) - - (let ((missing-file-names - (vector-fold - (lambda (_ result file-name-or-drv) - (if file-name-or-drv - (let ((file-name - (if (string? file-name-or-drv) - file-name-or-drv - (derivation-file-name file-name-or-drv)))) - (if (hash-ref derivation-ids-hash-table - file-name) - result - (cons file-name - result))) - result)) - '() - derivations-or-file-names))) - - (simple-format - #t "debug: update-derivation-ids-hash-table!: lookup ~A file-names, ~A not cached\n" - derivations-count (length missing-file-names)) - - (unless (null? missing-file-names) - (chunk-for-each! - (lambda (chunk) - (for-each - (match-lambda - ((id file-name) - (hash-set! derivation-ids-hash-table - file-name - (string->number id)))) - (exec-query conn (select-existing-derivations chunk)))) - 1000 - missing-file-names)))) + (values + package-ids + new-package-metadata-ids-list)))) (define (compute-and-update-derivation-source-file-nar postgresql-connection-pool @@ -1087,247 +1577,6 @@ compressed-nar-bytevector uncompressed-size))))) -(define* (derivations-insert-sources postgresql-connection-pool - call-with-utility-thread - derivations - derivation-ids - #:key (log-tag "unspecified")) - (with-time-logging - (string-append "insert-missing-derivations: inserting sources for " - (number->string (vector-length derivations)) - " derivations (" log-tag ")") - (let ((sources-ids-vector - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - (string-append - "insert-missing-derivations: inserting " - (number->string (vector-length derivations)) - " derivation_source_files and derivation_sources" - " (" log-tag ")") - (vector-map - (lambda (_ derivation-id derivation) - (let ((sources (derivation-sources derivation))) - (if (null? sources) - #() - (insert-derivation-sources conn - derivation-id - sources)))) - derivation-ids - derivations))))) - (with-time-logging - (string-append - "insert-missing-derivations: inserting " - (number->string (vector-length derivations)) - " derivation_source_file_nars" - " (" log-tag ")") - (fibers-for-each - (lambda (derivation source-ids) - (for-each - (lambda (id source-file) - (when - (with-resource-from-pool postgresql-connection-pool conn - (match - (exec-query - conn - " -SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" - (list (number->string id))) - (() - ;; Insert a placeholder to avoid other fibers - ;; working on this source file - (insert-placeholder-derivation-source-file-nar - conn - id) - #t) - (_ #f))) - ;; Use a utility thread to control concurrency here, to - ;; avoid using too much memory - (call-with-utility-thread - (lambda () - (compute-and-update-derivation-source-file-nar - postgresql-connection-pool - id - source-file))))) - (vector->list source-ids) - (derivation-sources derivation))) - derivations - sources-ids-vector))))) - -(define* (insert-missing-derivations postgresql-connection-pool - call-with-utility-thread - derivation-ids-hash-table - unfiltered-derivations - #:key (log-tag "unspecified")) - - (define (insert-into-derivations conn drvs) - (insert-missing-data-and-return-all-ids - conn - "derivations" - '(file_name builder args env_vars system_id) - (vector-map (match-lambda* - ((_ ($ outputs inputs sources - system builder args env-vars file-name)) - (list file-name - builder - (cons "varchar[]" - (list->vector args)) - (cons "varchar[][]" - (list->vector - (map (match-lambda - ((key . value) - (vector key value))) - env-vars))) - (system->system-id conn system)))) - drvs))) - - (define (insert-derivations) - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-ids-hash-table! - conn - derivation-ids-hash-table - (list->vector unfiltered-derivations)) - - (let ((derivations - ;; Do this while holding the PostgreSQL connection to - ;; avoid conflicts with other fibers - (list->vector - (delete-duplicates - (filter-map (lambda (derivation) - (if (hash-ref derivation-ids-hash-table - (derivation-file-name - derivation)) - #f - derivation)) - unfiltered-derivations))))) - (if (= 0 (vector-length derivations)) - (values #() #()) - (begin - (simple-format - (current-error-port) - "insert-missing-derivations: inserting ~A derivations (~A)\n" - (vector-length derivations) - log-tag) - (let ((derivation-ids - (insert-into-derivations conn derivations))) - - ;; Do this while holding the connection so that other - ;; fibers don't also try inserting the same derivations - (with-time-logging - (string-append "insert-missing-derivations: updating hash table (" log-tag ")") - (vector-for-each (lambda (_ derivation derivation-id) - (hash-set! derivation-ids-hash-table - (derivation-file-name derivation) - derivation-id)) - derivations - derivation-ids)) - - (simple-format - (current-error-port) - "insert-missing-derivations: finished inserting ~A derivations (~A)\n" - (vector-length derivations) - log-tag) - - (values derivations - derivation-ids))))))) - - (define (insert-input-derivations derivations) - (with-time-logging - (string-append - "insert-missing-derivations: ensure-input-derivations-exist (" - log-tag ")") - (let ((input-derivations - (vector-fold - (lambda (_ result drv) - (append! (map derivation-input-derivation - (derivation-inputs drv)) - result)) - '() - derivations))) - (unless (null? input-derivations) - (let loop ((chunk '()) - (count 0) - (rest input-derivations)) - (if (null? rest) - (unless (null? chunk) - (insert-missing-derivations - postgresql-connection-pool - call-with-utility-thread - derivation-ids-hash-table - chunk - #:log-tag log-tag)) - (if (= count 1000) - (begin - (simple-format #t "debug: inserting ~A input derivations\n" - count) - (insert-missing-derivations - postgresql-connection-pool - call-with-utility-thread - derivation-ids-hash-table - chunk - #:log-tag log-tag) - (loop '() - 0 - rest)) - (let ((drv (car rest))) - (if (hash-ref derivation-ids-hash-table - (derivation-file-name - drv)) - (loop chunk - count - (cdr rest)) - (loop (cons drv chunk) - (+ 1 count) - (cdr rest))))))))))) - - (let ((derivations - derivation-ids - (insert-derivations))) - - (unless (= 0 (vector-length derivations)) - (fibers-parallel - (derivations-insert-sources postgresql-connection-pool - call-with-utility-thread - derivations - derivation-ids - #:log-tag log-tag) - (with-time-logging - (string-append "insert-missing-derivations: inserting outputs (" - log-tag ")") - (with-resource-from-pool postgresql-connection-pool conn - (vector-for-each - (lambda (_ derivation-id derivation) - (insert-derivation-outputs conn - derivation-id - (derivation-outputs derivation))) - derivation-ids - derivations))) - (insert-input-derivations derivations)) - - (simple-format - (current-error-port) - "debug: insert-missing-derivations: done parallel (~A)\n" log-tag) - (retry-on-missing-derivation-output - (lambda () - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - (simple-format - #f "insert-missing-derivations: inserting inputs for ~A derivations (~A)" - (vector-length derivations) - log-tag) - (insert-derivation-inputs conn - derivation-ids - derivations)))) - #:on-exception - (lambda () - ;; If this has happened because derivations have been removed, it - ;; might be necessary to insert them in the database where they - ;; previously existed. Clear the hash table while having the - ;; PostgreSQL connection to avoid issues with it being used at the - ;; same time. - (with-resource-from-pool postgresql-connection-pool conn - (hash-clear! derivation-ids-hash-table)) - (insert-input-derivations derivations)))))) - (define (fix-derivation file-name) (define (derivation-missing-inputs? conn drv-id) (let ((inputs (select-derivation-inputs-by-derivation-id @@ -1371,37 +1620,23 @@ SELECT 1 FROM derivation_source_file_nars WHERE derivation_source_file_id = $1" (match (select-derivation-by-file-name conn (derivation-file-name drv)) ((drv-id rest ...) - (when (and (derivation-missing-sources? conn drv-id) - (not (null? (derivation-sources drv)))) + (when (or (and (derivation-missing-sources? conn drv-id) + (not (null? (derivation-sources drv)))) + (and (derivation-missing-inputs? conn drv-id) + (not (null? (derivation-inputs drv))))) (with-postgresql-transaction conn (lambda (conn) - (derivations-insert-sources postgresql-connection-pool - call-with-utility-thread - (vector drv) - (vector drv-id))))) - - (when (and (derivation-missing-inputs? conn drv-id) - (not (null? (derivation-inputs drv)))) - (with-postgresql-transaction - conn - (lambda (conn) - (let ((input-derivations - (map derivation-input-derivation - (derivation-inputs drv)))) - (unless (null? input-derivations) - ;; Ensure all the input derivations exist - (chunk-for-each! - (lambda (chunk) - (insert-missing-derivations - postgresql-connection-pool - call-with-utility-thread - derivation-ids-hash-table - chunk)) - 1000 - input-derivations))))) - - (fix-derivation-inputs conn drv)))) + (let ((table-manager-coordinator + (get-table-manager-coordinator + postgresql-connection-pool))) + (insert-derivations-with-table-managers + table-manager-coordinator + call-with-utility-thread + (vector drv) + #:repair? #t) + (destroy-table-manager-coordinator + table-manager-coordinator))))))) (destroy-resource-pool postgresql-connection-pool))))))) #:unwind? #t)) @@ -1782,116 +2017,378 @@ WHERE builder != 'builtin:download' (for-each fix-derivation broken-derivations)))) -(define (flatten-derivation-graph derivations - derivation-ids-hash-table) +(define (flatten-derivation-graph table-manager-coordinator + derivations) (define seen-hash-table (make-hash-table)) - (define (flatten-inputs derivation result) + (define derivations-table-manager + (fetch-table-manager table-manager-coordinator "derivations")) + + (define derivation-outputs-table-manager + (fetch-table-manager table-manager-coordinator "derivation_outputs")) + + (define (add-inputs-recursively result derivation) (fold (lambda (input result) (let ((drv (derivation-input-derivation input))) - (if (or (hash-ref derivation-ids-hash-table - (derivation-file-name drv)) - (hash-ref seen-hash-table - drv)) + (if (or (hash-ref seen-hash-table drv) + (let ((derivation-id + (table-manager-id-for-key + derivations-table-manager + (derivation-file-name drv)))) + (and derivation-id + (every + (lambda (name) + (table-manager-id-for-key + derivation-outputs-table-manager + (vector derivation-id + name))) + (derivation-input-sub-derivations input))))) result (begin (hash-set! seen-hash-table drv #t) (cons drv - (flatten-inputs drv result)))))) + (add-inputs-recursively result drv)))))) result (derivation-inputs derivation))) (reverse! - (fold - (lambda (derivation result) - (let ((flat-inputs - (flatten-inputs derivation - result))) - (cons derivation - flat-inputs))) + (vector-fold + (lambda (_ result derivation) + (if derivation + (cons derivation + (add-inputs-recursively result derivation)) + result)) '() derivations))) -(define* (derivation-file-names->derivation-ids postgresql-connection-pool - call-with-utility-thread - read-derivations/serialised - derivation-ids-hash-table - derivation-file-names - #:key (log-tag "unspecified")) - (define derivations-count - (vector-length derivation-file-names)) +(define* (insert-derivations-with-table-managers table-manager-coordinator + call-with-utility-thread + derivations + #:key repair? + store-items-promise) + (define (extract-vector-columns vecs) + (let ((result (make-vector (vector-length (first vecs))))) + (apply vector-map! + (lambda (i . vals) + (list->vector (cdr vals))) + (cons result vecs)) + (apply values (vector->list result)))) - (if (= 0 derivations-count) - #() - (begin - (with-resource-from-pool postgresql-connection-pool conn - (update-derivation-ids-hash-table! - conn - derivation-ids-hash-table - derivation-file-names)) + (define (vector-zip . vecs) + (let ((result (make-vector (vector-length (first vecs))))) + (apply vector-map! + (lambda (i . vals) + (list->vector (cdr vals))) + (cons result vecs)) + result)) - (let ((missing-derivation-filenames - (deduplicate-strings - (vector-fold - (lambda (_ result derivation-file-name) - (if (not derivation-file-name) - result - (if (hash-ref derivation-ids-hash-table - derivation-file-name) - result - (cons derivation-file-name result)))) - '() - derivation-file-names)))) - (simple-format - #t "debug: derivation-file-names->derivation-ids: processing ~A missing derivations (~A)\n" - (length missing-derivation-filenames) - log-tag) + (define (insert-derivation-outputs derivations derivation-ids) + (let* ((derivation-output-details-counts + (make-vector + (vector-length derivations))) + (derivations-output-details-derivation-ids + derivations-output-details-names + derivations-output-details + (with-delay-logging "prepare derivations vectors" + (extract-vector-columns + (vector-fold-right + (lambda (index result derivation derivation-id) + (vector-set! derivation-output-details-counts + index + (length (derivation-outputs derivation))) + (fold + (lambda (name-and-output result) + (match name-and-output + ((name . ($ path hash-algo + hash recursive?)) + (cons + (vector + derivation-id + name + (vector path + (or (and=> hash-algo symbol->string) + NULL) + (or (and=> hash bytevector->base16-string) + NULL) + recursive?)) + result)))) + result + (derivation-outputs derivation))) + '() + derivations + derivation-ids)))) + (derivation-output-details-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_output_details") + derivations-output-details))) - (let ((chunk-counter 0)) - (chunk-for-each! - (lambda (missing-derivation-file-names-chunk) - (simple-format - #t "debug: derivation-file-names->derivation-ids: processing chunk ~A (~A)\n" - chunk-counter - log-tag) - (let* ((missing-derivations-chunk - (read-derivations/serialised - missing-derivation-file-names-chunk)) - (flat-missing-derivations - (with-time-logging "flattening missing derivations" - (flatten-derivation-graph - missing-derivations-chunk - derivation-ids-hash-table)))) - (simple-format - #t "debug: derivation-file-names->derivation-ids: processing ~A flat missing derivations (~A)\n" - (length flat-missing-derivations) - log-tag) - (set! chunk-counter (+ 1 chunk-counter)) - (insert-missing-derivations postgresql-connection-pool - call-with-utility-thread - derivation-ids-hash-table - missing-derivations-chunk - #:log-tag log-tag))) - 1000 - missing-derivation-filenames)) + (fibers-parallel + (let ((derivation-output-detail-sets-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_output_details_sets") + (with-delay-logging "prepare grouped derivation outputs vectors" + (group-ids-by-counts-vector + derivation-output-details-ids + derivation-output-details-counts))))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivations_by_output_details_set") + (vector-map + (lambda (_ derivation-id derivation-output-detail-set-id) + (vector derivation-id + derivation-output-detail-set-id)) + derivation-ids + derivation-output-detail-sets-ids))) - (let ((all-ids - (vector-map - (lambda (_ derivation-file-name) - (if derivation-file-name - (or (hash-ref derivation-ids-hash-table - derivation-file-name) - (error - (simple-format #f "missing derivation id (~A)" - derivation-file-name))) - #f)) - derivation-file-names))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_outputs") + (with-delay-logging "prepare zipped derivations vectors" + (vector-zip derivations-output-details-derivation-ids + derivations-output-details-names + derivation-output-details-ids))))) - all-ids))))) + *unspecified*) -(prevent-inlining-for-tests derivation-file-names->derivation-ids) + (define (insert-derivation-inputs derivations derivation-ids) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_inputs") + (with-delay-logging "prepare derivation-inputs vectors" + (list->vector + (vector-fold + (lambda (_ result derivation derivation-id) + (fold + (lambda (input result) + (let* ((input-derivation + (derivation-input-derivation input)) + (input-derivation-filename + (derivation-file-name + input-derivation)) + (input-derivation-id + (table-manager-lookup-id-or-placeholder + (fetch-table-manager table-manager-coordinator + "derivations") + input-derivation-filename))) + (fold + (lambda (name result) + (let ((derivation-output-id + (table-manager-lookup-id-or-placeholder + (fetch-table-manager table-manager-coordinator + "derivation_outputs") + (vector input-derivation-id + name)))) + (cons (vector derivation-id + derivation-output-id) + result))) + result + (derivation-input-sub-derivations input)))) + result + (derivation-inputs derivation))) + '() + derivations + derivation-ids))))) + + (define (get-source-file-details source) + (define (read-source) + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (unless (file-exists? source) + (raise-exception + (make-missing-store-item-error + source))) + (write-file source port) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res)))) + + (let* ((nar-bytevector + (if store-items-promise + (retry-on-missing-store-item + read-source + #:on-exception + (lambda () + (simple-format #t "debug: missing store item ~A, retrying\n" + source) + (fibers-promise-reset store-items-promise) + (fibers-force store-items-promise))) + (read-source))) + (compressed-nar-bytevector + (call-with-values + (lambda () + (open-bytevector-output-port)) + (lambda (port get-bytevector) + (call-with-lzip-output-port port + (lambda (port) + (put-bytevector port nar-bytevector)) + #:level 9) + (let ((res (get-bytevector))) + (close-port port) ; maybe reduces memory? + res)))) + (hash + (bytevector->nix-base32-string + (sha256 nar-bytevector))) + (uncompressed-size + (bytevector-length nar-bytevector))) + `((hash . ,hash) + (uncompressed-size . ,uncompressed-size) + (data + . ,(string-append + "\\x" + (bytevector->base16-string + compressed-nar-bytevector)))))) + + (define (insert-derivation-sources derivations derivation-ids) + (let* ((derivations-index-and-source + (list->vector + (vector-fold + (lambda (index result derivation derivation-id) + (fold + (lambda (source result) + (cons (cons index source) + result)) + result + (derivation-sources derivation))) + '() + derivations + derivation-ids))) + (derivation-source-file-ids + new-derivation-source-files?-vector + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_source_files") + (vector-map + (lambda (_ pair) + (vector (cdr pair))) + derivations-index-and-source)))) + + (fibers-parallel + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_sources") + (vector-map + (lambda (_ index-and-source derivation-source-file-id) + (vector (vector-ref derivation-ids + (car index-and-source)) + derivation-source-file-id)) + derivations-index-and-source + derivation-source-file-ids)) + + (let ((new-derivations-index-and-source + new-derivation-source-file-ids + (if repair? + (values derivations-index-and-source + derivation-source-file-ids) + (filter-vectors-only-new + new-derivation-source-files?-vector + derivations-index-and-source + derivation-source-file-ids)))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivation_source_file_nars") + (vector-map + (lambda (_ index-and-source derivation-source-file-id) + (define source + (cdr index-and-source)) + + (define promise + (fibers-delay + (lambda () + (call-with-utility-thread + (lambda () + (get-source-file-details source)))))) + + (vector derivation-source-file-id + "lzip" + "sha256" + (make-table-manager-thunk-placeholder + (lambda () + (assq-ref (fibers-force promise) + 'hash))) + (make-table-manager-thunk-placeholder + (lambda () + (assq-ref (fibers-force promise) + 'uncompressed-size))) + (make-table-manager-thunk-placeholder + (lambda () + (assq-ref (fibers-force promise) + 'data))))) + new-derivations-index-and-source + new-derivation-source-file-ids)))))) + + (define (add-derivations-rows derivations) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "derivations") + (vector-map + (lambda (_ derivation) + (match derivation + (($ outputs inputs sources + system builder args env-vars file-name) + (vector file-name + builder + (cons "varchar[]" + (list->vector args)) + (cons "varchar[][]" + (list->vector + (map (match-lambda + ((key . value) + (vector key value))) + env-vars))) + (table-manager-add-row + (fetch-table-manager table-manager-coordinator + "systems") + (vector system)))))) + derivations))) + + (let ((flat-derivations + (with-delay-logging "flatten derivation graph" + (list->vector + (flatten-derivation-graph table-manager-coordinator + derivations))))) + (unless (= 0 (vector-length flat-derivations)) + (let* ((flat-derivations-ids + flat-derivations-new?-vector + (add-derivations-rows flat-derivations)) + + (new-derivations + new-derivation-ids + (if repair? + (values flat-derivations + flat-derivations-ids) + (filter-vectors-only-new + flat-derivations-new?-vector + flat-derivations + flat-derivations-ids)))) + + (unless (= 0 (vector-length new-derivations)) + (fibers-parallel + (insert-derivation-sources new-derivations + new-derivation-ids) + (begin + ;; Outputs then inputs, so that all outputs for this chunk are + ;; available when inserting inputs. Insert outputs for all + ;; derivations, not just new ones, since that'll ensure all + ;; output information is available when inserting inputs. + (insert-derivation-outputs flat-derivations + flat-derivations-ids) + (insert-derivation-inputs new-derivations + new-derivation-ids))))))) + + (vector-map + (lambda (_ derivation) + (if derivation + (table-manager-lookup-id-or-placeholder + (fetch-table-manager table-manager-coordinator "derivations") + (derivation-file-name derivation)) + #f)) + derivations)) (define guix-store-path (let ((store-path #f)) @@ -2258,6 +2755,9 @@ WHERE builder != 'builtin:download' (prevent-inlining-for-tests channel-derivations-by-system->guix-store-item) (define (glibc-locales-for-guix-store-path store store-path) + (unless (valid-path? store store-path) + (raise-exception + (make-missing-store-item-error store-path))) (let ((inf (if (defined? 'open-inferior/container (resolve-module '(guix inferior))) @@ -2487,10 +2987,14 @@ WHERE builder != 'builtin:download' *unspecified*) -(define* (extract-information-from db-conn guix-revision-id-promise +(define* (extract-information-from postgresql-connection-pool + inf-and-store-pool + table-manager-coordinator + guix-revision-id-promise commit guix-source store-item guix-derivation + channel-for-commit call-with-utility-thread read-derivations/serialised #:key skip-system-tests? @@ -2499,83 +3003,8 @@ WHERE builder != 'builtin:download' ignore-systems ignore-targets inferior-memory-limit) - (define guix-locpath - ;; Augment the GUIX_LOCPATH to include glibc-locales from - ;; the Guix at store-path, this should mean that the - ;; inferior Guix works, even if it's build using a different - ;; glibc version - (string-append - (with-store-connection - (lambda (store) - (glibc-locales-for-guix-store-path store store-item))) - "/lib/locale" - ":" (getenv "GUIX_LOCPATH"))) - - (define inf-and-store-pool - (make-resource-pool - (lambda () - (let* ((inferior-store (open-store-connection))) - (unless (valid-path? inferior-store store-item) - (simple-format #t "warning: store item missing (~A)\n" - store-item) - (unless (valid-path? inferior-store guix-derivation) - (simple-format #t "warning: attempting to substitute guix derivation (~A)\n" - guix-derivation) - ;; Wait until the derivations are in the database - (fibers-force guix-revision-id-promise) - (ensure-path inferior-store guix-derivation)) - (simple-format #t "warning: building (~A)\n" - guix-derivation) - (build-derivations inferior-store - (list (read-derivation-from-file - guix-derivation)))) - ;; Use this more to keep the store-path alive so long as there's a - ;; inferior operating - (add-temp-root inferior-store store-item) - - (let ((inferior (start-inferior-for-data-extration - inferior-store - store-item - guix-locpath - extra-inferior-environment-variables))) - (ensure-non-blocking-store-connection inferior-store) - (make-inferior-non-blocking! inferior) - (simple-format #t "debug: started new inferior and store connection\n") - - (cons inferior inferior-store)))) - parallelism - #:min-size 0 - #:idle-seconds 20 - #:name "inferior" - #:destructor - (match-lambda - ((inferior . store) - (simple-format - #t "debug: closing inferior and associated store connection\n") - - (close-connection store) - (close-inferior inferior))))) - - (define postgresql-connection-pool - (make-resource-pool - (lambda () - (with-time-logging - "waiting for guix-revision-id" - ;; This uses the transaction lock, so wait until the transaction has - ;; committed - (fibers-force guix-revision-id-promise)) - (with-time-logging - "extract information, acquiring advisory transaction lock: load-new-guix-revision-inserts" - ;; Wait until this is the only transaction inserting data, to - ;; avoid any concurrency issues - (obtain-advisory-transaction-lock db-conn - 'load-new-guix-revision-inserts)) - db-conn) - 1 - #:name "postgres")) - (define package-ids-promise - (fibers-delay + (fibers-delay/eager (lambda () (let ((packages-data (call-with-inferior @@ -2592,8 +3021,9 @@ WHERE builder != 'builtin:download' packages pkg-to-replacement-hash-table)))) #:memory-limit inferior-memory-limit))) - (with-resource-from-pool postgresql-connection-pool conn - (insert-packages conn packages-data)))))) + (insert-packages postgresql-connection-pool + table-manager-coordinator + packages-data))))) (define (extract-and-store-lint-checkers-and-warnings) (define inferior-lint-checkers-data @@ -2606,60 +3036,156 @@ WHERE builder != 'builtin:download' (when inferior-lint-checkers-data (fibers-let ((lint-checker-ids - (with-resource-from-pool postgresql-connection-pool conn - (lint-checkers->lint-checker-ids - conn - (vector-map - (match-lambda* - ((_ (name descriptions-by-locale network-dependent)) - (list - name - network-dependent - (lint-checker-description-data->lint-checker-description-set-id - conn - descriptions-by-locale)))) - inferior-lint-checkers-data)))) - (lint-warnings-data - (fibers-batch-map - (match-lambda - ((checker-name _ network-dependent?) - (and (and (not network-dependent?) - ;; Running the derivation linter is - ;; currently infeasible - (not (eq? checker-name 'derivation))) - (begin - (call-with-inferior - inf-and-store-pool - (lambda (inferior inferior-store) - (inferior-lint-warnings inferior - inferior-store - checker-name)) - #:memory-limit inferior-memory-limit))))) - 20 ; TODO - inferior-lint-checkers-data))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "lint_checkers") + (vector-map + (lambda (_ lint-checker-details) + (match lint-checker-details + ((name network-dependent descriptions-by-locale) + (vector + (symbol->string name) + network-dependent + (table-manager-add-row + (fetch-table-manager table-manager-coordinator + "lint_checker_description_sets") + (vector + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "lint_checker_descriptions") + descriptions-by-locale))))))) + inferior-lint-checkers-data))) + (lint-warnings-data + (fibers-batch-map + (match-lambda + ((checker-name network-dependent? _) + (and (and (not network-dependent?) + ;; Running the derivation linter is + ;; currently infeasible + (not (eq? checker-name 'derivation))) + (begin + (call-with-inferior + inf-and-store-pool + (lambda (inferior inferior-store) + (inferior-lint-warnings inferior + inferior-store + checker-name)) + #:memory-limit inferior-memory-limit))))) + 20 ; TODO + inferior-lint-checkers-data))) - (let ((package-ids (fibers-force package-ids-promise))) - (with-resource-from-pool postgresql-connection-pool conn - (insert-guix-revision-lint-checkers - conn - (fibers-force guix-revision-id-promise) - lint-checker-ids) + (close-table-manager table-manager-coordinator + "lint_checker_descriptions") + (close-table-manager table-manager-coordinator + "lint_checker_description_sets") + (close-table-manager table-manager-coordinator + "lint_checkers") - (let ((lint-warning-id-vectors - (with-time-logging "inserting lint warnings" - (insert-lint-warnings - conn - package-ids - lint-checker-ids - lint-warnings-data)))) - (with-time-logging "inserting guix revision lint warnings" - (for-each - (lambda (lint-warning-ids) - (insert-guix-revision-lint-warnings - conn - (fibers-force guix-revision-id-promise) - lint-warning-ids)) - lint-warning-id-vectors)))))))) + (let ((guix-revision-id + (fibers-force guix-revision-id-promise))) + + (table-manager-add-raw-rows + (fetch-table-manager table-manager-coordinator + "guix_revision_lint_checkers") + (vector-map + (lambda (_ lint-checker-id) + (vector lint-checker-id + guix-revision-id)) + lint-checker-ids))) + + (let* ((package-ids (fibers-force package-ids-promise)) + (lint-warning-ids + (with-time-logging "inserting lint warnings" + (vector-fold + (lambda (_ result lint-checker-id warnings-per-package) + (if warnings-per-package + (vector-fold + (lambda (_ result package-id warnings) + (if (= 0 (vector-length warnings)) + result + (let* ((lint-warning-ids-list + (vector->list + (table-manager-add-rows + (fetch-table-manager + table-manager-coordinator + "lint_warnings") + (vector-map + (lambda (_ data) + (match data + ((location-data messages-by-locale) + (vector + lint-checker-id + package-id + (table-manager-add-row + (fetch-table-manager + table-manager-coordinator + "locations") + location-data) + (table-manager-add-row + (fetch-table-manager table-manager-coordinator + "lint_warning_message_sets") + (vector + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "lint_warning_messages") + messages-by-locale))))))) + warnings)))) + (deduplicated-new-lint-warning-ids + (pair-fold + (lambda (pair result) + (if (null? (cdr pair)) + (cons (first pair) result) + (let ((a (first pair)) + (b (second pair))) + (if (eq? a b) + (begin + (simple-format #t "duplicate lint warning ~A\n" + a) + result) + (cons a result))))) + '() + (filter table-manager-placeholder? + lint-warning-ids-list)))) + (append! + ;; TODO Sort properly so pair-fold above + ;; works as intended + (delete-duplicates + deduplicated-new-lint-warning-ids + eq?) + (delete-duplicates/sort! + (filter integer? lint-warning-ids-list) + < + =) + result)))) + result + package-ids + warnings-per-package) + result)) + '() + lint-checker-ids + lint-warnings-data)))) + + (close-table-manager table-manager-coordinator + "lint_warnings") + (close-table-manager table-manager-coordinator + "lint_warning_messages") + (close-table-manager table-manager-coordinator + "lint_warning_message_sets") + + (let ((guix-revision-id + (fibers-force guix-revision-id-promise))) + + (table-manager-add-raw-rows + (fetch-table-manager table-manager-coordinator "guix_revision_lint_warnings") + (list->vector + (map + (lambda (lint-warning-id) + (vector lint-warning-id + guix-revision-id)) + lint-warning-ids)))))) + + (close-table-manager table-manager-coordinator + "guix_revision_lint_warnings"))) (define (extract-and-store-package-derivations) (define packages-count @@ -2671,95 +3197,162 @@ WHERE builder != 'builtin:download' (inferior-eval '(vector-length gds-inferior-packages) inferior)) #:memory-limit inferior-memory-limit)) - (define chunk-size 1000) + (define chunk-size 200) (define get-derivations/parallelism-limiter (make-parallelism-limiter parallelism)) + (define insert-derivations/parallelism-limiter + (make-parallelism-limiter parallelism)) (define (get-derivations system target) - ;; Limit concurrency here to keep focused on specific systems until - ;; they've been fully processed + (define (make-chunk-promise chunk-promises + index chunk-count + chunk + store-items-promise) + (fibers-delay/eager + (lambda () + (with-parallelism-limiter + insert-derivations/parallelism-limiter + (let ((chunk-derivations + (with-time-logging + (simple-format + #f "read-derivations/serialised (~A ~A, chunk ~A/~A)" + system target index chunk-count) + (read-derivations/serialised chunk)))) + + (with-time-logging + (simple-format + #f "insert-derivations-with-table-managers (~A ~A, chunk ~A/~A)" + system target index chunk-count) + (insert-derivations-with-table-managers + table-manager-coordinator + call-with-utility-thread + chunk-derivations + #:store-items-promise + store-items-promise))))))) + + (define (get-chunk-promises) + (define indexes-and-counts + (append + (map + (lambda (start-index) + (cons start-index chunk-size)) + (iota (quotient packages-count chunk-size) + 0 + chunk-size)) + (let ((last-chunk-size + (modulo packages-count chunk-size))) + (if (= 0 last-chunk-size) + '() + (list (cons (* (quotient packages-count chunk-size) + chunk-size) + last-chunk-size)))))) + + (define chunk-count + (length indexes-and-counts)) + + (define (get-chunk start-index count) + (call-with-inferior + inf-and-store-pool + (lambda (inferior inferior-store) + (ensure-gds-inferior-packages-defined! inferior) + + (let ((result + (inferior-package-derivations + inferior-store + inferior + system + target + start-index + count))) + + ;; When last chunk? + (when (< count chunk-size) + (inferior-cleanup inferior)) + + result)) + #:memory-limit inferior-memory-limit)) + + (let loop ((indexes-and-counts indexes-and-counts) + (chunk-promises '())) + (if (null? indexes-and-counts) + (reverse chunk-promises) + (match (car indexes-and-counts) + ((start-index . count) + (let* ((chunk + (get-chunk start-index count)) + (chunk-promise + (make-chunk-promise + chunk-promises + (/ start-index chunk-size) + chunk-count + chunk + (fibers-delay + (lambda () + (get-chunk start-index count)))))) + (loop (cdr indexes-and-counts) + (cons chunk-promise + chunk-promises)))))))) + + ;; Limit concurrency here to keep focused on specific systems + ;; until they've been fully processed (with-parallelism-limiter get-derivations/parallelism-limiter - (let ((derivations-vector (make-vector packages-count))) - (with-time-logging - (simple-format #f "getting derivations for ~A" - (cons system target)) - (let loop ((start-index 0)) - (let* ((last-chunk? - (>= (+ start-index chunk-size) packages-count)) - (count - (if last-chunk? - (- packages-count start-index) - chunk-size)) - (chunk - (call-with-inferior - inf-and-store-pool - (lambda (inferior inferior-store) - (ensure-gds-inferior-packages-defined! inferior) - - (let ((result - (inferior-package-derivations - inferior-store - inferior - system - target - start-index - count))) - - (when last-chunk? - (inferior-cleanup inferior)) - - result)) - #:memory-limit inferior-memory-limit))) - (vector-copy! derivations-vector - start-index - chunk) - (unless last-chunk? - (loop (+ start-index chunk-size)))))) + (let ((chunk-promises + (with-time-logging + (simple-format #f "getting derivations for ~A" + (cons system target)) + (get-chunk-promises))) + (derivations-vector (make-vector packages-count))) + (for-each + (lambda (index chunk-promise) + (vector-copy! derivations-vector + (* index chunk-size) + (fibers-force chunk-promise))) + (iota (length chunk-promises)) + chunk-promises) derivations-vector))) - (define derivation-file-names->derivation-ids/parallelism-limiter - (make-parallelism-limiter 2)) (define (process-system-and-target system target) (with-time-logging (simple-format #f "processing derivations for ~A" (cons system target)) - (let* ((derivations-vector (get-derivations system target)) - (derivation-ids - (with-parallelism-limiter - derivation-file-names->derivation-ids/parallelism-limiter - (with-time-logging - (simple-format #f "derivation-file-names->derivation-ids (~A ~A)" - system target) - (derivation-file-names->derivation-ids - postgresql-connection-pool - call-with-utility-thread - read-derivations/serialised - (make-hash-table) - derivations-vector - #:log-tag (simple-format #f "~A:~A" system target))))) + (let* ((derivation-ids + (get-derivations system target)) (guix-revision-id (fibers-force guix-revision-id-promise)) - (package-ids (fibers-force package-ids-promise)) + (package-ids + (fibers-force package-ids-promise)) + (system-id + (table-manager-add-row + (fetch-table-manager table-manager-coordinator "systems") + (vector system))) (package-derivation-ids - (with-resource-from-pool postgresql-connection-pool conn - (with-time-logging - (simple-format #f "insert-package-derivations (~A ~A)" - system target) - (insert-package-derivations conn - system - (or target "") - package-ids - derivation-ids))))) - (chunk-for-each! - (lambda (package-derivation-ids-chunk) - (with-resource-from-pool postgresql-connection-pool conn - (insert-guix-revision-package-derivations - conn - guix-revision-id - package-derivation-ids-chunk))) - 2000 - ;; TODO Chunk more efficiently - (vector->list package-derivation-ids)))) + (with-time-logging + (simple-format #f "insert-package-derivations (~A ~A)" + system target) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "package_derivations") + (list->vector + (vector-fold-right + (lambda (_ result package-id derivation-id) + (if derivation-id + (cons + (vector package-id + derivation-id + (or target "") + system-id) + result) + result)) + '() + package-ids + derivation-ids)))))) + + (table-manager-add-raw-rows + (fetch-table-manager table-manager-coordinator "guix_revision_package_derivations") + (vector-map + (lambda (_ package-derivation-id) + (vector guix-revision-id + package-derivation-id)) + package-derivation-ids)))) 'finished) @@ -2767,9 +3360,14 @@ WHERE builder != 'builtin:download' (fibers-map-with-progress (match-lambda ((system . target) - (retry-on-missing-store-item - (lambda () - (process-system-and-target system target))))) + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (retry-on-missing-store-item + (lambda () + (process-system-and-target system target))))))) (list (let ((all-system-target-pairs (call-with-inferior @@ -2801,6 +3399,8 @@ WHERE builder != 'builtin:download' data))) (destroy-parallelism-limiter get-derivations/parallelism-limiter) + (destroy-parallelism-limiter + insert-derivations/parallelism-limiter) #t)) (define (extract-and-store-system-tests) @@ -2820,49 +3420,90 @@ WHERE builder != 'builtin:download' guix-source commit #:ignore-systems ignore-systems))) - #:memory-limit inferior-memory-limit))) + #:memory-limit inferior-memory-limit)) + (guix-revision-id + (fibers-force guix-revision-id-promise))) (when data-with-derivation-file-names - (let ((data-with-derivation-ids - (map (match-lambda - ((name description derivation-file-names-by-system location-data) - (list name - description - (let ((systems - (map car derivation-file-names-by-system)) - (derivation-ids - (vector->list - (derivation-file-names->derivation-ids - postgresql-connection-pool - call-with-utility-thread - read-derivations/serialised - (make-hash-table) - (list->vector - (map cdr derivation-file-names-by-system)) - #:log-tag "channel-instances")))) - (map cons systems derivation-ids)) - location-data))) - data-with-derivation-file-names))) - (with-resource-from-pool postgresql-connection-pool conn - (insert-system-tests-for-guix-revision - conn - (fibers-force guix-revision-id-promise) - data-with-derivation-ids)))))))) + (let ((system-test-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "system_tests") + (vector-map + (lambda (_ details) + (match details + ((name + description + _ + location-data) + + (vector name + description + (table-manager-add-row + (fetch-table-manager table-manager-coordinator "locations") + location-data))))) + data-with-derivation-file-names)))) + + (vector-for-each + (lambda (_ system-test-id details) + (let* ((derivation-file-names-by-system + (vector-ref details 2)) + (systems + (list->vector + (map car derivation-file-names-by-system))) + (derivation-ids + (insert-derivations-with-table-managers + table-manager-coordinator + call-with-utility-thread + (read-derivations/serialised + (list->vector + (map cdr derivation-file-names-by-system)))))) + (table-manager-add-raw-rows + (fetch-table-manager table-manager-coordinator + "guix_revision_system_test_derivations") + (vector-map + (lambda (_ derivation-id system) + (vector guix-revision-id + system-test-id + derivation-id + system)) + derivation-ids + systems)))) + system-test-ids + data-with-derivation-file-names))))))) + + (define (extract-and-store-channel-news) + (if (defined? 'channel-news-for-commit + (resolve-module '(guix channels))) + (with-time-logging "inserting channel news entries" + (insert-channel-news-entries-for-guix-revision + table-manager-coordinator + (fibers-force guix-revision-id-promise) + (channel-news-for-commit channel-for-commit commit))) + (simple-format + #t "debug: importing channel news not supported\n"))) (with-time-logging (simple-format #f "extract-information-from: ~A\n" store-item) (fibers-parallel - (begin - (fibers-force package-ids-promise) - #f) + (extract-and-store-channel-news) (extract-and-store-package-derivations) (retry-on-missing-store-item extract-and-store-system-tests) (with-time-logging "extract-and-store-lint-checkers-and-warnings" (extract-and-store-lint-checkers-and-warnings)))) - (destroy-resource-pool inf-and-store-pool) - (destroy-resource-pool postgresql-connection-pool) + (close-table-manager table-manager-coordinator + "systems") + (close-table-manager table-manager-coordinator + "derivations") - *unspecified*) + (let ((_ + new-package-metadata-ids + (fibers-force package-ids-promise))) + (close-table-manager table-manager-coordinator + "derivation_source_files") + (close-table-manager table-manager-coordinator + "derivation_source_file_nars") + + new-package-metadata-ids)) (prevent-inlining-for-tests extract-information-from) @@ -2878,28 +3519,20 @@ WHERE builder != 'builtin:download' (with-postgresql-transaction channel-instances-conn (lambda (channel-instances-conn) - - (with-time-logging - "channel instances, acquiring advisory transaction lock: load-new-guix-revision-inserts" - ;; Wait until this is the only transaction inserting data, to avoid - ;; any concurrency issues - (obtain-advisory-transaction-lock channel-instances-conn - 'load-new-guix-revision-inserts)) - - (let* ((existing-guix-revision-id - (git-repository-id-and-commit->revision-id channel-instances-conn - git-repository-id - commit)) - (guix-revision-id - (or existing-guix-revision-id - (insert-guix-revision channel-instances-conn - git-repository-id commit))) - (postgresql-connection-pool + (let* ((postgresql-connection-pool (make-fixed-size-resource-pool (list channel-instances-conn) - #:name "postgres"))) + #:name "postgres")) + (table-manager-coordinator + (get-table-manager-coordinator postgresql-connection-pool)) + (guix-revision-id + new-guix-revision-id? + (table-manager-add-row + (fetch-table-manager table-manager-coordinator "guix_revisions") + (vector commit + (string->number git-repository-id))))) - (unless existing-guix-revision-id + (when new-guix-revision-id? (let* ((derivations-by-system (filter-map (match-lambda @@ -2910,33 +3543,98 @@ WHERE builder != 'builtin:download' (lambda (drv) (cons system drv))))) channel-derivations-by-system)) - (derivation-ids-by-system - (fibers-batch-map - (match-lambda - ((system . drv) - (cons system - (vector-ref - (derivation-file-names->derivation-ids - postgresql-connection-pool - call-with-utility-thread - read-derivations/serialised - (make-hash-table) - (vector drv)) - 0)))) - 20 ; TODO - derivations-by-system))) + (derivation-ids-vector + (insert-derivations-with-table-managers + table-manager-coordinator + call-with-utility-thread + (read-derivations/serialised + (list->vector (map cdr derivations-by-system)))))) - (insert-channel-instances channel-instances-conn - guix-revision-id - derivation-ids-by-system)) - (simple-format - (current-error-port) - "guix-data-service: saved the channel instance derivations to the database\n")) + (for-each + (match-lambda* + ((system derivation-id) + (table-manager-add-row + (fetch-table-manager table-manager-coordinator + "channel_instances") + (vector guix-revision-id + system + derivation-id)))) + (map car derivations-by-system) + (vector->list derivation-ids-vector)))) - guix-revision-id)))))) + (with-time-logging + "load-channel-instances destroy-table-manager-coordinator" + (destroy-table-manager-coordinator table-manager-coordinator)) + (destroy-resource-pool postgresql-connection-pool) + + (simple-format + (current-error-port) + "guix-data-service: saved the channel instance derivations to the database\n") + + (resolve-table-manager-placeholder guix-revision-id))))))) (prevent-inlining-for-tests load-channel-instances) +(define (insert-channel-news-entries-for-guix-revision table-manager-coordinator + guix-revision-id + channel-news-entries) + (let ((channel-news-entry-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "channel_news_entries") + (list->vector + (map (lambda (entry) + (vector (or (channel-news-entry-commit entry) + NULL) + (or (channel-news-entry-tag entry) + NULL))) + channel-news-entries))))) + + (table-manager-add-raw-rows + (fetch-table-manager table-manager-coordinator "guix_revision_channel_news_entries") + (vector-map + (lambda (index channel-news-entry-id) + (vector guix-revision-id + channel-news-entry-id + index)) + channel-news-entry-ids)) + + (vector-for-each + (lambda (_ entry channel-news-entry-id) + (let ((text-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "channel_news_entry_text") + (list->vector + (map (match-lambda + ((lang . text) + (vector lang text))) + (channel-news-entry-title entry)))))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "channel_news_entry_titles") + (vector-map + (lambda (_ text-id) + (vector channel-news-entry-id + text-id)) + text-ids))) + + (let ((text-ids + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator "channel_news_entry_text") + (list->vector + (map (match-lambda + ((lang . text) + (vector lang text))) + (channel-news-entry-body entry)))))) + (table-manager-add-rows + (fetch-table-manager table-manager-coordinator + "channel_news_entry_bodies") + (vector-map + (lambda (_ text-id) + (vector channel-news-entry-id + text-id)) + text-ids)))) + (list->vector channel-news-entries) + channel-news-entry-ids))) + (define* (load-new-guix-revision conn git-repository-id commit #:key skip-system-tests? parallelism extra-inferior-environment-variables @@ -2945,30 +3643,26 @@ WHERE builder != 'builtin:download' (define utility-thread-pool ;; Use a thread pool rather than a fixed size thread pool, since this ;; takes care of queuing waiters - (make-thread-pool parallelism)) + (make-thread-pool parallelism + #:name "utility")) (define call-with-utility-thread (lambda (thunk) (call-with-thread utility-thread-pool thunk))) - (define (read-derivations filenames) + (define (read-derivations/serialised filenames) (call-with-utility-thread (lambda () - (map (lambda (filename) - (if (file-exists? filename) - (read-derivation-from-file filename) - (raise-exception - (make-missing-store-item-error - filename)))) - filenames)))) - - (define read-derivations/parallelism-limiter - (make-parallelism-limiter 1)) - (define (read-derivations/serialised . args) - (with-parallelism-limiter - read-derivations/parallelism-limiter - (apply read-derivations args))) + (vector-map (lambda (_ filename) + (if (eq? #f filename) + #f + (if (file-exists? filename) + (read-derivation-from-file filename) + (raise-exception + (make-missing-store-item-error + filename))))) + filenames)))) (let* ((git-repository-fields (select-git-repository conn git-repository-id)) @@ -2995,7 +3689,7 @@ WHERE builder != 'builtin:download' #:ignore-systems ignore-systems)))))) (define guix-revision-id-promise - (fibers-delay + (fibers-delay/eager (lambda () (parameterize ((%postgresql-in-transaction? #f)) @@ -3012,57 +3706,133 @@ WHERE builder != 'builtin:download' (lambda () (fibers-promise-reset channel-derivations-by-system-promise))))))) - ;; Prompt getting the guix-revision-id as soon as possible - (spawn-fiber - (lambda () - (with-exception-handler - (lambda _ - ;; Silently handle this exception - #f) - (lambda () - (fibers-force guix-revision-id-promise)) - #:unwind? #t))) - (let* ((guix-source channel-derivations-by-system (fibers-force channel-derivations-by-system-promise)) (store-item guix-derivation (channel-derivations-by-system->guix-store-item - channel-derivations-by-system))) + channel-derivations-by-system)) + (postgresql-connection-pool + (make-fixed-size-resource-pool + (list conn) + #:name "postgres")) + (table-manager-coordinator + (get-table-manager-coordinator postgresql-connection-pool)) + (glibc-locales-promise + (fibers-delay + (lambda () + (with-store-connection + (lambda (store) + (glibc-locales-for-guix-store-path store store-item)))))) + (inf-and-store-pool + (make-resource-pool + (lambda () + (define inferior-store (open-store-connection)) - (extract-information-from conn - guix-revision-id-promise - commit guix-source store-item - guix-derivation - call-with-utility-thread - read-derivations/serialised - #:skip-system-tests? - skip-system-tests? - #:extra-inferior-environment-variables - extra-inferior-environment-variables - #:ignore-systems ignore-systems - #:ignore-targets ignore-targets - #:parallelism parallelism - #:inferior-memory-limit - inferior-memory-limit) + (define (ensure-store-item) + (simple-format #t "warning: store item missing (~A)\n" + store-item) + (unless (valid-path? inferior-store guix-derivation) + (simple-format #t "warning: attempting to substitute guix derivation (~A)\n" + guix-derivation) + ;; Wait until the derivations are in the database + (fibers-force guix-revision-id-promise) + (ensure-path inferior-store guix-derivation)) + (simple-format #t "warning: building (~A)\n" + guix-derivation) + (build-derivations inferior-store + (list (read-derivation-from-file + guix-derivation)))) + + (unless (valid-path? inferior-store store-item) + (ensure-store-item)) + + ;; Use this more to keep the store-path alive so long as + ;; there's a inferior operating + (add-temp-root inferior-store store-item) + + (let* ((glibc-locales + (retry-on-missing-store-item + (lambda () + (let ((output + (fibers-force glibc-locales-promise))) + (unless (file-exists? output) + (raise-exception + (make-missing-store-item-error + output))) + output)) + #:on-exception + (lambda () + (ensure-store-item) + (fibers-promise-reset glibc-locales-promise)))) + (guix-locpath + ;; Augment the GUIX_LOCPATH to include glibc-locales + ;; from the Guix at store-path, this should mean that + ;; the inferior Guix works, even if it's build using + ;; a different glibc version + (string-append + glibc-locales + "/lib/locale" + ":" (getenv "GUIX_LOCPATH"))) + (inferior (start-inferior-for-data-extration + inferior-store + store-item + guix-locpath + extra-inferior-environment-variables))) + (ensure-non-blocking-store-connection inferior-store) + (make-inferior-non-blocking! inferior) + (simple-format #t "debug: started new inferior and store connection\n") + + (cons inferior inferior-store))) + parallelism + #:min-size 0 + #:idle-seconds 20 + #:name "inferior" + #:destructor + (match-lambda + ((inferior . store) + (simple-format + #t "debug: closing inferior and associated store connection\n") + + (close-connection store) + (close-inferior inferior))))) + (new-package-metadata-ids-list + (extract-information-from + postgresql-connection-pool + inf-and-store-pool + table-manager-coordinator + guix-revision-id-promise + commit guix-source store-item + guix-derivation + channel-for-commit + call-with-utility-thread + read-derivations/serialised + #:skip-system-tests? + skip-system-tests? + #:extra-inferior-environment-variables + extra-inferior-environment-variables + #:ignore-systems ignore-systems + #:ignore-targets ignore-targets + #:parallelism parallelism + #:inferior-memory-limit + inferior-memory-limit))) + + (destroy-table-manager-coordinator table-manager-coordinator) + (destroy-resource-pool inf-and-store-pool) + + (unless (null? new-package-metadata-ids-list) + (with-resource-from-pool postgresql-connection-pool conn + (with-time-logging "inserting package metadata tsvector entries" + (insert-package-metadata-tsvector-entries + conn + (map resolve-table-manager-placeholder + new-package-metadata-ids-list))))) (let ((guix-revision-id (fibers-force guix-revision-id-promise))) - (destroy-parallelism-limiter - read-derivations/parallelism-limiter) - (destroy-thread-pool - utility-thread-pool) - (if (defined? 'channel-news-for-commit - (resolve-module '(guix channels))) - (with-time-logging "inserting channel news entries" - (insert-channel-news-entries-for-guix-revision - conn - guix-revision-id - (channel-news-for-commit channel-for-commit commit))) - (simple-format - #t "debug: importing channel news not supported\n")) + (destroy-thread-pool utility-thread-pool) (with-time-logging "updating builds.derivation_output_details_set_id" (update-builds-derivation-output-details-set-id @@ -3075,6 +3845,8 @@ WHERE builder != 'builtin:download' guix-revision-id commit) + (destroy-resource-pool postgresql-connection-pool) + (let ((stats (gc-stats))) (format (current-error-port) "gc-stats: time taken: ~3fs, times: ~d~%" @@ -3533,7 +4305,8 @@ SKIP LOCKED") (with-postgresql-connection (simple-format #f "load-new-guix-revision ~A record failure" id) (lambda (conn) - (record-job-event conn id "failure")))) + (record-job-event conn id "failure"))) + (primitive-exit 1)) #f) (lambda () (with-exception-handler diff --git a/guix-data-service/model/channel-news.scm b/guix-data-service/model/channel-news.scm index 4bb5625..5dcecd1 100644 --- a/guix-data-service/model/channel-news.scm +++ b/guix-data-service/model/channel-news.scm @@ -24,9 +24,7 @@ #:use-module (guix channels) #:use-module (guix-data-service database) #:use-module (guix-data-service model utils) - #:export (select-channel-news-entries-contained-in-guix-revision - - insert-channel-news-entries-for-guix-revision)) + #:export (select-channel-news-entries-contained-in-guix-revision)) (define (select-channel-news-entries-contained-in-guix-revision conn commit) (define query @@ -70,82 +68,3 @@ SELECT channel_news_entries.commit, (vector->list (json-string->scm body_text)))))) (exec-query-with-null-handling conn query (list commit)))) - -(define (insert-channel-news-entry-text conn text) - (insert-missing-data-and-return-all-ids - conn - "channel_news_entry_text" - '(lang text) - (list->vector - (map (match-lambda - ((lang . text) - (list lang text))) - text)))) - -(define (insert-channel-news-entry conn commit tag) - (insert-and-return-id - conn - "channel_news_entries" - '(commit tag) - (list (or commit NULL) - (or tag NULL)))) - -(define (insert-channel-news-entries conn channel-news-entries) - (map - (lambda (entry) - (let ((commit (channel-news-entry-commit entry)) - (tag (channel-news-entry-tag entry)) - (title-ids - (sort (insert-channel-news-entry-text - conn (channel-news-entry-title entry)) - <)) - (body-ids - (sort (insert-channel-news-entry-text - conn - (channel-news-entry-body entry)) - <))) - (let ((channel-news-entry-id - (insert-channel-news-entry conn commit tag))) - (for-each - (lambda (table ids) - (exec-query - conn - (string-append - "INSERT INTO " table - " VALUES " - (string-join - (map (lambda (id) - (simple-format #f "(~A, ~A)" - channel-news-entry-id - id)) - (vector->list ids)) - ", ") - " ON CONFLICT DO NOTHING"))) - '("channel_news_entry_titles" - "channel_news_entry_bodies") - (list title-ids - body-ids)) - - channel-news-entry-id))) - channel-news-entries)) - -(define (insert-channel-news-entries-for-guix-revision - conn - guix-revision-id - channel-news-entries) - (unless (null? channel-news-entries) - (let ((channel-news-entry-ids - (insert-channel-news-entries conn channel-news-entries))) - (exec-query - conn - (string-append - "INSERT INTO guix_revision_channel_news_entries " - "(guix_revision_id, channel_news_entry_id, index) VALUES " - (string-join - (map (lambda (id index) - (simple-format #f "(~A,~A,~A)" guix-revision-id id index)) - channel-news-entry-ids - (iota (length channel-news-entries))) - ", ") - " ON CONFLICT DO NOTHING")))) - #t) diff --git a/guix-data-service/model/license-set.scm b/guix-data-service/model/license-set.scm deleted file mode 100644 index 8436875..0000000 --- a/guix-data-service/model/license-set.scm +++ /dev/null @@ -1,38 +0,0 @@ -;;; Guix Data Service -- Information about Guix over time -;;; Copyright © 2019 Christopher Baines -;;; -;;; This program is free software: you can redistribute it and/or -;;; modify it under the terms of the GNU Affero General Public License -;;; as published by the Free Software Foundation, either version 3 of -;;; the License, or (at your option) any later version. -;;; -;;; This program is distributed in the hope that it will be useful, -;;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -;;; Affero General Public License for more details. -;;; -;;; You should have received a copy of the GNU Affero General Public -;;; License along with this program. If not, see -;;; . - -(define-module (guix-data-service model license-set) - #:use-module (srfi srfi-1) - #:use-module (srfi srfi-43) - #:use-module (ice-9 vlist) - #:use-module (squee) - #:use-module (guix-data-service utils) - #:use-module (guix-data-service model utils) - #:use-module (guix-data-service model license) - #:export (inferior-packages->license-set-ids)) - -(define (inferior-packages->license-set-ids conn license-id-lists) - (insert-missing-data-and-return-all-ids - conn - "license_sets" - '(license_ids) - (vector-map - (lambda (_ license-ids) - (if (= 0 (vector-length license-ids)) - (list (cons "integer[]" license-ids)) - (list (sort license-ids <)))) - license-id-lists))) diff --git a/guix-data-service/model/license.scm b/guix-data-service/model/license.scm deleted file mode 100644 index f16634d..0000000 --- a/guix-data-service/model/license.scm +++ /dev/null @@ -1,91 +0,0 @@ -;;; Guix Data Service -- Information about Guix over time -;;; Copyright © 2019 Christopher Baines -;;; -;;; This program is free software: you can redistribute it and/or -;;; modify it under the terms of the GNU Affero General Public License -;;; as published by the Free Software Foundation, either version 3 of -;;; the License, or (at your option) any later version. -;;; -;;; This program is distributed in the hope that it will be useful, -;;; but WITHOUT ANY WARRANTY; without even the implied warranty of -;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -;;; Affero General Public License for more details. -;;; -;;; You should have received a copy of the GNU Affero General Public -;;; License along with this program. If not, see -;;; . - -(define-module (guix-data-service model license) - #:use-module (srfi srfi-1) - #:use-module (srfi srfi-43) - #:use-module (ice-9 vlist) - #:use-module (ice-9 match) - #:use-module (squee) - #:use-module (guix inferior) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model utils) - #:export (inferior-packages->license-id-lists - inferior-packages->license-data)) - -(define inferior-package-id - (@@ (guix inferior) inferior-package-id)) - -(define (inferior-packages->license-data inf) - (define proc - `(vector-map - (lambda (_ package) - (match (package-license package) - ((? license? license) - (list - (list (license-name license) - (license-uri license) - (license-comment license)))) - ((values ...) - (map (match-lambda - ((? license? license) - (list (license-name license) - (license-uri license) - (license-comment license))) - (x - (simple-format - (current-error-port) - "error: unknown license value ~A for package ~A" - x package) - #f)) - values)) - (x - (simple-format - (current-error-port) - "error: unknown license value ~A for package ~A" - x package) - '()))) - gds-inferior-packages)) - - (inferior-eval '(use-modules (guix licenses)) inf) - (inferior-eval proc inf)) - -(define (inferior-packages->license-id-lists conn license-data) - (define (string-or-null v) - (if (string? v) - v - ;; save non string values as NULL - NULL)) - - (vector-map - (lambda (_ license-tuples) - (if (null? license-tuples) - #() - (insert-missing-data-and-return-all-ids - conn - "licenses" - `(name uri comment) - (list->vector - (filter-map - (match-lambda - ((name uri comment) - (list name - (string-or-null uri) - (string-or-null comment))) - (#f #f)) - license-tuples))))) - license-data)) diff --git a/guix-data-service/model/lint-checker.scm b/guix-data-service/model/lint-checker.scm index 4ee6521..0ef1ce2 100644 --- a/guix-data-service/model/lint-checker.scm +++ b/guix-data-service/model/lint-checker.scm @@ -21,43 +21,10 @@ #:use-module (ice-9 match) #:use-module (squee) #:use-module (guix-data-service model utils) - #:export (lint-checkers->lint-checker-ids - lint-warning-count-by-lint-checker-for-revision - insert-guix-revision-lint-checkers + #:export (lint-warning-count-by-lint-checker-for-revision lint-checkers-for-revision lint-checker-description-data->lint-checker-description-set-id)) -(define (lint-checkers->lint-checker-ids conn lint-checkers-data) - (insert-missing-data-and-return-all-ids - conn - "lint_checkers" - '(name network_dependent lint_checker_description_set_id) - lint-checkers-data)) - -(define (lint-checker-description-data->lint-checker-description-ids - conn descriptions-by-locale) - (insert-missing-data-and-return-all-ids - conn - "lint_checker_descriptions" - '(locale description) - (list->vector - (map (match-lambda - ((locale . description) - (list locale description))) - descriptions-by-locale)))) - -(define (lint-checker-description-data->lint-checker-description-set-id - conn - descriptions-by-locale) - (insert-and-return-id - conn - "lint_checker_description_sets" - '(description_ids) - (list - (lint-checker-description-data->lint-checker-description-ids - conn - descriptions-by-locale)))) - (define (lint-warning-count-by-lint-checker-for-revision conn commit-hash) (define query " @@ -85,24 +52,6 @@ ORDER BY count DESC") (exec-query conn query (list commit-hash))) -(define (insert-guix-revision-lint-checkers conn - guix-revision-id - lint-checker-ids) - (exec-query - conn - (string-append - "INSERT INTO guix_revision_lint_checkers (lint_checker_id, guix_revision_id) " - "VALUES " - (string-join - (map (lambda (lint-checker-id) - (simple-format - #f - "(~A, ~A)" - lint-checker-id - guix-revision-id)) - (vector->list lint-checker-ids)) - ", ")))) - (define (lint-checkers-for-revision conn commit-hash) (exec-query conn diff --git a/guix-data-service/model/lint-warning-message.scm b/guix-data-service/model/lint-warning-message.scm index c44ba8a..d94bc9d 100644 --- a/guix-data-service/model/lint-warning-message.scm +++ b/guix-data-service/model/lint-warning-message.scm @@ -21,34 +21,7 @@ #:use-module (squee) #:use-module (guix-data-service database) #:use-module (guix-data-service model utils) - #:export (lint-warning-message-data->lint-warning-message-ids - lint-warning-message-locales-for-revision - lint-warning-message-data->lint-warning-message-set-id)) - -(define (lint-warning-message-data->lint-warning-message-ids conn - messages-by-locale) - (insert-missing-data-and-return-all-ids - conn - "lint_warning_messages" - '(locale message) - (let ((v (list->vector messages-by-locale))) - (vector-map! (lambda (_ data) - (match data - ((locale . message) - (list locale message)))) - v) - v))) - -(define (lint-warning-message-data->lint-warning-message-set-id - conn - messages-by-locale) - (insert-and-return-id - conn - "lint_warning_message_sets" - '(message_ids) - (list (lint-warning-message-data->lint-warning-message-ids - conn - messages-by-locale)))) + #:export (lint-warning-message-locales-for-revision)) (define (lint-warning-message-locales-for-revision conn commit-hash) (exec-query diff --git a/guix-data-service/model/lint-warning.scm b/guix-data-service/model/lint-warning.scm index 4efa186..63b8f89 100644 --- a/guix-data-service/model/lint-warning.scm +++ b/guix-data-service/model/lint-warning.scm @@ -20,43 +20,11 @@ #:use-module (srfi srfi-1) #:use-module (squee) #:use-module (guix-data-service model utils) - #:export (lint-warnings-data->lint-warning-ids - insert-guix-revision-lint-warnings - lint-warnings-for-guix-revision + #:export (lint-warnings-for-guix-revision select-lint-warnings-by-revision-package-name-and-version any-translated-lint-warnings?)) -(define (lint-warnings-data->lint-warning-ids - conn - ;; (lint-checker-id package-id location-id lint-warning-message-set-id) - lint-warnings-data) - (insert-missing-data-and-return-all-ids - conn - "lint_warnings" - '(lint_checker_id package_id location_id lint_warning_message_set_id) - lint-warnings-data)) - -(define (insert-guix-revision-lint-warnings conn - guix-revision-id - lint-warning-ids) - (unless (= 0 (vector-length lint-warning-ids)) - (exec-query - conn - (string-append - "INSERT INTO guix_revision_lint_warnings (lint_warning_id, guix_revision_id) " - "VALUES " - (string-join - (map (lambda (lint-warning-id) - (simple-format - #f - "(~A, ~A)" - lint-warning-id - guix-revision-id)) - (vector->list lint-warning-ids)) - ", ") - " ON CONFLICT DO NOTHING")))) - (define* (lint-warnings-for-guix-revision conn commit-hash #:key locale diff --git a/guix-data-service/model/package-derivation.scm b/guix-data-service/model/package-derivation.scm index fc7dca7..1a71dd5 100644 --- a/guix-data-service/model/package-derivation.scm +++ b/guix-data-service/model/package-derivation.scm @@ -23,39 +23,7 @@ #:use-module (squee) #:use-module (guix-data-service model utils) #:use-module (guix-data-service model system) - #:export (insert-package-derivations - count-packages-derivations-in-revision)) - -(define (insert-package-derivations conn - system - target - package-ids - derivation-ids) - (define system-id - (system->system-id conn system)) - - (define data-4-tuples - (vector-fold - (lambda (_ result package-id derivation-id) - (if derivation-id - (cons (list package-id - derivation-id - system-id - target) - result) - result)) - '() - package-ids - derivation-ids)) - - (if (null? data-4-tuples) - #() - (insert-missing-data-and-return-all-ids - conn - "package_derivations" - '(package_id derivation_id system_id target) - (list->vector - data-4-tuples)))) + #:export (count-packages-derivations-in-revision)) (define (count-packages-derivations-in-revision conn commit-hash) (define query diff --git a/guix-data-service/model/package-metadata.scm b/guix-data-service/model/package-metadata.scm index 91a8445..70e6a71 100644 --- a/guix-data-service/model/package-metadata.scm +++ b/guix-data-service/model/package-metadata.scm @@ -25,6 +25,7 @@ #:use-module (json) #:use-module (gcrypt hash) #:use-module (rnrs bytevectors) + #:use-module (guix utils) #:use-module (guix base16) #:use-module (guix packages) #:use-module (guix i18n) @@ -275,92 +276,6 @@ WHERE packages.id IN ( (prevent-inlining-for-tests inferior-packages->translated-package-descriptions-and-synopsis) -(define (inferior-packages->package-metadata-ids conn - package-metadata - license-set-ids) - (define (vector-zip . vecs) - (let ((result (make-vector (vector-length (first vecs))))) - (apply vector-map! - (lambda (i . vals) - (cdr vals)) - (cons result vecs)) - result)) - - (insert-missing-data-and-return-all-ids - conn - "package_metadata" - '(home_page - location_id - license_set_id - package_description_set_id - package_synopsis_set_id) - - (vector-zip - (vector-map (match-lambda* - ((_ (home-page rest ...)) - (if (string? home-page) - home-page - NULL))) - package-metadata) - (with-time-logging "preparing location ids" - (vector-map (match-lambda* - ((_ (_ location rest ...)) - (if location - (location->location-id - conn - location) - NULL))) - package-metadata)) - license-set-ids - (with-time-logging "preparing package description set ids" - (vector-map (match-lambda* - ((_ (_ _ package-description-data _)) - (let ((package-description-ids - (insert-missing-data-and-return-all-ids - conn - "package_descriptions" - '(locale description) - (let ((vec (list->vector package-description-data))) - (vector-map! - (match-lambda* - ((_ (locale . description)) - (list locale - ;; \u0000 has appeared in package - ;; descriptions (#71968), so strip it - ;; out here to avoid PostgreSQL throwing - ;; an error - (string-delete-null description)))) - vec) - vec)))) - (insert-and-return-id - conn - "package_description_sets" - '(description_ids) - (list (sort! package-description-ids <)))))) - package-metadata)) - (with-time-logging "preparing package synopsis set ids" - (vector-map (match-lambda* - ((_ (_ _ _ package-synopsis-data)) - (let ((package-synopsis-ids - (insert-missing-data-and-return-all-ids - conn - "package_synopsis" - '(locale synopsis) - (let ((vec - (list->vector package-synopsis-data))) - (vector-map! - (match-lambda* - ((_ (locale . synopsis)) - (list locale synopsis))) - vec) - vec)))) - (insert-and-return-id - conn - "package_synopsis_sets" - '(synopsis_ids) - (list (sort! package-synopsis-ids <)))))) - package-metadata))))) - (define (package-description-and-synopsis-locale-options-guix-revision conn revision-id) ;; TODO This no longer uses the revision-id, as that's too expensive. Maybe @@ -480,7 +395,7 @@ INNER JOIN ( OR translated_package_descriptions.locale = 'en_US.UTF-8') WHERE package_metadata.id IN (" (string-join - (map number->string (vector->list package-metadata-ids)) + (map number->string package-metadata-ids) ", ") ")" " ORDER BY package_metadata.id, locale, diff --git a/guix-data-service/model/package.scm b/guix-data-service/model/package.scm index 395cbd4..3812747 100644 --- a/guix-data-service/model/package.scm +++ b/guix-data-service/model/package.scm @@ -27,7 +27,6 @@ select-packages-in-revision search-packages-in-revision count-packages-in-revision - inferior-packages->package-ids select-package-versions-for-revision package-versions-for-branch @@ -250,13 +249,6 @@ WHERE packages.id IN ( (exec-query conn query (list commit-hash))) -(define (inferior-packages->package-ids conn package-entries) - (insert-missing-data-and-return-all-ids - conn - "packages" - '(name version package_metadata_id replacement_package_id) - package-entries)) - (define (select-package-versions-for-revision conn commit package-name) diff --git a/guix-data-service/model/system-test.scm b/guix-data-service/model/system-test.scm index 9bbc228..1f341e8 100644 --- a/guix-data-service/model/system-test.scm +++ b/guix-data-service/model/system-test.scm @@ -23,56 +23,9 @@ #:use-module (guix utils) #:use-module (guix-data-service model utils) #:use-module (guix-data-service model location) - #:export (insert-system-tests-for-guix-revision - - select-system-tests-for-guix-revision + #:export (select-system-tests-for-guix-revision system-test-derivations-for-branch)) -(define (insert-system-tests-for-guix-revision conn - guix-revision-id - system-test-data) - (unless (null? system-test-data) - (let* ((system-test-ids - (insert-missing-data-and-return-all-ids - conn - "system_tests" - '(name description location_id) - (list->vector - (map (match-lambda - ((name description derivation-ids-by-system location-data) - (list name - description - (location->location-id - conn - (apply location location-data))))) - system-test-data)))) - (data - (append-map - (lambda (system-test-id derivation-ids-by-system) - (map (lambda (system-and-derivation-id) - (list guix-revision-id - system-test-id - (cdr system-and-derivation-id) - (car system-and-derivation-id))) - derivation-ids-by-system)) - (vector->list system-test-ids) - (map third system-test-data)))) - - (exec-query - conn - (string-append - " -INSERT INTO guix_revision_system_test_derivations - (guix_revision_id, system_test_id, derivation_id, system) -VALUES " - (string-join - (map (lambda (vals) - (apply simple-format #f "(~A, ~A, ~A, '~A')" - vals)) - data) - ", "))))) - #t) - (define (select-system-tests-for-guix-revision conn system commit-hash) diff --git a/guix-data-service/model/utils.scm b/guix-data-service/model/utils.scm index c3b0cee..119a5c0 100644 --- a/guix-data-service/model/utils.scm +++ b/guix-data-service/model/utils.scm @@ -17,10 +17,22 @@ (define-module (guix-data-service model utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) + #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 vlist) #:use-module (ice-9 receive) + #:use-module (ice-9 exceptions) + #:use-module (ice-9 textual-ports) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (fibers conditions) + #:use-module (knots) + #:use-module (knots parallelism) + #:use-module (knots resource-pool) #:use-module (squee) #:use-module (guix-data-service database) #:use-module (guix-data-service utils) @@ -39,7 +51,28 @@ bulk-select bulk-insert insert-and-return-id - prepare-insert-and-return-id)) + prepare-insert-and-return-id + + call-with-false-hidden-in-vector + + make-table-manager-thunk-placeholder + spawn-table-manager-fiber + table-manager-id-for-key + table-manager-lookup-id-or-placeholder + table-manager-placeholder? + resolve-table-manager-placeholder + table-manager-add-row + table-manager-add-rows + table-manager-add-raw-rows + table-manager-close + table-manager-insert-all + filter-vectors-only-new + group-ids-by-counts-vector + + spawn-table-manager-coordinator + fetch-table-manager + close-table-manager + destroy-table-manager-coordinator)) (define (char-null? c) (char=? c #\null)) @@ -651,3 +684,1563 @@ EXECUTE " table-name "PreparedInsertSelect(" (string-join (map value->sql field-vals) ", ") ");")) (((id)) id)))))) + +(define (call-with-false-hidden-in-vector v proc) + (let* ((result + (make-vector (vector-length v) #f)) + (indexes-without-#f + (list->vector + (reverse! + (vector-fold + (lambda (index result x) + (if x + (cons index result) + result)) + '() + v)))) + (temp-v + (vector-map + (lambda (_ index) + (vector-ref v index)) + indexes-without-#f)) + (response-data + (proc temp-v))) + + (vector-for-each + (lambda (_ index val) + (vector-set! result index val)) + indexes-without-#f + response-data) + + result)) + +(define-record-type + (make-table-manager table-name columns-and-types dependencies + key-columns key-columns-nullable id-column + keyplaceholder-and-row-hash-table + key->id-hash-table raw-rows + channel state finished-condition) + table-manager? + (table-name table-manager-table-name) + (columns-and-types table-manager-columns-and-types) + (dependencies table-manager-dependencies) + (key-columns table-manager-key-columns) + (key-columns-nullable table-manager-key-columns-nullable) + (id-column table-manager-id-column) + (keyplaceholder-and-row-hash-table + table-manager-key->placeholder-and-row-hash-table) + (key->id-hash-table table-manager-key->id-hash-table) + (raw-rows table-manager-raw-rows + set-table-manager-raw-rows!) + (channel table-manager-channel) + (state table-manager-state + set-table-manager-state!) + (finished-condition table-manager-finished-condition)) + +(define (table-manager-id-for-key table-manager key) + (hash-ref (table-manager-key->id-hash-table table-manager) + key)) + +(set-record-type-printer! + + (lambda (record port) + (simple-format port "#< table-name: ~A>" + (table-manager-table-name record)))) + +(define-record-type + (make-table-manager-id-placeholder table-manager key id) + table-manager-id-placeholder? + (table-manager table-manager-id-placeholder-table-manager) + (key table-manager-id-placeholder-key) + (id table-manager-id-placeholder-id + set-table-manager-id-placeholder-id!)) + +(define unknown-id + (make-symbol "unknown-id")) + +(set-record-type-printer! + + (lambda (record port) + (simple-format port "#< table: ~A key: ~A id: ~A>" + (table-manager-table-name + (table-manager-id-placeholder-table-manager + record)) + (table-manager-id-placeholder-key record) + (table-manager-id-placeholder-id record)))) + +(define-record-type + (make-table-manager-thunk-placeholder thunk) + table-manager-thunk-placeholder? + (thunk table-manager-thunk-placeholder-thunk)) + +(define (table-manager-placeholder? val) + (or (table-manager-id-placeholder? val) + (table-manager-thunk-placeholder? val))) + +(define (table-manager-lookup-id-or-placeholder table-manager unprocessed-key) + (define key + (let ((key-types + (table-manager-key-column-types table-manager))) + (if (and (list? key-types) + (member "sorted-integer[]" key-types)) + (vector-map + (lambda (_ type val) + (if (and (string? type) + (string=? type "sorted-integer[]")) + (sort-sorted-integer-array val) + val)) + (list->vector key-types) + unprocessed-key) + unprocessed-key))) + + (match (hash-ref (table-manager-key->id-hash-table table-manager) + key) + (#f + (match (hashx-ref key-hash + key-assoc + (table-manager-key->placeholder-and-row-hash-table + table-manager) + key) + (#f + (let ((reply (make-channel))) + ;; Maybe the hash table is being resized, so ask via the channel + (put-message (table-manager-channel table-manager) + (list 'lookup-id-or-placeholder + key + reply)) + (match (get-message reply) + (#f + (raise-exception + (make-exception-with-message + (simple-format + #f "table-manager-lookup-id-or-placeholder can't find ~A in ~A" + key + table-manager)))) + (val val)))) + ((placeholder . _) placeholder))) + (id id))) + +(define (table-manager-keyless-than type) + (match type + ('string (wrap-null-handlingequality type) + (match type + ('string (wrap-null-handling=? string=?)) + ('integer (wrap-null-handling=? =)) + ('boolean eq?) + ("integer[]" integer-array=?) + ("sorted-integer[]" integer-array=?))) + +(define (table-manager-key-column-types table-manager) + (define columns-and-types + (table-manager-columns-and-types table-manager)) + (define key-columns + (table-manager-key-columns table-manager)) + + (if (list? key-columns) + (map (lambda (col) + (assq-ref columns-and-types + col)) + key-columns) + (assq-ref columns-and-types key-columns))) + +(define (make-keyless-than + (assq-ref columns-and-types key-columns)))) + (lambda (a b) + (let ((a-val + (if (table-manager-id-placeholder? a) + (table-manager-id-placeholder-id a) + a)) + (b-val + (if (table-manager-id-placeholder? b) + (table-manager-id-placeholder-id b) + b))) + (less-than a-val b-val)))) + (let ((less-than-procedures + (map (lambda (column) + (type->less-than + (assq-ref columns-and-types column))) + key-columns)) + (equality-procedures + (map (lambda (column) + (type->equality + (assq-ref columns-and-types column))) + key-columns))) + (lambda (initial-a initial-b) + (let loop ((index 0) + (equality-procedures equality-procedures) + (less-than-procedures less-than-procedures)) + (let ((a-val (let ((val (vector-ref initial-a index))) + (if (table-manager-id-placeholder? val) + (table-manager-id-placeholder-id val) + val))) + (b-val (let ((val (vector-ref initial-b index))) + (if (table-manager-id-placeholder? val) + (table-manager-id-placeholder-id val) + val)))) + (if ((car equality-procedures) a-val b-val) + (if (= (- (vector-length initial-a) 1) + index) + #f + (loop (1+ index) + (cdr equality-procedures) + (cdr less-than-procedures))) + ((car less-than-procedures) a-val b-val)))))))) + +(define (table-manager-placeholderlist + cons + (table-manager-key->id-hash-table + (table-manager-id-placeholder-table-manager val)))) + + (raise-exception + (make-exception + (make-exception-with-message "unable to resolve placeholder") + (make-exception-with-irritants + (list val))))) + id))) + ((table-manager-thunk-placeholder? val) + ((table-manager-thunk-placeholder-thunk val))) + ((list? val) + (map resolve-table-manager-placeholder val)) + ((vector? val) + (vector-map + (lambda (_ v) + (resolve-table-manager-placeholder v)) + val)) + (else val))) + +(define (integer-arrayplaceholder-and-row-hash-table + (make-hash-table initial-hash-table-size)) + + (define key->id-hash-table + (make-hash-table initial-hash-table-size)) + + (define finished-condition + (make-condition)) + + (define key-columns-nullable + (call-with-resource-from-pool + connection-pool + (lambda (conn) + (if (symbol? key-columns) + (field-can-be-null? conn + table-name + (symbol->string key-columns)) + (map (lambda (column) + (field-can-be-null? conn table-name + (symbol->string column))) + key-columns))))) + + (define table-manager + (make-table-manager table-name + columns-and-types + dependencies + key-columns + key-columns-nullable + id-column + (make-keyplaceholder-and-row-hash-table + key->id-hash-table + #f + channel + #f + finished-condition)) + + (define (perform-value-processing value types) + (cond + ((and (list? types) + (member "sorted-integer[]" types)) + (if (vector? value) + (vector-map + (lambda (_ val type) + (if (and (string? type) + (string=? type "sorted-integer[]")) + (sort! val <) + val)) + value + (list->vector types)) + (map + (lambda (val type) + (if (and (string? type) + (string=? type "sorted-integer[]")) + (sort! val <) + val)) + value + types))) + ((and (string? types) + (string=? types "sorted-integer[]")) + (sort! value <)) + (else value))) + + (define column-names + (map (lambda (column-and-type) + (symbol->string (car column-and-type))) + columns-and-types)) + + (define (build-insert-query/raw-rows raw-rows) + (call-with-output-string + (lambda (port) + (put-string + port + " +INSERT INTO ") + (put-string port table-name) + (put-string port " (") + (put-string port (string-join column-names ", ")) + (put-string port ") VALUES +") + (let* ((row-count + (vector-length raw-rows)) + (last-index + (- row-count 1))) + (vector-for-each + (lambda (index row) + (put-string port "(") + (let ((last-index + (- (vector-length row) 1))) + (vector-for-each + (lambda (index val) + (put-string port (value->sql val)) + (unless (= index last-index) + (put-string port ","))) + row)) + (put-string port ")") + + (unless (= index last-index) + (put-string port ","))) + raw-rows))))) + + (define (build-insert-query sorted-keys-and-rows) + (define column-names + (map (lambda (column-and-type) + (symbol->string (car column-and-type))) + columns-and-types)) + + (call-with-output-string + (lambda (port) + (put-string + port + " +INSERT INTO ") + (put-string port table-name) + (put-string port " (") + (put-string port (string-join column-names ", ")) + (put-string port ") VALUES +") + (let loop ((sorted-keys-and-rows + sorted-keys-and-rows)) + (let* ((key-and-row + next + (car+cdr sorted-keys-and-rows)) + (key + row + (car+cdr key-and-row))) + (put-string port "(") + (let ((last-index + (- (vector-length row) 1))) + (vector-for-each + (lambda (index val) + (put-string port (value->sql val)) + (unless (= index last-index) + (put-string port ","))) + row)) + (put-string port ")") + + (if (null? next) + #f ; finished + (begin + (put-string port ",") + (loop next))))) + + (put-string port " +ON CONFLICT DO NOTHING") + (when id-column + (put-string port " +RETURNING ") + (put-string port (symbol->string id-column)))))) + + (define (insert-all/raw-rows) + (for-each table-manager-insert-all + dependencies) + + (let ((count + (vector-length (table-manager-raw-rows table-manager)))) + + (if (= count 0) + (simple-format + (current-error-port) + "no entries to insert for the ~A table\n" + table-name) + (with-time-logging + (simple-format + #f + "inserting ~A entries to the ~A table\n" + count table-name) + + (let ((raw-rows + (table-manager-raw-rows table-manager))) + (with-delay-logging + "table-manager resolve raw-rows placeholders" + (vector-map! + (lambda (_ row) + (vector-map! + (lambda (_ val) + (resolve-table-manager-placeholder val)) + row) + row) + raw-rows)) + + (let ((query + (with-delay-logging + "table-manager insert-all build-query/raw-rows" + (build-insert-query/raw-rows raw-rows)))) + (with-resource-from-pool connection-pool conn + (exec-query conn query))))))) + *unspecified*) + + (define* (insert-all #:key check-for-self-dependents?) + (for-each table-manager-insert-all + dependencies) + + (let* ((keys-and-rows + (hash-fold + (lambda (key value result) + (if (and check-for-self-dependents? + (vector-any + (lambda (val) + (and (table-manager-id-placeholder? val) + (eq? + (table-manager-id-placeholder-table-manager + val) + table-manager))) + (cdr value))) + result + (cons + (cons key (cdr value)) + result))) + '() + key->placeholder-and-row-hash-table)) + (count + (length keys-and-rows))) + + (if (= count 0) + (simple-format + (current-error-port) + "no entries to insert for the ~A table\n" + table-name) + (let ((sorted-keys-and-rows + (with-delay-logging + "table-manager insert-all sort rows" + (fibers-sort! + keys-and-rows + (let ((keyplaceholder-and-row-hash-table + (car key-and-row)))) + (set-table-manager-id-placeholder-id! + (car placeholder-and-row) + (string->number (car query-response))) + (hashx-remove! + key-hash + key-assoc + key->placeholder-and-row-hash-table + (car key-and-row)))) + sorted-keys-and-rows-chunk + result-rows) + ;; Don't know which rows the id's correspond + ;; to, so query for them all + (let* ((keys-vector + (list->vector (map car sorted-keys-and-rows-chunk))) + (resolved-keys + (map (lambda (key-and-row) + (perform-value-processing + (resolve-table-manager-placeholder + (car key-and-row)) + (table-manager-key-column-types + table-manager))) + sorted-keys-and-rows-chunk)) + (keys-and-indexes + (map cons + resolved-keys + (iota (length resolved-keys)))) + (select-result-rows + (with-resource-from-pool connection-pool conn + (exec-query conn + (build-select-query-by-keys-and-indexes + table-manager + keys-and-indexes))))) + (simple-format #t "debug: expected ~A rows, got ~A, querying returned ~A rows\n" + (length sorted-keys-and-rows-chunk) + (length result-rows) + (length select-result-rows)) + (for-each + (lambda (cols) + (let* ((index (string->number (first cols))) + (id (string->number (second cols))) + (key (vector-ref keys-vector index))) + (let* ((placeholder-and-row + (hashx-ref + key-hash + key-assoc + key->placeholder-and-row-hash-table + key))) + (set-table-manager-id-placeholder-id! + (car placeholder-and-row) + id) + (hashx-remove! + key-hash + key-assoc + key->placeholder-and-row-hash-table + key)))) + select-result-rows)))) + (with-resource-from-pool connection-pool conn + (exec-query conn query))))) + insert-batch-size + sorted-keys-and-rows)) + + (when check-for-self-dependents? + ;; All the processed rows should be removed from the hash table, + ;; so process the ones with self dependents + (insert-all)) + + (hash-clear! key->placeholder-and-row-hash-table)))) + + *unspecified*) + + (spawn-fiber + (lambda () + (let loop () + (match (get-message channel) + (('insert-all reply) + (put-message + reply + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (cons 'exception exn)) + (lambda () + (with-exception-handler + (lambda (exn) + (let ((stack + (match (fluid-ref %stacks) + ((stack-tag . prompt-tag) + (make-stack #t + 0 prompt-tag + 0 (and prompt-tag 1))) + (_ + (make-stack #t))))) + (raise-exception + (make-exception + exn + (make-knots-exception stack))))) + (lambda () + (when (eq? (table-manager-state table-manager) + 'closed) + (if (table-manager-raw-rows table-manager) + (insert-all/raw-rows) + (insert-all #:check-for-self-dependents? + dependent-on-self?))))) + 'success) + #:unwind? #t)) + + (set-table-manager-state! table-manager 'all-rows-inserted) + (signal-condition! finished-condition) + + (loop)) + + (('close reply) + (set-table-manager-state! table-manager 'closed) + (put-message reply #t) + + (loop)) + + (('destroy reply) + (hash-clear! key->id-hash-table) + (set-table-manager-state! table-manager 'destroyed) + (put-message reply #t) + + ;; No loop + *unspecified*) + + (('add-raw-rows raw-rows reply) + (set-table-manager-raw-rows! + table-manager + (vector-append raw-rows + (or (table-manager-raw-rows table-manager) + (vector)))) + (put-message reply #t) + (loop)) + + (('cache-id key id reply) + (hash-set! key->id-hash-table key id) + + (put-message reply #t) + (loop)) + + (('cache-ids data keys-vector reply) + (with-delay-logging + "table-manager add-rows cache-ids" + (for-each + (lambda (cols) + (let* ((index (string->number (first cols))) + (id (string->number (second cols))) + (key (vector-ref keys-vector index))) + (hash-set! key->id-hash-table key id))) + data)) + + (put-message reply #t) + (loop)) + + (('lookup-id-or-placeholder key reply) + (put-message + reply + (match (hash-ref (table-manager-key->id-hash-table table-manager) + key) + (#f + (match (hashx-ref + key-hash + key-assoc + (table-manager-key->placeholder-and-row-hash-table + table-manager) + key) + (#f #f) + ((placeholder . _) placeholder))) + (id id))) + (loop)) + + (('populate-results-vector keys-vector rows + result-vector new?-vector + reply) + (with-delay-logging + "table-manager populate-results-vector" + (let ((result-vector-length (vector-length result-vector))) + (let loop ((index 0)) + (if (= index result-vector-length) + #f ; finished + (let ((res (vector-ref result-vector index))) + (unless res + (vector-set! + result-vector + index + (let* ((key (vector-ref keys-vector index)) + (entry + (hashx-ref + key-hash + key-assoc + key->placeholder-and-row-hash-table + key))) + (if (eq? #f entry) + (let ((placeholder + (make-table-manager-id-placeholder + table-manager + key + unknown-id)) + (row (vector-ref rows index))) + (hashx-set! key-hash + key-assoc + key->placeholder-and-row-hash-table + key + (cons placeholder row)) + (vector-set! new?-vector index #t) + placeholder) + (let ((placeholder (car entry))) + (vector-set! new?-vector index #f) + placeholder))))) + (loop (1+ index))))))) + (put-message reply #t) + (loop)) + + (('add-row key row reply) + ;; Return two values, the placeholder, and whether this placeholder + ;; is new? + (match (hashx-ref key-hash + key-assoc + key->placeholder-and-row-hash-table + key) + ((placeholder . row) + (put-message reply (list placeholder #f))) + (#f + (let ((placeholder + (make-table-manager-id-placeholder + table-manager + key + unknown-id))) + (hashx-set! key-hash + key-assoc + key->placeholder-and-row-hash-table + key + (cons placeholder row)) + (put-message reply (list placeholder #t))))) + + (loop))))) + #:parallel? #t) + + table-manager) + +(define (table-manager-row->key table-manager row) + (define columns-and-types + (table-manager-columns-and-types table-manager)) + (define (column-name->val col) + (let ((index (list-index (lambda (col-and-type) + (eq? col + (car col-and-type))) + columns-and-types))) + (vector-ref row index))) + + (let ((key-columns (table-manager-key-columns table-manager))) + (cond + ((symbol? key-columns) + (column-name->val key-columns)) + ;; Directly use the row as the key if all the columns are key columns + ((= (vector-length row) + (length key-columns)) + row) + (else + (let ((vec (list->vector key-columns))) + (vector-map! + (lambda (_ name) + (column-name->val name)) + vec) + vec))))) + +(define (key-contains-placeholder? key) + (cond + ((vector? key) + (vector-any table-manager-placeholder? key)) + ((list? key) + (any (lambda (v) + (if (vector? v) + (vector-any table-manager-placeholder? v) + (table-manager-placeholder? v))) + key)) + (else + (table-manager-placeholder? key)))) + +(define (sort-sorted-integer-array val) + (sort! val + (lambda (a b) + (cond + ((and (integer? a) + (integer? b)) + (< a b)) + ((and (table-manager-placeholder? a) + (table-manager-placeholder? b)) + (table-manager-placeholdervector types) + unprocessed-row) + unprocessed-row))) + +(define (process-rows table-manager unprocessed-rows) + (let ((types + (map cdr (table-manager-columns-and-types table-manager)))) + (if (member "sorted-integer[]" types) + (begin + (vector-map! + (lambda (_ row) + (vector-map! + (lambda (_ val type) + (if (and (string? type) + (string=? type "sorted-integer[]") + (> (vector-length val) 1)) + (sort-sorted-integer-array val) + val)) + row + (list->vector types)) + row) + unprocessed-rows) + unprocessed-rows) + unprocessed-rows))) + +(define (query-for-single-row table-manager key) + (define table-name + (table-manager-table-name table-manager)) + (define id-column + (table-manager-id-column table-manager)) + (define key-columns + (table-manager-key-columns table-manager)) + (define key-columns-list + (if (symbol? key-columns) + (list key-columns) + key-columns)) + + (let ((query + (string-append + " +SELECT " (if id-column + (symbol->string id-column) + "1") " +FROM " table-name " +WHERE " (string-join + (map (lambda (index key-column key-value) + (if (NULL? key-value) + (string-append + (symbol->string key-column) + " IS NULL") + (string-append + (symbol->string key-column) + " = $" + (number->string index)))) + (iota (length key-columns-list) 1) + key-columns-list + (if (symbol? key-columns) + (list key) + (vector->list key))) + " AND ") " +FOR KEY SHARE"))) + (match (call-with-resource-from-pool + (table-manager-connection-pool table-manager) + (lambda (conn) + (exec-query-with-null-handling + conn + query + (filter-map + (lambda (val) + (if (NULL? val) + #f + (value->sql-literal val))) + (if (symbol? key-columns) + (list key) + (vector->list key)))))) + (() #f) + (((id-string)) + (string->number id-string))))) + +(define (table-manager-add-row table-manager unprocessed-row) + (define table-name + (table-manager-table-name table-manager)) + (define key-columns + (table-manager-key-columns table-manager)) + (define id-column + (table-manager-id-column table-manager)) + (define key->placeholder-and-row-hash-table + (table-manager-key->placeholder-and-row-hash-table table-manager)) + (define key->id-hash-table + (table-manager-key->id-hash-table table-manager)) + + (define key-columns-list + (if (symbol? key-columns) + (list key-columns) + key-columns)) + + (define row (process-row table-manager unprocessed-row)) + + (define key (table-manager-row->key table-manager row)) + + (match (hash-ref key->id-hash-table key) + (#f + (match (hashx-ref key-hash + key-assoc + key->placeholder-and-row-hash-table + key) + ((placeholder . _) + (values placeholder + #f)) + (#f + (if (key-contains-placeholder? key) + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'add-row key row reply)) + (apply values (get-message reply))) + (let ((id (query-for-single-row table-manager key))) + (if id + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'cache-id key id reply)) + (get-message reply) + (values id #f)) + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'add-row key row reply)) + (apply values (get-message reply))))))))) + (id (values id #f)))) + +(define (table-manager-add-raw-rows table-manager raw-rows) + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'add-raw-rows raw-rows reply)) + + (get-message reply))) + +(define (build-select-query-by-keys-and-indexes table-manager + keys-and-indexes) + (define table-name + (table-manager-table-name table-manager)) + (define id-column + (table-manager-id-column table-manager)) + (define key-columns + (table-manager-key-columns table-manager)) + (define key-columns-list + (if (symbol? key-columns) + (list key-columns) + key-columns)) + (define key-column-types + (let ((types (table-manager-columns-and-types table-manager))) + (map (lambda (key-column) + (assoc-ref types key-column)) + key-columns-list))) + (define key-columns-nullable + (table-manager-key-columns-nullable table-manager)) + (define key-columns-nullable-list + (if (symbol? key-columns) + (list key-columns-nullable) + key-columns-nullable)) + + (call-with-output-string + (lambda (port) + (put-string port + " +SELECT vals.tm_index") + (when id-column + (put-string port ", ") + (put-string port table-name) + (put-string port ".") + (put-string port (symbol->string id-column))) + (put-string port " +FROM ") + (put-string port table-name) + (put-string port " +JOIN (VALUES ") + (let loop ((keys-and-indexes keys-and-indexes)) + (let* ((key-and-index + (car keys-and-indexes)) + (key + index + (car+cdr key-and-index))) + (put-string port "(") + (put-string port (number->string index)) + (put-string port ",") + (put-string + port + (string-join + (map (lambda (key type) + (value->sql + (if (eq? key NULL) + (cons + (match type + ('string "varchar") + ('integer "integer")) + NULL) + key))) + (if (symbol? key-columns) + (list key) + (vector->list key)) + key-column-types) + ",")) + (put-string port ")")) + + (if (null? (cdr keys-and-indexes)) + #f ; finished + (begin + (put-string port ",") + (loop (cdr keys-and-indexes))))) + + (put-string port " +) AS vals (tm_index,") + + (put-string port + (string-join + (map symbol->string key-columns-list) + ",")) + + (put-string port ") +ON ") + + (put-string + port + (string-join + (map (lambda (field nullable?) + (string-concatenate + `("(" ,table-name "." ,field " = vals." ,field + ,@(if nullable? + `(" OR (" ,table-name "." ,field " IS NULL AND" + " vals." ,field " IS NULL" + ")") + '()) + ")"))) + (map symbol->string key-columns-list) + key-columns-nullable-list) + " AND\n "))))) + +(define* (table-manager-add-rows table-manager unprocessed-rows) + (define table-name + (table-manager-table-name table-manager)) + (define key-columns + (table-manager-key-columns table-manager)) + (define key-columns-nullable + (table-manager-key-columns-nullable table-manager)) + (define id-column + (table-manager-id-column table-manager)) + (define key->placeholder-and-row-hash-table + (table-manager-key->placeholder-and-row-hash-table table-manager)) + (define key->id-hash-table + (table-manager-key->id-hash-table table-manager)) + + (define key-columns-list + (if (symbol? key-columns) + (list key-columns) + key-columns)) + + (define key-columns-nullable-list + (if (symbol? key-columns) + (list key-columns-nullable) + key-columns-nullable)) + + (define key-column-types + (let ((types (table-manager-columns-and-types table-manager))) + (map (lambda (key-column) + (assoc-ref types key-column)) + key-columns-list))) + + (define start-time + (current-time)) + + (let* ((row-count (vector-length unprocessed-rows)) + (rows + (with-delay-logging + "table-manager add-rows process-rows" + (process-rows table-manager unprocessed-rows))) + (keys-vector + (with-delay-logging + "table-manager add-rows make keys-vector" + (vector-map + (lambda (_ row) + (table-manager-row->key table-manager row)) + rows))) + (result-vector + (make-vector row-count #f)) + (new?-vector + (make-vector row-count #f)) + (keys-and-indexes + (with-delay-logging + "table-manager add-rows make keys-and-indexes" + (vector-fold + (lambda (index result key) + (match (hash-ref key->id-hash-table key) + (#f + (match (hashx-ref key-hash + key-assoc + key->placeholder-and-row-hash-table + key) + ((placeholder . _) + (vector-set! result-vector index placeholder) + result) + (#f + (if (key-contains-placeholder? key) + result + (cons (cons key index) + result))))) + (id + (vector-set! result-vector index id) + result))) + '() + keys-vector)))) + + (let ((data + (if (null? keys-and-indexes) + '() + (let ((query + (with-delay-logging + "table-manager add-rows build-query" + (build-select-query-by-keys-and-indexes + table-manager + keys-and-indexes)))) + (call-with-resource-from-pool + (table-manager-connection-pool table-manager) + (lambda (conn) + (with-delay-logging + "table-manager add-rows exec-query" + (exec-query-with-null-handling conn query)))))))) + + (when id-column + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'cache-ids + data keys-vector reply)) + (get-message reply))) + + (if id-column + (for-each + (lambda (cols) + (let* ((index (string->number (first cols))) + (id (string->number (second cols)))) + (vector-set! result-vector index id))) + data) + (for-each + (lambda (data-row) + (let ((index (string->number (car data-row)))) + (vector-set! result-vector index #t))) + data)) + + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'populate-results-vector + keys-vector rows + result-vector new?-vector + reply)) + (get-message reply)) + + (let ((time-taken (- (current-time) start-time))) + (when (> time-taken 0) + (simple-format + #t "table-manager-add-rows to ~A for ~A rows took ~A ~A\n" + table-name + row-count + time-taken + (if (= time-taken 1) + "second" + "seconds")))) + + (values result-vector + new?-vector)))) + +(define (table-manager-closed? table-manager) + (memq (table-manager-state table-manager) + '(closed all-rows-inserted destroyed))) + +(define (table-manager-all-rows-inserted? table-manager) + (memq (table-manager-state table-manager) + '(all-rows-inserted destroyed))) + +(define (table-manager-destroyed? table-manager) + (eq? (table-manager-state table-manager) + 'destroyed)) + +(define (table-manager-close table-manager) + (or (table-manager-closed? table-manager) + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'close reply)) + (get-message reply)))) + +(define (table-manager-insert-all table-manager) + (let ((reply (make-channel))) + (perform-operation + (choice-operation + (wrap-operation (put-operation + (table-manager-channel table-manager) + (list 'insert-all reply)) + (lambda _ + (match (get-message reply) + (('exception . exn) + (raise-exception exn)) + ('success #t)))) + (wait-operation + (table-manager-finished-condition table-manager)))))) + +(define (filter-vectors-only-new new?-vec . vecs) + (let* ((new-length + (vector-count (lambda (_ x) x) new?-vec)) + (dest-vecs + (map (lambda _ (make-vector new-length)) + vecs))) + (let loop ((dest-index 0) + (new?-index 0)) + (if (= dest-index new-length) + (apply values dest-vecs) + (if (vector-ref new?-vec new?-index) + (begin + (for-each + (lambda (source-vec dest-vec) + (vector-set! dest-vec + dest-index + (vector-ref source-vec new?-index))) + vecs + dest-vecs) + (loop (1+ dest-index) + (1+ new?-index))) + (loop dest-index + (1+ new?-index))))))) + +(define (group-ids-by-counts-vector ids-vec counts-vec) + (vector-unfold + (lambda (index ids-vec-index) + (let ((count (vector-ref counts-vec index))) + (values + (vector + (vector-copy + ids-vec + ids-vec-index + (+ ids-vec-index count))) + (+ ids-vec-index count)))) + (vector-length counts-vec) + 0)) + +(define-record-type + (make-table-manager-coordinator table-managers channel) + table-manager-coordinator? + (table-managers table-manager-coordinator-table-managers) + (channel table-manager-coordinator-channel)) + +(define (spawn-table-manager-coordinator seed-table-managers) + (define all-ordered-table-managers + (let loop ((seed-table-managers seed-table-managers) + (result '())) + (if (null? seed-table-managers) + (delete-duplicates result eq?) + (loop (cdr seed-table-managers) + (let ((table-manager + (first seed-table-managers))) + (loop (table-manager-dependencies table-manager) + (cons table-manager result))))))) + + (define table-managers-by-table + (map (lambda (table-manager) + (cons (table-manager-table-name table-manager) + table-manager)) + all-ordered-table-managers)) + + (define table-manager-first-level-dependents + (map (lambda (table-manager) + (cons + table-manager + (filter-map + (lambda (other-table-manager) + (if (eq? table-manager other-table-manager) + #f + (if (memq table-manager + (table-manager-dependencies other-table-manager)) + other-table-manager + #f))) + all-ordered-table-managers))) + all-ordered-table-managers)) + + (define (table-manager-plus-recursive-dependents table-manager) + (delete-duplicates + (cons table-manager + (append-map + table-manager-plus-recursive-dependents + (assq-ref table-manager-first-level-dependents + table-manager))) + eq?)) + + (define table-manager-dependents + (map (lambda (table-manager) + (cons + table-manager + (append-map + (lambda (other-table-manager) + (if (eq? table-manager other-table-manager) + '() + (if (memq table-manager + (table-manager-dependencies other-table-manager)) + ;; Placeholders might refer to dependencies, so + ;; include them all + (table-manager-plus-recursive-dependents + other-table-manager) + '()))) + all-ordered-table-managers))) + all-ordered-table-managers)) + + (define (destroy-table-manager table-manager) + (let ((reply (make-channel))) + (put-message (table-manager-channel table-manager) + (list 'destroy reply)) + (get-message reply))) + + (define (destroy-unneeded-table-managers) + (for-each + (match-lambda + ((table-manager . dependents) + (unless (table-manager-destroyed? table-manager) + (when (and (table-manager-all-rows-inserted? table-manager) + (every table-manager-all-rows-inserted? dependents)) + (destroy-table-manager table-manager))))) + table-manager-dependents)) + + (define (close-table-manager table-manager) + (table-manager-close table-manager) + + (destroy-unneeded-table-managers) + + (let* ((table-managers-to-insert-first + (take-while (lambda (tm) + (not (eq? table-manager tm))) + all-ordered-table-managers)) + (open-table-managers-to-insert-first + (remove table-manager-closed? + table-managers-to-insert-first))) + (if (null? open-table-managers-to-insert-first) + (begin + (for-each table-manager-insert-all + table-managers-to-insert-first) + + (table-manager-insert-all table-manager) + + (destroy-unneeded-table-managers)) + (simple-format #t "info: unable to insert to ~A as the following table managers haven't been closed yet: ~A\n" + (table-manager-table-name table-manager) + (map table-manager-table-name + open-table-managers-to-insert-first))))) + + (define channel (make-channel)) + + (define table-manager-coordinator + (make-table-manager-coordinator all-ordered-table-managers + channel)) + + (spawn-fiber + (lambda () + (let loop () + (match (get-message channel) + (('close table reply) + (with-exception-handler + (lambda (exn) + (put-message reply (cons 'exception exn))) + (lambda () + (close-table-manager + (assoc-ref table-managers-by-table + table)) + (put-message reply #t)) + #:unwind? #t) + (loop)) + (('destroy reply) + (for-each + (lambda (table-manager) + (unless (table-manager-closed? table-manager) + (simple-format #t "warning, table manager not closed ~A\n" + table-manager) + (close-table-manager table-manager))) + all-ordered-table-managers) + + (put-message reply #t) + + ;; No loop + *unspecified*))))) + + table-manager-coordinator) + +(define (fetch-table-manager coordinator table-name) + (find (lambda (table-manager) + (string=? (table-manager-table-name table-manager) + table-name)) + (table-manager-coordinator-table-managers coordinator))) + +(define (close-table-manager coordinator table-name) + (let ((reply (make-channel))) + (put-message (table-manager-coordinator-channel coordinator) + (list 'close table-name reply)) + (match (get-message reply) + (('exception exn) + (raise-exception exn)) + (val val)))) + +(define (destroy-table-manager-coordinator coordinator) + (let ((reply (make-channel))) + (put-message (table-manager-coordinator-channel coordinator) + (list 'destroy reply)) + (get-message reply))) diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm index 84b823d..e2651ba 100644 --- a/guix-data-service/utils.scm +++ b/guix-data-service/utils.scm @@ -37,11 +37,23 @@ #:use-module (fibers conditions) #:use-module (fibers scheduler) #:use-module (knots timeout) + #:use-module (knots promise) + #:use-module (knots parallelism) #:use-module (prometheus) #:export (call-with-time-logging with-time-logging + + %delay-threshold + call-with-delay-logging + with-delay-logging + prevent-inlining-for-tests + fibers-delay/eager + fibers-sort! + + try-split-at! + chunk chunk! chunk-for-each! @@ -70,9 +82,103 @@ "Log under NAME the time taken to evaluate EXP." (call-with-time-logging action (lambda () exp ...))) +(define %delay-threshold + (make-parameter 4)) + +(define (call-with-delay-logging action thunk) + (let ((start-time (current-time))) + (let-values + ((result (thunk))) + (let ((time-taken (- (current-time) start-time))) + (when (and=> (%delay-threshold) + (lambda (threshold) + (>= time-taken threshold))) + (simple-format #t "delay detected in ~A, took ~A seconds\n" + action time-taken))) + (apply values result)))) + +(define-syntax-rule (with-delay-logging action exp ...) + "Log under NAME the time taken to evaluate EXP." + (call-with-delay-logging action (lambda () exp ...))) + (define-syntax-rule (prevent-inlining-for-tests var) (set! var var)) +(define (fibers-delay/eager thunk) + (let ((promise (fibers-delay thunk))) + (spawn-fiber + (lambda () + (with-exception-handler + (lambda _ + ;; Silently handle this exception + #f) + (lambda () + (fibers-force promise)) + #:unwind? #t))) + promise)) + +(define (try-split-at! lst i) + (cond ((< i 0) + (error "negitive split size")) + ((= i 0) + (values '() lst)) + (else + (let lp ((l lst) (n (- i 1))) + (if (<= n 0) + (let ((tmp (cdr l))) + (unless (null? tmp) + (set-cdr! l '())) + (values lst tmp)) + (if (or (null? l) + (null? (cdr l))) + (values lst '()) + (lp (cdr l) (- n 1)))))))) + +(define (chunk! lst max-length) + (let loop ((chunks '()) + (lst lst)) + (let ((chunk + rest + (try-split-at! lst max-length))) + (if (null? rest) + (reverse! (cons chunk chunks)) + (loop (cons chunk chunks) + rest))))) + +(define* (fibers-sort! items less #:key parallelism) + (define requested-chunk-count + (or parallelism + (+ 1 (length (scheduler-remote-peers (current-scheduler)))))) + + (define items-length (length items)) + + (if (= 0 items-length) + items + (let* ((chunk-length (ceiling (/ items-length + requested-chunk-count))) + (chunks (chunk! items chunk-length))) + (let loop ((sorted-chunk-promises + (map + (lambda (chunk) + (fibers-delay/eager + (lambda () + (sort! chunk less)))) + chunks))) + (if (null? (cdr sorted-chunk-promises)) + (fibers-force + (first sorted-chunk-promises)) + (loop + (map + (match-lambda + ((items) items) + ((a b) + (fibers-delay/eager + (lambda () + (merge! (fibers-force a) + (fibers-force b) + less))))) + (chunk! sorted-chunk-promises 2)))))))) + (define (chunk lst max-length) (let ((len (length lst))) (cond @@ -86,45 +192,26 @@ (else (list lst))))) -(define (chunk! lst max-length) - (let ((len (length lst))) - (cond - ((= 0 len) '()) - ((> (length lst) max-length) - (call-with-values (lambda () - (split-at! lst max-length)) - (lambda (first-lst rest) - (cons first-lst - (chunk! rest max-length))))) - (else - (list lst))))) - (define* (chunk-for-each! proc chunk-size #:rest lsts) - (define (do-one-iteration lsts) - (if (> (length (car lsts)) - chunk-size) - (let ((chunks-and-rest - (map (lambda (lst) - (call-with-values (lambda () - (split-at! lst chunk-size)) - (lambda (first-lst rest) - (cons first-lst - rest)))) - lsts))) - (apply proc - (map car chunks-and-rest)) - (do-one-iteration - (map cdr chunks-and-rest))) - (apply proc lsts))) - (let ((list-lengths (map length lsts))) (unless (= 1 (length (delete-duplicates list-lengths))) - (error "lists not equal length")) + (error "lists not equal length"))) - (unless (= 0 (first list-lengths)) - (do-one-iteration lsts))) + (let loop ((lsts lsts)) + (let ((chunks-and-rest + (map (lambda (lst) + (call-with-values (lambda () + (try-split-at! lst chunk-size)) + (lambda (first-lst rest) + (cons first-lst + rest)))) + lsts))) + (apply proc + (map car chunks-and-rest)) + (unless (null? (cdr (first chunks-and-rest))) + (loop (map cdr chunks-and-rest))))) - #t) + *unspecified*) (define* (delete-duplicates/sort! unsorted-lst less #:optional (equal? equal?)) (if (null? unsorted-lst) diff --git a/scripts/guix-data-service-process-job.in b/scripts/guix-data-service-process-job.in index 488a0b7..eaf38bc 100644 --- a/scripts/guix-data-service-process-job.in +++ b/scripts/guix-data-service-process-job.in @@ -129,6 +129,4 @@ #:parallelism (assq-ref opts 'parallelism))) #:unwind? #t)) #:hz 0 - #:parallelism 1 - ;; Drain to make sure there are no bugs with the use of fibers - #:drain? #t)))) + #:parallelism (assq-ref opts 'parallelism))))) diff --git a/tests/jobs-load-new-guix-revision.scm b/tests/jobs-load-new-guix-revision.scm index 84d78e8..0ca26c4 100644 --- a/tests/jobs-load-new-guix-revision.scm +++ b/tests/jobs-load-new-guix-revision.scm @@ -3,11 +3,13 @@ #:use-module (ice-9 match) #:use-module (squee) #:use-module (fibers) + #:use-module (knots) #:use-module (guix utils) #:use-module (guix store) #:use-module (guix tests) #:use-module (guix-data-service database) #:use-module (guix-data-service model git-repository) + #:use-module (guix-data-service model guix-revision) #:use-module (guix-data-service jobs load-new-guix-revision)) (test-begin "jobs-load-new-guix-revision") @@ -65,13 +67,16 @@ ((guix-data-service jobs load-new-guix-revision) extract-information-from (lambda _ - #t)) + '())) (mock - ((guix-data-service model channel-instance) - insert-channel-instances - (lambda (conn guix-revision-id derivations-by-system) - #t)) + ((guix-data-service jobs load-new-guix-revision) + load-channel-instances + (lambda (call-with-utility-thread + read-derivations/serialised + git-repository-id commit + channel-derivations-by-system) + (insert-guix-revision conn git-repository-id commit))) (mock ((guix channels) @@ -81,7 +86,7 @@ (mock ((guix-data-service jobs load-new-guix-revision) - derivation-file-names->derivation-ids + insert-derivations-with-table-managers (lambda _ #(1))) @@ -103,8 +108,13 @@ ((id) (run-fibers (lambda () - (process-load-new-guix-revision-job - id #:parallelism 1)) + (with-exception-handler + (lambda (exn) + (print-backtrace-and-exception/knots exn) + (raise-exception exn)) + (lambda () + (process-load-new-guix-revision-job + id #:parallelism 1)))) #:hz 0 #:parallelism 1 #:drain? #t)))))))))))))) diff --git a/tests/model-derivation.scm b/tests/model-derivation.scm deleted file mode 100644 index 59f3f75..0000000 --- a/tests/model-derivation.scm +++ /dev/null @@ -1,17 +0,0 @@ -(define-module (test-model-derivation) - #:use-module (srfi srfi-64) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model derivation)) - -(test-begin "test-model-derivation") - -(with-postgresql-connection - "test-model-derivation" - (lambda (conn) - (check-test-database! conn) - - (test-equal "count-derivations" - '("0") - (count-derivations conn)))) - -(test-end) diff --git a/tests/model-license-set.scm b/tests/model-license-set.scm deleted file mode 100644 index 24cb4e1..0000000 --- a/tests/model-license-set.scm +++ /dev/null @@ -1,47 +0,0 @@ -(define-module (tests model-license-set) - #:use-module (srfi srfi-64) - #:use-module (guix utils) - #:use-module (guix tests) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model license) - #:use-module (guix-data-service model license-set)) - -(test-begin "test-model-license-set") - -(define license-data - '#((("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")) - (("License 1" - "https://gnu.org/licenses/test-1.html" - #f) - ("License 2" - #f - #f)) - ())) - -(with-postgresql-connection - "test-model-license-set" - (lambda (conn) - (check-test-database! conn) - - (with-postgresql-transaction - conn - (lambda (conn) - (test-assert "works" - (inferior-packages->license-set-ids - conn - (inferior-packages->license-id-lists conn license-data)))) - #:always-rollback? #t) - - (with-postgresql-transaction - conn - (lambda (conn) - (let ((license-id-lists - (inferior-packages->license-id-lists conn license-data))) - (test-equal "works repeatedly" - (inferior-packages->license-set-ids conn license-id-lists) - (inferior-packages->license-set-ids conn license-id-lists)))) - #:always-rollback? #t))) - -(test-end) diff --git a/tests/model-license.scm b/tests/model-license.scm deleted file mode 100644 index e34b4f8..0000000 --- a/tests/model-license.scm +++ /dev/null @@ -1,44 +0,0 @@ -(define-module (tests model-license) - #:use-module (srfi srfi-64) - #:use-module (guix utils) - #:use-module (guix tests) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model license)) - -(test-begin "test-model-license") - -(define license-data - '#((("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")) - (("License 1" - "https://gnu.org/licenses/test-1.html" - #f) - ("License 2" - "https://gnu.org/licenses/test-2.html" - #f) - ("License 3" - #f - #f)))) - -(with-postgresql-connection - "test-model-license" - (lambda (conn) - (check-test-database! conn) - - (with-postgresql-transaction - conn - (lambda (conn) - (test-assert "works" - (inferior-packages->license-id-lists conn license-data))) - #:always-rollback? #t) - - (with-postgresql-transaction - conn - (lambda (conn) - (test-equal "works repeatedly" - (inferior-packages->license-id-lists conn license-data) - (inferior-packages->license-id-lists conn license-data))) - #:always-rollback? #t))) - -(test-end) diff --git a/tests/model-lint-checker.scm b/tests/model-lint-checker.scm deleted file mode 100644 index 73ac405..0000000 --- a/tests/model-lint-checker.scm +++ /dev/null @@ -1,38 +0,0 @@ -(define-module (tests model-lint-checker) - #:use-module (srfi srfi-64) - #:use-module (ice-9 match) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model lint-checker)) - -(test-begin "test-model-lint-checker") - -(with-postgresql-connection - "test-model-lint-checker" - (lambda (conn) - (check-test-database! conn) - - (test-assert "single insert" - (with-postgresql-transaction - conn - (lambda (conn) - (define data - `#((name-1 - #t - ,(lint-checker-description-data->lint-checker-description-set-id - conn - '(("en_US" . "foo")))) - (name-2 - #f - ,(lint-checker-description-data->lint-checker-description-set-id - conn - '(("en_US" . "bar")))))) - - (match (lint-checkers->lint-checker-ids conn data) - (#((? number? id1) (? number? id2)) - (match (lint-checkers->lint-checker-ids conn data) - (#((? number? second-id1) (? number? second-id2)) - (and (= id1 second-id1) - (= id2 second-id2))))))) - #:always-rollback? #t)))) - -(test-end) diff --git a/tests/model-lint-warning-message.scm b/tests/model-lint-warning-message.scm deleted file mode 100644 index 88cedd1..0000000 --- a/tests/model-lint-warning-message.scm +++ /dev/null @@ -1,59 +0,0 @@ -(define-module (tests model-lint-warning-message) - #:use-module (srfi srfi-64) - #:use-module (ice-9 match) - #:use-module (guix-data-service database) - #:use-module (guix-data-service model lint-warning-message)) - -(test-begin "test-model-lint-warning-message") - -(define data - '(("en" . "Test message") - ("es" . "Test message in Spanish"))) - -(with-postgresql-connection - "test-model-lint-checker" - (lambda (conn) - (check-test-database! conn) - - (test-assert "single insert" - (with-postgresql-transaction - conn - (lambda (conn) - (match (lint-warning-message-data->lint-warning-message-ids conn data) - (#((? number? id1) (? number? id2)) - #t))) - #:always-rollback? #t)) - - (test-assert "double insert" - (with-postgresql-transaction - conn - (lambda (conn) - (match (lint-warning-message-data->lint-warning-message-ids conn data) - (#((? number? id1) (? number? id2)) - (match (lint-warning-message-data->lint-warning-message-ids conn data) - (#((? number? second-id1) (? number? second-id2)) - (and (= id1 second-id1) - (= id2 second-id2))))))) - #:always-rollback? #t)) - - (test-assert "single set insert" - (with-postgresql-transaction - conn - (lambda (conn) - (match (lint-warning-message-data->lint-warning-message-set-id conn data) - ((? number? id1) - #t))) - #:always-rollback? #t)) - - (test-assert "double set insert" - (with-postgresql-transaction - conn - (lambda (conn) - (match (lint-warning-message-data->lint-warning-message-set-id conn data) - ((? number? id) - (match (lint-warning-message-data->lint-warning-message-set-id conn data) - ((? number? second-id) - (= id second-id)))))) - #:always-rollback? #t)))) - -(test-end) diff --git a/tests/model-package-metadata.scm b/tests/model-package-metadata.scm deleted file mode 100644 index 5e9c897..0000000 --- a/tests/model-package-metadata.scm +++ /dev/null @@ -1,98 +0,0 @@ -(define-module (test-model-package-metadata) - #:use-module (ice-9 match) - #:use-module (srfi srfi-64) - #:use-module (guix utils) - #:use-module (guix tests) - #:use-module (tests mock-inferior) - #:use-module (guix-data-service model license) - #:use-module (guix-data-service model license-set) - #:use-module (guix-data-service model package-metadata) - #:use-module (guix-data-service database)) - -(test-begin "test-model-package-metadata") - -(define mock-inferior-package-foo - (mock-inferior-package - (name "foo") - (version "2") - (synopsis "Foo") - (description "Foo description") - (home-page "https://example.com") - (location (location "file.scm" 5 0)))) - -(define mock-inferior-package-foo-2 - (mock-inferior-package - (name "foo") - (version "2") - (synopsis "Foo") - (description "Foo description") - (home-page #f) - (location #f))) - -(define mock-inferior-packages - (list mock-inferior-package-foo - mock-inferior-package-foo-2)) - -(define mock-package-metadata - (list->vector - (map (lambda (mock-inf-pkg) - (list - (mock-inferior-package-home-page mock-inf-pkg) - (mock-inferior-package-location mock-inf-pkg) - `(("en_US.UTF-8" . "Fake synopsis")) - `(("en_US.UTF-8" . "Fake description")))) - mock-inferior-packages))) - -(define (test-license-set-ids conn) - (let ((license-id-lists - (inferior-packages->license-id-lists - conn - '#((("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")) - (("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")))))) - - (inferior-packages->license-set-ids conn license-id-lists))) - -(with-mock-inferior-packages - (lambda () - (use-modules (guix-data-service model package) - (guix-data-service model git-repository) - (guix-data-service model guix-revision) - (guix-data-service model package-metadata)) - - (with-postgresql-connection - "test-model-package-metadata" - (lambda (conn) - (check-test-database! conn) - - (test-assert "inferior-packages->package-metadata-ids" - (with-postgresql-transaction - conn - (lambda (conn) - (match - (inferior-packages->package-metadata-ids - conn - mock-package-metadata - (test-license-set-ids conn)) - (#(x y) (and (number? x) - (number? y))))) - #:always-rollback? #t)) - - (with-postgresql-transaction - conn - (lambda (conn) - (test-equal "inferior-packages->package-metadata-ids" - (inferior-packages->package-metadata-ids - conn - mock-package-metadata - (test-license-set-ids conn)) - (inferior-packages->package-metadata-ids - conn - mock-package-metadata - (test-license-set-ids conn))) - #:always-rollback? #t)))))) - -(test-end) diff --git a/tests/model-package.scm b/tests/model-package.scm deleted file mode 100644 index f58b887..0000000 --- a/tests/model-package.scm +++ /dev/null @@ -1,125 +0,0 @@ -(define-module (test-model-package) - #:use-module (ice-9 match) - #:use-module (srfi srfi-1) - #:use-module (srfi srfi-64) - #:use-module (guix utils) - #:use-module (guix tests) - #:use-module (tests mock-inferior) - #:use-module (guix-data-service model utils) - #:use-module (guix-data-service model license) - #:use-module (guix-data-service model license-set) - #:use-module (guix-data-service model package) - #:use-module (guix-data-service model package-metadata) - #:use-module (guix-data-service database)) - -(test-begin "test-model-package") - -(define mock-inferior-package-foo - (mock-inferior-package - (name "foo") - (version "2") - (synopsis "Foo") - (description "Foo description") - (home-page "https://example.com") - (location (location "file.scm" 5 0)))) - -(define mock-inferior-package-foo-2 - (mock-inferior-package - (name "foo") - (version "2") - (synopsis "Foo") - (description "Foo description") - (home-page #f) - (location #f))) - -(define (test-license-set-ids conn) - (let ((license-id-lists - (inferior-packages->license-id-lists - conn - '#((("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")) - (("License 1" - "https://gnu.org/licenses/test-1.html" - "https://example.com/why-license-1")))))) - - (inferior-packages->license-set-ids conn license-id-lists))) - -(define mock-inferior-packages - (list mock-inferior-package-foo - mock-inferior-package-foo-2)) - -(define mock-package-metadata - (list->vector - (map (lambda (mock-inf-pkg) - (list - (mock-inferior-package-home-page mock-inf-pkg) - (mock-inferior-package-location mock-inf-pkg) - `(("en_US.UTF-8" . "Fake synopsis")) - `(("en_US.UTF-8" . "Fake description")))) - mock-inferior-packages))) - -(with-mock-inferior-packages - (lambda () - (use-modules (guix-data-service model package) - (guix-data-service model git-repository) - (guix-data-service model guix-revision) - (guix-data-service model package-metadata)) - - (with-postgresql-connection - "test-model-package" - (lambda (conn) - (check-test-database! conn) - - (with-postgresql-transaction - conn - (lambda (conn) - (test-assert "inferior-packages->package-ids works once" - (let ((package-metadata-ids - (inferior-packages->package-metadata-ids - conn - mock-package-metadata - (test-license-set-ids conn))) - (package-replacement-package-ids - (make-list (length mock-inferior-packages) - (cons "integer" NULL)))) - (match (inferior-packages->package-ids - conn - (list->vector - (zip (map mock-inferior-package-name mock-inferior-packages) - (map mock-inferior-package-version mock-inferior-packages) - (vector->list package-metadata-ids) - package-replacement-package-ids))) - (#(x y) (and (number? x) - (number? y))))))) - #:always-rollback? #t) - - (with-postgresql-transaction - conn - (lambda (conn) - (let ((package-metadata-ids - (inferior-packages->package-metadata-ids - conn - mock-package-metadata - (test-license-set-ids conn))) - (package-replacement-package-ids - (make-list (length mock-inferior-packages) - (cons "integer" NULL)))) - (test-equal "inferior-packages->package-ids is idempotent" - (inferior-packages->package-ids - conn - (list->vector - (zip (map mock-inferior-package-name mock-inferior-packages) - (map mock-inferior-package-version mock-inferior-packages) - (vector->list package-metadata-ids) - package-replacement-package-ids))) - (inferior-packages->package-ids - conn - (list->vector - (zip (map mock-inferior-package-name mock-inferior-packages) - (map mock-inferior-package-version mock-inferior-packages) - (vector->list package-metadata-ids) - package-replacement-package-ids)))))) - #:always-rollback? #t))))) - -(test-end)