diff --git a/consistency_checks.py b/consistency_checks.py index 2c7c8fc..17e9d0a 100644 --- a/consistency_checks.py +++ b/consistency_checks.py @@ -1,3 +1,4 @@ +import datetime import json import gzip import random @@ -203,6 +204,28 @@ def _skip_errors(self, stats): else: return True + def _output_errors(self, stats): + """Output errors to json file""" + out = stats.copy() + referencing = [] + duplicates = [] + missing = [] + for error in self.error_log: + if error.startswith("BODS referencing error"): + statement_id = error.split("Statement")[-1].split("not found")[0].strip() + referencing.append(statement_id) + if error.startswith("BODS duplicate error"): + statement_id = error.split("(")[-1].split(")")[0].strip() + duplicates.append(statement_id) + if error.startswith("BODS duplicate error"): + statement_id = error.split(":")[-1].split(")")[0].strip() + missing.append(statement_id) + out["ref_errors"] = referencing + out["dup_errors"] = duplicates + out["mis_errors"] = missing + with open(f"errors-{datetime.date.today().strftime('%d%m%y')}.json", "w") as out_file: + json.dump(out, out_file, indent = 4) + def _process_errors(self): """Check for any errors in log""" for error in self.error_log[:self.error_limit]: @@ -211,6 +234,7 @@ def _process_errors(self): output_text(self.console, f"{len(self.error_log)} errors: truncated at {self.error_limit}", "red") if len(self.error_log) > 0: stats = self._error_stats() + self._output_errors(stats) if not self._skip_errors(stats): estats = [] for e in stats: diff --git a/setup.py b/setup.py index fd32240..db341a9 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ "jsonref", "ipython", "flatterer", + "Werkzeug<3", "frozen-flask", "bootstrap-flask", "markdown", diff --git a/tests/test_stages.py b/tests/test_stages.py index 4a47763..b1541e3 100644 --- a/tests/test_stages.py +++ b/tests/test_stages.py @@ -139,7 +139,14 @@ def test_json_zip(self, temp_dir, output_dir, source_dir): data = output_file.readlines() print(data) assert len(data) == 20 - assert json.loads(data[0].strip())['interestedParty']['describedByPersonStatement'] == '14105856581894595060' + count = 0 + for d in data: + json_data = json.loads(d.strip()) + if json_data["statementID"] == "8359172029532323967": + count += 1 + print(json_data) + assert json_data['interestedParty']['describedByPersonStatement'] == '14105856581894595060' + assert count == 2 def test_sqlite_zip(self, temp_dir, output_dir, source_dir): """Test creation of output sqlite.db.gz file"""