Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add class with helper methods to fill in missing records in database #856

Merged
merged 9 commits into from
Jan 11, 2025
16 changes: 16 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
get_ncbi_export_pipeline_inputs,
ncbi_submission_xml_from_nmdc_study,
ncbi_submission_xml_asset,
get_database_updater_inputs,
nmdc_study_id_filename,
missing_data_generation_repair,
)
from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id

Expand Down Expand Up @@ -467,3 +470,16 @@ def nmdc_study_to_ncbi_submission_export():
all_instruments,
)
ncbi_submission_xml_asset(xml_data)


@graph
def fill_missing_data_generation_data_object_records():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me "fill" implies that it's going to actually commit the results to Mongo. Is there any reason this Graph doesn't do that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, so we want to see the JSON export that this job produces, visually QC it and then ingest it into the system. So it's not doing the "filling" automatically. Is "stitch_" a good prefix?

(study_id, gold_nmdc_instrument_mapping_file_url) = get_database_updater_inputs()
gold_nmdc_instrument_map_df = get_df_from_url(gold_nmdc_instrument_mapping_file_url)

database = missing_data_generation_repair(study_id, gold_nmdc_instrument_map_df)

database_dict = nmdc_schema_object_to_dict(database)
filename = nmdc_study_id_filename(study_id)
outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)
62 changes: 61 additions & 1 deletion nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@
from nmdc_runtime.site.translation.submission_portal_translator import (
SubmissionPortalTranslator,
)
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.site.repair.database_updater import DatabaseUpdater
from nmdc_runtime.site.util import (
run_and_log,
schema_collection_has_index_on_id,
nmdc_study_id_to_filename,
)
from nmdc_runtime.util import (
drs_object_in_for,
get_names_of_classes_in_effective_range_of_slot,
Expand Down Expand Up @@ -1241,3 +1246,58 @@ def ncbi_submission_xml_from_nmdc_study(
all_instruments,
)
return ncbi_xml


@op
def nmdc_study_id_filename(nmdc_study_id: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a more specific name since it produces a very specific file name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address it!

filename = nmdc_study_id_to_filename(nmdc_study_id)
return f"missing_database_records_for_{filename}.json"


@op(
config_schema={
"nmdc_study_id": str,
"gold_nmdc_instrument_mapping_file_url": str,
},
out={
"nmdc_study_id": Out(str),
"gold_nmdc_instrument_mapping_file_url": Out(str),
},
)
def get_database_updater_inputs(context: OpExecutionContext) -> Tuple[str, str]:
return (
context.op_config["nmdc_study_id"],
context.op_config["gold_nmdc_instrument_mapping_file_url"],
)


@op(
required_resource_keys={
"runtime_api_user_client",
"runtime_api_site_client",
"gold_api_client",
}
)
def missing_data_generation_repair(
context: OpExecutionContext,
nmdc_study_id: str,
gold_nmdc_instrument_map_df: pd.DataFrame,
) -> nmdc.Database:
runtime_api_user_client: RuntimeApiUserClient = (
context.resources.runtime_api_user_client
)
runtime_api_site_client: RuntimeApiSiteClient = (
context.resources.runtime_api_site_client
)
gold_api_client: GoldApiClient = context.resources.gold_api_client

database_updater = DatabaseUpdater(
runtime_api_user_client,
runtime_api_site_client,
gold_api_client,
nmdc_study_id,
gold_nmdc_instrument_map_df,
)
database = database_updater.create_missing_dg_records()

return database
Empty file.
151 changes: 151 additions & 0 deletions nmdc_runtime/site/repair/database_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from functools import lru_cache
from typing import Any, Dict, List
import pandas as pd
from nmdc_runtime.site.resources import (
RuntimeApiUserClient,
RuntimeApiSiteClient,
GoldApiClient,
)
from nmdc_runtime.site.translation.gold_translator import GoldStudyTranslator
from nmdc_schema import nmdc


class DatabaseUpdater:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in this being a class? Since it's not inheriting from anything and the internal state never really gets modified, could create_missing_dg_records just be a top-level function instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, there will be other methods that I add to this "interface" (class) in the future, so we have to decide between "collecting" all of these methods under that class or have them be top level functions?

def __init__(
self,
runtime_api_user_client: RuntimeApiUserClient,
runtime_api_site_client: RuntimeApiSiteClient,
gold_api_client: GoldApiClient,
study_id: str,
gold_nmdc_instrument_map_df: pd.DataFrame = pd.DataFrame(),
):
"""This class serves as an API for repairing connections in the database by
adding records that are essentially missing "links"/"connections". As we identify
common use cases for adding missing records to the database, we can
add helper methods to this class.

:param runtime_api_user_client: An object of RuntimeApiUserClient which can be
used to retrieve instance records from the NMDC database.
:param runtime_api_site_client: An object of RuntimeApiSiteClient which can be
used to mint new IDs for the repaired records that need to be added into the NMDC database.
:param gold_api_client: An object of GoldApiClient which can be used to retrieve
records from GOLD via the GOLD API.
:param study_id: NMDC study ID for which the missing records need to be added.
:param gold_nmdc_instrument_map_df: A dataframe originally stored as a TSV mapping file in the
NMDC schema repo, which maps GOLD instrument IDs to IDs of NMDC instrument_set records.
"""
self.runtime_api_user_client = runtime_api_user_client
self.runtime_api_site_client = runtime_api_site_client
self.gold_api_client = gold_api_client
self.study_id = study_id
self.gold_nmdc_instrument_map_df = gold_nmdc_instrument_map_df

@lru_cache
def _fetch_gold_biosample(self, gold_biosample_id: str) -> List[Dict[str, Any]]:
"""Fetch response from GOLD /biosamples API for a given biosample id.

:param gold_biosample_id: GOLD biosample ID.
:return: Dictionary containing the response from the GOLD /biosamples API.
"""
return self.gold_api_client.fetch_biosample_by_biosample_id(gold_biosample_id)

@lru_cache
def _fetch_gold_projects(self, gold_biosample_id: str):
"""Fetch response from GOLD /projects API for a given biosample id.

:param gold_biosample_id: GOLD biosample ID
:return: Dictionary containing the response from the GOLD /projects API.
"""
return self.gold_api_client.fetch_projects_by_biosample(gold_biosample_id)

@lru_cache
def create_missing_dg_records(self):
sujaypatil96 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above comment, I'm a little hesitant about the "missing" part of this name. To me this is generate_data_generation_set_from_gold_api.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! "missing" is a little misleading, "generate" is better.

"""This method creates missing data generation records for a given study in the NMDC database using
metadata from GOLD. The way the logic works is, it first fetches all the biosamples associated
with the study from the NMDC database. Then, it fetches all the biosample and project data data
associated with the individual biosamples from the GOLD API using the NMDC-GOLD biosample id
mappings on the "gold_biosample_identifiers" key/slot. We use the GoldStudyTranslator class
to mint the required number of `nmdc:DataGeneration` (`nmdc:NucleotideSequencing`) records based
on the number of GOLD sequencing projects, and then reimplement only the part of logic from that
class which is responsible for making data_generation_set records.

:return: An instance of `nmdc:Database` object which is JSON-ified and rendered on the frontend.
"""
database = nmdc.Database()

biosample_set = self.runtime_api_user_client.get_biosamples_for_study(
self.study_id
)

all_gold_biosamples = []
all_gold_projects = []
for biosample in biosample_set:
gold_biosample_identifiers = biosample.get("gold_biosample_identifiers")
if gold_biosample_identifiers:
for gold_biosample_id in gold_biosample_identifiers:
gold_biosample = self._fetch_gold_biosample(gold_biosample_id)[0]
gold_projects = self._fetch_gold_projects(gold_biosample_id)
gold_biosample["projects"] = gold_projects

all_gold_biosamples.append(gold_biosample)
all_gold_projects.extend(gold_projects)

gold_study_translator = GoldStudyTranslator(
biosamples=all_gold_biosamples,
projects=all_gold_projects,
gold_nmdc_instrument_map_df=self.gold_nmdc_instrument_map_df,
)

# The GoldStudyTranslator class has some pre-processing logic which filters out
# invalid biosamples and projects (based on `sequencingStrategy`, `projectStatus`, etc.)
filtered_biosamples = gold_study_translator.biosamples
filtered_projects = gold_study_translator.projects

gold_project_ids = [project["projectGoldId"] for project in filtered_projects]
nmdc_nucleotide_sequencing_ids = self.runtime_api_site_client.mint_id(
"nmdc:NucleotideSequencing", len(gold_project_ids)
).json()
gold_project_to_nmdc_nucleotide_sequencing_ids = dict(
zip(gold_project_ids, nmdc_nucleotide_sequencing_ids)
)

gold_to_nmdc_biosample_ids = {}

for biosample in biosample_set:
gold_ids = biosample.get("gold_biosample_identifiers", [])
for gold_id in gold_ids:
gold_id_stripped = gold_id.replace("gold:", "")
gold_to_nmdc_biosample_ids[gold_id_stripped] = biosample["id"]

database.data_generation_set = []
# Similar to the logic in GoldStudyTranslator, the number of nmdc:NucleotideSequencing records
# created is based on the number of GOLD sequencing projects
for project in filtered_projects:
# map the projectGoldId to the NMDC biosample ID
biosample_gold_id = next(
(
biosample["biosampleGoldId"]
for biosample in filtered_biosamples
if any(
p["projectGoldId"] == project["projectGoldId"]
for p in biosample.get("projects", [])
)
),
None,
)

if biosample_gold_id:
nmdc_biosample_id = gold_to_nmdc_biosample_ids.get(biosample_gold_id)
if nmdc_biosample_id:
database.data_generation_set.append(
gold_study_translator._translate_nucleotide_sequencing(
project,
nmdc_nucleotide_sequencing_id=gold_project_to_nmdc_nucleotide_sequencing_ids[
project["projectGoldId"]
],
nmdc_biosample_id=nmdc_biosample_id,
nmdc_study_id=self.study_id,
)
)

return database
49 changes: 49 additions & 0 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ingest_neon_surface_water_metadata,
ensure_alldocs,
nmdc_study_to_ncbi_submission_export,
fill_missing_data_generation_data_object_records,
)
from nmdc_runtime.site.resources import (
get_mongo,
Expand Down Expand Up @@ -922,6 +923,54 @@ def biosample_export():
]


@repository
def database_record_repair():
normal_resources = run_config_frozen__normal_env["resources"]
return [
fill_missing_data_generation_data_object_records.to_job(
resource_defs=resource_defs,
config={
"resources": merge(
unfreeze(normal_resources),
{
"runtime_api_user_client": {
"config": {
"base_url": {"env": "API_HOST"},
"username": {"env": "API_ADMIN_USER"},
"password": {"env": "API_ADMIN_PASS"},
},
},
"runtime_api_site_client": {
"config": {
"base_url": {"env": "API_HOST"},
"client_id": {"env": "API_SITE_CLIENT_ID"},
"client_secret": {"env": "API_SITE_CLIENT_SECRET"},
"site_id": {"env": "API_SITE_ID"},
},
},
"gold_api_client": {
"config": {
"base_url": {"env": "GOLD_API_BASE_URL"},
"username": {"env": "GOLD_API_USERNAME"},
"password": {"env": "GOLD_API_PASSWORD"},
},
},
},
),
"ops": {
"get_database_updater_inputs": {
"config": {
"nmdc_study_id": "",
"gold_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/gold_seqMethod_to_nmdc_instrument_set.tsv",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have already explained this to me and I've forgotten, but why do we keep this file in the nmdc-schema repository?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, that's a good question. I actually don't know why we keep it in the nmdc-schema repo. We were debating between having those files be "close" to the source files that use them in runtime or keep them in the schema repo, and I guess we just left the conversation there. We can talk about potentially moving them into this repo and have them be "close" to the Python scripts consuming them at our internal BBPO NMDC call.

}
},
"export_json_to_drs": {"config": {"username": ""}},
},
},
),
]


# @repository
# def validation():
# graph_jobs = [validate_jgi_job, validate_gold_job, validate_emsl_job]
Expand Down
29 changes: 24 additions & 5 deletions nmdc_runtime/site/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,23 @@ def get_omics_processing_records_by_gold_project_id(self, gold_project_id: str):
return response.json()["cursor"]["firstBatch"]

def get_biosamples_for_study(self, study_id: str):
# TODO: 10000 is an arbitrarily large number that has been chosen for the max_page_size param.
# The /nmdcschema/{collection-name} endpoint implements pagination via the page_token mechanism,
# but the tradeoff there is that we would need to make multiple requests to step through the
# each of the pages. By picking a large number for max_page_size, we can get all the results
# in a single request.
# This method previously used the /queries:run endpoint but the problem with that was that
# it used to truncate the number of results returned to 100.
response = self.request(
"POST",
f"/queries:run",
"GET",
f"/nmdcschema/biosample_set",
{
"find": "biosample_set",
"filter": {"part_of": {"$elemMatch": {"$eq": study_id}}},
"filter": json.dumps({"associated_studies": study_id}),
"max_page_size": 10000,
},
)
response.raise_for_status()
return response.json()["cursor"]["firstBatch"]
return response.json()["resources"]

def get_omics_processing_by_name(self, name: str):
response = self.request(
Expand Down Expand Up @@ -370,6 +377,18 @@ def fetch_study(self, id: str) -> Union[Dict[str, Any], None]:
return None
return results[0]

def fetch_projects_by_biosample(self, biosample_id: str) -> List[Dict[str, Any]]:
id = self._normalize_id(biosample_id)
results = self.request("/projects", params={"biosampleGoldId": id})
return results

def fetch_biosample_by_biosample_id(
self, biosample_id: str
) -> List[Dict[str, Any]]:
id = self._normalize_id(biosample_id)
results = self.request("/biosamples", params={"biosampleGoldId": id})
return results


@resource(
config_schema={
Expand Down
9 changes: 7 additions & 2 deletions nmdc_runtime/site/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
from functools import lru_cache
from subprocess import Popen, PIPE, STDOUT, CalledProcessError

from dagster import op
from functools import lru_cache
from pymongo.database import Database as MongoDatabase
from subprocess import Popen, PIPE, STDOUT, CalledProcessError

from nmdc_runtime.api.db.mongo import get_collection_names_from_schema
from nmdc_runtime.site.resources import mongo_resource
Expand Down Expand Up @@ -47,3 +48,7 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict:

def get_basename(filename: str) -> str:
return os.path.basename(filename)


def nmdc_study_id_to_filename(nmdc_study_id: str) -> str:
return nmdc_study_id.replace(":", "_").replace("-", "_")
2 changes: 1 addition & 1 deletion nmdc_runtime/site/workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ load_from:
attribute: biosample_submission_ingest
- python_package:
package_name: nmdc_runtime.site.repository
attribute: biosample_export
attribute: database_record_repair
# - python_package:
# package_name: nmdc_runtime.site.repository
# attribute: validation
Expand Down
Loading
Loading