Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format dates to ISO8601 in ingestion #37

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 87 additions & 17 deletions fhirflat/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import pandas as pd
import numpy as np
from datetime import datetime
import dateutil.parser
from zoneinfo import ZoneInfo
import warnings
import os
from math import isnan
Expand All @@ -24,44 +27,84 @@
"""


def find_field_value(row, response, mapp, raw_data=None):
def find_field_value(
row, response, fhir_attr, mapp, date_format, timezone, raw_data=None
):
"""
Returns the data for a given field, given the mapping.
For one to many resources the raw data is provided to allow for searching for other
fields than in the melted data.
"""
if mapp == "<FIELD>":
return response
return_val = response
elif "+" in mapp:
mapp = mapp.split("+")
results = [find_field_value(row, response, m, raw_data) for m in mapp]
results = [
find_field_value(row, response, "", m, date_format, timezone, raw_data)
for m in mapp
]
results = [str(x) for x in results if not (isinstance(x, float) and isnan(x))]
return " ".join(results) if "/" not in results[0] else "".join(results)
return_val = " ".join(results) if "/" not in results[0] else "".join(results)
elif "if not" in mapp:
mapp = mapp.replace(" ", "").split("ifnot")
results = [find_field_value(row, response, m, raw_data) for m in mapp]
results = [
find_field_value(row, response, "", m, date_format, timezone, raw_data)
for m in mapp
]
x, y = results
if isinstance(y, float):
return x if isnan(y) else None
return_val = x if isnan(y) else None
else:
return x if not y else None
return_val = x if not y else None
elif "<" in mapp:
col = mapp.lstrip("<").rstrip(">")
try:
return row[col]
return_val = row[col]
except KeyError:
if raw_data is not None:
try:
return raw_data.loc[row["index"], col]
return_val = raw_data.loc[row["index"], col]
except KeyError:
raise KeyError(f"Column {col} not found in data")
else:
raise KeyError(f"Column {col} not found in the filtered data")
else:
return mapp
return_val = mapp

if "date" in fhir_attr.lower() or "period" in fhir_attr.lower():
return format_dates(return_val, date_format, timezone)
return return_val


def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict:
def format_dates(date_str: str, date_format: str, timezone: str) -> dict:
"""
Converts dates into ISO8601 format with timezone information.
"""

if date_str is None:
return date_str

new_tz = ZoneInfo(timezone)

try:
date_time = datetime.strptime(date_str, date_format)
date_time_aware = date_time.replace(tzinfo=new_tz)
if "%H" not in date_format:
date_time_aware = date_time_aware.date()
except ValueError:
# Unconverted data remains in the string (i.e. time is present)
date, time = date_str.split(" ")
date = datetime.strptime(date, date_format)
time = dateutil.parser.parse(time).time()
date_time = datetime.combine(date, time)
date_time_aware = date_time.replace(tzinfo=new_tz)

return date_time_aware.isoformat()


def create_dict_wide(
row: pd.Series, map_df: pd.DataFrame, date_format: str, timezone: str
) -> dict:
"""
Takes a wide-format dataframe and iterates through the columns of the row,
applying the mapping to each column and produces a fhirflat-like dictionary to
Expand All @@ -83,7 +126,9 @@ def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict:
k: (
v
if "<" not in str(v)
else find_field_value(row, response, v)
else find_field_value(
row, response, k, v, date_format, timezone
)
)
for k, v in mapping.items()
}
Expand Down Expand Up @@ -119,7 +164,11 @@ def create_dict_wide(row: pd.Series, map_df: pd.DataFrame) -> dict:


def create_dict_long(
row: pd.Series, full_df: pd.DataFrame, map_df: pd.DataFrame
row: pd.Series,
full_df: pd.DataFrame,
map_df: pd.DataFrame,
date_format: str,
timezone: str,
) -> dict | None:
"""
Takes a long-format dataframe and a mapping file, and produces a fhirflat-like
Expand All @@ -139,7 +188,9 @@ def create_dict_long(
k: (
v
if "<" not in str(v)
else find_field_value(row, response, v, raw_data=full_df)
else find_field_value(
row, response, k, v, date_format, timezone, raw_data=full_df
)
)
for k, v in mapping.items()
}
Expand All @@ -160,6 +211,8 @@ def create_dictionary(
resource: str,
one_to_one=False,
subject_id="subjid",
date_format="%Y-%m-%d",
timezone="UTC",
) -> pd.DataFrame | None:
"""
Given a data file and a single mapping file for one FHIR resource type,
Expand All @@ -179,6 +232,10 @@ def create_dictionary(
Whether the resource should be mapped as one-to-one or one-to-many.
subject_id: str
The name of the column containing the subject ID in the data file.
date_format: str
The format of the dates in the data file. E.g. "%Y-%m-%d"
timezone: str
The timezone of the dates in the data file. E.g. "Europe/London"
"""

data: pd.DataFrame = pd.read_csv(data_file, header=0)
Expand Down Expand Up @@ -238,19 +295,21 @@ def condense(x):
# Generate the flat_like dictionary
if one_to_one:
filtered_data["flat_dict"] = filtered_data.apply(
create_dict_wide, args=[map_df], axis=1
create_dict_wide, args=[map_df, date_format, timezone], axis=1
)
return filtered_data
else:
melted_data["flat_dict"] = melted_data.apply(
create_dict_long, args=[data, map_df], axis=1
create_dict_long, args=[data, map_df, date_format, timezone], axis=1
)
return melted_data["flat_dict"].to_frame()


def convert_data_to_flat(
data: str,
folder_name: str,
date_format: str,
timezone: str,
mapping_files_types: tuple[dict, dict] | None = None,
sheet_id: str | None = None,
subject_id="subjid",
Expand All @@ -266,6 +325,10 @@ def convert_data_to_flat(
The path to the raw clinical data file.
folder_name: str
The name of the folder to store the FHIRflat files.
date_format: str
The format of the dates in the data file. E.g. "%Y-%m-%d"
timezone: str
The timezone of the dates in the data file. E.g. "Europe/London"
mapping_files_types: tuple[dict, dict] | None
A tuple containing two dictionaries, one with the mapping files for each
resource type and one with the mapping type (either one-to-one or one-to-many)
Expand Down Expand Up @@ -315,6 +378,8 @@ def convert_data_to_flat(
resource.__name__,
one_to_one=True,
subject_id=subject_id,
date_format=date_format,
timezone=timezone,
)
if df is None:
continue
Expand All @@ -325,6 +390,8 @@ def convert_data_to_flat(
resource.__name__,
one_to_one=False,
subject_id=subject_id,
date_format=date_format,
timezone=timezone,
)
if df is None:
continue
Expand All @@ -334,5 +401,8 @@ def convert_data_to_flat(
raise ValueError(f"Unknown mapping type {t}")

resource.ingest_to_flat(
df, os.path.join(folder_name, resource.__name__.lower())
df,
os.path.join(folder_name, resource.__name__.lower()),
date_format,
timezone,
)
28 changes: 22 additions & 6 deletions fhirflat/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from __future__ import annotations
from fhir.resources.domainresource import DomainResource as _DomainResource

import datetime
import pandas as pd
import numpy as np
import orjson

from fhirflat.fhir2flat import fhir2flat
Expand Down Expand Up @@ -127,7 +129,9 @@ def fhir_format(row: pd.Series) -> pd.Series:
return condensed_mapped_data

@classmethod
def ingest_to_flat(cls, data: pd.DataFrame, filename: str):
def ingest_to_flat(
cls, data: pd.DataFrame, filename: str, date_format: str, timezone: str
):
"""
Takes a pandas dataframe and populates the resource with the data.
Creates a FHIRflat parquet file for the resources.
Expand All @@ -146,13 +150,25 @@ def ingest_to_flat(cls, data: pd.DataFrame, filename: str):
# flattens resources back out
flat_df = data["fhir"].apply(lambda x: x.to_flat())

# Stops parquet conversion from stripping the time from mixed date/datetime
# columns
# create FHIR expected date format
for date_cols in [
x for x in flat_df.columns if "date" in x.lower() or "period" in x.lower()
x
for x in flat_df.columns
if ("date" in x.lower() or "period" in x.lower() or "time" in x.lower())
]:
flat_df[date_cols] = flat_df[date_cols].astype(str)
flat_df[date_cols] = flat_df[date_cols].replace("nan", None)
# replace nan with None
flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None)

# convert datetime objects to ISO strings
# (stops unwanted parquet conversions)
# but skips over extensions that have floats/strings rather than dates
flat_df[date_cols] = flat_df[date_cols].apply(
lambda x: (
x.isoformat()
if isinstance(x, datetime.datetime) or isinstance(x, datetime.date)
else x
)
)

for coding_column in [
x
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"pyarrow==15.0.0",
"pydantic==2.6.1",
"pydantic_core==2.16.2",
"tzdata",
"python-dateutil"
]

[project.optional-dependencies]
Expand Down
Loading
Loading