From 1cfc66a13f4fdada81fcf1c0a2cfe62aa5a66087 Mon Sep 17 00:00:00 2001 From: Alexander Kiel Date: Mon, 3 Feb 2025 15:23:09 +0100 Subject: [PATCH] Fix Issues with Rebuilding the PatientLastChange Index Closes: #2372 --- .github/workflows/build.yml | 3 ++ .../db/impl/index/patient_last_change.clj | 2 +- modules/db/src/blaze/db/node.clj | 42 +++++-------------- .../db/node/patient_last_change_index.clj | 13 ------ .../node/patient_last_change_index_spec.clj | 14 ------- .../node/patient_last_change_index_test.clj | 42 ------------------- 6 files changed, 14 insertions(+), 102 deletions(-) delete mode 100644 modules/db/src/blaze/db/node/patient_last_change_index.clj delete mode 100644 modules/db/test/blaze/db/node/patient_last_change_index_spec.clj delete mode 100644 modules/db/test/blaze/db/node/patient_last_change_index_test.clj diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9e73d7d05..e8716345c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -2077,6 +2077,9 @@ jobs: - name: Wait for Blaze run: .github/scripts/wait-for-url.sh http://localhost:8080/health + - name: Docker Logs + run: docker logs blaze + - name: Ensure that the State of PatientLastChange Index is Current run: .github/scripts/check-patient-last-change-index-state.sh current diff --git a/modules/db/src/blaze/db/impl/index/patient_last_change.clj b/modules/db/src/blaze/db/impl/index/patient_last_change.clj index 8f5421678..88b7ba78e 100644 --- a/modules/db/src/blaze/db/impl/index/patient_last_change.clj +++ b/modules/db/src/blaze/db/impl/index/patient_last_change.clj @@ -45,7 +45,7 @@ (bb/put-long! t) bb/array))) -(defn decode-state [bytes] +(defn- decode-state [bytes] (let [buf (bb/wrap bytes)] (if (zero? (bb/get-byte! buf)) {:type :current} diff --git a/modules/db/src/blaze/db/node.clj b/modules/db/src/blaze/db/node.clj index fd01c1b96..d163bf975 100644 --- a/modules/db/src/blaze/db/node.clj +++ b/modules/db/src/blaze/db/node.clj @@ -16,7 +16,6 @@ [blaze.db.impl.index.tx-success :as tx-success] [blaze.db.impl.protocols :as p] [blaze.db.kv :as kv] - [blaze.db.node.patient-last-change-index :as node-plc] [blaze.db.node.protocols :as np] [blaze.db.node.resource-indexer :as resource-indexer] [blaze.db.node.resource-indexer.spec] @@ -494,40 +493,19 @@ (fn sync-standalone [^Node node] (ac/completed-future (db/db node (:t @(.-state node))))))) -(defn- index-patient-last-change-index! - [{:keys [kv-store] :as node} current-t {:keys [t] :as tx-data}] - (log/trace "Build PatientLastChange index with t =" t) - (when-ok [entries (node-plc/index-entries node tx-data)] - (store-tx-entries! kv-store entries)) - (vreset! current-t t)) - -(defn- poll-and-index-patient-last-change-index! - [node queue current-t poll-timeout] - (run! (partial index-patient-last-change-index! node current-t) - (poll-tx-queue! queue poll-timeout))) +(defn- initial-plc-index-entries [{:keys [state] :as node}] + (into + [(plc/state-index-entry {:type :current})] + (map (fn [{:keys [id]}] (plc/index-entry (codec/id-byte-string id) (:t @state)))) + (d/type-list (d/db node) "Patient"))) (defn build-patient-last-change-index - [key {:keys [tx-log kv-store run? state poll-timeout] :as node}] - (let [{:keys [type t]} (plc/state kv-store)] + [key {:keys [kv-store] :as node}] + (let [{:keys [type]} (plc/state kv-store)] (when (identical? :building type) - (let [start-t (inc t) - end-t (:t @state) - current-t (volatile! start-t)] - (log/info "Building PatientLastChange index of" (node-util/component-name key "node") "starting at t =" start-t) - (with-open [queue (tx-log/new-queue tx-log start-t)] - (while (and @run? (< @current-t end-t)) - (try - (poll-and-index-patient-last-change-index! node queue current-t poll-timeout) - (catch Exception e - (log/error (format "Error while building the PatientLastChange index of %s." - (node-util/component-name key "node")) e))))) - (if (>= @current-t end-t) - (do - (store-tx-entries! kv-store [(plc/state-index-entry {:type :current})]) - (log/info (format "Finished building PatientLastChange index of %s." (node-util/component-name key "node")))) - (log/info (format "Partially build PatientLastChange index of %s up to t =" - (node-util/component-name key "node")) @current-t - "at a goal of t =" end-t "Will continue at next start.")))))) + (log/info "Building PatientLastChange index of" (node-util/component-name key "node")) + (store-tx-entries! kv-store (initial-plc-index-entries node)) + (log/info (format "Finished building PatientLastChange index of %s." (node-util/component-name key "node")))))) (defmethod m/pre-init-spec :blaze.db/node [_] (s/keys diff --git a/modules/db/src/blaze/db/node/patient_last_change_index.clj b/modules/db/src/blaze/db/node/patient_last_change_index.clj deleted file mode 100644 index 9245d6f2c..000000000 --- a/modules/db/src/blaze/db/node/patient_last_change_index.clj +++ /dev/null @@ -1,13 +0,0 @@ -(ns blaze.db.node.patient-last-change-index - (:require - [blaze.anomaly :refer [when-ok]] - [blaze.db.impl.db :as db] - [blaze.db.impl.index.patient-last-change :as plc] - [blaze.db.node.tx-indexer :as tx-indexer])) - -(defn index-entries - {:arglists '([node tx-data])} - [node {:keys [t] :as tx-data}] - (when-ok [entries (tx-indexer/index-tx (db/db node (dec t)) tx-data)] - (-> (filterv (comp #{:patient-last-change-index} first) entries) - (conj (plc/state-index-entry {:type :building :t t}))))) diff --git a/modules/db/test/blaze/db/node/patient_last_change_index_spec.clj b/modules/db/test/blaze/db/node/patient_last_change_index_spec.clj deleted file mode 100644 index 038817cb2..000000000 --- a/modules/db/test/blaze/db/node/patient_last_change_index_spec.clj +++ /dev/null @@ -1,14 +0,0 @@ -(ns blaze.db.node.patient-last-change-index-spec - (:require - [blaze.db.kv.spec] - [blaze.db.node.patient-last-change-index :as node-plc] - [blaze.db.node.tx-indexer-spec] - [blaze.db.spec] - [blaze.db.tx-log.spec] - [clojure.spec.alpha :as s] - [cognitect.anomalies :as anom])) - -(s/fdef node-plc/index-entries - :args (s/cat :node :blaze.db/node - :tx-data :blaze.db/tx-data) - :ret (s/or :entries (s/coll-of :blaze.db.kv/put-entry) :anomaly ::anom/anomaly)) diff --git a/modules/db/test/blaze/db/node/patient_last_change_index_test.clj b/modules/db/test/blaze/db/node/patient_last_change_index_test.clj deleted file mode 100644 index 189b1f30a..000000000 --- a/modules/db/test/blaze/db/node/patient_last_change_index_test.clj +++ /dev/null @@ -1,42 +0,0 @@ -(ns blaze.db.node.patient-last-change-index-test - (:require - [blaze.db.impl.index.patient-last-change :as plc] - [blaze.db.impl.index.patient-last-change-test-util :as plc-tu] - [blaze.db.node.patient-last-change-index :as node-plc] - [blaze.db.node.patient-last-change-index-spec] - [blaze.db.test-util :refer [config with-system-data]] - [blaze.fhir.hash :as hash] - [blaze.test-util :as tu] - [clojure.spec.test.alpha :as st] - [clojure.test :as test :refer [deftest]] - [juxt.iota :refer [given]]) - (:import - [java.nio.charset StandardCharsets] - [java.time Instant])) - -(st/instrument) - -(test/use-fixtures :each tu/fixture) - -(def patient-0 {:fhir/type :fhir/Patient :id "0"}) -(def observation-0 {:fhir/type :fhir/Observation :id "0" - :subject #fhir/Reference{:reference "Patient/0"}}) -(def hash-observation-0 (hash/generate observation-0)) - -(deftest patient-last-change-index-entries-test - (with-system-data [{:blaze.db/keys [node]} config] - [[[:put patient-0]]] - - (given (node-plc/index-entries - node - {:t 2 - :instant Instant/EPOCH - :tx-cmds [{:op "put" :type "Observation" :id "0" - :hash hash-observation-0 :refs [["Patient" "0"]]}]}) - count := 2 - [0 0] := :patient-last-change-index - [0 1 plc-tu/decode-key] := {:patient-id "0" :t 2} - - [1 0] := :default - [1 1 #(String. ^bytes % StandardCharsets/ISO_8859_1)] := "patient-last-change-state" - [1 2 plc/decode-state] := {:type :building :t 2})))