diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 7ff35ff..8969a67 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -19,6 +19,7 @@ import dateutil.parser import numpy as np import pandas as pd +from joblib import Parallel, delayed from pyarrow.lib import ArrowTypeError import fhirflat @@ -349,6 +350,7 @@ def condense(x): if not one_to_one: filtered_data = filtered_data.reset_index() melted_data = filtered_data.melt(id_vars="index", var_name="column") + melted_data.dropna(subset=["value"], inplace=True) # set up the mappings ------------------------------------------------------- @@ -440,6 +442,7 @@ def convert_data_to_flat( subject_id="subjid", validate: bool = True, compress_format: None | str = None, + parallel: bool = False, ): """ Takes raw clinical data (currently assumed to be a one-row-per-patient format like @@ -448,29 +451,31 @@ def convert_data_to_flat( Parameters ---------- - data: str + data The path to the raw clinical data file. - date_format: str + date_format The format of the dates in the data file. E.g. "%Y-%m-%d" - timezone: str + timezone The timezone of the dates in the data file. E.g. "Europe/London" - folder_name: str + folder_name The name of the folder to store the FHIRflat files. - mapping_files_types: tuple[dict, dict] | None + mapping_files_types A tuple containing two dictionaries, one with the mapping files for each resource type and one with the mapping type (either one-to-one or one-to-many) for each resource type. - sheet_id: str | None + sheet_id The Google Sheet ID containing the mapping files. The first sheet must contain the mapping types - one column listing the resource name, and another describing whether the mapping is one-to-one or one-to-many. The subsequent sheets must be named by resource, and contain the mapping for that resource. - subject_id: str + subject_id The name of the column containing the subject ID in the data file. - validate: bool + validate Whether to validate the FHIRflat files after creation. - compress_format: optional str + compress_format If the output folder should be zipped, and if so with what format. + parallel + Whether to parallelize the data conversion over different resources. """ if not mapping_files_types and not sheet_id: @@ -501,40 +506,37 @@ def convert_data_to_flat( for r, i in sheet_keys.items() } - for resource, map_file in mappings.items(): + def convert_resource( + resource, data, map_file, t, subject_id, date_format, timezone + ): start_time = timeit.default_timer() - t = types[resource.__name__] - if t == "one-to-one": - df = create_dictionary( - data, - map_file, - resource.__name__, - one_to_one=True, - subject_id=subject_id, - date_format=date_format, - timezone=timezone, - ) - if df is None: - continue - elif t == "one-to-many": + o2o = t == "one-to-one" + + if t in ["one-to-one", "one-to-many"]: df = create_dictionary( data, map_file, resource.__name__, - one_to_one=False, + one_to_one=o2o, subject_id=subject_id, date_format=date_format, timezone=timezone, ) if df is None: - continue - else: - df = df.dropna().reset_index(drop=True) + return None else: raise ValueError(f"Unknown mapping type {t}") + dict_time = timeit.default_timer() + print( + f"creates {resource.__name__} dictionary in " + str(dict_time - start_time) + ) + flat_nonvalidated = resource.ingest_to_flat(df) + ingest_time = timeit.default_timer() + print(f"{resource.__name__} ingestion in " + str(ingest_time - dict_time)) + if validate: valid_flat, errors = resource.validate_fhirflat(flat_nonvalidated) @@ -555,7 +557,10 @@ def convert_data_to_flat( UserWarning, stacklevel=2, ) - continue + return None + + valid_time = timeit.default_timer() + print(f"{resource.__name__} validation in " + str(valid_time - dict_time)) end_time = timeit.default_timer() total_time = end_time - start_time @@ -574,6 +579,22 @@ def convert_data_to_flat( f"Errors saved to {resource.__name__.lower()}_errors.csv" ) + total_t = timeit.default_timer() + _ = Parallel(n_jobs=-1 if parallel else 1)( + delayed(convert_resource)( + resource, + data, + map_file, + types[resource.__name__], + subject_id, + date_format, + timezone, + ) + for resource, map_file in mappings.items() + ) + + print(f"Total time: {timeit.default_timer() - total_t}") + write_metadata(*generate_metadata(folder_name), Path(folder_name) / "fhirflat.toml") if compress_format: shutil.make_archive(folder_name, compress_format, folder_name) @@ -677,6 +698,13 @@ def main(): choices=["zip", "tar", "gztar", "bztar", "xztar"], ) + parser.add_argument( + "-p", + "--parallel", + help="Parallelize the data conversion over different reosurces", + action="store_true", + ) + args = parser.parse_args() convert_data_to_flat( @@ -688,6 +716,7 @@ def main(): subject_id=args.subject_id, validate=args.validate, compress_format=args.compress, + parallel=args.parallel, ) diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index 7477317..8d6d73f 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -119,9 +119,10 @@ def validate_fhirflat( flat_df = df.copy() - flat_df["fhir"] = flat_df.apply( + fhir_json = flat_df.apply( lambda row: row.to_json(date_format="iso", date_unit="s"), axis=1 - ).apply(lambda x: cls.create_fhir_resource(x)) + ) + flat_df["fhir"] = fhir_json.apply(lambda x: cls.create_fhir_resource(x)) if len(flat_df) == 1 and return_frames is False: resource = flat_df["fhir"].iloc[0] diff --git a/fhirflat/resources/condition.py b/fhirflat/resources/condition.py index c76338f..c65dbf0 100644 --- a/fhirflat/resources/condition.py +++ b/fhirflat/resources/condition.py @@ -4,6 +4,7 @@ from fhir.resources import fhirtypes from fhir.resources.condition import Condition as _Condition +from fhir.resources.condition import ConditionParticipant, ConditionStage from pydantic.v1 import Field, validator from .base import FHIRFlatBase @@ -47,6 +48,11 @@ class Condition(_Condition, FHIRFlatBase): "participant", } + backbone_elements: ClassVar[dict] = { + "participant": ConditionParticipant, + "stage": ConditionStage, + } + # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "clinicalStatus"] diff --git a/fhirflat/resources/immunization.py b/fhirflat/resources/immunization.py index c927e21..d80a73c 100644 --- a/fhirflat/resources/immunization.py +++ b/fhirflat/resources/immunization.py @@ -4,6 +4,12 @@ from fhir.resources import fhirtypes from fhir.resources.immunization import Immunization as _Immunization +from fhir.resources.immunization import ( + ImmunizationPerformer, + ImmunizationProgramEligibility, + ImmunizationProtocolApplied, + ImmunizationReaction, +) from pydantic.v1 import Field, validator from .base import FHIRFlatBase @@ -54,6 +60,13 @@ class Immunization(_Immunization, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = { + "performer": ImmunizationPerformer, + "programEligibility": ImmunizationProgramEligibility, + "reaction": ImmunizationReaction, + "protocolApplied": ImmunizationProtocolApplied, + } + @validator("extension") def validate_extension_contents(cls, extensions): phase_count = sum(isinstance(item, timingPhase) for item in extensions) diff --git a/fhirflat/resources/location.py b/fhirflat/resources/location.py index a24c7de..5a16eb0 100644 --- a/fhirflat/resources/location.py +++ b/fhirflat/resources/location.py @@ -3,6 +3,7 @@ from typing import ClassVar, TypeAlias from fhir.resources.location import Location as _Location +from fhir.resources.location import LocationPosition from .base import FHIRFlatBase @@ -19,6 +20,8 @@ class Location(_Location, FHIRFlatBase): "hoursOfOperation", } + backbone_elements: ClassVar[dict] = {"position": LocationPosition} + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/medicationadministration.py b/fhirflat/resources/medicationadministration.py index 1e686ea..138dc26 100644 --- a/fhirflat/resources/medicationadministration.py +++ b/fhirflat/resources/medicationadministration.py @@ -5,6 +5,10 @@ from fhir.resources.medicationadministration import ( MedicationAdministration as _MedicationAdministration, ) +from fhir.resources.medicationadministration import ( + MedicationAdministrationDosage, + MedicationAdministrationPerformer, +) from .base import FHIRFlatBase @@ -24,6 +28,11 @@ class MedicationAdministration(_MedicationAdministration, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = { + "performer": MedicationAdministrationPerformer, + "dosage": MedicationAdministrationDosage, + } + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/medicationstatement.py b/fhirflat/resources/medicationstatement.py index 50f25e3..3d19589 100644 --- a/fhirflat/resources/medicationstatement.py +++ b/fhirflat/resources/medicationstatement.py @@ -5,6 +5,9 @@ from fhir.resources.medicationstatement import ( MedicationStatement as _MedicationStatement, ) +from fhir.resources.medicationstatement import ( + MedicationStatementAdherence, +) from .base import FHIRFlatBase @@ -23,6 +26,8 @@ class MedicationStatement(_MedicationStatement, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = {"adherence": MedicationStatementAdherence} + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/observation.py b/fhirflat/resources/observation.py index 49c5811..aeb7623 100644 --- a/fhirflat/resources/observation.py +++ b/fhirflat/resources/observation.py @@ -4,7 +4,13 @@ from fhir.resources import fhirtypes from fhir.resources.observation import Observation as _Observation -from fhir.resources.observation import ObservationComponent as _ObservationComponent +from fhir.resources.observation import ( + ObservationComponent as _ObservationComponent, +) +from fhir.resources.observation import ( + ObservationReferenceRange, + ObservationTriggeredBy, +) from pydantic.v1 import Field, validator from .base import FHIRFlatBase @@ -80,6 +86,12 @@ class Observation(_Observation, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = { + "triggeredBy": ObservationTriggeredBy, + "referenceRange": ObservationReferenceRange, + "component": _ObservationComponent, + } + @validator("extension") def validate_extension_contents(cls, extensions): phase_count = sum(isinstance(item, timingPhase) for item in extensions) diff --git a/fhirflat/resources/organization.py b/fhirflat/resources/organization.py index d9499a7..32b6b8c 100644 --- a/fhirflat/resources/organization.py +++ b/fhirflat/resources/organization.py @@ -2,7 +2,12 @@ from typing import ClassVar, TypeAlias -from fhir.resources.organization import Organization as _Organization +from fhir.resources.organization import ( + Organization as _Organization, +) +from fhir.resources.organization import ( + OrganizationQualification, +) from .base import FHIRFlatBase @@ -18,6 +23,8 @@ class Organization(_Organization, FHIRFlatBase): "contact", # phone numbers, addresses } + backbone_elements: ClassVar[dict] = {"qualification": OrganizationQualification} + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/patient.py b/fhirflat/resources/patient.py index fa7d20e..a951a48 100644 --- a/fhirflat/resources/patient.py +++ b/fhirflat/resources/patient.py @@ -3,7 +3,14 @@ from typing import ClassVar, TypeAlias, Union from fhir.resources import fhirtypes -from fhir.resources.patient import Patient as _Patient +from fhir.resources.patient import ( + Patient as _Patient, +) +from fhir.resources.patient import ( + PatientCommunication, + PatientContact, + PatientLink, +) from pydantic.v1 import Field, validator from .base import FHIRFlatBase @@ -43,6 +50,12 @@ class Patient(_Patient, FHIRFlatBase): "link", } + backbone_elements: ClassVar[dict] = { + "contact": PatientContact, + "communication": PatientCommunication, + "link": PatientLink, + } + @validator("extension") def validate_extension_contents(cls, extensions): age_count = sum(isinstance(item, Age) for item in extensions) diff --git a/fhirflat/resources/procedure.py b/fhirflat/resources/procedure.py index 7934c72..8297d5d 100644 --- a/fhirflat/resources/procedure.py +++ b/fhirflat/resources/procedure.py @@ -3,7 +3,13 @@ from typing import ClassVar, TypeAlias, Union from fhir.resources import fhirtypes -from fhir.resources.procedure import Procedure as _Procedure +from fhir.resources.procedure import ( + Procedure as _Procedure, +) +from fhir.resources.procedure import ( + ProcedureFocalDevice, + ProcedurePerformer, +) from pydantic.v1 import Field, validator from .base import FHIRFlatBase @@ -63,6 +69,11 @@ class Procedure(_Procedure, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = { + "performer": ProcedurePerformer, + "focalDevice": ProcedureFocalDevice, + } + @validator("extension") def validate_extension_contents(cls, extensions): duration_count = sum(isinstance(item, Duration) for item in extensions) diff --git a/fhirflat/resources/researchsubject.py b/fhirflat/resources/researchsubject.py index 00c3b20..f5ee2ba 100644 --- a/fhirflat/resources/researchsubject.py +++ b/fhirflat/resources/researchsubject.py @@ -2,7 +2,12 @@ from typing import ClassVar, TypeAlias -from fhir.resources.researchsubject import ResearchSubject as _ResearchSubject +from fhir.resources.researchsubject import ( + ResearchSubject as _ResearchSubject, +) +from fhir.resources.researchsubject import ( + ResearchSubjectProgress, +) from .base import FHIRFlatBase @@ -19,6 +24,8 @@ class ResearchSubject(_ResearchSubject, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + backbone_elements: ClassVar[dict] = {"progress": ResearchSubjectProgress} + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/specimen.py b/fhirflat/resources/specimen.py index 816c274..6745fd6 100644 --- a/fhirflat/resources/specimen.py +++ b/fhirflat/resources/specimen.py @@ -2,7 +2,15 @@ from typing import ClassVar, TypeAlias -from fhir.resources.specimen import Specimen as _Specimen +from fhir.resources.specimen import ( + Specimen as _Specimen, +) +from fhir.resources.specimen import ( + SpecimenCollection, + SpecimenContainer, + SpecimenFeature, + SpecimenProcessing, +) from fhirflat.flat2fhir import expand_concepts @@ -21,6 +29,13 @@ class Specimen(_Specimen, FHIRFlatBase): "note", } + backbone_elements: ClassVar[dict] = { + "feature": SpecimenFeature, + "collection": SpecimenCollection, + "processing": SpecimenProcessing, + "container": SpecimenContainer, + } + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/pyproject.toml b/pyproject.toml index ac05852..f8777df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,12 +21,13 @@ dependencies = [ "fhir.resources==7.1.0", "numpy==1.26.4", "orjson==3.9.13", - "pandas>=2.2.0", + "pandas[performance]>=2.2.0", "pyarrow==15.0.0", "pydantic==2.6.1", "pydantic_core==2.16.2", "tzdata", "python-dateutil", + "joblib", ] [project.optional-dependencies] diff --git a/tests/test_base.py b/tests/test_base.py index 88c523c..2e344a5 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -3,7 +3,6 @@ import pandas as pd import pytest from pydantic.v1 import ValidationError -from pathlib import Path def test_flat_fields(): diff --git a/tests/test_ingest.py b/tests/test_ingest.py index ae272a4..815613f 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -1038,6 +1038,29 @@ def test_convert_data_to_flat_local_mapping(): shutil.rmtree(output_folder) +def test_convert_data_to_flat_local_mapping_parallel(): + output_folder = "tests/ingestion_output" + mappings = { + Encounter: "tests/dummy_data/encounter_dummy_mapping.csv", + Observation: "tests/dummy_data/observation_dummy_mapping.csv", + } + resource_types = {"Encounter": "one-to-one", "Observation": "one-to-many"} + + convert_data_to_flat( + "tests/dummy_data/combined_dummy_data.csv", + folder_name=output_folder, + date_format="%Y-%m-%d", + timezone="Brazil/East", + mapping_files_types=(mappings, resource_types), + parallel=True, + ) + + assert os.path.exists("tests/ingestion_output/encounter.parquet") + assert os.path.exists("tests/ingestion_output/observation.parquet") + + shutil.rmtree(output_folder) + + def test_convert_data_to_flat_local_mapping_zipped(): output_folder = "tests/ingestion_output" mappings = {