diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4d78db6..a550ce7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,7 +27,7 @@ jobs: python-version: "3.11" - name: Install dependencies run: | - run: python3 -m pip install -r requirements.txt + python3 -m pip install -r requirements.txt - name: Test with pytest run: | python3 -m pytest --cov diff --git a/fhirflat/fhir2flat.py b/fhirflat/fhir2flat.py index 07384b4..973d527 100644 --- a/fhirflat/fhir2flat.py +++ b/fhirflat/fhir2flat.py @@ -11,29 +11,37 @@ from .resources.base import FHIRFlatBase -def flatten_column(df: pd.DataFrame, column_name: str) -> pd.DataFrame: +def flatten_column( + data: pd.DataFrame | pd.Series, column_name: str +) -> pd.DataFrame | pd.Series: """ - Takes a column of a dataframe containing dictionaries and flattens it into multiple - columns. + Takes a column of a dataframe or series containing dictionaries and flattens it + into multiple columns. """ - i = df.columns.get_loc(column_name) - - expanded_col = pd.json_normalize(df[column_name]) + expanded_col: pd.DataFrame = pd.json_normalize(data[column_name]) expanded_col.columns = [ column_name + "." + str(col) for col in expanded_col.columns ] - df = df.drop(column_name, axis=1) - - new_df = pd.concat([df.iloc[:, :i], expanded_col, df.iloc[:, i:]], axis=1) - return new_df + if isinstance(data, pd.DataFrame): + i = data.columns.get_loc(column_name) + data = data.drop(column_name, axis=1) + new_data = pd.concat([data.iloc[:, :i], expanded_col, data.iloc[:, i:]], axis=1) + return new_data + elif isinstance(data, pd.Series): + data = data.drop(column_name) + new_data = pd.concat([data, expanded_col.iloc[0]], axis=0) + return new_data + else: + raise ValueError("Input data must be a pandas DataFrame or Series.") def explode_and_flatten(df, list_cols): """ Recursively explodes and flattens a dataframe. - Columns containing a 'coding' list are left intact for later processing. + Columns containing a 'coding' or 'extension' list are left intact for later + processing. df: flattened fhir resource lists: list of columns containing lists in the dataframe @@ -49,13 +57,19 @@ def explode_and_flatten(df, list_cols): df = flatten_column(df, lc) else: raise NotImplementedError("Can't handle lists with more than one concept yet") + # for lc in list_cols: + # df = flatten_column(df, lc) - # check if any columns remain containing lists that aren't 'coding' chunks + # check if any cols remain containing lists that aren't 'coding' chunks or extension list_columns = df.map(lambda x: isinstance(x, list)) new_list_cols = [ col for col in df.columns - if (list_columns[col].any() and not col.endswith("coding")) + if ( + list_columns[col].any() + and not col.endswith("coding") + and not col.endswith("extension") + ) ] if new_list_cols: df = explode_and_flatten(df, new_list_cols) @@ -63,6 +77,35 @@ def explode_and_flatten(df, list_cols): return df +def implode(df: pd.DataFrame) -> pd.DataFrame: + """ + Implodes a dataframe back to one row per resource instance. + """ + + def single_or_list(x): + if x.apply(lambda x: isinstance(x, list)).any(): + x_unique = x.drop_duplicates() + if len(x_unique) == 1: + return x_unique + elif len(x_unique.dropna()) == 1: + return x_unique.dropna() + else: + return list(x) + else: + # Check if the column contains nan values + if x.isnull().any(): + # If the column contains a single non-nan value, return it + non_nan_values = x.dropna() + if non_nan_values.nunique() == 1: + return non_nan_values + else: + return list(non_nan_values) + else: + return x.iat[0] if x.nunique() == 1 else list(x) + + return df.groupby(df.index).agg(single_or_list) + + def expandCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame: """ Turns a column containing a list of dictionaries with coding information into @@ -147,6 +190,73 @@ def condenseSystem(df: pd.DataFrame, col_name: str) -> pd.DataFrame: return df +def flattenExtensions(df: pd.DataFrame, extension: str) -> pd.DataFrame: + """ + Flattens extensions in a FHIR resource. + + [ + {"url": "relativeDay", "valueInteger": 2}, + {"url":"approximateDate", "valueDate": "2012-09"} + ] + becomes + [2], [ "2012-09" ] + + """ + + def expand_and_redefine(df, extension): + + def redefine(row: pd.Series, extension: str) -> pd.Series: + """Expands out simple extensions and leaves complex ones as is. + To be dealt with later in the pipeline.""" + + ext = row[extension] + + name = extension.removesuffix(".extension") + "." + ext["url"] + + if "extension" in ext.keys(): + row[extension] = ext["extension"] + row.rename({extension: name}, inplace=True) + row = expand_and_redefine(row, name) + + if isinstance(row, pd.DataFrame): + row = implode(row) + assert len(row) == 1 + return row.iloc[0] + + try: + # The fixed index will probably cause issues + value = ext[[key for key in ext if key.startswith("value")][0]] + except IndexError: + raise IndexError("Extension does not contain a single value.") + + row[name] = value + + if type(row[name]) is dict or issubclass(type(row[name]), dict): + row = flatten_column(row, name) + + return row + + if isinstance(df, pd.DataFrame): + df_ext = df.explode(extension) + + elif isinstance(df, pd.Series): + # convert to dataframe, transpose then explode + df_ext = df.to_frame().T.explode(extension) + + df_ext = df_ext.apply(lambda x: redefine(x, extension), axis=1) + df_ext.drop( + columns=extension, inplace=True, errors="ignore" + ) # will stay silent if column doesn't exist + + return df_ext + + df_ext = expand_and_redefine(df, extension) + + df_ext_single = implode(df_ext) + + return df_ext_single + + def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame: """ Converts a FHIR JSON file into a FHIRflat file. @@ -164,6 +274,10 @@ def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame if list_cols: df = explode_and_flatten(df, list_cols) + # condense all extensions + for ext in df.columns[df.columns.str.endswith("extension")]: + df = flattenExtensions(df, ext) + # expand all instances of the "coding" list for coding in df.columns[df.columns.str.endswith("coding")]: df = expandCoding(df, coding) diff --git a/fhirflat/flat2fhir.py b/fhirflat/flat2fhir.py index afe26c7..afddfea 100644 --- a/fhirflat/flat2fhir.py +++ b/fhirflat/flat2fhir.py @@ -1,16 +1,31 @@ # Converts FHIRflat files into FHIR resources -from .util import group_keys, get_fhirtype +from .util import group_keys, get_fhirtype, get_local_extension_type from fhir.resources.quantity import Quantity from fhir.resources.codeableconcept import CodeableConcept from fhir.resources.period import Period -import fhir.resources as fr +from fhir.resources.fhirprimitiveextension import FHIRPrimitiveExtension +from fhir.resources.datatype import DataType as _DataType +from fhir.resources.domainresource import DomainResource as _DomainResource +from fhir.resources.backbonetype import BackboneType as _BackboneType + +from pydantic.v1.error_wrappers import ValidationError def create_codeable_concept( old_dict: dict[str, list[str] | str], name: str ) -> dict[str, list[str]]: """Re-creates a codeableConcept structure from the FHIRflat representation.""" - codes = old_dict[name + ".code"] + codes = old_dict.get(name + ".code") + + if codes is None: + return { + "text": ( + old_dict[name + ".text"][0] + if isinstance(old_dict[name + ".text"], list) + else old_dict[name + ".text"] + ) + } + if len(codes) == 1: system, code = codes[0].split("|") display = ( @@ -54,22 +69,112 @@ def createQuantity(df, group): return quant -def expand_concepts( - data: dict, data_class: type[fr.domainresource.DomainResource] -) -> dict: +def createExtension(exts: dict): + """ + Searches through the schema of the extensions to find the correct datatype + + Covers the scenario where there is a list of extensions,e.g. + [{'type': 'approximateDate'}, {'type': 'relativeDay'}, {'type': 'Extension'}] + and finds the appropriate class for the data provided. + + Args: + exts: dict + e.g. {"relativeDay": 3, "approximateDate": "month 6"} + """ + + extensions = [] + + extension_classes = {e: get_local_extension_type(e) for e in exts.keys()} + + for e, v in exts.items(): + properties = extension_classes[e].schema()["properties"] + data_options = [key for key in properties.keys() if key.startswith("value")] + if len(data_options) == 1: + extensions.append({"url": e, data_options[0]: v}) + else: + for opt in data_options: + try: + extension_classes[e](**{opt: v}) + extensions.append({"url": e, opt: v}) + break + except ValidationError: + continue + + return extensions + + +def set_datatypes(k, v_dict, klass) -> dict: + if klass == Quantity: + return createQuantity(v_dict, k) + elif klass == CodeableConcept: + return create_codeable_concept(v_dict, k) + elif klass == Period: + return {"start": v_dict.get(k + ".start"), "end": v_dict.get(k + ".end")} + elif issubclass(klass, FHIRPrimitiveExtension): + return { + "extension": createExtension( + {s.split(".", 1)[1]: v_dict[s] for s in v_dict} + ), + } + elif issubclass(klass, _DataType) and not issubclass(klass, _BackboneType): + # not quite + prop = klass.schema()["properties"] + value_type = [key for key in prop.keys() if key.startswith("value")] + if not value_type: + # nested extension + return { + "url": k, + "extension": createExtension( + {s.split(".", 1)[1]: v_dict[s] for s in v_dict} + ), + } + + data_type = prop[value_type[0]]["type"] + data_class = get_fhirtype(data_type) + return {"url": k, f"{value_type[0]}": set_datatypes(k, v_dict, data_class)} + + return {s.split(".", 1)[1]: v_dict[s] for s in v_dict} + + +def expand_concepts(data: dict, data_class: type[_DomainResource]) -> dict: """ Combines columns containing flattened FHIR concepts back into JSON-like structures. """ groups = group_keys(data.keys()) - group_classes = { - k: ( - data_class.schema()["properties"][k].get("items").get("type") - if data_class.schema()["properties"][k].get("items") is not None - else data_class.schema()["properties"][k].get("type") - ) - for k in groups.keys() - } + group_classes = {} + + for k in groups.keys(): + + if isinstance(data_class, list): + title_matches = [ + k.lower() == c.schema()["title"].lower() for c in data_class + ] + result = [x for x, y in zip(data_class, title_matches) if y] + if len(result) == 1: + group_classes[k] = k + continue + else: + raise ValueError( + f"Couldn't find a matching class for {k} in {data_class}" + ) + + else: + k_schema = data_class.schema()["properties"].get(k) + + group_classes[k] = ( + k_schema.get("items").get("type") + if k_schema.get("items") is not None + else k_schema.get("type") + ) + + if group_classes[k] is None: + assert k_schema.get("type") == "array" + + group_classes[k] = [ + opt.get("type") for opt in k_schema["items"]["anyOf"] + ] + group_classes = {k: get_fhirtype(v) for k, v in group_classes.items()} expanded = {} @@ -89,18 +194,13 @@ def expand_concepts( # coming back out of nested recursion expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict} if data_class.schema()["properties"][k].get("type") == "array": - expanded[k] = [expanded[k]] - - elif group_classes[k] == Quantity: - expanded[k] = createQuantity(v_dict, k) - elif group_classes[k] == CodeableConcept: - v = create_codeable_concept(v_dict, k) - expanded[k] = v - elif group_classes[k] == Period: - v = {"start": data.get(k + ".start"), "end": data.get(k + ".end")} - expanded[k] = v + if k == "extension": + expanded[k] = [v for v in expanded[k].values()] + else: + expanded[k] = [expanded[k]] + else: - expanded[k] = {s.split(".", 1)[1]: v_dict[s] for s in v_dict} + expanded[k] = set_datatypes(k, v_dict, group_classes[k]) for k in keys_to_replace: data.pop(k) diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index dc0a8c5..d97ec29 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -66,6 +66,7 @@ def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: df["json_data"] = df.apply( lambda row: row.to_json(date_format="iso", date_unit="s"), axis=1 ) + # Creates a columns of FHIR resource instances df["fhir"] = df["json_data"].apply(lambda x: cls.cleanup(x)) if len(df) == 1: diff --git a/fhirflat/resources/encounter.py b/fhirflat/resources/encounter.py index 5f1d092..26592b3 100644 --- a/fhirflat/resources/encounter.py +++ b/fhirflat/resources/encounter.py @@ -4,13 +4,36 @@ import orjson from ..flat2fhir import expand_concepts -from typing import TypeAlias, ClassVar + +from .extensions import relativePeriod, timingPhase +from .extension_types import relativePeriodType, timingPhaseType +from pydantic.v1 import Field, validator +from typing import TypeAlias, ClassVar, Union +from fhir.resources import fhirtypes JsonString: TypeAlias = str class Encounter(_Encounter, FHIRFlatBase): + extension: list[ + Union[relativePeriodType, timingPhaseType, fhirtypes.ExtensionType] + ] = Field( + None, + alias="extension", + title="List of `Extension` items (represented as `dict` in JSON)", + description=( + """ + Contains the G.H 'eventTiming' and 'relativePeriod' extensions, and allows + extensions from other implementations to be included. + """ + ), + # if property is element of this resource. + element_property=True, + # this trys to match the type of the object to each of the union types + union_mode="smart", + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions + ( "id", @@ -26,6 +49,16 @@ class Encounter(_Encounter, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = FHIRFlatBase.flat_defaults + ["status"] + @validator("extension") + def validate_extension_contents(cls, extensions): + rel_phase_count = sum(isinstance(item, relativePeriod) for item in extensions) + tim_phase_count = sum(isinstance(item, timingPhase) for item in extensions) + + if rel_phase_count > 1 or tim_phase_count > 1: + raise ValueError("relativePeriod and timingPhase can only appear once.") + + return extensions + @classmethod def cleanup(cls, data: JsonString) -> Encounter: """ diff --git a/fhirflat/resources/extension_types.py b/fhirflat/resources/extension_types.py new file mode 100644 index 0000000..60b6d33 --- /dev/null +++ b/fhirflat/resources/extension_types.py @@ -0,0 +1,47 @@ +from __future__ import annotations +from fhir.resources.fhirtypes import AbstractType as _AbstractType + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pydantic.v1.types import CallableGenerator + + +class AbstractType(_AbstractType): + @classmethod + def __get_validators__(cls) -> "CallableGenerator": + from . import extension_validators as validators + + yield getattr(validators, cls.__resource_type__.lower() + "_validator") + + +class timingPhaseType(AbstractType): + __resource_type__ = "timingPhase" + + +class relativeDayType(AbstractType): + __resource_type__ = "relativeDay" + + +class relativeStartType(AbstractType): + __resource_type__ = "relativeStart" + + +class relativeEndType(AbstractType): + __resource_type__ = "relativeEnd" + + +class relativePeriodType(AbstractType): + __resource_type__ = "relativePeriod" + + +class approximateDateType(AbstractType): + __resource_type__ = "approximateDate" + + +class durationType(AbstractType): + __resource_type__ = "Duration" + + +class dateTimeExtensionType(AbstractType): + __resource_type__ = "dateTimeExtension" diff --git a/fhirflat/resources/extension_validators.py b/fhirflat/resources/extension_validators.py new file mode 100644 index 0000000..897ab23 --- /dev/null +++ b/fhirflat/resources/extension_validators.py @@ -0,0 +1,221 @@ +""" +This file is modified from https://github.com/nazrulworld/fhir.resources +to support custom extension types. Original license below: + +BSD License + +Copyright (c) 2019, Md Nazrul Islam +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from this + software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. + +""" + +import importlib +import typing +from pathlib import Path +from typing import Union, Type, TYPE_CHECKING + +from pydantic.v1.class_validators import make_generic_validator +from pydantic.v1.error_wrappers import ErrorWrapper, ValidationError +from pydantic.v1.types import StrBytes +from pydantic.v1.utils import ROOT_KEY + +from fhir.resources.core.fhirabstractmodel import FHIRAbstractModel + +if typing.TYPE_CHECKING: + from pydantic.v1 import BaseModel + + +class Validators: + def __init__(self): + self.MODEL_CLASSES = { + "timingPhase": (None, ".extensions"), + "relativeDay": (None, ".extensions"), + "relativeStart": (None, ".extensions"), + "relativeEnd": (None, ".extensions"), + "relativePeriod": (None, ".extensions"), + "approximateDate": (None, ".extensions"), + "Duration": (None, ".extensions"), + "dateTimeExtension": (None, ".extensions"), + } + + def get_fhir_model_class(self, model_name: str) -> Type[FHIRAbstractModel]: + """ + Returns the extension class by finding the 'datetimeextension' file and + importing the type class. + Will probably need changing. + """ + klass, module_name = self.MODEL_CLASSES[model_name] + if klass is not None: + return klass + module = importlib.import_module(module_name, package=__package__) + klass = getattr(module, model_name) + self.MODEL_CLASSES[model_name] = (klass, module_name) + return klass + + def run_validator_for_fhir_type(self, model_type_cls, v, values, config, field): + """ """ + cls = self.get_fhir_model_class(model_type_cls.__resource_type__) + for validator in model_type_cls.__get_validators__(): + func = make_generic_validator(validator) + v = func(cls, v, values, config, field) + return v + + def fhir_model_validator( + self, model_name: str, v: Union[StrBytes, dict, Path, FHIRAbstractModel] + ): + """ """ + model_class: Type[BaseModel] | Type[FHIRAbstractModel] = ( + self.get_fhir_model_class(model_name) + ) + + if isinstance(v, (str, bytes)): + try: + v = model_class.parse_raw(v) + except ValidationError as exc: + if TYPE_CHECKING: + model_class = typing.cast(Type[BaseModel], model_class) + errors = exc.errors() + if ( + len(errors) == 1 + and errors[0]["type"] == "value_error.jsondecode" + and errors[0]["loc"][0] == ROOT_KEY + ): + raise ValidationError( + [ + ErrorWrapper( + ValueError( + "Invalid json str value has been provided for " + f"class {model_class}" + ), + loc=ROOT_KEY, + ) + ], + model_class, + ) + + raise + + elif isinstance(v, Path): + _p = v + try: + v = model_class.parse_file(_p) + except (ValueError, TypeError) as exc: + if exc.__class__.__name__ in ("JSONDecodeError", "UnicodeDecodeError"): + raise ValidationError( + [ + ErrorWrapper( + ValueError( + f"Provided file '{_p}' for class " + "'{model_class.__name__}' " + "as value, contains invalid json data. errors from " + f"decoder-> ''{str(exc)}''" + ), + loc=ROOT_KEY, + ) + ], + model_class, + ) + + raise + + except FileNotFoundError: + raise ValidationError( + [ + ErrorWrapper( + ValueError( + f"Provided file '{_p}' for class {model_class} " + "as value, doesn't exists." + ), + loc=ROOT_KEY, + ) + ], + model_class, + ) + + elif isinstance(v, dict): + v = model_class.parse_obj(v) + + if not isinstance(v, model_class): + raise ValidationError( + [ + ErrorWrapper( + ValueError( + "Value is expected from the instance of " + f"{model_class}, but got type {type(v)}" + ), + loc=ROOT_KEY, + ) + ], + model_class, + ) + if model_name != v.resource_type: + raise ValidationError( + [ + ErrorWrapper( + ValueError( + f"Expected resource_type is '{model_name}', " + f"but value has resource_type '{v.resource_type}'" + ), + loc=ROOT_KEY, + ) + ], + model_class, + ) + return v + + +def timingphase_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("timingPhase", v) + + +def relativeday_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("relativeDay", v) + + +def relativestart_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("relativeStart", v) + + +def relativeend_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("relativeEnd", v) + + +def relativeperiod_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("relativePeriod", v) + + +def approximatedate_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("approximateDate", v) + + +def duration_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("Duration", v) + + +def datetimeextension_validator(v: Union[StrBytes, dict, Path, FHIRAbstractModel]): + return Validators().fhir_model_validator("dateTimeExtension", v) diff --git a/fhirflat/resources/extensions.py b/fhirflat/resources/extensions.py new file mode 100644 index 0000000..a9c754a --- /dev/null +++ b/fhirflat/resources/extensions.py @@ -0,0 +1,380 @@ +from __future__ import annotations + +from fhir.resources.datatype import DataType as _DataType +from fhir.resources.fhirprimitiveextension import ( + FHIRPrimitiveExtension as _FHIRPrimitiveExtension, +) +from fhir.resources import fhirtypes +from pydantic.v1 import Field, validator, root_validator +from typing import Union, Any + +from . import extension_types as et + +# --------- extensions ------------------------------ + + +class timingPhase(_DataType): + """ + An ISARIC extension collecting data on the phase of admission an event occurred. + This is typically one of: + - Pre-admission + - Admission (i.e. during the hospital stay) + - Follow-up + with an appropriate SNOMED (or similar) code. + """ + + resource_type = Field("timingPhase", const=True) + + url = Field("timingPhase", const=True, alias="url") + + valueCodeableConcept: fhirtypes.CodeableConceptType = Field( + None, + alias="valueCodeableConcept", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + element_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + "valueCodeableConcept", + ] + + +class relativeDay(_DataType): + """ + An ISARIC extension recording the day an event occurred relative to the admission + date. For a resources such as Encounter or Procedure, use relativePeriod to record + both the relative start and end dates instead. + """ + + resource_type = Field("relativeDay", const=True) + + url = Field("relativeDay", const=True, alias="url") + + valueInteger: fhirtypes.Integer = Field( + None, + alias="valueInteger", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + element_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + "valueInteger", + ] + + +class relativeStart(_DataType): + """ + An ISARIC extension for use inside the complex `relativePeriod` extension. + """ + + resource_type = Field("relativeStart", const=True) + + url = Field("relativeStart", const=True, alias="url") + + valueInteger: fhirtypes.Integer = Field( + None, + alias="valueInteger", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + element_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + "valueInteger", + ] + + +class relativeEnd(_DataType): + """ + An ISARIC extension for use inside the complex `relativePeriod` extension. + """ + + resource_type = Field("relativeEnd", const=True) + + url = Field("relativeEnd", const=True, alias="url") + + valueInteger: fhirtypes.Integer = Field( + None, + alias="valueInteger", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + element_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + "valueInteger", + ] + + +class relativePeriod(_DataType): + """ + An ISARIC extension recording the start and end dates an event occurred relative to + the admission date. + + E.g. a an Encounter that starts on the 1st of Jan, the same day as admission, and + ends on the 5th, would have a relativePeriod extension where relativeStart is 1 and + relativeEnd is 5. + """ + + resource_type = Field("relativePeriod", const=True) + + url = Field("relativePeriod", const=True, alias="url") + + extension: list[Union[et.relativeStartType, et.relativeEndType]] = Field( + None, + alias="extension", + title="List of `Extension` items (represented as `dict` in JSON)", + description="Additional content defined by implementations", + # if property is element of this resource. + element_property=True, + # this trys to match the type of the object to each of the union types + union_mode="smart", + ) + + @validator("extension") + def validate_extension_contents(cls, extensions): + start_count = sum(isinstance(item, relativeStart) for item in extensions) + end_count = sum(isinstance(item, relativeEnd) for item in extensions) + + if start_count > 1 or end_count > 1: + raise ValueError("relativeStart and relativeEnd can only appear once.") + + return extensions + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + ] + + +class approximateDate(_DataType): + """ + An ISARIC extension for recording the approximate date (if the true date is unknown) + or timeframe of an event. + + E.g. a Follow-up encounter that occured 3 months after admission would have an + approximateDate extension with a valueString of "3 months". + """ + + resource_type = Field("approximateDate", const=True) + + url = Field("approximateDate", const=True, alias="url") + + valueDate: fhirtypes.Date = Field( + None, + alias="valueDate", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + # Choice of Data Types. i.e value[x] + one_of_many="value", + one_of_many_required=True, + ) + + valueString: fhirtypes.String = Field( + None, + alias="valueString", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + # Choice of Data Types. i.e value[x] + one_of_many="value", + one_of_many_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return ["id", "extension", "url", "valueDate", "valueString"] + + @root_validator(pre=True, allow_reuse=True) + def validate_one_of_many_1136(cls, values: dict[str, Any]) -> dict[str, Any]: + """https://www.hl7.org/fhir/formats.html#choice + A few elements have a choice of more than one data type for their content. + All such elements have a name that takes the form nnn[x]. + The "nnn" part of the name is constant, and the "[x]" is replaced with + the title-cased name of the type that is actually used. + The table view shows each of these names explicitly. + + Elements that have a choice of data type cannot repeat - they must have a + maximum cardinality of 1. When constructing an instance of an element with a + choice of types, the authoring system must create a single element with a + data type chosen from among the list of permitted data types. + """ + one_of_many_fields = { + "value": [ + "valueDate", + "valueString", + ] + } + for prefix, fields in one_of_many_fields.items(): + assert cls.__fields__[fields[0]].field_info.extra["one_of_many"] == prefix + required = ( + cls.__fields__[fields[0]].field_info.extra["one_of_many_required"] + is True + ) + found = False + for field in fields: + if field in values and values[field] is not None: + if found is True: + raise ValueError( + "Any of one field value is expected from " + f"this list {fields}, but got multiple!" + ) + else: + found = True + if required is True and found is False: + raise ValueError(f"Expect any of field value from this list {fields}.") + + return values + + +class Duration(_DataType): + """ + An ISARIC extension for recording the length of an event (e.g. 5 days) where + duration is not an option in the base FHIR specification. + """ + + resource_type = Field("Duration", const=True) + + url = Field("Duration", const=True, alias="url") + + valueQuantity: fhirtypes.QuantityType = Field( + None, + alias="valueQuantity", + title="Value of extension", + description=( + "Value of extension - must be one of a constrained set of the data " + "types (see [Extensibility](extensibility.html) for a list)." + ), + # if property is element of this resource. + element_property=True, + element_required=True, + ) + + @classmethod + def elements_sequence(cls): + """returning all elements names from + ``Extension`` according specification, + with preserving original sequence order. + """ + return [ + "id", + "extension", + "url", + "valueQuantity", + ] + + +# ------------------- extension types ------------------------------ + + +class dateTimeExtension(_FHIRPrimitiveExtension): + """ + A G.Health specific extension to the FHIR dateTime type + Allows dates to be specified as either approximate, and/or number of days relative + to the current date. + """ + + resource_type = Field("dateTimeExtension", const=True) + + extension: list[ + Union[et.approximateDateType, et.relativeDayType, fhirtypes.ExtensionType] + ] = Field( + None, + alias="extension", + title="List of `Extension` items (represented as `dict` in JSON)", + description="Additional content defined by implementations", + # if property is element of this resource. + element_property=True, + # this trys to match the type of the object to each of the union types + union_mode="smart", + ) + + @validator("extension") + def validate_extension_contents(cls, extensions): + approx_date_count = sum( + isinstance(item, approximateDate) for item in extensions + ) + rel_day_count = sum(isinstance(item, relativeDay) for item in extensions) + + if approx_date_count > 1 or rel_day_count > 1: + raise ValueError("approximateDate and relativeDay can only appear once.") + + return extensions diff --git a/fhirflat/resources/immunization.py b/fhirflat/resources/immunization.py index 70f6034..eb5a9d0 100644 --- a/fhirflat/resources/immunization.py +++ b/fhirflat/resources/immunization.py @@ -1,16 +1,41 @@ from __future__ import annotations from fhir.resources.immunization import Immunization as _Immunization from .base import FHIRFlatBase +from .extensions import timingPhase +from .extension_types import timingPhaseType, dateTimeExtensionType +from pydantic.v1 import Field, validator import orjson from ..flat2fhir import expand_concepts -from typing import TypeAlias, ClassVar +from typing import TypeAlias, ClassVar, Union +from fhir.resources import fhirtypes JsonString: TypeAlias = str class Immunization(_Immunization, FHIRFlatBase): + extension: list[Union[timingPhaseType, fhirtypes.ExtensionType]] = Field( + None, + alias="extension", + title="List of `Extension` items (represented as `dict` in JSON)", + description=( + """ + Contains the G.H 'eventPhase' extension, and allows extensions from other + implementations to be included.""" + ), + # if property is element of this resource. + element_property=True, + # this trys to match the type of the object to each of the union types + union_mode="smart", + ) + + occurrenceDateTime__ext: dateTimeExtensionType = Field( + None, + alias="_occurrenceDateTime", + title="Extension field for ``occurrenceDateTime``.", + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions + ( "id", @@ -30,6 +55,15 @@ class Immunization(_Immunization, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = FHIRFlatBase.flat_defaults + ["status"] + @validator("extension") + def validate_extension_contents(cls, extensions): + phase_count = sum(isinstance(item, timingPhase) for item in extensions) + + if phase_count > 1: + raise ValueError("timingPhase can only appear once.") + + return extensions + @classmethod def cleanup(cls, data: JsonString) -> Immunization: """ diff --git a/fhirflat/resources/observation.py b/fhirflat/resources/observation.py index 71543f2..6d8eb7d 100644 --- a/fhirflat/resources/observation.py +++ b/fhirflat/resources/observation.py @@ -1,15 +1,71 @@ from __future__ import annotations from fhir.resources.observation import Observation as _Observation +from fhir.resources.observation import ObservationComponent as _ObservationComponent + from .base import FHIRFlatBase +from .extension_types import dateTimeExtensionType, timingPhaseType +from .extensions import timingPhase +from pydantic.v1 import Field, validator import orjson +from fhir.resources import fhirtypes from ..flat2fhir import expand_concepts -from typing import TypeAlias, ClassVar +from typing import TypeAlias, ClassVar, Union JsonString: TypeAlias = str +class ObservationComponent(_ObservationComponent): + """ + Adds the dateTime extension into the Observation.component class + """ + + valueDateTime__ext: dateTimeExtensionType = Field( + None, + alias="_effectiveDateTime", + title="Extension field for ``effectiveDateTime``.", + ) + + class Observation(_Observation, FHIRFlatBase): + + extension: list[Union[timingPhaseType, fhirtypes.ExtensionType]] = Field( + None, + alias="extension", + title="List of `Extension` items (represented as `dict` in JSON)", + description=( + """ + Contains the G.H 'eventPhase' extension, and allows extensions from other + implementations to be included.""" + ), + # if property is element of this resource. + element_property=True, + # this trys to match the type of the object to each of the union types + union_mode="smart", + ) + + effectiveDateTime__ext: dateTimeExtensionType = Field( + None, + alias="_effectiveDateTime", + title="Extension field for ``effectiveDateTime``.", + ) + + # Update component to include the dateTime extension + component: list[ObservationComponent] = Field( + None, + alias="component", + title="Component results", + description=( + "Some observations have multiple component observations. These " + "component observations are expressed as separate code value pairs that" + " share the same attributes. Examples include systolic and diastolic " + "component observations for blood pressure measurement and multiple " + "component observations for genetics observations." + ), + # if property is element of this resource. + element_property=True, + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions + ( "id", @@ -26,6 +82,15 @@ class Observation(_Observation, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = FHIRFlatBase.flat_defaults + ["status"] + @validator("extension") + def validate_extension_contents(cls, extensions): + phase_count = sum(isinstance(item, timingPhase) for item in extensions) + + if phase_count > 1: + raise ValueError("timingPhase can only appear once.") + + return extensions + @classmethod def cleanup(cls, data: JsonString) -> Observation: """ diff --git a/fhirflat/resources/procedure.py b/fhirflat/resources/procedure.py index d845ac5..1da6534 100644 --- a/fhirflat/resources/procedure.py +++ b/fhirflat/resources/procedure.py @@ -1,15 +1,52 @@ from __future__ import annotations from fhir.resources.procedure import Procedure as _Procedure from .base import FHIRFlatBase + +from .extension_types import ( + dateTimeExtensionType, + relativePeriodType, + durationType, + timingPhaseType, +) + +from .extensions import Duration, timingPhase, relativePeriod + +from pydantic.v1 import Field, validator import orjson from ..flat2fhir import expand_concepts -from typing import TypeAlias, ClassVar +from typing import TypeAlias, ClassVar, Union +from fhir.resources import fhirtypes JsonString: TypeAlias = str class Procedure(_Procedure, FHIRFlatBase): + + extension: list[ + Union[ + durationType, timingPhaseType, relativePeriodType, fhirtypes.ExtensionType + ] + ] = Field( + None, + alias="extension", + title="Additional content defined by implementations", + description=( + """ + Contains the G.H 'timingPhase', 'relativePeriod' and 'Duration' extensions, + and allows extensions from other implementations to be included.""" + ), + # if property is element of this resource. + element_property=True, + union_mode="smart", + ) + + occurrenceDateTime__ext: dateTimeExtensionType = Field( + None, + alias="_occurrenceDateTime", + title="Extension field for ``occurrenceDateTime``.", + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions + ( "id", @@ -30,6 +67,19 @@ class Procedure(_Procedure, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = FHIRFlatBase.flat_defaults + ["status"] + @validator("extension") + def validate_extension_contents(cls, extensions): + duration_count = sum(isinstance(item, Duration) for item in extensions) + tim_phase_count = sum(isinstance(item, timingPhase) for item in extensions) + rel_phase_count = sum(isinstance(item, relativePeriod) for item in extensions) + + if duration_count > 1 or tim_phase_count > 1 or rel_phase_count > 1: + raise ValueError( + "Duration, timingPhase and relativePeriod can only appear once." + ) + + return extensions + @classmethod def cleanup(cls, data: JsonString) -> Procedure: """ diff --git a/fhirflat/util.py b/fhirflat/util.py index 65a4569..c56c62c 100644 --- a/fhirflat/util.py +++ b/fhirflat/util.py @@ -1,7 +1,10 @@ # Utility functions for FHIRflat from itertools import groupby -import fhir.resources as fr +import fhir.resources import re +import importlib + +from .resources import extensions def group_keys(data_keys: list[str]) -> list[dict[str, list[str]]]: @@ -21,18 +24,41 @@ def group_keys(data_keys: list[str]) -> list[dict[str, list[str]]]: return groups -def get_fhirtype(t: str): +def get_fhirtype(t: str | list[str]): """ Finds the relevent class from fhir.resources for a given string. """ - try: - return getattr(getattr(fr, t.lower()), t) - except AttributeError: - file_words = re.findall(r"[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))", t) - file = "".join(file_words[:-1]).lower() + if isinstance(t, list): + return [get_fhirtype(x) for x in t] + if not hasattr(extensions, t): try: - return getattr(getattr(fr, file), t) + return getattr(getattr(fhir.resources, t.lower()), t) except AttributeError: - raise AttributeError(f"Could not find {t} in fhir.resources") + file_words = re.findall(r"[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))", t) + file = "".join(file_words[:-1]).lower() + + try: + return getattr(getattr(fhir.resources, file), t) + except AttributeError: + try: + module = importlib.import_module(f"fhir.resources.{t.lower()}") + return getattr(module, t) + except ImportError or ModuleNotFoundError: + # Handle the case where the module does not exist. + raise AttributeError(f"Could not find {t} in fhir.resources") + + else: + return get_local_extension_type(t) + + +def get_local_extension_type(t: str): + """ + Finds the relevent class from fhir.resources for a given string. + """ + + try: + return getattr(extensions, t) + except AttributeError: + raise AttributeError(f"Could not find {t} in fhirflat extensions") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..03f586d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = . \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a0fb494..1755bfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ pyarrow==15.0.0 pydantic==2.6.1 pydantic_core==2.16.2 pytest==8.0.0 +pytest-cov +pytest-unordered diff --git a/tests/data/encounter_flat.parquet b/tests/data/encounter_flat.parquet index adfd3f3..c6a949a 100644 Binary files a/tests/data/encounter_flat.parquet and b/tests/data/encounter_flat.parquet differ diff --git a/tests/data/immunization_flat.parquet b/tests/data/immunization_flat.parquet index 8bbd8e8..258ec45 100644 Binary files a/tests/data/immunization_flat.parquet and b/tests/data/immunization_flat.parquet differ diff --git a/tests/data/observation_flat.parquet b/tests/data/observation_flat.parquet index c968cde..0948956 100644 Binary files a/tests/data/observation_flat.parquet and b/tests/data/observation_flat.parquet differ diff --git a/tests/data/procedure_flat.parquet b/tests/data/procedure_flat.parquet index ad317d4..5c12c21 100644 Binary files a/tests/data/procedure_flat.parquet and b/tests/data/procedure_flat.parquet differ diff --git a/tests/test_encounter_resource.py b/tests/test_encounter_resource.py index 5a0f3d9..0c786df 100644 --- a/tests/test_encounter_resource.py +++ b/tests/test_encounter_resource.py @@ -9,6 +9,27 @@ "id": "f203", "identifier": [{"use": "temp", "value": "Encounter_Roel_20130311"}], "status": "completed", + "extension": [ + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + { + "url": "relativePeriod", + "extension": [ + {"url": "relativeStart", "valueInteger": 2}, + {"url": "relativeEnd", "valueInteger": 5}, + ], + }, + ], "class": [ { "coding": [ @@ -155,6 +176,10 @@ ENCOUNTER_FLAT = { "resourceType": "Encounter", + "extension.timingPhase.code": "http://snomed.info/sct|278307001", + "extension.timingPhase.text": "on admission", + "extension.relativePeriod.relativeStart": 2, + "extension.relativePeriod.relativeEnd": 5, "class.code": "http://terminology.hl7.org/CodeSystem/v3-ActCode|IMP", "class.text": "inpatient encounter", "type.code": "http://snomed.info/sct|183807002", @@ -182,6 +207,27 @@ ENCOUNTER_DICT_OUT = { "resourceType": "Encounter", "status": "completed", + "extension": [ + { + "url": "relativePeriod", + "extension": [ + {"url": "relativeEnd", "valueInteger": 5}, + {"url": "relativeStart", "valueInteger": 2}, + ], + }, + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + ], "class": [ { "coding": [ @@ -219,6 +265,17 @@ "partOf": {"reference": "Encounter/f203"}, "serviceProvider": {"reference": "Organization/2"}, "actualPeriod": {"start": "2013-03-11T00:00:00", "end": "2013-03-20T00:00:00"}, + "reason": [ + { + "value": [ + { + "concept": { + "text": "The patient seems to suffer from bilateral pneumonia and renal insufficiency, most likely due to chemotherapy." # noqa: E501 + } + } + ] + } + ], "admission": { "origin": {"reference": "Location/2"}, "admitSource": { @@ -246,6 +303,7 @@ def test_encounter_to_flat(): pd.DataFrame(ENCOUNTER_FLAT, index=[0]), # Date types are off otherwise, pyarrow uses pytz and pandas uses dateutil check_dtype=False, + check_like=True, # ignore column order ) os.remove("test_encounter.parquet") diff --git a/tests/test_extensions.py b/tests/test_extensions.py new file mode 100644 index 0000000..7704811 --- /dev/null +++ b/tests/test_extensions.py @@ -0,0 +1,170 @@ +import pytest +import datetime +from fhir.resources.extension import Extension +from fhir.resources.datatype import DataType +from fhir.resources.fhirprimitiveextension import FHIRPrimitiveExtension +from fhir.resources.codeableconcept import CodeableConcept as _CodeableConcept +from fhir.resources.quantity import Quantity as _Quantity +from fhirflat.resources.extensions import ( + timingPhase, + relativeDay, + relativeStart, + relativeEnd, + relativePeriod, + approximateDate, + Duration, + dateTimeExtension, +) +from pydantic.v1.error_wrappers import ValidationError + +timing_phase_data = { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": "307168008", + "display": "During admission (qualifier value)", + } + ] + }, +} + + +def test_timingPhase(): + timing_phase = timingPhase(**timing_phase_data) + assert isinstance(timing_phase, DataType) + assert timing_phase.resource_type == "timingPhase" + assert timing_phase.url == "timingPhase" + assert type(timing_phase.valueCodeableConcept) is _CodeableConcept + + +rel_day = {"url": "relativeDay", "valueInteger": 3} + + +def test_relativeDay(): + relative_day = relativeDay(**rel_day) + assert isinstance(relative_day, DataType) + assert relative_day.resource_type == "relativeDay" + assert relative_day.url == "relativeDay" + assert type(relative_day.valueInteger) is int + + +start_date = {"url": "relativeStart", "valueInteger": 3} + + +def test_relativeStart(): + relative_start = relativeStart(**start_date) + assert isinstance(relative_start, DataType) + assert relative_start.resource_type == "relativeStart" + assert relative_start.url == "relativeStart" + assert type(relative_start.valueInteger) is int + + +end_date = {"url": "relativeEnd", "valueInteger": 5} + + +def test_relativeEnd(): + relative_end = relativeEnd(**end_date) + assert isinstance(relative_end, DataType) + assert relative_end.resource_type == "relativeEnd" + assert relative_end.url == "relativeEnd" + assert type(relative_end.valueInteger) is int + + +relative_phase_data = {"url": "relativePeriod", "extension": [start_date, end_date]} + + +def test_relativePeriod(): + relative_phase = relativePeriod(**relative_phase_data) + assert isinstance(relative_phase, DataType) + assert relative_phase.resource_type == "relativePeriod" + assert relative_phase.url == "relativePeriod" + assert isinstance(relative_phase.extension, list) + assert all( + isinstance(ext, (relativeStart, relativeEnd)) + for ext in relative_phase.extension + ) + + +@pytest.mark.parametrize( + "data, expected_type_date, expected_type_str", + [ + ( + {"url": "approximateDate", "valueDate": "2021-01-01"}, + datetime.date, + type(None), + ), + ({"url": "approximateDate", "valueString": "month 3"}, type(None), str), + ], +) +def test_approximateDate(data, expected_type_date, expected_type_str): + approximate_date = approximateDate(**data) + assert isinstance(approximate_date, DataType) + assert approximate_date.resource_type == "approximateDate" + assert approximate_date.url == "approximateDate" + assert type(approximate_date.valueDate) is expected_type_date + assert type(approximate_date.valueString) is expected_type_str + + +dur = {"url": "Duration", "valueQuantity": {"value": 3, "unit": "days"}} + + +def test_Duration(): + duration = Duration(**dur) + assert isinstance(duration, DataType) + assert duration.resource_type == "Duration" + assert duration.url == "Duration" + assert type(duration.valueQuantity) is _Quantity + + +dte = {"extension": [{"url": "approximateDate", "valueDate": "2021-01-01"}, rel_day]} + + +def test_dateTimeExtension(): + date_time_extension = dateTimeExtension(**dte) + assert isinstance(date_time_extension, FHIRPrimitiveExtension) + assert date_time_extension.resource_type == "dateTimeExtension" + assert isinstance(date_time_extension.extension, list) + assert all( + isinstance(ext, (approximateDate, relativeDay, Extension)) + for ext in date_time_extension.extension + ) + + +@pytest.mark.parametrize( + "ext_class, data", + [ + (timingPhase, {"url": "timing"}), + (relativeDay, {"url": "day"}), + (relativeStart, {"url": "startdate"}), + (relativeEnd, {"url": "enddate"}), + (relativePeriod, {"url": "phase"}), + (approximateDate, {"url": "approx"}), + (Duration, {"url": "dur"}), + ], +) +def test_extension_name_error(ext_class, data): + with pytest.raises(ValueError): + ext_class(**data) + + +@pytest.mark.parametrize( + "ext_class, data", + [ + (timingPhase, {"valueQuantity": {}}), + (relativeDay, {"valueFloat": 2.5}), + (relativeStart, {"valueInteger": "startdate"}), + (relativeEnd, {"valueFloat": 2.5}), + (relativePeriod, {"valueFloat": 2.5}), + # not date format + (approximateDate, {"valueDate": "month 3"}), + # can't have both + (approximateDate, {"valueDate": "2021-09", "valueString": "month 3"}), + (Duration, {"valuePeriod": "middle"}), + (dateTimeExtension, {"extension": [{"valueDate": "month 3"}]}), + ], +) +def test_extension_validation_error(ext_class, data): + with pytest.raises(ValidationError): + ext_class(**data)(**data) diff --git a/tests/test_flat2fhir_units.py b/tests/test_flat2fhir_units.py index 039175e..ccd2fb0 100644 --- a/tests/test_flat2fhir_units.py +++ b/tests/test_flat2fhir_units.py @@ -60,6 +60,15 @@ ] }, ), + ( + ( + {"concept.text": ["Test"]}, + "concept", + ), + { + "text": "Test", + }, + ), ], ) def test_create_codeable_concept(data_groups, expected): diff --git a/tests/test_immunization_resource.py b/tests/test_immunization_resource.py index 5e71f10..5c4beaa 100644 --- a/tests/test_immunization_resource.py +++ b/tests/test_immunization_resource.py @@ -14,6 +14,20 @@ } ], "status": "completed", + "extension": [ + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + ], "vaccineCode": { "coding": [{"system": "http://hl7.org/fhir/sid/cvx", "code": "175"}], "text": "Rabies - IM Diploid cell culture", @@ -24,6 +38,12 @@ "patient": {"reference": "Patient/example"}, "encounter": {"reference": "Encounter/example"}, "occurrenceDateTime": "2021-09-12", + "_occurrenceDateTime": { + "extension": [ + {"url": "approximateDate", "valueString": "month 3"}, + {"url": "relativeDay", "valueInteger": 3}, + ] + }, "primarySource": True, "location": {"reference": "Location/1"}, "site": { @@ -83,7 +103,11 @@ IMMUNIZATION_FLAT = { "resourceType": "Immunization", + "extension.timingPhase.code": "http://snomed.info/sct|278307001", + "extension.timingPhase.text": "on admission", "occurrenceDateTime": datetime.date(2021, 9, 12), + "_occurrenceDateTime.relativeDay": 3.0, + "_occurrenceDateTime.approximateDate": "month 3", "reason.reference": "Observation/example", "isSubpotent": False, "reaction.date": datetime.date(2021, 9, 12), @@ -106,6 +130,20 @@ IMMUNIZATION_DICT_OUT = { "resourceType": "Immunization", "status": "completed", + "extension": [ + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + ], "vaccineCode": { "coding": [ { @@ -119,6 +157,12 @@ "patient": {"reference": "Patient/example"}, "encounter": {"reference": "Encounter/example"}, "occurrenceDateTime": "2021-09-12T00:00:00", + "_occurrenceDateTime": { + "extension": [ + {"url": "approximateDate", "valueString": "month 3"}, + {"url": "relativeDay", "valueInteger": 3}, + ] + }, "location": {"reference": "Location/1"}, "site": { "coding": [ @@ -161,6 +205,7 @@ def test_immunization_to_flat(): pd.DataFrame(IMMUNIZATION_FLAT, index=[0]), # Date types are off otherwise, pyarrow uses pytz and pandas uses dateutil check_dtype=False, + check_like=True, # ignore column order ) os.remove("test_immunization.parquet") diff --git a/tests/test_observation_resource.py b/tests/test_observation_resource.py index ac81444..931598c 100644 --- a/tests/test_observation_resource.py +++ b/tests/test_observation_resource.py @@ -82,6 +82,20 @@ OBSERVATION_DICT_INPUT = { "resourceType": "Observation", "status": "final", + "extension": [ + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + ], "category": [ { "coding": [ @@ -105,6 +119,12 @@ }, "subject": {"reference": "Patient/example"}, "effectiveDateTime": "2012-09-17", + "_effectiveDateTime": { + "extension": [ + {"url": "relativeDay", "valueInteger": 2}, + {"url": "approximateDate", "valueDate": "2012-09"}, + ] + }, "performer": [{"reference": "Practitioner/example"}], "interpretation": [ { @@ -134,6 +154,10 @@ "category.code": "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", # noqa: E501 "category.text": "Vital Signs", "effectiveDateTime": datetime.date(2012, 9, 17), + "_effectiveDateTime.relativeDay": 2.0, + "_effectiveDateTime.approximateDate": "2012-09", + "extension.timingPhase.code": "http://snomed.info/sct|278307001", + "extension.timingPhase.text": "on admission", "performer": "Practitioner/example", "interpretation.code": "http://terminology.hl7.org/CodeSystem/v3-ObservationInterpretation|L", # noqa: E501 "interpretation.text": "Below low normal", @@ -147,6 +171,20 @@ OBSERVATION_DICT_OUT = { "resourceType": "Observation", "status": "final", + "extension": [ + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": 278307001, + "display": "on admission", + } + ] + }, + }, + ], "category": [ { "coding": [ @@ -169,6 +207,12 @@ }, "subject": {"reference": "Patient/example"}, "effectiveDateTime": "2012-09-17T00:00:00", + "_effectiveDateTime": { + "extension": [ + {"url": "approximateDate", "valueDate": "2012-09"}, + {"url": "relativeDay", "valueInteger": 2}, + ] + }, "performer": [{"reference": "Practitioner/example"}], "interpretation": [ { @@ -201,6 +245,7 @@ def test_observation_to_flat(): assert_frame_equal( pd.read_parquet("test_observation.parquet"), pd.DataFrame(OBSERVATION_FLAT, index=[0]), + check_like=True, # ignore column order ) os.remove("test_observation.parquet") diff --git a/tests/test_procedure_resource.py b/tests/test_procedure_resource.py index f99437b..976832b 100644 --- a/tests/test_procedure_resource.py +++ b/tests/test_procedure_resource.py @@ -9,6 +9,22 @@ "id": "f201", "instantiatesCanonical": ["http://example.org/fhir/PlanDefinition/KDN5"], "status": "completed", + "extension": [ + {"url": "Duration", "valueQuantity": {"value": 1, "unit": "d"}}, + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [{"system": "timing.com", "code": "1234"}] + }, + }, + { + "url": "relativePeriod", + "extension": [ + {"url": "relativeStart", "valueInteger": 2}, + {"url": "relativeEnd", "valueInteger": 5}, + ], + }, + ], "code": { "coding": [ { @@ -27,6 +43,12 @@ "start": "2013-01-28T13:31:00+01:00", "end": "2013-01-28T14:27:00+01:00", }, + # "_occurrenceDateTime": { + # "extension": [ + # {"url": "approximateDate", "valueString": "month 3"}, + # {"url": "relativeDay", "valueInteger": 3}, + # ] + # }, "performer": [ { "function": { @@ -57,6 +79,12 @@ PROCEDURE_FLAT = { "resourceType": "Procedure", + "extension.Duration.value": 1, + "extension.Duration.unit": "d", + "extension.timingPhase.code": "timing.com|1234", + "extension.timingPhase.text": None, + "extension.relativePeriod.relativeStart": 2, + "extension.relativePeriod.relativeEnd": 5, "bodySite.code": "http://snomed.info/sct|272676008", "bodySite.text": "Sphenoid bone", "code.code": "http://snomed.info/sct|367336001", @@ -74,6 +102,22 @@ PROCEDURE_DICT_OUT = { "resourceType": "Procedure", "status": "completed", + "extension": [ + {"url": "Duration", "valueQuantity": {"value": 1.0, "unit": "d"}}, + { + "url": "relativePeriod", + "extension": [ + {"url": "relativeEnd", "valueInteger": 5}, + {"url": "relativeStart", "valueInteger": 2}, + ], + }, + { + "url": "timingPhase", + "valueCodeableConcept": { + "coding": [{"system": "timing.com", "code": "1234"}] + }, + }, + ], "code": { "coding": [ { @@ -113,11 +157,12 @@ def test_procedure_to_flat(): pd.DataFrame(PROCEDURE_FLAT, index=[0]), # Date types are off otherwise, pyarrow uses pytz and pandas uses dateutil check_dtype=False, + check_like=True, # ignore column order ) os.remove("test_procedure.parquet") -def test_observation_from_flat(): +def test_procedure_from_flat(): chemo = Procedure(**PROCEDURE_DICT_OUT) flat_chemo = Procedure.from_flat("tests/data/procedure_flat.parquet") diff --git a/tests/test_utils.py b/tests/test_utils.py index 73f4edb..d600602 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -5,6 +5,8 @@ from fhir.resources.codeableconcept import CodeableConcept from fhir.resources.medicationstatement import MedicationStatementAdherence +from fhirflat.resources.extensions import dateTimeExtension + def test_group_keys(): data = [ @@ -39,8 +41,14 @@ def test_group_keys(): ("Quantity", Quantity), ("CodeableConcept", CodeableConcept), ("MedicationStatementAdherence", MedicationStatementAdherence), + ("dateTimeExtension", dateTimeExtension), ], ) def test_get_fhirtype(input, expected): result = get_fhirtype(input) assert result == expected + + +def test_get_fhirtype_raises(): + with pytest.raises(AttributeError): + get_fhirtype("NotARealType")