From c04fa66dce3412f379d95727fe7f0b73f52e357d Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Wed, 29 May 2024 15:32:52 +0100 Subject: [PATCH 1/4] Basic method for applying correct datetime format - Will strip times out when a midnight time has been provided, not ideal. --- fhirflat/ingest.py | 7 ++++++- fhirflat/resources/base.py | 25 ++++++++++++++++++++----- tests/test_ingest.py | 22 ++++++++++++++-------- 3 files changed, 40 insertions(+), 14 deletions(-) diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 4df3da4..4c83625 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -254,6 +254,8 @@ def convert_data_to_flat( mapping_files_types: tuple[dict, dict] | None = None, sheet_id: str | None = None, subject_id="subjid", + date_format="%Y-%m-%d", + timezone=None, ): """ Takes raw clinical data (currently assumed to be a one-row-per-patient format like @@ -334,5 +336,8 @@ def convert_data_to_flat( raise ValueError(f"Unknown mapping type {t}") resource.ingest_to_flat( - df, os.path.join(folder_name, resource.__name__.lower()) + df, + os.path.join(folder_name, resource.__name__.lower()), + date_format, + timezone, ) diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index 7719212..4e1cc15 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -3,6 +3,7 @@ from fhir.resources.domainresource import DomainResource as _DomainResource import pandas as pd +import numpy as np import orjson from fhirflat.fhir2flat import fhir2flat @@ -127,7 +128,9 @@ def fhir_format(row: pd.Series) -> pd.Series: return condensed_mapped_data @classmethod - def ingest_to_flat(cls, data: pd.DataFrame, filename: str): + def ingest_to_flat( + cls, data: pd.DataFrame, filename: str, date_format: str, timezone: str + ): """ Takes a pandas dataframe and populates the resource with the data. Creates a FHIRflat parquet file for the resources. @@ -146,13 +149,25 @@ def ingest_to_flat(cls, data: pd.DataFrame, filename: str): # flattens resources back out flat_df = data["fhir"].apply(lambda x: x.to_flat()) - # Stops parquet conversion from stripping the time from mixed date/datetime - # columns + # create FHIR expected date format for date_cols in [ x for x in flat_df.columns if "date" in x.lower() or "period" in x.lower() ]: - flat_df[date_cols] = flat_df[date_cols].astype(str) - flat_df[date_cols] = flat_df[date_cols].replace("nan", None) + dti = pd.to_datetime(flat_df[date_cols], format=date_format) + dti = dti.dt.tz_localize(timezone) + flat_df[date_cols] = dti.dt.strftime("%Y-%m-%dT%H:%M:%S%z") + + # replace nan with None + flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None) + + # remove time & timezone info if none was provided + flat_df[date_cols] = flat_df[date_cols].apply( + lambda x: ( + (x.split("T")[0] if "T00:00:00" in x else x) + if x is not None + else None + ) + ) for coding_column in [ x diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 5d6716a..ac807d0 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -118,7 +118,7 @@ def test_create_dict_one_to_one_single_row(): }, ], "subject": "Patient/2", - "actualPeriod.start": "2021-04-01 18:00:00", + "actualPeriod.start": "2021-04-01T18:00:00-0300", "actualPeriod.end": "2021-04-10", "admission.dischargeDisposition.code": "https://snomed.info/sct|371827001", "admission.dischargeDisposition.text": "Patient discharged alive (finding)", @@ -136,7 +136,9 @@ def test_load_data_one_to_one_single_row(): ) assert df is not None - Encounter.ingest_to_flat(df, "encounter_ingestion_single") + Encounter.ingest_to_flat( + df, "encounter_ingestion_single", "%Y-%m-%d", "Brazil/East" + ) assert_frame_equal( pd.read_parquet("encounter_ingestion_single.parquet"), @@ -294,12 +296,12 @@ def test_load_data_one_to_one_single_row(): "id": ["10", "11", "12", "13"], "actualPeriod.start": [ "2020-05-01", - "2021-04-01 18:00:00", - "2021-05-10 17:30:00", - "2022-06-15 21:00:00", + "2021-04-01T18:00:00-0300", + "2021-05-10T17:30:00-0300", + "2022-06-15T21:00:00-0300", ], "actualPeriod.end": [ - "2020-05-01", + "2020-05-01", # don't want this "2021-04-10", "2021-05-15", "2022-06-20", @@ -340,7 +342,7 @@ def test_load_data_one_to_one_multi_row(): ) assert df is not None - Encounter.ingest_to_flat(df, "encounter_ingestion_multi") + Encounter.ingest_to_flat(df, "encounter_ingestion_multi", "%Y-%m-%d", "Brazil/East") assert_frame_equal( pd.read_parquet("encounter_ingestion_multi.parquet"), @@ -432,7 +434,9 @@ def test_load_data_one_to_many_multi_row(): ) assert df is not None - Observation.ingest_to_flat(df.dropna(), "observation_ingestion") + Observation.ingest_to_flat( + df.dropna(), "observation_ingestion", "%Y-%m-%d", "Brazil/East" + ) full_df = pd.read_parquet("observation_ingestion.parquet") @@ -461,6 +465,8 @@ def test_convert_data_to_flat_local_mapping(): "tests/dummy_data/combined_dummy_data.csv", mapping_files_types=(mappings, resource_types), folder_name=output_folder, + date_format="%Y-%m-%d", + timezone="Brazil/East", ) encounter_df = pd.read_parquet("tests/ingestion_output/encounter.parquet") From 829830c95960322de069dce3acb01019563f634c Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 30 May 2024 12:59:11 +0100 Subject: [PATCH 2/4] Move date formatting into ingestion pipeline --- fhirflat/ingest.py | 101 ++++++++++++++++++++++++++++++------- fhirflat/resources/base.py | 13 ++--- pyproject.toml | 2 + tests/test_ingest.py | 48 +++++++++++++++--- 4 files changed, 129 insertions(+), 35 deletions(-) diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 4c83625..724ed20 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -5,6 +5,9 @@ import pandas as pd import numpy as np +from datetime import datetime +import dateutil.parser +from zoneinfo import ZoneInfo import warnings import os from math import isnan @@ -24,44 +27,84 @@ """ -def find_field_value(row, response, mapp, raw_data=None): +def find_field_value( + row, response, fhir_attr, mapp, date_format, timezone, raw_data=None +): """ Returns the data for a given field, given the mapping. For one to many resources the raw data is provided to allow for searching for other fields than in the melted data. """ if mapp == "": - return response + return_val = response elif "+" in mapp: mapp = mapp.split("+") - results = [find_field_value(row, response, m, raw_data) for m in mapp] + results = [ + find_field_value(row, response, "", m, date_format, timezone, raw_data) + for m in mapp + ] results = [str(x) for x in results if not (isinstance(x, float) and isnan(x))] - return " ".join(results) if "/" not in results[0] else "".join(results) + return_val = " ".join(results) if "/" not in results[0] else "".join(results) elif "if not" in mapp: mapp = mapp.replace(" ", "").split("ifnot") - results = [find_field_value(row, response, m, raw_data) for m in mapp] + results = [ + find_field_value(row, response, "", m, date_format, timezone, raw_data) + for m in mapp + ] x, y = results if isinstance(y, float): - return x if isnan(y) else None + return_val = x if isnan(y) else None else: - return x if not y else None + return_val = x if not y else None elif "<" in mapp: col = mapp.lstrip("<").rstrip(">") try: - return row[col] + return_val = row[col] except KeyError: if raw_data is not None: try: - return raw_data.loc[row["index"], col] + return_val = raw_data.loc[row["index"], col] except KeyError: raise KeyError(f"Column {col} not found in data") else: raise KeyError(f"Column {col} not found in the filtered data") else: - return mapp + return_val = mapp + + if "date" in fhir_attr.lower() or "period" in fhir_attr.lower(): + return format_dates(return_val, date_format, timezone) + return return_val -def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict: +def format_dates(date_str: str, date_format: str, timezone=str) -> dict: + """ + Converts dates into ISO8601 format with timezone information. + """ + + if date_str is None: + return date_str + + new_tz = ZoneInfo(timezone) + + try: + date_time = datetime.strptime(date_str, date_format) + date_time_aware = date_time.replace(tzinfo=new_tz) + if "%H" not in date_format: + date_time_aware = date_time_aware.date() + except ValueError: + # Unconverted data remains in the string (i.e. time is present) + date, time = date_str.split(" ") + date = datetime.strptime(date, date_format) + time = dateutil.parser.parse(time).time() + date_time = datetime.combine(date, time) + date_time_aware = date_time.replace(tzinfo=new_tz) + + return date_time_aware.isoformat() + + +def create_dict_wide( + row: pd.Series, map_df: pd.DataFrame, date_format: str, timezone: str +) -> dict: """ Takes a wide-format dataframe and iterates through the columns of the row, applying the mapping to each column and produces a fhirflat-like dictionary to @@ -83,7 +126,9 @@ def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict: k: ( v if "<" not in str(v) - else find_field_value(row, response, v) + else find_field_value( + row, response, k, v, date_format, timezone + ) ) for k, v in mapping.items() } @@ -119,7 +164,11 @@ def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict: def create_dict_long( - row: pd.Series, full_df: pd.DataFrame, map_df: pd.DataFrame + row: pd.Series, + full_df: pd.DataFrame, + map_df: pd.DataFrame, + date_format: str, + timezone: str, ) -> dict | None: """ Takes a long-format dataframe and a mapping file, and produces a fhirflat-like @@ -139,7 +188,9 @@ def create_dict_long( k: ( v if "<" not in str(v) - else find_field_value(row, response, v, raw_data=full_df) + else find_field_value( + row, response, k, v, date_format, timezone, raw_data=full_df + ) ) for k, v in mapping.items() } @@ -160,6 +211,8 @@ def create_dictionary( resource: str, one_to_one=False, subject_id="subjid", + date_format="%Y-%m-%d", + timezone=None, ) -> pd.DataFrame | None: """ Given a data file and a single mapping file for one FHIR resource type, @@ -179,6 +232,10 @@ def create_dictionary( Whether the resource should be mapped as one-to-one or one-to-many. subject_id: str The name of the column containing the subject ID in the data file. + date_format: str + The format of the dates in the data file. E.g. "%Y-%m-%d" + timezone: str + The timezone of the dates in the data file. E.g. "Europe/London" """ data: pd.DataFrame = pd.read_csv(data_file, header=0) @@ -238,12 +295,12 @@ def condense(x): # Generate the flat_like dictionary if one_to_one: filtered_data["flat_dict"] = filtered_data.apply( - create_dict_wide, args=[map_df], axis=1 + create_dict_wide, args=[map_df, date_format, timezone], axis=1 ) return filtered_data else: melted_data["flat_dict"] = melted_data.apply( - create_dict_long, args=[data, map_df], axis=1 + create_dict_long, args=[data, map_df, date_format, timezone], axis=1 ) return melted_data["flat_dict"].to_frame() @@ -251,11 +308,11 @@ def condense(x): def convert_data_to_flat( data: str, folder_name: str, + date_format: str, + timezone: str, mapping_files_types: tuple[dict, dict] | None = None, sheet_id: str | None = None, subject_id="subjid", - date_format="%Y-%m-%d", - timezone=None, ): """ Takes raw clinical data (currently assumed to be a one-row-per-patient format like @@ -268,6 +325,10 @@ def convert_data_to_flat( The path to the raw clinical data file. folder_name: str The name of the folder to store the FHIRflat files. + date_format: str + The format of the dates in the data file. E.g. "%Y-%m-%d" + timezone: str + The timezone of the dates in the data file. E.g. "Europe/London" mapping_files_types: tuple[dict, dict] | None A tuple containing two dictionaries, one with the mapping files for each resource type and one with the mapping type (either one-to-one or one-to-many) @@ -317,6 +378,8 @@ def convert_data_to_flat( resource.__name__, one_to_one=True, subject_id=subject_id, + date_format=date_format, + timezone=timezone, ) if df is None: continue @@ -327,6 +390,8 @@ def convert_data_to_flat( resource.__name__, one_to_one=False, subject_id=subject_id, + date_format=date_format, + timezone=timezone, ) if df is None: continue diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index 4e1cc15..b432a66 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -153,20 +153,13 @@ def ingest_to_flat( for date_cols in [ x for x in flat_df.columns if "date" in x.lower() or "period" in x.lower() ]: - dti = pd.to_datetime(flat_df[date_cols], format=date_format) - dti = dti.dt.tz_localize(timezone) - flat_df[date_cols] = dti.dt.strftime("%Y-%m-%dT%H:%M:%S%z") - # replace nan with None flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None) - # remove time & timezone info if none was provided + # convert datetime objects to ISO strings + # (stops unwanted parquet conversions) flat_df[date_cols] = flat_df[date_cols].apply( - lambda x: ( - (x.split("T")[0] if "T00:00:00" in x else x) - if x is not None - else None - ) + lambda x: (x.isoformat() if x is not None else None) ) for coding_column in [ diff --git a/pyproject.toml b/pyproject.toml index 5641cd8..2491b5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "pyarrow==15.0.0", "pydantic==2.6.1", "pydantic_core==2.16.2", + "tzdata", + "python-dateutil" ] [project.optional-dependencies] diff --git a/tests/test_ingest.py b/tests/test_ingest.py index ac807d0..27898b9 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -1,6 +1,7 @@ from fhirflat.ingest import ( create_dictionary, convert_data_to_flat, + format_dates, ) from fhirflat.resources.encounter import Encounter from fhirflat.resources.observation import Observation @@ -10,12 +11,37 @@ import shutil from decimal import Decimal import numpy as np +import pytest + + +@pytest.mark.parametrize( + "date_srt, format, tz, expected", + [ + ("2021-04-01", "%Y-%m-%d", "Brazil/East", "2021-04-01"), + ("2021-04-01 18:00", "%Y-%m-%d", "Brazil/East", "2021-04-01T18:00:00-03:00"), + ("2021-04-01 00:30", "%Y-%m-%d", "UTC", "2021-04-01T00:30:00+00:00"), + ( + "2021-04-01 12:00", + "%Y-%m-%d %H:%M", + "Brazil/East", + "2021-04-01T12:00:00-03:00", + ), + (None, "%Y-%m-%d", "Brazil/East", None), + ], +) +def test_format_dates(date_srt, format, tz, expected): + assert format_dates(date_srt, format, tz) == expected + + +def test_format_dates_error(): + with pytest.raises(ValueError): + format_dates("2021-04-01", "%m/%d/%Y", "Brazil/East") ENCOUNTER_DICT_OUT = { "id": 11, "subject": "Patient/2", - "actualPeriod.start": "2021-04-01 18:00", + "actualPeriod.start": "2021-04-01T18:00:00-03:00", "actualPeriod.end": "2021-04-10", "extension.timingPhase.system": "https://snomed.info/sct", "extension.timingPhase.code": 278307001, @@ -50,6 +76,8 @@ def test_create_dict_one_to_one_single_row(): "tests/dummy_data/encounter_dummy_mapping.csv", "Encounter", one_to_one=True, + date_format="%Y-%m-%d", + timezone="Brazil/East", ) assert df is not None @@ -118,7 +146,7 @@ def test_create_dict_one_to_one_single_row(): }, ], "subject": "Patient/2", - "actualPeriod.start": "2021-04-01T18:00:00-0300", + "actualPeriod.start": "2021-04-01T18:00:00-03:00", "actualPeriod.end": "2021-04-10", "admission.dischargeDisposition.code": "https://snomed.info/sct|371827001", "admission.dischargeDisposition.text": "Patient discharged alive (finding)", @@ -133,6 +161,8 @@ def test_load_data_one_to_one_single_row(): "tests/dummy_data/encounter_dummy_mapping.csv", "Encounter", one_to_one=True, + date_format="%Y-%m-%d", + timezone="Brazil/East", ) assert df is not None @@ -296,12 +326,12 @@ def test_load_data_one_to_one_single_row(): "id": ["10", "11", "12", "13"], "actualPeriod.start": [ "2020-05-01", - "2021-04-01T18:00:00-0300", - "2021-05-10T17:30:00-0300", - "2022-06-15T21:00:00-0300", + "2021-04-01T18:00:00-03:00", + "2021-05-10T17:30:00-03:00", + "2022-06-15T21:00:00-03:00", ], "actualPeriod.end": [ - "2020-05-01", # don't want this + "2020-05-01", "2021-04-10", "2021-05-15", "2022-06-20", @@ -339,6 +369,8 @@ def test_load_data_one_to_one_multi_row(): "tests/dummy_data/encounter_dummy_mapping.csv", "Encounter", one_to_one=True, + date_format="%Y-%m-%d", + timezone="Brazil/East", ) assert df is not None @@ -431,6 +463,8 @@ def test_load_data_one_to_many_multi_row(): "tests/dummy_data/observation_dummy_mapping.csv", "Observation", one_to_one=False, + date_format="%Y-%m-%d", + timezone="Brazil/East", ) assert df is not None @@ -463,10 +497,10 @@ def test_convert_data_to_flat_local_mapping(): convert_data_to_flat( "tests/dummy_data/combined_dummy_data.csv", - mapping_files_types=(mappings, resource_types), folder_name=output_folder, date_format="%Y-%m-%d", timezone="Brazil/East", + mapping_files_types=(mappings, resource_types), ) encounter_df = pd.read_parquet("tests/ingestion_output/encounter.parquet") From aa4d8e5da4ec927fc45c65a5cfcd724fea6d860c Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 30 May 2024 14:27:36 +0100 Subject: [PATCH 3/4] hint fixes --- fhirflat/ingest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 724ed20..72844c4 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -76,7 +76,7 @@ def find_field_value( return return_val -def format_dates(date_str: str, date_format: str, timezone=str) -> dict: +def format_dates(date_str: str, date_format: str, timezone: str) -> dict: """ Converts dates into ISO8601 format with timezone information. """ @@ -212,7 +212,7 @@ def create_dictionary( one_to_one=False, subject_id="subjid", date_format="%Y-%m-%d", - timezone=None, + timezone="UTC", ) -> pd.DataFrame | None: """ Given a data file and a single mapping file for one FHIR resource type, From 5bb4d54cb05336591bd4907c13959585025e1b68 Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 30 May 2024 14:41:53 +0100 Subject: [PATCH 4/4] Add check for fields like recievedTime, allow only existing date/datetime fields to be transformed Accounts for extension fields with period/date in title which aren't datetime fields --- fhirflat/resources/base.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index b432a66..32069dd 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -2,6 +2,7 @@ from __future__ import annotations from fhir.resources.domainresource import DomainResource as _DomainResource +import datetime import pandas as pd import numpy as np import orjson @@ -151,15 +152,22 @@ def ingest_to_flat( # create FHIR expected date format for date_cols in [ - x for x in flat_df.columns if "date" in x.lower() or "period" in x.lower() + x + for x in flat_df.columns + if ("date" in x.lower() or "period" in x.lower() or "time" in x.lower()) ]: # replace nan with None flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None) # convert datetime objects to ISO strings # (stops unwanted parquet conversions) + # but skips over extensions that have floats/strings rather than dates flat_df[date_cols] = flat_df[date_cols].apply( - lambda x: (x.isoformat() if x is not None else None) + lambda x: ( + x.isoformat() + if isinstance(x, datetime.datetime) or isinstance(x, datetime.date) + else x + ) ) for coding_column in [