Skip to content

Commit

Permalink
Merge pull request #19 from globaldothealth/extensions
Browse files Browse the repository at this point in the history
Adds extensions to FHIR Resources
  • Loading branch information
pipliggins authored Apr 29, 2024
2 parents e759599 + c5c9ba5 commit fd237d7
Show file tree
Hide file tree
Showing 25 changed files with 1,508 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
python-version: "3.11"
- name: Install dependencies
run: |
run: python3 -m pip install -r requirements.txt
python3 -m pip install -r requirements.txt
- name: Test with pytest
run: |
python3 -m pytest --cov
Expand Down
140 changes: 127 additions & 13 deletions fhirflat/fhir2flat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,37 @@
from .resources.base import FHIRFlatBase


def flatten_column(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
def flatten_column(
data: pd.DataFrame | pd.Series, column_name: str
) -> pd.DataFrame | pd.Series:
"""
Takes a column of a dataframe containing dictionaries and flattens it into multiple
columns.
Takes a column of a dataframe or series containing dictionaries and flattens it
into multiple columns.
"""

i = df.columns.get_loc(column_name)

expanded_col = pd.json_normalize(df[column_name])
expanded_col: pd.DataFrame = pd.json_normalize(data[column_name])
expanded_col.columns = [
column_name + "." + str(col) for col in expanded_col.columns
]
df = df.drop(column_name, axis=1)

new_df = pd.concat([df.iloc[:, :i], expanded_col, df.iloc[:, i:]], axis=1)

return new_df
if isinstance(data, pd.DataFrame):
i = data.columns.get_loc(column_name)
data = data.drop(column_name, axis=1)
new_data = pd.concat([data.iloc[:, :i], expanded_col, data.iloc[:, i:]], axis=1)
return new_data
elif isinstance(data, pd.Series):
data = data.drop(column_name)
new_data = pd.concat([data, expanded_col.iloc[0]], axis=0)
return new_data
else:
raise ValueError("Input data must be a pandas DataFrame or Series.")


def explode_and_flatten(df, list_cols):
"""
Recursively explodes and flattens a dataframe.
Columns containing a 'coding' list are left intact for later processing.
Columns containing a 'coding' or 'extension' list are left intact for later
processing.
df: flattened fhir resource
lists: list of columns containing lists in the dataframe
Expand All @@ -49,20 +57,55 @@ def explode_and_flatten(df, list_cols):
df = flatten_column(df, lc)
else:
raise NotImplementedError("Can't handle lists with more than one concept yet")
# for lc in list_cols:
# df = flatten_column(df, lc)

# check if any columns remain containing lists that aren't 'coding' chunks
# check if any cols remain containing lists that aren't 'coding' chunks or extension
list_columns = df.map(lambda x: isinstance(x, list))
new_list_cols = [
col
for col in df.columns
if (list_columns[col].any() and not col.endswith("coding"))
if (
list_columns[col].any()
and not col.endswith("coding")
and not col.endswith("extension")
)
]
if new_list_cols:
df = explode_and_flatten(df, new_list_cols)

return df


def implode(df: pd.DataFrame) -> pd.DataFrame:
"""
Implodes a dataframe back to one row per resource instance.
"""

def single_or_list(x):
if x.apply(lambda x: isinstance(x, list)).any():
x_unique = x.drop_duplicates()
if len(x_unique) == 1:
return x_unique
elif len(x_unique.dropna()) == 1:
return x_unique.dropna()
else:
return list(x)
else:
# Check if the column contains nan values
if x.isnull().any():
# If the column contains a single non-nan value, return it
non_nan_values = x.dropna()
if non_nan_values.nunique() == 1:
return non_nan_values
else:
return list(non_nan_values)
else:
return x.iat[0] if x.nunique() == 1 else list(x)

return df.groupby(df.index).agg(single_or_list)


def expandCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
"""
Turns a column containing a list of dictionaries with coding information into
Expand Down Expand Up @@ -147,6 +190,73 @@ def condenseSystem(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
return df


def flattenExtensions(df: pd.DataFrame, extension: str) -> pd.DataFrame:
"""
Flattens extensions in a FHIR resource.
[
{"url": "relativeDay", "valueInteger": 2},
{"url":"approximateDate", "valueDate": "2012-09"}
]
becomes
[2], [ "2012-09" ]
"""

def expand_and_redefine(df, extension):

def redefine(row: pd.Series, extension: str) -> pd.Series:
"""Expands out simple extensions and leaves complex ones as is.
To be dealt with later in the pipeline."""

ext = row[extension]

name = extension.removesuffix(".extension") + "." + ext["url"]

if "extension" in ext.keys():
row[extension] = ext["extension"]
row.rename({extension: name}, inplace=True)
row = expand_and_redefine(row, name)

if isinstance(row, pd.DataFrame):
row = implode(row)
assert len(row) == 1
return row.iloc[0]

try:
# The fixed index will probably cause issues
value = ext[[key for key in ext if key.startswith("value")][0]]
except IndexError:
raise IndexError("Extension does not contain a single value.")

row[name] = value

if type(row[name]) is dict or issubclass(type(row[name]), dict):
row = flatten_column(row, name)

return row

if isinstance(df, pd.DataFrame):
df_ext = df.explode(extension)

elif isinstance(df, pd.Series):
# convert to dataframe, transpose then explode
df_ext = df.to_frame().T.explode(extension)

df_ext = df_ext.apply(lambda x: redefine(x, extension), axis=1)
df_ext.drop(
columns=extension, inplace=True, errors="ignore"
) # will stay silent if column doesn't exist

return df_ext

df_ext = expand_and_redefine(df, extension)

df_ext_single = implode(df_ext)

return df_ext_single


def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame:
"""
Converts a FHIR JSON file into a FHIRflat file.
Expand All @@ -164,6 +274,10 @@ def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame
if list_cols:
df = explode_and_flatten(df, list_cols)

# condense all extensions
for ext in df.columns[df.columns.str.endswith("extension")]:
df = flattenExtensions(df, ext)

# expand all instances of the "coding" list
for coding in df.columns[df.columns.str.endswith("coding")]:
df = expandCoding(df, coding)
Expand Down
150 changes: 125 additions & 25 deletions fhirflat/flat2fhir.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
# Converts FHIRflat files into FHIR resources
from .util import group_keys, get_fhirtype
from .util import group_keys, get_fhirtype, get_local_extension_type
from fhir.resources.quantity import Quantity
from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.period import Period
import fhir.resources as fr
from fhir.resources.fhirprimitiveextension import FHIRPrimitiveExtension
from fhir.resources.datatype import DataType as _DataType
from fhir.resources.domainresource import DomainResource as _DomainResource
from fhir.resources.backbonetype import BackboneType as _BackboneType

from pydantic.v1.error_wrappers import ValidationError


def create_codeable_concept(
old_dict: dict[str, list[str] | str], name: str
) -> dict[str, list[str]]:
"""Re-creates a codeableConcept structure from the FHIRflat representation."""
codes = old_dict[name + ".code"]
codes = old_dict.get(name + ".code")

if codes is None:
return {
"text": (
old_dict[name + ".text"][0]
if isinstance(old_dict[name + ".text"], list)
else old_dict[name + ".text"]
)
}

if len(codes) == 1:
system, code = codes[0].split("|")
display = (
Expand Down Expand Up @@ -54,22 +69,112 @@ def createQuantity(df, group):
return quant


def expand_concepts(
data: dict, data_class: type[fr.domainresource.DomainResource]
) -> dict:
def createExtension(exts: dict):
"""
Searches through the schema of the extensions to find the correct datatype
Covers the scenario where there is a list of extensions,e.g.
[{'type': 'approximateDate'}, {'type': 'relativeDay'}, {'type': 'Extension'}]
and finds the appropriate class for the data provided.
Args:
exts: dict
e.g. {"relativeDay": 3, "approximateDate": "month 6"}
"""

extensions = []

extension_classes = {e: get_local_extension_type(e) for e in exts.keys()}

for e, v in exts.items():
properties = extension_classes[e].schema()["properties"]
data_options = [key for key in properties.keys() if key.startswith("value")]
if len(data_options) == 1:
extensions.append({"url": e, data_options[0]: v})
else:
for opt in data_options:
try:
extension_classes[e](**{opt: v})
extensions.append({"url": e, opt: v})
break
except ValidationError:
continue

return extensions


def set_datatypes(k, v_dict, klass) -> dict:
if klass == Quantity:
return createQuantity(v_dict, k)
elif klass == CodeableConcept:
return create_codeable_concept(v_dict, k)
elif klass == Period:
return {"start": v_dict.get(k + ".start"), "end": v_dict.get(k + ".end")}
elif issubclass(klass, FHIRPrimitiveExtension):
return {
"extension": createExtension(
{s.split(".", 1)[1]: v_dict[s] for s in v_dict}
),
}
elif issubclass(klass, _DataType) and not issubclass(klass, _BackboneType):
# not quite
prop = klass.schema()["properties"]
value_type = [key for key in prop.keys() if key.startswith("value")]
if not value_type:
# nested extension
return {
"url": k,
"extension": createExtension(
{s.split(".", 1)[1]: v_dict[s] for s in v_dict}
),
}

data_type = prop[value_type[0]]["type"]
data_class = get_fhirtype(data_type)
return {"url": k, f"{value_type[0]}": set_datatypes(k, v_dict, data_class)}

return {s.split(".", 1)[1]: v_dict[s] for s in v_dict}


def expand_concepts(data: dict, data_class: type[_DomainResource]) -> dict:
"""
Combines columns containing flattened FHIR concepts back into
JSON-like structures.
"""
groups = group_keys(data.keys())
group_classes = {
k: (
data_class.schema()["properties"][k].get("items").get("type")
if data_class.schema()["properties"][k].get("items") is not None
else data_class.schema()["properties"][k].get("type")
)
for k in groups.keys()
}
group_classes = {}

for k in groups.keys():

if isinstance(data_class, list):
title_matches = [
k.lower() == c.schema()["title"].lower() for c in data_class
]
result = [x for x, y in zip(data_class, title_matches) if y]
if len(result) == 1:
group_classes[k] = k
continue
else:
raise ValueError(
f"Couldn't find a matching class for {k} in {data_class}"
)

else:
k_schema = data_class.schema()["properties"].get(k)

group_classes[k] = (
k_schema.get("items").get("type")
if k_schema.get("items") is not None
else k_schema.get("type")
)

if group_classes[k] is None:
assert k_schema.get("type") == "array"

group_classes[k] = [
opt.get("type") for opt in k_schema["items"]["anyOf"]
]

group_classes = {k: get_fhirtype(v) for k, v in group_classes.items()}

expanded = {}
Expand All @@ -89,18 +194,13 @@ def expand_concepts(
# coming back out of nested recursion
expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict}
if data_class.schema()["properties"][k].get("type") == "array":
expanded[k] = [expanded[k]]

elif group_classes[k] == Quantity:
expanded[k] = createQuantity(v_dict, k)
elif group_classes[k] == CodeableConcept:
v = create_codeable_concept(v_dict, k)
expanded[k] = v
elif group_classes[k] == Period:
v = {"start": data.get(k + ".start"), "end": data.get(k + ".end")}
expanded[k] = v
if k == "extension":
expanded[k] = [v for v in expanded[k].values()]
else:
expanded[k] = [expanded[k]]

else:
expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict}
expanded[k] = set_datatypes(k, v_dict, group_classes[k])

for k in keys_to_replace:
data.pop(k)
Expand Down
1 change: 1 addition & 0 deletions fhirflat/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]:
df["json_data"] = df.apply(
lambda row: row.to_json(date_format="iso", date_unit="s"), axis=1
)
# Creates a columns of FHIR resource instances
df["fhir"] = df["json_data"].apply(lambda x: cls.cleanup(x))

if len(df) == 1:
Expand Down
Loading

0 comments on commit fd237d7

Please sign in to comment.