Skip to content

Commit

Permalink
Merge pull request #835 from microbiomedata/831-implement-refscan-bas…
Browse files Browse the repository at this point in the history
…ed-real-time-referential-integrity-validation-in-runtime

Implement `refscan`-based real-time referential integrity checking on `/metadata/json:validate`
  • Loading branch information
eecavanna authored Jan 14, 2025
2 parents 3e7a36a + 90c7070 commit d3e146b
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 62 deletions.
2 changes: 1 addition & 1 deletion nmdc_runtime/api/endpoints/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def validate_json_nmdcdb(docs: dict, mdb: MongoDatabase = Depends(get_mong
"""

return validate_json(docs, mdb)
return validate_json(docs, mdb, check_inter_document_references=True)


@router.post("/metadata/json:submit", name="Submit JSON")
Expand Down
16 changes: 16 additions & 0 deletions nmdc_runtime/api/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
from contextlib import asynccontextmanager
from functools import cache
from importlib import import_module
from importlib.metadata import version
from typing import Annotated
Expand All @@ -19,6 +20,7 @@

from nmdc_runtime.api.analytics import Analytics
from nmdc_runtime.util import (
get_allowed_references,
ensure_unique_id_indexes,
REPO_ROOT_DIR,
)
Expand Down Expand Up @@ -356,9 +358,23 @@ def ensure_default_api_perms():

@asynccontextmanager
async def lifespan(app: FastAPI):
r"""
Prepares the application to receive requests.
From the [FastAPI documentation](https://fastapi.tiangolo.com/advanced/events/#lifespan-function):
> You can define logic (code) that should be executed before the application starts up. This means that
> this code will be executed once, before the application starts receiving requests.
Note: Based on my own observations, I think this function gets called when the first request starts coming in,
but not before that (i.e. not when the application is idle before any requests start coming in).
"""
ensure_initial_resources_on_boot()
ensure_attribute_indexes()
ensure_default_api_perms()

# Invoke a function—thereby priming its memoization cache—in order to speed up all future invocations.
get_allowed_references() # we ignore the return value here

yield


Expand Down
148 changes: 147 additions & 1 deletion nmdc_runtime/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from pydantic import Field, BaseModel
from pymongo.database import Database as MongoDatabase
from pymongo.errors import OperationFailure
from refscan.lib.helpers import identify_references
from refscan.lib.Finder import Finder
from refscan.lib.ReferenceList import ReferenceList
from refscan.scanner import scan_outgoing_references
from toolz import merge, unique

from nmdc_runtime.api.core.util import sha256hash_from_file
Expand Down Expand Up @@ -120,6 +124,23 @@ def get_class_names_from_collection_spec(
return class_names


@lru_cache
def get_allowed_references() -> ReferenceList:
r"""
Returns a `ReferenceList` of all the inter-document references that
the NMDC Schema allows a schema-compliant MongoDB database to contain.
"""

# Identify the inter-document references that the schema allows a database to contain.
print("Identifying schema-allowed references.")
references = identify_references(
schema_view=nmdc_schema_view(),
collection_name_to_class_names=collection_name_to_class_names,
)

return references


@lru_cache
def get_type_collections() -> dict:
"""Returns a dictionary mapping class names to Mongo collection names."""
Expand Down Expand Up @@ -353,6 +374,14 @@ def nmdc_database_collection_instance_class_names():

@lru_cache
def nmdc_database_collection_names():
r"""
TODO: Document this function.
TODO: Assuming this function was designed to return a list of names of all Database slots that represents database
collections, use the function named `get_collection_names_from_schema` in `nmdc_runtime/api/db/mongo.py`
instead, since (a) it includes documentation and (b) it performs the additional checks the lead schema
maintainer expects (e.g. checking whether a slot is `multivalued` and `inlined_as_list`).
"""
names = []
view = nmdc_schema_view()
all_classes = set(view.all_classes())
Expand Down Expand Up @@ -541,6 +570,13 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self._bottom_db.client.drop_database(self._top_db.name)

def get_collection(self, coll_name: str):
r"""Returns a reference to the specified collection."""
try:
return self._top_db[coll_name]
except OperationFailure as e:
raise OverlayDBError(str(e.details))

def replace_or_insert_many(self, coll_name, documents: list):
try:
self._top_db[coll_name].insert_many(documents)
Expand Down Expand Up @@ -591,14 +627,42 @@ def merge_find(self, coll_name, find_spec: dict):
yield doc


def validate_json(in_docs: dict, mdb: MongoDatabase):
def validate_json(
in_docs: dict, mdb: MongoDatabase, check_inter_document_references: bool = False
):
r"""
Checks whether the specified dictionary represents a valid instance of the `Database` class
defined in the NMDC Schema. Referential integrity checking is performed on an opt-in basis.
Example dictionary:
{
"biosample_set": [
{"id": "nmdc:bsm-00-000001", ...},
{"id": "nmdc:bsm-00-000002", ...}
],
"study_set": [
{"id": "nmdc:sty-00-000001", ...},
{"id": "nmdc:sty-00-000002", ...}
]
}
:param in_docs: The dictionary you want to validate
:param mdb: A reference to a MongoDB database
:param check_inter_document_references: Whether you want this function to check whether every document that
is referenced by any of the documents passed in would, indeed, exist
in the database, if the documents passed in were to be inserted into
the database. In other words, set this to `True` if you want this
function to perform referential integrity checks.
"""
validator = Draft7Validator(get_nmdc_jsonschema_dict())
docs = deepcopy(in_docs)
validation_errors = {}

known_coll_names = set(nmdc_database_collection_names())
for coll_name, coll_docs in docs.items():
if coll_name not in known_coll_names:
# FIXME: Document what `@type` is (conceptually; e.g., why this function accepts it as a collection name).
# See: https://github.com/microbiomedata/nmdc-runtime/discussions/858
if coll_name == "@type" and coll_docs in ("Database", "nmdc:Database"):
continue
else:
Expand Down Expand Up @@ -631,6 +695,88 @@ def validate_json(in_docs: dict, mdb: MongoDatabase):
except Exception as e:
return {"result": "errors", "detail": str(e)}

# Third pass (if enabled): Check inter-document references.
if check_inter_document_references is True:
# Insert all documents specified for all collections specified, into the OverlayDB.
#
# Note: This will allow us to validate referential integrity in the database's _final_ state. If we were to,
# instead, validate it after processing _each_ collection, we would get a false positive if a document
# inserted into an earlier-processed collection happened to reference a document slated for insertion
# into a later-processed collection. By waiting until all documents in all collections specified have
# been inserted, we avoid that situation.
#
with OverlayDB(mdb) as overlay_db:
print(f"Inserting documents into the OverlayDB.")
for collection_name, documents_to_insert in docs.items():
# Insert the documents into the OverlayDB.
#
# Note: The `isinstance(..., list)` check is here to work around the fact that the previous
# validation stages allow for the request payload to specify a collection named "@type" whose
# value is a string, as opposed to a list of dictionaries.
#
# I don't know why those stages do that. I posed the question in this GitHub Discussion:
# https://github.com/microbiomedata/nmdc-runtime/discussions/858
#
# The `len(...) > 0` check is here because pymongo complains when `insert_many` is called
# with an empty list.
#
if (
isinstance(documents_to_insert, list)
and len(documents_to_insert) > 0
):
try:
overlay_db.replace_or_insert_many(
collection_name, documents_to_insert
)
except OverlayDBError as error:
validation_errors[collection_name].append(str(error))

# Now that the OverlayDB contains all the specified documents, we will check whether
# every document referenced by any of the inserted documents exists.
finder = Finder(database=overlay_db)
references = get_allowed_references()
reference_field_names_by_source_class_name = (
references.get_reference_field_names_by_source_class_name()
)
for source_collection_name, documents_inserted in docs.items():
# If `documents_inserted` is not a list (which is a scenario that the previous validation stages
# allow), abort processing this collection and proceed to processing the next collection.
if not isinstance(documents_inserted, list):
continue

# Check the referential integrity of the replaced or inserted documents.
print(
f"Checking references emanating from documents inserted into '{source_collection_name}'."
)
for document in documents_inserted:
violations = scan_outgoing_references(
document=document,
schema_view=nmdc_schema_view(),
reference_field_names_by_source_class_name=reference_field_names_by_source_class_name,
references=references,
finder=finder,
collection_names=nmdc_database_collection_names(),
source_collection_name=source_collection_name,
user_wants_to_locate_misplaced_documents=False,
)
for violation in violations:
violation_as_str = (
f"Document '{violation.source_document_id}' "
f"in collection '{violation.source_collection_name}' "
f"has a field '{violation.source_field_name}' that "
f"references a document having id "
f"'{violation.target_id}', but the latter document "
f"does not exist in any of the collections the "
f"NMDC Schema says it can exist in."
)
validation_errors[source_collection_name].append(
violation_as_str
)

# If any collection's error list is not empty, return an error response.
if any(len(v) > 0 for v in validation_errors.values()):
return {"result": "errors", "detail": validation_errors}

return {"result": "All Okay!"}
else:
return {"result": "errors", "detail": validation_errors}
3 changes: 3 additions & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ python-jose[cryptography]
# Reference: https://github.com/microbiomedata/nmdc-runtime/security/dependabot/8
python-multipart>=0.0.18
pyyaml
# Note: We use `refscan` to get information about inter-document references from the schema and database.
# Reference: https://pypi.org/project/refscan/
refscan==0.2.0
requests
semver
setuptools-scm
Expand Down
Loading

0 comments on commit d3e146b

Please sign in to comment.