From 1d3d887f4b92d27ef68a9352cdfa5aa4a189a054 Mon Sep 17 00:00:00 2001 From: Riley Grant Date: Mon, 18 Nov 2024 16:29:15 -0600 Subject: [PATCH] fixup(data-pipeline): PR review, squash into add test file commit to remove from PR entirely --- .../pipeline/test_new_clinvar_xml_format.py | 293 ------------------ 1 file changed, 293 deletions(-) delete mode 100644 data-pipeline/tests/pipeline/test_new_clinvar_xml_format.py diff --git a/data-pipeline/tests/pipeline/test_new_clinvar_xml_format.py b/data-pipeline/tests/pipeline/test_new_clinvar_xml_format.py deleted file mode 100644 index de931daa4..000000000 --- a/data-pipeline/tests/pipeline/test_new_clinvar_xml_format.py +++ /dev/null @@ -1,293 +0,0 @@ -import gzip -import xml.etree.ElementTree as ET -from pathlib import Path -from typing import Iterator, Optional -from tqdm import tqdm -import datetime -import sys - - -# programatically find project root from whatever dir you're in -project_root = Path(__file__).parent -while project_root.name != "gnomad-browser": - project_root = project_root.parent - -sys.path.append(str(project_root / "data-pipeline" / "src")) -from data_pipeline.datasets.clinvar import parse_clinvar_xml_to_tsv, _parse_variant, SkipVariant - - -### === Functionality to downsample ClinVar for use in a small test dataset - - -def count_records(gz_path: Path, target_element: str = "VariationArchive") -> int: - """Count total number of records in the XML file.""" - - print("\nStarting count_records") - - count = 0 - print("Counting total records...") - with tqdm(unit=" records") as pbar: - for _ in stream_xml_elements(gz_path, target_element): - count += 1 - pbar.update(1) - print(count) - return count - - -def stream_xml_elements(gz_path: Path, target_element: str = "VariationArchive") -> Iterator[ET.Element]: - """Stream elements from a gzipped XML file to avoid loading entire file into memory""" - print("\nStarting stream_xml_elements") - print("Opening gzipped file...") - - f = gzip.open(gz_path, "rt") - print("Creating parser...") - - context = ET.iterparse(f, events=("end",)) - print("Starting to iterate...") - - count = 0 - for _, elem in context: - if elem.tag == target_element: - yield elem - elem.clear() - count += 1 - if count % 1000 == 0: - print(f"Parsed {count} elements, current tag: {elem.tag}") - - -def create_subset( - input_file: Path, - output_file: Path, - sample_rate: int = 1000, - max_records: Optional[int] = None, - skip_count: bool = False, -) -> None: - """ - Create a subset of a ClinVar XML file by sampling every nth record or taking the first n records. - - Args: - input_file: Path to input gzipped XML file - output_file: Path to output gzipped XMl file - sample_rate: Take every nth record (default: 100) - max_records: Maximum number of records to include (optional) - skip_count: Skipi counting total records at start (optional) - """ - - print("\nStarting create_subset") - - # Get total count for progress bar if not skipped - total_records = None - if not skip_count: - total_records = count_records(input_file) - expected_records = total_records // sample_rate - if max_records: - expected_records = min(expected_records, max_records) - print(f"\nExpecting to keep {expected_records:,} records") - - count = 0 - kept = 0 - - # Start with output XMl file with the root element - with gzip.open(output_file, "wt") as out: - out.write('\n\n') - - pbar_desc = "Processing records" - with tqdm(total=total_records, desc=pbar_desc, unit=" records") as pbar: - - for element in stream_xml_elements(input_file): - count += 1 - - if count % sample_rate == 0: - kept += 1 - - xml_str = ET.tostring(element, encoding="unicode") - out.write(xml_str + "\n") - - if max_records and kept >= max_records: - break - - pbar.update(1) - pbar.set_postfix({"kept": kept}, refresh=True) - - out.write("") - - print(f"Finished! Processed {count:,} records, kept {kept:,}") - - -# === Test old function and new function produce same TSV - -old_format_clinvar_xml_local_path = ( - "/Users/rgrant/Documents/pseudo-desktop/clinvar_update/old_format/downsampled_1000_kept_10.xml.gz" -) -old_format_output_tsv_path = ( - "/Users/rgrant/Documents/pseudo-desktop/clinvar_update/old_format/output_downsampled_1000_kept_10.tsv" -) -old_format_parse_variant_function = _parse_variant - -print("\n\nrunning old...") -old_format_release_date = parse_clinvar_xml_to_tsv( - input_xml_path=old_format_clinvar_xml_local_path, - output_tsv_path=old_format_output_tsv_path, - parse_variant_function=old_format_parse_variant_function, -) - -CLINVAR_GOLD_STARS = { - None: 0, - "no classification for the single variant": 0, - "no interpretation for the single variant": 0, - "no classification provided": 0, - "no assertion provided": 0, - "no classifications from unflagged records": 0, - "no assertion criteria provided": 0, - "criteria provided, single submitter": 1, - "criteria provided, conflicting classifications": 1, - "criteria provided, conflicting interpretations": 1, - "criteria provided, multiple submitters, no conflicts": 2, - "reviewed by expert panel": 3, - "practice guideline": 4, -} - - -def _NEW_parse_submission(submission_element, trait_mapping_list_element): - submission = {} - - submission["id"] = submission_element.find("./ClinVarAccession").attrib["Accession"] - submission["submitter_name"] = submission_element.find("./ClinVarAccession").attrib["SubmitterName"] - - classification_element = submission_element.find("./Classification") - classification_description_element = classification_element.find("./GermlineClassification") - if classification_description_element is not None: - # TODO: rename to 'germline classification'? - submission["clinical_significance"] = classification_description_element.text - - submission["last_evaluated"] = classification_element.attrib.get("DateLastEvaluated", None) - submission["review_status"] = classification_element.find("./ReviewStatus").text - - submission["conditions"] = [] - trait_elements = submission_element.findall("./TraitSet/Trait") - for trait_element in trait_elements: - preferred_name_element = None - mapping_element = None - - if trait_mapping_list_element is not None: - xref_elements = trait_element.findall("XRef") - - # Trait elements can have several different ways of mapping to a given classification 'MappingTypes', - # match the mappings in decreasing order of specificity - - # 1. Match on XRefs explicitly included in the classification element - for xref_element in xref_elements: - selector = f"./TraitMapping[@ClinicalAssertionID='{submission_element.attrib['ID']}'][@TraitType='{trait_element.attrib['Type']}'][@MappingType='XRef'][@MappingValue='{xref_element.attrib['ID']}']" # noqa - mapping_element = trait_mapping_list_element.find(selector) - if mapping_element is not None: - break - - # 2. Match on the 'Preferred' name of an element, when there's no XRefs defined - if mapping_element is None: - preferred_name_element = trait_element.find("./Name/ElementValue[@Type='Preferred']") - if preferred_name_element is not None and trait_mapping_list_element is not None: - selector = f"./TraitMapping[@ClinicalAssertionID='{submission_element.attrib['ID']}'][@TraitType='{trait_element.attrib['Type']}'][@MappingType='Name'][@MappingValue=\"{preferred_name_element.text}\"]" # noqa - mapping_element = trait_mapping_list_element.find(selector) - - # 3. Match on the name of a classification, when there's no preferred names - if mapping_element is None: - name_elements = trait_element.findall("./Name/ElementValue") - for name_element in name_elements: - if preferred_name_element is None: - preferred_name_element = name_element - - if trait_mapping_list_element is not None: - selector = f"./TraitMapping[@ClinicalAssertionID='{submission_element.attrib['ID']}'][@TraitType='{trait_element.attrib['Type']}'][@MappingType='Name'][@MappingValue=\"{name_element.text}\"]" # noqa - mapping_element = trait_mapping_list_element.find(selector) - if mapping_element: - break - - # After matching Classifications to TraitMappings, iterate over mapping elements and add the MedGen IDs - if mapping_element is not None: - medgen_element = mapping_element.find("./MedGen") - submission["conditions"].append( - {"name": medgen_element.attrib["Name"], "medgen_id": medgen_element.attrib["CUI"]} - ) - - # 4. If there's no mappings that can be made confidently, add a single entry without a MedGen ID - elif preferred_name_element is not None: - submission["conditions"].append({"name": preferred_name_element.text, "medgen_id": None}) - - return submission - - -def _NEW_parse_variant(variant_element): - variant = {} - - if variant_element.find("./ClassifiedRecord") is None: - print("no ./ClassifiedRecord, skipping!") - raise SkipVariant - - variant["locations"] = {} - location_elements = variant_element.findall("./ClassifiedRecord/SimpleAllele/Location/SequenceLocation") - for element in location_elements: - try: - chromosome = element.attrib["Chr"] - # A release in August 2022 introduced several Variants with a Chromosome of 'Un' - # which caused failure of this pipeline when compared to the reference genome - if chromosome == "Un": - variant["locations"] = {} - allele_element = variant_element.findall("./ClassifiedRecord/SimpleAllele") - print( - f' Skipping variant with Allele ID: {allele_element[0].attrib["AlleleID"]} due to anomalous Chromosome value of "Un"' # noqa - ) - break - - variant["locations"][element.attrib["Assembly"]] = { - "locus": chromosome + ":" + element.attrib["positionVCF"], - "alleles": [element.attrib["referenceAlleleVCF"], element.attrib["alternateAlleleVCF"]], - } - except KeyError: - pass - - if not variant["locations"]: - raise SkipVariant - - variant["clinvar_variation_id"] = variant_element.attrib["VariationID"] - - variant["rsid"] = None - rsid_element = variant_element.find("./ClassifiedRecord/SimpleAllele/XRefList/XRef[@DB='dbSNP']") - if rsid_element is not None: - variant["rsid"] = rsid_element.attrib["ID"] - - germline_classification_element = variant_element.find("./ClassifiedRecord/Classifications/GermlineClassification") - variant["review_status"] = germline_classification_element.find("./ReviewStatus").text - variant["gold_stars"] = CLINVAR_GOLD_STARS[variant["review_status"]] - - # TODO: change this to 'classification', ClinVar's new preferred term - variant["clinical_significance"] = germline_classification_element.find("./Description").text - - variant["last_evaluated"] = germline_classification_element.attrib.get("DateLastEvaluated") - - submission_elements = variant_element.findall("./ClassifiedRecord/ClinicalAssertionList/ClinicalAssertion") - trait_mapping_list_element = variant_element.find("./ClassifiedRecord/TraitMappingList") - variant["submissions"] = [ - _NEW_parse_submission(submission_element, trait_mapping_list_element) - for submission_element in submission_elements - ] - - return variant - - -new_format_clinvar_xml_local_path = ( - "/Users/rgrant/Documents/pseudo-desktop/clinvar_update/new_format/downsampled_1000_kept_1000.xml.gz" -) -new_format_output_tsv_path = ( - "/Users/rgrant/Documents/pseudo-desktop/clinvar_update/new_format/output_downsampled_1000_kept_1000.tsv" -) -new_format_parse_variant_function = _NEW_parse_variant - -print("\n\nrunning new...") -new_format_release_date = parse_clinvar_xml_to_tsv( - input_xml_path=new_format_clinvar_xml_local_path, - output_tsv_path=new_format_output_tsv_path, - parse_variant_function=new_format_parse_variant_function, -) - - -assert old_format_release_date == new_format_release_date