Skip to content

Commit

Permalink
Speed up ingestion (#64)
Browse files Browse the repository at this point in the history
* Use joblib to parallelise across resources

* Add parallel option to CLI for data conversion

* Add in backbone elements for all resources

* Changes suggested during review
  • Loading branch information
pipliggins authored Aug 22, 2024
1 parent 9c15c76 commit 2830531
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 39 deletions.
87 changes: 58 additions & 29 deletions fhirflat/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dateutil.parser
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from pyarrow.lib import ArrowTypeError

import fhirflat
Expand Down Expand Up @@ -349,6 +350,7 @@ def condense(x):
if not one_to_one:
filtered_data = filtered_data.reset_index()
melted_data = filtered_data.melt(id_vars="index", var_name="column")
melted_data.dropna(subset=["value"], inplace=True)

# set up the mappings -------------------------------------------------------

Expand Down Expand Up @@ -440,6 +442,7 @@ def convert_data_to_flat(
subject_id="subjid",
validate: bool = True,
compress_format: None | str = None,
parallel: bool = False,
):
"""
Takes raw clinical data (currently assumed to be a one-row-per-patient format like
Expand All @@ -448,29 +451,31 @@ def convert_data_to_flat(
Parameters
----------
data: str
data
The path to the raw clinical data file.
date_format: str
date_format
The format of the dates in the data file. E.g. "%Y-%m-%d"
timezone: str
timezone
The timezone of the dates in the data file. E.g. "Europe/London"
folder_name: str
folder_name
The name of the folder to store the FHIRflat files.
mapping_files_types: tuple[dict, dict] | None
mapping_files_types
A tuple containing two dictionaries, one with the mapping files for each
resource type and one with the mapping type (either one-to-one or one-to-many)
for each resource type.
sheet_id: str | None
sheet_id
The Google Sheet ID containing the mapping files. The first sheet must contain
the mapping types - one column listing the resource name, and another describing
whether the mapping is one-to-one or one-to-many. The subsequent sheets must
be named by resource, and contain the mapping for that resource.
subject_id: str
subject_id
The name of the column containing the subject ID in the data file.
validate: bool
validate
Whether to validate the FHIRflat files after creation.
compress_format: optional str
compress_format
If the output folder should be zipped, and if so with what format.
parallel
Whether to parallelize the data conversion over different resources.
"""

if not mapping_files_types and not sheet_id:
Expand Down Expand Up @@ -501,40 +506,37 @@ def convert_data_to_flat(
for r, i in sheet_keys.items()
}

for resource, map_file in mappings.items():
def convert_resource(
resource, data, map_file, t, subject_id, date_format, timezone
):
start_time = timeit.default_timer()
t = types[resource.__name__]
if t == "one-to-one":
df = create_dictionary(
data,
map_file,
resource.__name__,
one_to_one=True,
subject_id=subject_id,
date_format=date_format,
timezone=timezone,
)
if df is None:
continue
elif t == "one-to-many":
o2o = t == "one-to-one"

if t in ["one-to-one", "one-to-many"]:
df = create_dictionary(
data,
map_file,
resource.__name__,
one_to_one=False,
one_to_one=o2o,
subject_id=subject_id,
date_format=date_format,
timezone=timezone,
)
if df is None:
continue
else:
df = df.dropna().reset_index(drop=True)
return None
else:
raise ValueError(f"Unknown mapping type {t}")

dict_time = timeit.default_timer()
print(
f"creates {resource.__name__} dictionary in " + str(dict_time - start_time)
)

flat_nonvalidated = resource.ingest_to_flat(df)

ingest_time = timeit.default_timer()
print(f"{resource.__name__} ingestion in " + str(ingest_time - dict_time))

if validate:
valid_flat, errors = resource.validate_fhirflat(flat_nonvalidated)

Expand All @@ -555,7 +557,10 @@ def convert_data_to_flat(
UserWarning,
stacklevel=2,
)
continue
return None

valid_time = timeit.default_timer()
print(f"{resource.__name__} validation in " + str(valid_time - dict_time))

end_time = timeit.default_timer()
total_time = end_time - start_time
Expand All @@ -574,6 +579,22 @@ def convert_data_to_flat(
f"Errors saved to {resource.__name__.lower()}_errors.csv"
)

total_t = timeit.default_timer()
_ = Parallel(n_jobs=-1 if parallel else 1)(
delayed(convert_resource)(
resource,
data,
map_file,
types[resource.__name__],
subject_id,
date_format,
timezone,
)
for resource, map_file in mappings.items()
)

print(f"Total time: {timeit.default_timer() - total_t}")

write_metadata(*generate_metadata(folder_name), Path(folder_name) / "fhirflat.toml")
if compress_format:
shutil.make_archive(folder_name, compress_format, folder_name)
Expand Down Expand Up @@ -677,6 +698,13 @@ def main():
choices=["zip", "tar", "gztar", "bztar", "xztar"],
)

parser.add_argument(
"-p",
"--parallel",
help="Parallelize the data conversion over different reosurces",
action="store_true",
)

args = parser.parse_args()

convert_data_to_flat(
Expand All @@ -688,6 +716,7 @@ def main():
subject_id=args.subject_id,
validate=args.validate,
compress_format=args.compress,
parallel=args.parallel,
)


Expand Down
5 changes: 3 additions & 2 deletions fhirflat/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ def validate_fhirflat(

flat_df = df.copy()

flat_df["fhir"] = flat_df.apply(
fhir_json = flat_df.apply(
lambda row: row.to_json(date_format="iso", date_unit="s"), axis=1
).apply(lambda x: cls.create_fhir_resource(x))
)
flat_df["fhir"] = fhir_json.apply(lambda x: cls.create_fhir_resource(x))

if len(flat_df) == 1 and return_frames is False:
resource = flat_df["fhir"].iloc[0]
Expand Down
6 changes: 6 additions & 0 deletions fhirflat/resources/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fhir.resources import fhirtypes
from fhir.resources.condition import Condition as _Condition
from fhir.resources.condition import ConditionParticipant, ConditionStage
from pydantic.v1 import Field, validator

from .base import FHIRFlatBase
Expand Down Expand Up @@ -47,6 +48,11 @@ class Condition(_Condition, FHIRFlatBase):
"participant",
}

backbone_elements: ClassVar[dict] = {
"participant": ConditionParticipant,
"stage": ConditionStage,
}

# required attributes that are not present in the FHIRflat representation
flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "clinicalStatus"]

Expand Down
13 changes: 13 additions & 0 deletions fhirflat/resources/immunization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

from fhir.resources import fhirtypes
from fhir.resources.immunization import Immunization as _Immunization
from fhir.resources.immunization import (
ImmunizationPerformer,
ImmunizationProgramEligibility,
ImmunizationProtocolApplied,
ImmunizationReaction,
)
from pydantic.v1 import Field, validator

from .base import FHIRFlatBase
Expand Down Expand Up @@ -54,6 +60,13 @@ class Immunization(_Immunization, FHIRFlatBase):
# required attributes that are not present in the FHIRflat representation
flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"]

backbone_elements: ClassVar[dict] = {
"performer": ImmunizationPerformer,
"programEligibility": ImmunizationProgramEligibility,
"reaction": ImmunizationReaction,
"protocolApplied": ImmunizationProtocolApplied,
}

@validator("extension")
def validate_extension_contents(cls, extensions):
phase_count = sum(isinstance(item, timingPhase) for item in extensions)
Expand Down
3 changes: 3 additions & 0 deletions fhirflat/resources/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import ClassVar, TypeAlias

from fhir.resources.location import Location as _Location
from fhir.resources.location import LocationPosition

from .base import FHIRFlatBase

Expand All @@ -19,6 +20,8 @@ class Location(_Location, FHIRFlatBase):
"hoursOfOperation",
}

backbone_elements: ClassVar[dict] = {"position": LocationPosition}

@classmethod
def cleanup(cls, data: dict) -> dict:
"""
Expand Down
9 changes: 9 additions & 0 deletions fhirflat/resources/medicationadministration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from fhir.resources.medicationadministration import (
MedicationAdministration as _MedicationAdministration,
)
from fhir.resources.medicationadministration import (
MedicationAdministrationDosage,
MedicationAdministrationPerformer,
)

from .base import FHIRFlatBase

Expand All @@ -24,6 +28,11 @@ class MedicationAdministration(_MedicationAdministration, FHIRFlatBase):
# required attributes that are not present in the FHIRflat representation
flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"]

backbone_elements: ClassVar[dict] = {
"performer": MedicationAdministrationPerformer,
"dosage": MedicationAdministrationDosage,
}

@classmethod
def cleanup(cls, data: dict) -> dict:
"""
Expand Down
5 changes: 5 additions & 0 deletions fhirflat/resources/medicationstatement.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from fhir.resources.medicationstatement import (
MedicationStatement as _MedicationStatement,
)
from fhir.resources.medicationstatement import (
MedicationStatementAdherence,
)

from .base import FHIRFlatBase

Expand All @@ -23,6 +26,8 @@ class MedicationStatement(_MedicationStatement, FHIRFlatBase):
# required attributes that are not present in the FHIRflat representation
flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"]

backbone_elements: ClassVar[dict] = {"adherence": MedicationStatementAdherence}

@classmethod
def cleanup(cls, data: dict) -> dict:
"""
Expand Down
14 changes: 13 additions & 1 deletion fhirflat/resources/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

from fhir.resources import fhirtypes
from fhir.resources.observation import Observation as _Observation
from fhir.resources.observation import ObservationComponent as _ObservationComponent
from fhir.resources.observation import (
ObservationComponent as _ObservationComponent,
)
from fhir.resources.observation import (
ObservationReferenceRange,
ObservationTriggeredBy,
)
from pydantic.v1 import Field, validator

from .base import FHIRFlatBase
Expand Down Expand Up @@ -80,6 +86,12 @@ class Observation(_Observation, FHIRFlatBase):
# required attributes that are not present in the FHIRflat representation
flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"]

backbone_elements: ClassVar[dict] = {
"triggeredBy": ObservationTriggeredBy,
"referenceRange": ObservationReferenceRange,
"component": _ObservationComponent,
}

@validator("extension")
def validate_extension_contents(cls, extensions):
phase_count = sum(isinstance(item, timingPhase) for item in extensions)
Expand Down
9 changes: 8 additions & 1 deletion fhirflat/resources/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from typing import ClassVar, TypeAlias

from fhir.resources.organization import Organization as _Organization
from fhir.resources.organization import (
Organization as _Organization,
)
from fhir.resources.organization import (
OrganizationQualification,
)

from .base import FHIRFlatBase

Expand All @@ -18,6 +23,8 @@ class Organization(_Organization, FHIRFlatBase):
"contact", # phone numbers, addresses
}

backbone_elements: ClassVar[dict] = {"qualification": OrganizationQualification}

@classmethod
def cleanup(cls, data: dict) -> dict:
"""
Expand Down
15 changes: 14 additions & 1 deletion fhirflat/resources/patient.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from typing import ClassVar, TypeAlias, Union

from fhir.resources import fhirtypes
from fhir.resources.patient import Patient as _Patient
from fhir.resources.patient import (
Patient as _Patient,
)
from fhir.resources.patient import (
PatientCommunication,
PatientContact,
PatientLink,
)
from pydantic.v1 import Field, validator

from .base import FHIRFlatBase
Expand Down Expand Up @@ -43,6 +50,12 @@ class Patient(_Patient, FHIRFlatBase):
"link",
}

backbone_elements: ClassVar[dict] = {
"contact": PatientContact,
"communication": PatientCommunication,
"link": PatientLink,
}

@validator("extension")
def validate_extension_contents(cls, extensions):
age_count = sum(isinstance(item, Age) for item in extensions)
Expand Down
Loading

0 comments on commit 2830531

Please sign in to comment.