diff --git a/energyml-utils/example/main.py b/energyml-utils/example/main.py index 90d2cf0..abe941c 100644 --- a/energyml-utils/example/main.py +++ b/energyml-utils/example/main.py @@ -1,6 +1,7 @@ # Copyright (c) 2023-2024 Geosiris. # SPDX-License-Identifier: Apache-2.0 import json +import re from dataclasses import fields from energyml.eml.v2_3.commonv2 import * @@ -464,6 +465,57 @@ def test_obj_attribs(): print(get_obj_uri(tr, "coucou")) +def test_copy_values(): + data_in = { + "a": {"b": "v_0", "c": "v_1"}, + "uuid": "215f8219-cabd-4e24-9e4f-e371cabc9622", + "objectVersion": "Resqml 2.0", + "non_existing": 42, + } + data_out = { + "a": None, + "Uuid": "8291afd6-ae01-49f5-bc96-267e7b27450d", + "object_version": "Resqml 2.0", + } + copy_attributes( + obj_in=data_in, + obj_out=data_out, + only_existing_attributes=True, + ignore_case=True, + ) + + +def class_field(): + print(get_class_fields(tr)["citation"]) + print(get_class_pkg_version(tr)) + print(create_energyml_object("resqml22.TriangulatedSetRepresentation")) + ext_20 = create_energyml_object( + "application/x-eml+xml;version=2.0;type=obj_EpcExternalPartReference" + ) + print(ext_20) + print(gen_energyml_object_path(ext_20)) + print(create_external_part_reference("2.0", "my_h5")) + + print( + parse_content_or_qualified_type( + "application/x-eml+xml;version=2.0;type=obj_EpcExternalPartReference" + ) + ) + print( + get_domain_version_from_content_or_qualified_type( + "application/x-eml+xml;version=2.0;type=obj_EpcExternalPartReference" + ) + ) + print( + get_domain_version_from_content_or_qualified_type( + "resqml20.obj_EpcExternalPartReference" + ) + ) + + # print(create_external_part_reference("2.2", "myfile.h5")) + # print(create_external_part_reference("2.0", "myfile.h5")) + + if __name__ == "__main__": tests_0() tests_content_type() @@ -479,3 +531,5 @@ def test_obj_attribs(): test_wellbore_marker_frame_representation() test_obj_attribs() + test_copy_values() + class_field() diff --git a/energyml-utils/src/energyml/utils/constants.py b/energyml-utils/src/energyml/utils/constants.py index 233c274..8a1f471 100644 --- a/energyml-utils/src/energyml/utils/constants.py +++ b/energyml-utils/src/energyml/utils/constants.py @@ -1,7 +1,7 @@ import datetime import re import uuid as uuid_mod -from typing import List +from typing import List, Optional ENERGYML_NAMESPACES = { "eml": "http://www.energistics.org/energyml/data/commonv2", @@ -197,24 +197,59 @@ def flatten_concatenation(matrix) -> List: return flat_list -def parse_content_type(ct: str): +def parse_content_type(ct: str) -> Optional[re.Match[str]]: return re.search(RGX_CONTENT_TYPE, ct) -def parse_qualified_type(ct: str): +def parse_qualified_type(ct: str) -> Optional[re.Match[str]]: return re.search(RGX_QUALIFIED_TYPE, ct) -def now( - time_zone=datetime.timezone(datetime.timedelta(hours=1), "UTC") -) -> float: +def parse_content_or_qualified_type(cqt: str) -> Optional[re.Match[str]]: + """ + Give a re.Match object (or None if failed). + You can access to groups like : "domainVersion", "versionNum", "domain", "type" + + :param cqt: + :return: + """ + parsed = None + try: + parsed = parse_content_type(cqt) + except: + try: + parsed = parse_qualified_type(cqt) + except: + pass + + return parsed + + +def get_domain_version_from_content_or_qualified_type(cqt: str) -> str: + """ + return a version number like "2.2" or "2.0" + + :param cqt: + :return: + """ + try: + parsed = parse_content_type(cqt) + return parsed.group("domainVersion") + except: + try: + parsed = parse_qualified_type(cqt) + return ".".join(parsed.group("domainVersion")) + except: + pass + return None + + +def now(time_zone=datetime.timezone.utc) -> float: """Return an epoch value""" return datetime.datetime.timestamp(datetime.datetime.now(time_zone)) -def epoch( - time_zone=datetime.timezone(datetime.timedelta(hours=1), "UTC") -) -> int: +def epoch(time_zone=datetime.timezone.utc) -> int: return int(now(time_zone)) @@ -228,10 +263,14 @@ def date_to_epoch(date: str) -> int: def epoch_to_date( epoch_value: int, - time_zone=datetime.timezone(datetime.timedelta(hours=1), "UTC"), ) -> str: - date = datetime.datetime.fromtimestamp(epoch_value, time_zone) - return date.strftime("%Y-%m-%dT%H:%M:%S%z") + date = datetime.datetime.fromtimestamp(epoch_value, datetime.timezone.utc) + return date.astimezone(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + # date = datetime.datetime.fromtimestamp(epoch_value, datetime.timezone.utc) + # return date.astimezone(datetime.timezone(datetime.timedelta(hours=0), "UTC")).strftime('%Y-%m-%dT%H:%M:%SZ') + # return date.strftime("%Y-%m-%dT%H:%M:%SZ%z") def gen_uuid() -> str: diff --git a/energyml-utils/src/energyml/utils/epc.py b/energyml-utils/src/energyml/utils/epc.py index bc7b3e4..862968d 100644 --- a/energyml-utils/src/energyml/utils/epc.py +++ b/energyml-utils/src/energyml/utils/epc.py @@ -24,11 +24,11 @@ Created, Creator, Identifier, - Keywords1, + Keywords1, TargetMode, ) from xsdata.formats.dataclass.models.generics import DerivedElement -from .constants import RELS_CONTENT_TYPE, RELS_FOLDER_NAME +from .constants import RELS_CONTENT_TYPE, RELS_FOLDER_NAME, RGX_DOMAIN_VERSION from .introspection import ( get_class_from_content_type, get_obj_type, @@ -41,7 +41,8 @@ epoch_to_date, epoch, gen_uuid, - get_obj_identifier, + get_obj_identifier, get_class_from_qualified_type, copy_attributes, get_class_fields, get_obj_attribute_class, + set_attribute_from_path, set_attribute_value, ) from .manager import get_class_pkg, get_class_pkg_version from .serialization import ( @@ -92,15 +93,15 @@ def get_type(self) -> str: case EPCRelsRelationshipType.CORE_PROPERTIES: return "http://schemas.openxmlformats.org/package/2006/relationships/metadata/" + str(self.value) case ( - EPCRelsRelationshipType.CHUNKED_PART - | EPCRelsRelationshipType.DESTINATION_OBJECT - | EPCRelsRelationshipType.SOURCE_OBJECT - | EPCRelsRelationshipType.ML_TO_EXTERNAL_PART_PROXY - | EPCRelsRelationshipType.EXTERNAL_PART_PROXY_TO_ML - | EPCRelsRelationshipType.EXTERNAL_RESOURCE - | EPCRelsRelationshipType.DestinationMedia - | EPCRelsRelationshipType.SOURCE_MEDIA - | _ + EPCRelsRelationshipType.CHUNKED_PART + | EPCRelsRelationshipType.DESTINATION_OBJECT + | EPCRelsRelationshipType.SOURCE_OBJECT + | EPCRelsRelationshipType.ML_TO_EXTERNAL_PART_PROXY + | EPCRelsRelationshipType.EXTERNAL_PART_PROXY_TO_ML + | EPCRelsRelationshipType.EXTERNAL_RESOURCE + | EPCRelsRelationshipType.DestinationMedia + | EPCRelsRelationshipType.SOURCE_MEDIA + | _ ): return "http://schemas.energistics.org/package/2012/relationships/" + str(self.value) @@ -161,10 +162,10 @@ class Epc: def __str__(self): return ( - "EPC file (" - + str(self.export_version) - + ") " - + f"{len(self.energyml_objects)} energyml objects and {len(self.raw_files)} other files {[f.path for f in self.raw_files]}" + "EPC file (" + + str(self.export_version) + + ") " + + f"{len(self.energyml_objects)} energyml objects and {len(self.raw_files)} other files {[f.path for f in self.raw_files]}" # + f"\n{[serialize_json(ar) for ar in self.additional_rels]}" ) @@ -223,7 +224,7 @@ def export_io(self) -> BytesIO: zip_buffer = BytesIO() with zipfile.ZipFile( - zip_buffer, "a", zipfile.ZIP_DEFLATED, False + zip_buffer, "a", zipfile.ZIP_DEFLATED, False ) as zip_file: # CoreProps if self.core_props is None: @@ -323,6 +324,9 @@ def compute_rels(self) -> Dict[str, Relationships]: ) ) + # filtering non-accessible objects from DOR + rels = {k: v for k, v in rels.items() if self.get_object_by_identifier(k) is not None} + map_obj_id_to_obj = { get_obj_identifier(obj): obj for obj in self.energyml_objects } @@ -333,11 +337,11 @@ def compute_rels(self) -> Dict[str, Relationships]: export_version=self.export_version, ): Relationships( relationship=obj_rels - + ( - self.additional_rels[obj_id] - if obj_id in self.additional_rels - else [] - ), + + ( + self.additional_rels[obj_id] + if obj_id in self.additional_rels + else [] + ), ) for obj_id, obj_rels in rels.items() } @@ -388,6 +392,28 @@ def get_epc_file_folder(self) -> Optional[str]: return "" return None + def rels_to_h5_file(self, obj: any, h5_path: str) -> Relationship: + """ + Creates in the epc file, a Relation (in the object .rels file) to link a h5 external file. + Usually this function is used to link an ExternalPartReference to a h5 file. + In practice, the Relation object is added to the "additional_rels" of the current epc file. + :param obj: + :param h5_path: + :return: the Relationship added to the epc.additional_rels dict + """ + obj_ident = get_obj_identifier(obj) + if obj_ident not in self.additional_rels: + self.additional_rels[obj_ident] = [] + + rel = Relationship( + target=h5_path, + type_value=EPCRelsRelationshipType.EXTERNAL_RESOURCE.get_type(), + id="Hdf5File", + target_mode=TargetMode.EXTERNAL.value + ) + self.additional_rels[obj_ident].append(rel) + return rel + # Class methods @classmethod @@ -410,7 +436,7 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance additional_rels = {} core_props = None with zipfile.ZipFile( - epc_file_io, "r", zipfile.ZIP_DEFLATED + epc_file_io, "r", zipfile.ZIP_DEFLATED ) as epc_file: content_type_file_name = get_epc_content_type_path() content_type_info = None @@ -421,8 +447,8 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance except KeyError: for info in epc_file.infolist(): if ( - info.filename.lower() - == content_type_file_name.lower() + info.filename.lower() + == content_type_file_name.lower() ): content_type_info = info break @@ -441,7 +467,7 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance ov_path = ov.part_name # logging.debug(ov_ct) while ov_path.startswith("/") or ov_path.startswith( - "\\" + "\\" ): ov_path = ov_path[1:] if is_energyml_content_type(ov_ct): @@ -467,8 +493,8 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance pass # raise e elif ( - get_class_from_content_type(ov_ct) - == CoreProperties + get_class_from_content_type(ov_ct) + == CoreProperties ): _read_files.append(ov_path) core_props = read_energyml_xml_bytes( @@ -492,7 +518,7 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance except IOError as e: logging.error(traceback.format_exc()) elif ( - f_info.filename != "_rels/.rels" + f_info.filename != "_rels/.rels" ): # CoreProperties rels file # RELS FILES READING START @@ -511,8 +537,8 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance else "" ) obj_file_name = rels_file_name[ - :-5 - ] # removing the ".rels" + :-5 + ] # removing the ".rels" rels_file: Relationships = ( read_energyml_xml_bytes( epc_file.read(f_info.filename), @@ -530,16 +556,16 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance for rel in rels_file.relationship: # logging.debug(f"\t\t{rel.type_value}") if ( - rel.type_value - != EPCRelsRelationshipType.DESTINATION_OBJECT.get_type() - and rel.type_value - != EPCRelsRelationshipType.SOURCE_OBJECT.get_type() - and rel.type_value - != EPCRelsRelationshipType.EXTENDED_CORE_PROPERTIES.get_type() + rel.type_value + != EPCRelsRelationshipType.DESTINATION_OBJECT.get_type() + and rel.type_value + != EPCRelsRelationshipType.SOURCE_OBJECT.get_type() + and rel.type_value + != EPCRelsRelationshipType.EXTENDED_CORE_PROPERTIES.get_type() ): # not a computable relation if ( - additional_rels_key - not in additional_rels + additional_rels_key + not in additional_rels ): additional_rels[ additional_rels_key @@ -582,8 +608,74 @@ def read_stream(cls, epc_file_io: BytesIO): # returns an Epc instance # /____//____/ +def create_energyml_object( + content_or_qualified_type: str, + citation: Optional[Any] = None, + uuid: Optional[str] = None, +): + """ + Create an energyml object instance depending on the content-type or qualified-type given in parameter. + The SchemaVersion is automatically assigned. + If no citation is given default one will be used. + If no uuid is given, a random uuid will be used. + :param content_or_qualified_type: + :param citation: + :param uuid: + :return: + """ + if citation is None: + citation = { + "title": "New_Object", + "Creation": epoch_to_date(epoch()), + "LastUpdate": epoch_to_date(epoch()), + "Format": "energyml-utils", + "Originator": "energyml-utils python module", + } + cls = get_class_from_qualified_type(content_or_qualified_type) + obj = cls() + cit = get_obj_attribute_class(cls, "citation")() + copy_attributes( + obj_in=citation, + obj_out=cit, + only_existing_attributes=True, + ignore_case=True, + ) + set_attribute_from_path(obj, "citation", cit) + set_attribute_value(obj, "uuid", uuid or gen_uuid()) + set_attribute_value(obj, "SchemaVersion", get_class_pkg_version(obj)) + + return obj + + +def create_external_part_reference( + eml_version: str, + h5_file_path: str, + citation: Optional[Any] = None, + uuid: Optional[str] = None +): + """ + Create an EpcExternalPartReference depending on the energyml version (should be ["2.0", "2.1", "2.2"]). + The MimeType, ExistenceKind and Filename will be automatically filled. + :param eml_version: + :param h5_file_path: + :param citation: + :return: + """ + version_flat = re.findall(RGX_DOMAIN_VERSION, eml_version)[0][0].replace(".", "").replace("_", "") + obj = create_energyml_object( + content_or_qualified_type="eml" + version_flat + ".EpcExternalPartReference", + citation=citation, + uuid=uuid + ) + set_attribute_value(obj, "MimeType", "application/x-hdf5") + set_attribute_value(obj, "ExistenceKind", "Actual") + set_attribute_value(obj, "Filename", h5_file_path) + + return obj + + def get_reverse_dor_list( - obj_list: List[Any], key_func: Callable = get_obj_identifier + obj_list: List[Any], key_func: Callable = get_obj_identifier ) -> Dict[str, List[Any]]: """ Compute a dict with 'OBJ_UUID.OBJ_VERSION' as Key, and list of DOR that reference it. @@ -595,7 +687,7 @@ def get_reverse_dor_list( rels = {} for obj in obj_list: for dor in search_attribute_matching_type( - obj, "DataObjectReference", return_self=False + obj, "DataObjectReference", return_self=False ): key = key_func(dor) if key not in rels: @@ -608,14 +700,14 @@ def get_reverse_dor_list( def gen_core_props_path( - export_version: EpcExportVersion = EpcExportVersion.CLASSIC, + export_version: EpcExportVersion = EpcExportVersion.CLASSIC, ): return "docProps/core.xml" def gen_energyml_object_path( - energyml_object: Union[str, Any], - export_version: EpcExportVersion = EpcExportVersion.CLASSIC, + energyml_object: Union[str, Any], + export_version: EpcExportVersion = EpcExportVersion.CLASSIC, ): """ Generate a path to store the :param:`energyml_object` into an epc file (depending on the :param:`export_version`) @@ -656,8 +748,8 @@ def get_file_folder_and_name_from_path(path: str) -> Tuple[str, str]: def gen_rels_path( - energyml_object: Any, - export_version: EpcExportVersion = EpcExportVersion.CLASSIC, + energyml_object: Any, + export_version: EpcExportVersion = EpcExportVersion.CLASSIC, ) -> str: """ Generate a path to store the :param:`energyml_object` rels file into an epc file @@ -677,7 +769,7 @@ def gen_rels_path( def get_epc_content_type_path( - export_version: EpcExportVersion = EpcExportVersion.CLASSIC, + export_version: EpcExportVersion = EpcExportVersion.CLASSIC, ) -> str: """ Generate a path to store the "[Content_Types].xml" file into an epc file diff --git a/energyml-utils/src/energyml/utils/introspection.py b/energyml-utils/src/energyml/utils/introspection.py index 619efa0..3e4efa6 100644 --- a/energyml-utils/src/energyml/utils/introspection.py +++ b/energyml-utils/src/energyml/utils/introspection.py @@ -291,9 +291,13 @@ def import_related_module(energyml_module_name: str) -> None: def get_class_fields(cls: Union[type, Any]) -> Dict[str, Field]: """ Return all class fields names, mapped to their :class:`Field` value. + If a dict is given, this function is the identity :param cls: :return: """ + # for dict object, no change + if isinstance(cls, dict): + return cls if not isinstance(cls, type): # if cls is an instance cls = type(cls) try: @@ -367,7 +371,7 @@ def get_object_attribute( if isinstance(obj, list): value = obj[int(current_attrib_name)] elif isinstance(obj, dict): - value = obj[current_attrib_name] + value = obj.get(current_attrib_name, None) else: value = getattr(obj, current_attrib_name) @@ -512,6 +516,11 @@ def search_attribute_matching_type_with_path( if not deep_search: return res + if current_path is None: + current_path = "" + if len(current_path) > 0: + current_path = current_path + "." + if isinstance(obj, list): cpt = 0 for s_o in obj: @@ -521,7 +530,7 @@ def search_attribute_matching_type_with_path( re_flags=re_flags, return_self=True, deep_search=deep_search, - current_path=f"{current_path}.{cpt}", + current_path=f"{current_path}{cpt}", super_class_search=super_class_search, ) cpt = cpt + 1 @@ -533,7 +542,7 @@ def search_attribute_matching_type_with_path( re_flags=re_flags, return_self=True, deep_search=deep_search, - current_path=f"{current_path}.{k}", + current_path=f"{current_path}{k}", super_class_search=super_class_search, ) elif not is_primitive(obj): @@ -544,7 +553,7 @@ def search_attribute_matching_type_with_path( re_flags=re_flags, return_self=True, deep_search=deep_search, - current_path=f"{current_path}.{att_name}", + current_path=f"{current_path}{att_name}", super_class_search=super_class_search, ) @@ -647,7 +656,11 @@ def search_attribute_matching_name_with_path( next_match = ".".join(attrib_list[1:]) res = [] - match_value = None + if current_path is None: + current_path = "" + if len(current_path) > 0: + current_path = current_path + "." + match_path_and_obj = [] not_match_path_and_obj = [] if isinstance(obj, list): @@ -658,9 +671,9 @@ def search_attribute_matching_name_with_path( ) if match is not None: match_value = match.group(0) - match_path_and_obj.append((f"{current_path}.{cpt}", s_o)) + match_path_and_obj.append((f"{current_path}{cpt}", s_o)) else: - not_match_path_and_obj.append((f"{current_path}.{cpt}", s_o)) + not_match_path_and_obj.append((f"{current_path}{cpt}", s_o)) cpt = cpt + 1 elif isinstance(obj, dict): for k, s_o in obj.items(): @@ -669,9 +682,11 @@ def search_attribute_matching_name_with_path( ) if match is not None: match_value = match.group(0) - match_path_and_obj.append((f"{current_path}.{k}", s_o)) + match_path_and_obj.append( + (f"{current_path}{match_value}", s_o) + ) else: - not_match_path_and_obj.append((f"{current_path}.{k}", s_o)) + not_match_path_and_obj.append((f"{current_path}{k}", s_o)) elif not is_primitive(obj): match_value = get_matching_class_attribute_name( obj, current_match.replace("\\.", ".") @@ -679,7 +694,7 @@ def search_attribute_matching_name_with_path( if match_value is not None: match_path_and_obj.append( ( - f"{current_path}.{match_value}", + f"{current_path}{match_value}", get_object_attribute_no_verif(obj, match_value), ) ) @@ -687,7 +702,7 @@ def search_attribute_matching_name_with_path( if att_name != match_value: not_match_path_and_obj.append( ( - f"{current_path}.{att_name}", + f"{current_path}{att_name}", get_object_attribute_no_verif(obj, att_name), ) ) @@ -758,6 +773,92 @@ def search_attribute_matching_name( ] +def set_attribute_from_path(obj: Any, attribute_path: str, value: Any): + """ + Changes the value of a (sub)attribute. + Example : + data = { + "a": { + "b": [ "v_x", { "c": "v_old" } ] + } + } + set_attribute_from_path(data, "a.b.1.c", "v_new") + + # result is : + + data == { + "a": { + "b": [ "v_x", { "c": "v_new" } ] + } + } + + :param obj: + :param attribute_path: + :param value: + :return: + """ + while attribute_path.startswith("."): + attribute_path = attribute_path[1:] + + upper = obj + final_attribute_name = attribute_path + if "." in attribute_path: + upper = get_object_attribute( + obj, attribute_path[: attribute_path.rindex(".")] + ) + final_attribute_name = attribute_path[attribute_path.rindex(".") + 1 :] + try: + upper[final_attribute_name] = value + except Exception: + attrib_class = get_obj_attribute_class(upper, final_attribute_name) + if attrib_class is not None and is_enum(attrib_class): + val_snake = snake_case(value) + setattr( + upper, + final_attribute_name, + list( + filter( + lambda ev: snake_case(ev) == val_snake, + attrib_class._member_names_, + ) + )[0], + ) + else: + setattr(upper, final_attribute_name, value) + + +def set_attribute_value(obj: any, attribute_name_rgx, value: Any): + copy_attributes( + obj_in={attribute_name_rgx: value}, obj_out=obj, ignore_case=True + ) + + +def copy_attributes( + obj_in: any, + obj_out: Any, + only_existing_attributes: bool = True, + ignore_case: bool = True, +): + in_att_list = get_class_attributes(obj_in) + for k_in in in_att_list: + p_list = search_attribute_matching_name_with_path( + obj=obj_out, + name_rgx=k_in, + re_flags=re.IGNORECASE if ignore_case else 0, + deep_search=False, + search_in_sub_obj=False, + ) + path = None + if p_list is not None and len(p_list) > 0: + path, _ = p_list[0] + if not only_existing_attributes or path is not None: + set_attribute_from_path( + obj_out, + path or k_in, + get_object_attribute(obj_in, k_in, False), + ) + + # Utility functions @@ -933,9 +1034,11 @@ def get_content_type_from_class( def get_object_type_for_file_path_from_class(cls) -> str: + if not isinstance(cls, type): + cls = type(cls) classic_type = get_obj_type(cls) - for parent_cls in cls.__class__.__bases__: + for parent_cls in cls.__bases__: try: if ( classic_type.lower() in parent_cls.Meta.name.lower() @@ -953,6 +1056,73 @@ def get_object_type_for_file_path_from_class(cls) -> str: return classic_type +def get_obj_attribute_class( + cls: Any, + attribute_name: Optional[str], + random_for_typing: Optional[bool] = False, + no_abstract: Optional[bool] = True, +): + """ + Return an instantiable class for an attribute of :param cls:. + If the attribute is defined with typing with multiple possibility (like tuple or union), the first one + is selected or a random one (depending on the value of the :param random_for_typing:) + :param cls: + :param attribute_name: + :param random_for_typing: + :param no_abstract: if True, the returned typed will not be an abstract class + :return: + """ + chosen_type = None + if cls is not None: + if not isinstance(cls, type) and cls.__module__ != "typing": + return get_obj_attribute_class( + type(cls), attribute_name, random_for_typing + ) + elif cls.__module__ == "typing": + type_list = list(cls.__args__) + if type(None) in type_list: + type_list.remove( + type(None) + ) # we don't want to generate none value + + if random_for_typing: + chosen_type = type_list[random.randint(0, len(type_list) - 1)] + else: + chosen_type = type_list[0] + return get_obj_attribute_class( + chosen_type, None, random_for_typing + ) + else: + # print(f"attribute_name {attribute_name} > {cls}, {get_class_fields(cls)[attribute_name]}") + if attribute_name is not None and len(attribute_name) > 0: + cls = get_class_from_simple_name( + simple_name=get_class_fields(cls)[attribute_name].type, + energyml_module_context=get_related_energyml_modules_name( + cls + ), + ) + # print(f"attribute_name {attribute_name} > {cls}") + potential_classes = [cls] + get_sub_classes(cls) + # print(f"potential_classes {potential_classes}") + if no_abstract: + potential_classes = list( + filter(lambda _c: not is_abstract(_c), potential_classes) + ) + if random_for_typing: + chosen_type = potential_classes[ + random.randint(0, len(potential_classes) - 1) + ] + else: + chosen_type = potential_classes[0] + # print(f"chosen_type {chosen_type}") + + if cls.__module__ == "typing": + return get_obj_attribute_class( + chosen_type, None, random_for_typing + ) + return chosen_type + + # RANDOM @@ -1092,7 +1262,6 @@ def _random_value_from_class( chosen_type, energyml_module_context, attribute_name, cls ) elif cls.__module__ == "typing": - nb_value_for_list = random.randint(2, 3) type_list = list(cls.__args__) if type(None) in type_list: type_list.remove( @@ -1100,6 +1269,7 @@ def _random_value_from_class( ) # we don't want to generate none value if cls._name == "List": + nb_value_for_list = random.randint(2, 3) lst = [] for i in range(nb_value_for_list): chosen_type = type_list[ diff --git a/energyml-utils/src/energyml/utils/validation.py b/energyml-utils/src/energyml/utils/validation.py index 87cdaab..dab2fbb 100644 --- a/energyml-utils/src/energyml/utils/validation.py +++ b/energyml-utils/src/energyml/utils/validation.py @@ -59,6 +59,14 @@ def __str__(self): return f"{ValidationError.__str__(self)}\n\tMandatory value is None for {get_obj_identifier(self.target_obj)} : '{self.attribute_dot_path}'" +@dataclass +class MissingEntityError(ValidationObjectError): + missing_uuid: str = field(default=None) + + def __str__(self): + return f"{ValidationError.__str__(self)}\n\tMissing entity in {get_obj_identifier(self.target_obj)} at path '{self.attribute_dot_path}'. Missing entity uuid: {self.missing_uuid}" + + def validate_epc(epc: Epc) -> List[ValidationError]: """ Verify if all :param:`epc`'s objects are valid. @@ -106,11 +114,12 @@ def dor_validation(energyml_objects: List[Any]) -> List[ValidationError]: dor_version = get_obj_version(dor) if dor_uuid not in dict_obj_uuid: errs.append( - ValidationObjectError( + MissingEntityError( error_type=ErrorType.CRITICAL, target_obj=obj, attribute_dot_path=dor_path, - msg=f"[DOR ERR] has wrong information. Unkown object with uuid '{dor_uuid}'", + missing_uuid=dor_uuid + # msg=f"[DOR ERR] has wrong information. Unkown object with uuid '{dor_uuid}'", ) ) else: diff --git a/energyml-utils/tests/test_introspection.py b/energyml-utils/tests/test_introspection.py index 2e0293e..478f43c 100644 --- a/energyml-utils/tests/test_introspection.py +++ b/energyml-utils/tests/test_introspection.py @@ -15,6 +15,9 @@ is_enum, get_class_from_name, get_class_from_content_type, + get_object_attribute, + set_attribute_from_path, + copy_attributes, ) @@ -62,3 +65,84 @@ def test_get_class_from_content_type(): ) assert found_type is not None assert found_type == energyml.resqml.v2_0_1.resqmlv2.Grid2DRepresentation + + +def test_set_attribute_from_path(): + data = {"a": {"b": ["v_x", {"c": "v_test"}]}} + assert get_object_attribute(data, "a.b.1.c") == "v_test" + set_attribute_from_path(data, "a.b.1.c", "v_new") + assert get_object_attribute(data, "a.b.1.c") == "v_new" + set_attribute_from_path(data, "a", "v_new") + assert get_object_attribute(data, "a") == "v_new" + + +def test_copy_attributes_existing_ignore_case(): + data_in = { + "a": {"b": "v_0", "c": "v_1"}, + "uuid": "215f8219-cabd-4e24-9e4f-e371cabc9622", + "objectVersion": "Resqml 2.0", + "non_existing": 42, + } + data_out = { + "a": None, + "Uuid": "8291afd6-ae01-49f5-bc96-267e7b27450d", + "object_version": "Resqml 2.0", + } + copy_attributes( + obj_in=data_in, + obj_out=data_out, + only_existing_attributes=True, + ignore_case=True, + ) + assert data_out["a"] == data_in["a"] + assert data_out["Uuid"] == data_in["uuid"] + assert data_out["object_version"] == data_in["objectVersion"] + assert "non_existing" not in data_out + + +def test_copy_attributes_ignore_case(): + data_in = { + "a": {"b": "v_0", "c": "v_1"}, + "uuid": "215f8219-cabd-4e24-9e4f-e371cabc9622", + "objectVersion": "Resqml 2.0", + "non_existing": 42, + } + data_out = { + "a": None, + "Uuid": "8291afd6-ae01-49f5-bc96-267e7b27450d", + "object_version": "Resqml 2.0", + } + copy_attributes( + obj_in=data_in, + obj_out=data_out, + only_existing_attributes=False, + ignore_case=True, + ) + assert data_out["a"] == data_in["a"] + assert data_out["Uuid"] == data_in["uuid"] + assert data_out["object_version"] == data_in["objectVersion"] + assert data_out["non_existing"] == data_in["non_existing"] + + +def test_copy_attributes_case_sensitive(): + data_in = { + "a": {"b": "v_0", "c": "v_1"}, + "uuid": "215f8219-cabd-4e24-9e4f-e371cabc9622", + "objectVersion": "Resqml 2.0", + "non_existing": 42, + } + data_out = { + "a": None, + "Uuid": "8291afd6-ae01-49f5-bc96-267e7b27450d", + "object_version": "Resqml 2.0", + } + copy_attributes( + obj_in=data_in, + obj_out=data_out, + only_existing_attributes=False, + ignore_case=False, + ) + assert data_out["a"] == data_in["a"] + assert data_out["Uuid"] != data_in["uuid"] + assert data_out["object_version"] == data_in["objectVersion"] + assert data_out["non_existing"] == data_in["non_existing"]