diff --git a/src/sim_explorer/case.py b/src/sim_explorer/case.py index b5838a8..6e865a7 100644 --- a/src/sim_explorer/case.py +++ b/src/sim_explorer/case.py @@ -9,13 +9,13 @@ import matplotlib.pyplot as plt import numpy as np -from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore from sim_explorer.assertion import Assertion # type: ignore from sim_explorer.exceptions import CaseInitError from sim_explorer.json5 import Json5 from sim_explorer.models import AssertionResult, Temporal -from sim_explorer.simulator_interface import SimulatorInterface +from sim_explorer.system_interface import SystemInterface +from sim_explorer.system_interface_osp import SystemInterfaceOSP from sim_explorer.utils.misc import from_xml from sim_explorer.utils.paths import get_path, relative_path @@ -96,21 +96,22 @@ def __init__( self.act_get = Case._actions_copy(self.parent.act_get) self.act_set = Case._actions_copy(self.parent.act_set) - for k, v in self.js.jspath("$.spec", dict, True).items(): - self.read_spec_item(k, v) - _results = self.js.jspath("$.results", list) - if _results is not None: - for _res in _results: - self.read_spec_item(_res) - self.asserts: list = [] # list of assert keys - _assert = self.js.jspath("$.assert", dict) - if _assert is not None: - for k, v in _assert.items(): - _ = self.read_assertion(k, v) - if self.name == "base": - self.special = self._ensure_specials(self.special) # must specify for base case - self.act_get = dict(sorted(self.act_get.items())) - self.act_set = dict(sorted(self.act_set.items())) + if self.cases.simulator.full_simulator_available: + for k, v in self.js.jspath("$.spec", dict, True).items(): + self.read_spec_item(k, v) + _results = self.js.jspath("$.results", list) + if _results is not None: + for _res in _results: + self.read_spec_item(_res) + self.asserts: list = [] # list of assert keys + _assert = self.js.jspath("$.assert", dict) + if _assert is not None: + for k, v in _assert.items(): + _ = self.read_assertion(k, v) + if self.name == "base": + self.special = self._ensure_specials(self.special) # must specify for base case + self.act_get = dict(sorted(self.act_get.items())) + self.act_set = dict(sorted(self.act_set.items())) # self.res represents the Results object and is added when collecting results or when evaluating results def add_results_object(self, res: Results): @@ -575,8 +576,8 @@ class Cases: Args: spec (Path): file name for cases specification - simulator (SimulatorInterface)=None: Optional (pre-instantiated) SimulatorInterface object. - If that is None, the spec shall contain a modelFile to be used to instantiate the simulator. + simulator_type (SystemInterface)=SystemInterfaceOSP: Optional possibility to choose system simulator details + Default is OSP (libcosimpy), but when only results are read the basic SystemInterface is sufficient. """ __slots__ = ( @@ -593,28 +594,25 @@ class Cases: ) assertion_results: List[AssertionResult] = [] - def __init__(self, spec: str | Path, simulator: SimulatorInterface | None = None): + def __init__(self, spec: str | Path, simulator_type: type = SystemInterfaceOSP): self.file = Path(spec) # everything relative to the folder of this file! assert self.file.exists(), f"Cases spec file {spec} not found" self.js = Json5(spec) # del log_level = CosimLogLevel[self.js.jspath("$.header.logLevel") or "FATAL"] log_level = self.js.jspath("$.header.logLevel") or "fatal" - if simulator is None: - modelfile = self.js.jspath("$.header.modelFile", str) or "OspSystemStructure.xml" - path = self.file.parent / modelfile - assert path.exists(), f"OSP system structure file {path} not found" - try: - self.simulator = SimulatorInterface( - system=path, - name=self.js.jspath("$.header.name", str) or "", - description=self.js.jspath("$.header.description", str) or "", - log_level=log_level.upper(), - ) - except Exception as err: - raise AssertionError(f"'modelFile' needed from spec: {err}") from err - else: - self.simulator = simulator # SimulatorInterface( simulator = simulator) - log_output_level(CosimLogLevel[log_level.upper()]) + modelfile = self.js.jspath("$.header.modelFile", str) or "OspSystemStructure.xml" + path = self.file.parent / modelfile + assert path.exists(), f"OSP system structure file {path} not found" + print("SIM_type", simulator_type) + try: + self.simulator = simulator_type( + structure_file=path, + name=self.js.jspath("$.header.name", str) or "", + description=self.js.jspath("$.header.description", str) or "", + log_level=log_level, + ) + except Exception as err: + raise AssertionError(f"'modelFile' needed from spec: {err}") from err self.timefac = self._get_time_unit() * 1e9 # internally OSP uses pico-seconds as integer! # read the 'variables' section and generate dict { alias : { (instances), (variables)}}: @@ -637,6 +635,7 @@ def get_case_variables(self) -> dict[str, dict]: Optionally a description of the alias variable may be provided (and added to the dictionary). """ variables = {} + model_vars = {} # cache of variables of models for k, v in self.js.jspath("$.header.variables", dict, True).items(): if not isinstance(v, list): raise CaseInitError(f"List of 'component(s)' and 'variable(s)' expected. Found {v}") from None @@ -650,31 +649,32 @@ def get_case_variables(self) -> dict[str, dict]: assert len(comp) > 0, f"No component model instances '{v[0]}' found for alias variable '{k}'" assert isinstance(v[1], str), f"Second argument of variable sped: Variable name(s)! Found {v[1]}" _vars = self.simulator.match_variables(comp[0], v[1]) # tuple of matching var refs + if model not in model_vars: # ensure that model is included in the cache + model_vars.update({model: self.simulator.variables(comp[0])}) + prototype = model_vars[model][self.simulator.variable_name_from_ref(comp[0], _vars[0])] var: dict = { "model": model, "instances": comp, "variables": _vars, # variables from same model! } - assert len(var["variables"]) > 0, f"No matching variables found for alias {k}:{v}, component '{comp}'" + assert len(var["variables"]), f"No matching variables found for alias {k}:{v}, component '{comp}'" if len(v) > 2: var.update({"description": v[2]}) # We add also the more detailed variable info from the simulator (the FMU) # The type, causality and variability shall be equal for all variables. - # The 'reference' element is the same as 'variables'. - # next( iter( ...)) is used to get the first dict value - var0 = next(iter(self.simulator.get_variables(model, _vars[0]).values())) # prototype - for i in range(1, len(var["variables"])): - var_i = next(iter(self.simulator.get_variables(model, _vars[i]).values())) + for ref in _vars: + _var = model_vars[model][self.simulator.variable_name_from_ref(comp[0], ref)] for test in ["type", "causality", "variability"]: _assert( - var_i[test] == var0[test], - f"Variable with ref {var['variables'][i]} not same {test} as {var0} in model {model}", + _var[test] == prototype[test], + f"Variable with ref {ref} not same {test} as {prototype} in model {model}", ) var.update( { - "type": var0["type"], - "causality": var0["causality"], - "variability": var0["variability"], + "type": prototype["type"], + "causality": prototype["causality"], + "variability": prototype["variability"], + "initial": prototype.get("initial", ""), } ) variables.update({k: var}) @@ -955,7 +955,7 @@ def _init_from_existing(self, file: str | Path): self.res = Json5(self.file) case = Path(self.file.parent / (self.res.jspath("$.header.cases", str, True) + ".cases")) try: - cases = Cases(Path(case)) + cases = Cases(Path(case), simulator_type=SystemInterface) except ValueError: raise CaseInitError(f"Cases {Path(case)} instantiation error") from ValueError self.case: Case | None = cases.case_by_name(name=self.res.jspath(path="$.header.case", typ=str, errorMsg=True)) diff --git a/src/sim_explorer/simulator_interface.py b/src/sim_explorer/simulator_interface.py deleted file mode 100644 index 2f6a30f..0000000 --- a/src/sim_explorer/simulator_interface.py +++ /dev/null @@ -1,599 +0,0 @@ -# pyright: reportMissingImports=false, reportGeneralTypeIssues=false -import xml.etree.ElementTree as ET # noqa: N817 -from enum import Enum -from pathlib import Path -from typing import TypeAlias, cast - -from libcosimpy.CosimEnums import CosimVariableCausality, CosimVariableType, CosimVariableVariability # type: ignore -from libcosimpy.CosimExecution import CosimExecution # type: ignore -from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore -from libcosimpy.CosimManipulator import CosimManipulator # type: ignore -from libcosimpy.CosimObserver import CosimObserver # type: ignore - -from sim_explorer.utils.misc import from_xml, match_with_wildcard - -# type definitions -PyVal: TypeAlias = str | float | int | bool # simple python types / Json5 atom -Json5: TypeAlias = dict[str, "Json5Val"] # Json5 object -Json5List: TypeAlias = list["Json5Val"] # Json5 list -Json5Val: TypeAlias = PyVal | Json5 | Json5List # Json5 values - - -""" -sim_explorer module for definition and execution of simulation experiments -* read and compile the case definitions from configuration file - Note that Json5 is here restriced to 'ordered keys' and 'unique keys within an object' -* set the start variables for a given case -* manipulate variables according to conditions during the simulation run -* save requested variables at given communication points during a simulation run -* check the validity of results when saving variables - -With respect to MVx in general, this module serves the preparation of start conditions for smart testing. -""" - - -class CaseInitError(Exception): - """Special error indicating that something is wrong during initialization of cases.""" - - pass - - -class CaseUseError(Exception): - """Special error indicating that something is wrong during usage of cases.""" - - pass - - -class SimulatorInterface: - """Class providing the interface to the simulator itself. - This is designed for OSP and needs to be overridden for other types of simulator. - - Provides the following functions: - - * set_variable_value: Set variable values initially or at communication points - * get_variable_value: Get variable values at communication points - * match_components: Identify component instances based on (tuple of) (short) names - * match_variables: Identify component variables of component (instance) matching a (short) name - - - A system model might be defined through an instantiated simulator or explicitly through the .fmu files. - Unfortunately, an instantiated simulator does not seem to have functions to access the FMU, - therefore only the (reduced) info from the simulator is used here (FMUs not used directly). - - Args: - system (Path): Path to system model definition file - name (str)="System": Possibility to provide an explicit system name (if not provided by system file) - description (str)="": Optional possibility to provide a system description - simulator (CosimExecution)=None: Optional possibility to insert an existing simulator object. - Otherwise this is generated through CosimExecution.from_osp_config_file(). - log_level (str) = 'fatal': Per default the level is set to 'fatal', - but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes) - fmus_available (bool) = False: Optional possibility to state that all FMUs shall be explicitly available, - and the interface object is thus able to attain all interface information from the modelDescription files. - The OSP interface has such information pre-loaded and the FMUs are thus not explicitly necessary. - """ - - def __init__( - self, - system: Path | str = "", - name: str | None = None, - description: str = "", - simulator: CosimExecution | None = None, - log_level: str = "fatal", - fmus_available: bool = False, - ): - self.name = name # overwrite if the system includes that - self.description = description # overwrite if the system includes that - self.sysconfig: Path | None = None - log_output_level(CosimLogLevel[log_level.upper()]) - self.simulator: CosimExecution - if simulator is None: # instantiate the simulator through the system config file - self.sysconfig = Path(system) - assert self.sysconfig.exists(), f"File {self.sysconfig.name} not found" - ck, msg = self._check_system_structure(self.sysconfig) - assert ck, msg - self.simulator = cast(CosimExecution, self._simulator_from_config(self.sysconfig)) - else: - self.simulator = simulator - self.components = self.get_components() # dict of {component name : modelId} - # Instantiate a suitable manipulator for changing variables. - self.manipulator = CosimManipulator.create_override() - assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object" - - # Instantiate a suitable observer for collecting results. - self.observer = CosimObserver.create_last_value() - assert self.simulator.add_observer(observer=self.observer), "Could not add observer object" - self.message = "" # possibility to save additional message for (optional) retrieval by client - - @property - def path(self): - return self.sysconfig.resolve().parent if self.sysconfig is not None else None - - def _check_system_structure(self, file: Path): - """Check the OspSystemStructure file. Used in cases where the simulatorInterface is instantiated from Cases.""" - el = from_xml(file) - assert isinstance(el, ET.Element), f"ElementTree element expected. Found {el}" - ns = el.tag.split("{")[1].split("}")[0] - msg = "" - for s in el.findall(".//{*}Simulator"): - if not Path(Path(file).parent / s.get("source", "??")).exists(): - msg += f"Component {s.get('name')}, source {s.get('source','??')} not found. NS:{ns}" - return (not len(msg), msg) - - def reset(self): # , cases:Cases): - """Reset the simulator interface, so that a new simulation can be run.""" - assert isinstance(self.sysconfig, Path), "Simulator resetting does not work with explicitly supplied simulator." - assert self.sysconfig.exists(), "Simulator resetting does not work with explicitly supplied simulator." - assert isinstance(self.manipulator, CosimManipulator) - assert isinstance(self.observer, CosimObserver) - # self.simulator = self._simulator_from_config(self.sysconfig) - self.simulator = CosimExecution.from_osp_config_file(str(self.sysconfig)) - assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object" - assert self.simulator.add_observer(observer=self.observer), "Could not add observer object" - # for case in cases: - - def _simulator_from_config(self, file: Path): - """Instantiate a simulator object through the a suitable configuration file. - Intended for use case 1 when Cases are in charge. - """ - if file.is_file(): - _type = "ssp" if file.name.endswith(".ssp") else "osp" - # file = file.parent - else: # a directory. Find type - _type = "osp" - for child in file.iterdir(): - if child.is_file(): - if child.name.endswith(".ssp"): - _type = "ssp" - file = file / child - break - elif child.name.endswith(".xml"): - file = file / child - xml = from_xml(file) - assert isinstance(xml, ET.Element), f"An ET.Element is ixpected here. Found {xml}" - if xml.tag.endswith("OspSystemStructure"): - break - if _type == "osp": - xml = from_xml(file) - assert isinstance(xml, ET.Element), f"An ET.Element is ixpected here. Found {xml}" - assert xml.tag.endswith("OspSystemStructure"), f"File {file} not an OSP structure file" - return CosimExecution.from_osp_config_file(str(file)) - else: - return CosimExecution.from_ssp_file(str(file)) - - def same_model(self, ref: int, refs: list[int] | set[int]): - ref_vars = self.get_variables(ref) - for r in refs: - r_vars = self.get_variables(r) - yield (r, r_vars == ref_vars) - - def get_components(self, model: int = -1) -> dict: - """Provide a dict of `{ component_instances_name : model_ID, ...}` in the system model. - For each component a unique ID per basic model (FMU) is used. - In this way, if comps[x]==comps[y] the components x and y relate to the same basic model. - If model != -1, only the components (instances) related to model are returned. - """ - comps = {} - if self.simulator is None: - pass # nothing to do we return an empty dict - - elif model >= 0: # use self.components to extract only components related to the provided model - for comp, mod in self.components.items(): - if mod == model: - comps.update({comp: self.components[comp]}) - - else: - comp_infos = self.simulator.slave_infos() - for comp in comp_infos: - for r, same in self.same_model(comp.index, set(comps.values())): - if same: - comps.update({comp.name.decode(): r}) - break - if comp.name.decode() not in comps: # new model - comps.update({comp.name.decode(): comp.index}) - return comps - - def get_models(self) -> list: - """Get the list of basic models based on self.components.""" - models = [] - for _, m in self.components.items(): - if m not in models: - models.append(m) - return models - - def match_components(self, comps: str | tuple[str, ...]) -> tuple[str, tuple]: - """Identify component (instances) based on 'comps' (component alias or tuple of aliases). - comps can be a (tuple of) full component names or component names with wildcards. - Returned components shall be based on the same model. - """ - if isinstance(comps, str): - comps = (comps,) - collect = [] - model = "" - for c in comps: - for k, v in self.components.items(): - if match_with_wildcard(c, k): - if not len(model): - model = v - if v == model and k not in collect: - collect.append(k) - return (model, tuple(collect)) - - def match_variables(self, component: str, varname: str) -> tuple[int]: - """Based on an example component (instance), identify unique variables starting with 'varname'. - The returned information applies to all instances of the same model. - The variables shall all be of the same type, causality and variability. - - Args: - component: component instance varname. - varname (str): the varname to search for. This can be the full varname or only the start of the varname - If only the start of the varname is supplied, all matching variables are collected. - - Returns - ------- - Tuple of value references - """ - - def accept_as_alias(org: str) -> bool: - """Decide whether the alias can be accepted with respect to org (uniqueness).""" - if not org.startswith(varname): # necessary requirement - return False - rest = org[len(varname) :] - if not len(rest) or any(rest.startswith(c) for c in ("[", ".")): - return True - return False - - var = [] - assert len(self.components), "Need the dictionary of components before maching variables" - - accepted = None - variables = self.get_variables(component) - for k, v in variables.items(): - if accept_as_alias(k): - if accepted is None: - accepted = v - assert all( - v[e] == accepted[e] for e in ("type", "causality", "variability") - ), f"Variable {k} matches {varname}, but properties do not match" - var.append(v["reference"]) - # for sv in model.findall(".//ScalarVariable"): - # if sv.get("varname", "").startswith(varname): - # if len(sv.get("varname")) == len(varname): # full varname. Not part of vector - # return (sv,) - # if len(var): # check if the var are compliant so that they fit into a 'vector' - # for prop in ("causality", "variability", "initial"): - # assert var[0].get(prop, "") == sv.get( - # prop, "" - # ), f"Model {model.get('modelvarname')}, alias {varname}: The property {prop} of variable {var[0].get('varname')} and {sv.get('varname')} are not compliant with combining them in a 'vector'" - # assert ( - # var[0][0].tag == sv[0].tag - # ), f"Model {model.get('modelName')}, alias {varname}: The variable types of {var[0].get('name')} and {sv.get('name')} shall be equal if they are combined in a 'vector'" - # var.append(sv) - return tuple(var) - - def is_output_var(self, comp: int, ref: int) -> bool: - for idx in range(self.simulator.num_slave_variables(comp)): - struct = self.simulator.slave_variables(comp)[idx] - if struct.reference == ref: - return struct.causality == 2 - return False - - def get_variables(self, comp: str | int, single: int | str | None = None, as_numbers: bool = True) -> dict: - """Get the registered variables for a given component from the simulator. - - Args: - component (str, int): The component name or its index within the model - single (int,str): Optional possibility to return a single variable. - If int: by valueReference, else by name. - as_numbers (bool): Return the enumerations as integer numbers (if True) or as names (if False) - - Returns - ------- - A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability - """ - if isinstance(comp, str): - component = self.simulator.slave_index_from_instance_name(comp) - if component is None: # component not found - return {} - elif isinstance(comp, int): - if comp < 0 or comp >= self.simulator.num_slaves(): # invalid id - return {} - component = comp - else: - raise AssertionError(f"Unallowed argument {comp} in 'get_variables'") - variables = {} - for idx in range(self.simulator.num_slave_variables(component)): - struct = self.simulator.slave_variables(component)[idx] - if ( - single is None - or (isinstance(single, int) and struct.reference == single) - or struct.name.decode() == single - ): - typ = struct.type if as_numbers else CosimVariableType(struct.type).name - causality = struct.causality if as_numbers else CosimVariableCausality(struct.causality).name - variability = struct.variability if as_numbers else CosimVariableVariability(struct.variability).name - variables.update( - { - struct.name.decode(): { - "reference": struct.reference, - "type": typ, - "causality": causality, - "variability": variability, - } - } - ) - return variables - - # def identify_variable_groups(self, component: str, include_all: bool = False) -> dict[str, any]: - # """Try to identify variable groups of the 'component', based on the assumption that variable names are structured. - # - # This function is experimental and designed as an aid to define variable aliases in case studies. - # Rule: variables must be of same type, causality and variability and must start with a common name to be in the same group. - # Note: The function assumes access to component model fmu files. - # """ - # - # def max_match(txt1: str, txt2: str) -> int: - # """Check equality of txt1 and txt2 letter for letter and return the position of first divergence.""" - # i = 0 - # for i, c in enumerate(txt1): - # if txt2[i] != c: - # return i - # return i - # - # assert component in self.components, f"Component {component} was not found in the system model" - # - # if not isinstance(self.components[component], Path): - # print(f"The fmu of of {component} does not seem to be accessible. {component} is registered as {self.components[component]}", - # ): - # return {} - # variables = from_xml(self.components[component], "modelDescription.xml").findall(".//ScalarVariable") - # groups = {} - # for i, var in enumerate(variables): - # if var is not None: # treated elements are set to None! - # group_name = "" - # group = [] - # for k in range(i + 1, len(variables)): # go through all other variables - # if variables[k] is not None: - # if ( - # var.attrib["causality"] == variables[k].attrib["causality"] - # and var.attrib["variability"] == variables[k].attrib["variability"] - # and var[0].tag == variables[k][0].tag - # and variables[k].attrib["name"].startswith(group_name) - # ): # is a candidate - # pos = max_match(var.attrib["name"], variables[k].attrib["name"]) - # if pos > len(group_name): # there is more commonality than so far identified - # group_name = var.attrib["name"][:pos] - # group = [i, k] - # elif len(group_name) and pos == len(group_name): # same commonality than so far identified - # group.append(k) - # if len(group_name): # var is in a group - # groups.update( - # { - # group_name: { - # "members": (variables[k].attrib["name"] for k in group), - # "description": var.get("description", ""), - # "references": (variables[k].attrib["valueReference"] for k in group), - # } - # } - # ) - # for k in group: - # variables[k] = None # treated - # if include_all: - # for var in variables: - # if var is not None: # non-grouped variable. Add that since include_all has been chosen - # groups.update( - # { - # var.attrib["name"]: { - # "members": (var.attrib["name"],), - # "description": var.get("description", ""), - # "references": (var.attrib["valueReference"],), - # } - # } - # ) - # return groups - - # def set_initial(self, instance: int, typ: int, var_refs: tuple[int], var_vals: tuple[PyVal]): - def set_initial(self, instance: int, typ: int, var_ref: int, var_val: PyVal): - """Provide an _initial_value set function (OSP only allows simple variables). - The signature is the same as the manipulator functions slave_real_values()..., - only that variables are set individually and the type is added as argument. - """ - if typ == CosimVariableType.REAL.value: - return self.simulator.real_initial_value(instance, var_ref, self.pytype(typ, var_val)) - elif typ == CosimVariableType.INTEGER.value: - return self.simulator.integer_initial_value(instance, var_ref, self.pytype(typ, var_val)) - elif typ == CosimVariableType.STRING.value: - return self.simulator.string_initial_value(instance, var_ref, self.pytype(typ, var_val)) - elif typ == CosimVariableType.BOOLEAN.value: - return self.simulator.boolean_initial_value(instance, var_ref, self.pytype(typ, var_val)) - - def set_variable_value(self, instance: int, typ: int, var_refs: tuple[int], var_vals: tuple[PyVal]) -> bool: - """Provide a manipulator function which sets the 'variable' (of the given 'instance' model) to 'value'. - - Args: - instance (int): identifier of the instance model for which the variable is to be set - var_refs (tuple): Tuple of variable references for which the values shall be set - var_vals (tuple): Tuple of values (of the correct type), used to set model variables - """ - _vals = [self.pytype(typ, x) for x in var_vals] # ensure list and correct type - if typ == CosimVariableType.REAL.value: - return self.manipulator.slave_real_values(instance, list(var_refs), _vals) - elif typ == CosimVariableType.INTEGER.value: - return self.manipulator.slave_integer_values(instance, list(var_refs), _vals) - elif typ == CosimVariableType.BOOLEAN.value: - return self.manipulator.slave_boolean_values(instance, list(var_refs), _vals) - elif typ == CosimVariableType.STRING.value: - return self.manipulator.slave_string_values(instance, list(var_refs), _vals) - else: - raise CaseUseError(f"Unknown type {typ}") from None - - def get_variable_value(self, instance: int, typ: int, var_refs: tuple[int, ...]): - """Provide an observer function which gets the 'variable' value (of the given 'instance' model) at the time when called. - - Args: - instance (int): identifier of the instance model for which the variable is to be set - var_refs (tuple): Tuple of variable references for which the values shall be retrieved - """ - if typ == CosimVariableType.REAL.value: - return self.observer.last_real_values(instance, list(var_refs)) - elif typ == CosimVariableType.INTEGER.value: - return self.observer.last_integer_values(instance, list(var_refs)) - elif typ == CosimVariableType.BOOLEAN.value: - return self.observer.last_boolean_values(instance, list(var_refs)) - elif typ == CosimVariableType.STRING.value: - return self.observer.last_string_values(instance, list(var_refs)) - else: - raise CaseUseError(f"Unknown type {typ}") from None - - @staticmethod - def pytype(fmu_type: str | int, val: PyVal | None = None): - """Return the python type of the FMU type provided as string or int (CosimEnums). - If val is None, the python type object is returned. Else if boolean, true or false is returned. - """ - fmu_type_str = CosimVariableType(fmu_type).name if isinstance(fmu_type, int) else fmu_type - typ = { - "real": float, - "integer": int, - "boolean": bool, - "string": str, - "enumeration": Enum, - }[fmu_type_str.lower()] - - if val is None: - return typ - elif typ is bool: - if isinstance(val, str): - return "true" in val.lower() # should be fmi2True and fmi2False - elif isinstance(val, int): - return bool(val) - else: - raise CaseInitError(f"The value {val} could not be converted to boolean") - else: - return typ(val) - - @staticmethod - def default_initial(causality: int, variability: int, max_possible: bool = False) -> int: - """Return default initial setting as int, as initial setting is not explicitly available in OSP. See p.50 FMI2. - maxPossible = True chooses the the initial setting with maximum allowance. - - * Causality: input=0, parameter=1, output=2, calc.par.=3, local=4, independent=5 (within OSP) - * Initial: exact=0, approx=1, calculated=2, none=3. - """ - code = ( - (3, 3, 0, 3, 0, 3), - (3, 0, 3, 1, 1, 3), - (3, 0, 3, 1, 1, 3), - (3, 3, 2, 3, 2, 3), - (3, 3, 2, 3, 2, 3), - )[variability][causality] - if max_possible: - return (0, 1, 0, 3)[code] # first 'possible value' in table - else: - return (0, 2, 2, 3)[code] # default value in table - - def allowed_action(self, action: str, comp: int | str, var: int | str | tuple, time: float): - """Check whether the action would be allowed according to FMI2 rules, see FMI2.01, p.49. - - * Unfortunately, the OSP interface does not explicitly provide the 'initial' setting, - such that we need to assume the default value as listed on p.50. - * OSP does not provide explicit access to 'before initialization' and 'during initialization'. - The rules for these two stages are therefore not distinguished - * if a tuple of variables is provided, the variables shall have equal properties - in addition to the normal allowed rules. - - Args: - action (str): Action type, 'set', 'get', including init actions (set at time 0) - comp (int,str): The instantiated component within the system (as index or name) - var (int,str,tuple): The variable(s) (of component) as reference or name - time (float): The time at which the action will be performed - """ - - def _description(name: str, info: dict, initial: int) -> str: - descr = f"Variable {name}, causality {CosimVariableCausality(info['causality']).name}" - descr += f", variability {CosimVariableVariability(var_info['variability']).name}" - descr += f", initial {('exact','approx','calculated','none')[initial]}" - return descr - - def _check(cond, msg): - if cond: - self.message = msg - return True - return False - - _type, _causality, _variability = (-1, -1, -1) # unknown - if isinstance(var, (int, str)): - var = (var,) - for v in var: - variables = self.get_variables(comp, v) - if _check(len(variables) != 1, f"Variable {v} of component {comp} was not found"): - return False - name, var_info = next(variables.items().__iter__()) - if _type < 0 or _causality < 0 or _variability < 0: # define the properties and check whether allowed - _type = var_info["type"] - _causality = var_info["causality"] - _variability = var_info["variability"] - initial = SimulatorInterface.default_initial(_causality, _variability, True) - - if action == "get": # no restrictions on get - pass - elif action == "set": - if _check( - _variability == 0, - f"Variable {name} is defined as 'constant' and cannot be set", - ): - return False - if _check( - _variability == 0, - f"Variable {name} is defined as 'constant' and cannot be set", - ): - return False - - if time == 0: # initialization - # initial settings 'exact', 'approx' or 'input' - if _check( - not (initial in (0, 1) or _causality == 0), - _description(name, var_info, initial) + " cannot be set before or during initialization.", - ): - return False - else: # at communication points - # 'parameter', 'tunable' or 'input - if _check( - not ((_causality == 1 and _variability == 2) or _causality == 0), - _description(name, var_info, initial) + " cannot be set at communication points.", - ): - return False - else: # check whether the properties are equal - if _check( - _type != var_info["type"], - _description(name, var_info, initial) + f" != type {_type}", - ): - return False - if _check( - _causality != var_info["causality"], - _description(name, var_info, initial) + f" != causality { _causality}", - ): - return False - if _check( - _variability != var_info["variability"], - _description(name, var_info, initial) + f" != variability {_variability}", - ): - return False - return True - - def variable_name_from_ref(self, comp: int | str, ref: int) -> str: - for name, info in self.get_variables(comp).items(): - if info["reference"] == ref: - return name - return "" - - def component_name_from_id(self, idx: int) -> str: - """Retrieve the component name from the given index, or an empty string if not found.""" - for slave_info in self.simulator.slave_infos(): - if slave_info.index == idx: - return slave_info.name.decode() - return "" - - def component_id_from_name(self, name: str) -> int: - """Get the component id from the name. -1 if not found.""" - id = self.simulator.slave_index_from_instance_name(name) - return id if id is not None else -1 diff --git a/src/sim_explorer/system_interface.py b/src/sim_explorer/system_interface.py new file mode 100644 index 0000000..17c9ed8 --- /dev/null +++ b/src/sim_explorer/system_interface.py @@ -0,0 +1,413 @@ +from enum import Enum +from pathlib import Path +from typing import Any, TypeAlias + +from sim_explorer.json5 import Json5 +from sim_explorer.utils.misc import from_xml, match_with_wildcard +from sim_explorer.utils.osp import read_system_structure_xml + +PyVal: TypeAlias = str | float | int | bool # simple python types / Json5 atom + + +class SystemInterface: + """Class providing the interface to the system itself, i.e. system information, component interface information + and the system co-simulation orchestrator (if simulations shall be run). + + This model provides the base class and is able to read system and component information. + To run simulations it must be overridden by a super class, e.g. SystemInterfaceOSP + + Provides the following functions: + + * set_variable_value: Set variable values initially or at communication points + * get_variable_value: Get variable values at communication points + * match_components: Identify component instances based on (tuple of) (short) names + * match_variables: Identify component variables of component (instance) matching a (short) name + + + A system model might be defined through an instantiated simulator or explicitly through the .fmu files. + Unfortunately, an instantiated simulator does not seem to have functions to access the FMU, + therefore only the (reduced) info from the simulator is used here (FMUs not used directly). + + Args: + structure_file (Path): Path to system model definition file + name (str)="System": Possibility to provide an explicit system name (if not provided by system file) + description (str)="": Optional possibility to provide a system description + log_level (str) = 'fatal': Per default the level is set to 'fatal', + but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes) + """ + + def __init__( + self, + structure_file: Path | str = "", + name: str | None = None, + description: str = "", + log_level: str = "fatal", + ): + self.structure_file = Path(structure_file) + self.name = name # overwrite if the system includes that + self.description = description # overwrite if the system includes that + self.system_structure = SystemInterface.read_system_structure(self.structure_file) + self._models = self._get_models() + # self.simulator=None # derived classes override this to instantiate the system simulator + self.message = "" # possibility to save additional message for (optional) retrieval by client + self.full_simulator_available = False # only system and components specification available. No simulation! + + @property + def path(self): + return self.structure_file.resolve().parent + + @staticmethod + def read_system_structure(file: Path, fmus_exist: bool = True): + """Read the systemStructure file and perform checks. + + Returns + ------- + The system structure as (json) dict as if the structure was read through osp_system_structure_from_js5 + """ + assert file.exists(), f"System structure {file} not found" + if file.suffix == ".xml": # assume the standard OspSystemStructure.xml file + system_structure = read_system_structure_xml(file) + elif file.suffix in (".js5", ".json"): # assume the js5 variant of the OspSystemStructure + system_structure = Json5(file).js_py + elif file.suffix == ".ssp": + # see https://ssp-standard.org/publications/SSP10/SystemStructureAndParameterization10.pdf + raise NotImplementedError("The SSP file variant is not yet implemented") from None + else: + raise KeyError(f"Unknown file type {file.suffix} for System Structure file") from None + for comp in system_structure["Simulators"].values(): + comp["source"] = (file.parent / comp["source"]).resolve() + assert not fmus_exist or comp["source"].exists(), f"FMU {comp['source']} not found" + return system_structure + + @property + def components(self): + """Return an iterator over all components (instances). + Each component is represented by a dict , together with the stem of their fmu files. + + Note: there can be several instances per model (FMU) + """ + for k, v in self.system_structure["Simulators"].items(): + source = v["source"] + yield (k, {"model": source.stem, "source": source}) + + def _get_models(self) -> dict: + """Get a dict of the models in the system: + { : {'source':, 'components':[component-list], 'variables':{variables-dict}. + """ + mods = {} + for k, v in self.components: + if v["model"] not in mods: + mods.update( + { + v["model"]: { + "source": v["source"], + "components": [k], + "variables": self._get_variables(v["source"]), + } + } + ) + else: + mods[v["model"]]["components"].append(k) + return mods + + @property + def models(self) -> dict: + return self._models + + def match_components(self, comps: str | tuple[str, ...]) -> tuple[str, tuple]: + """Identify component (instances) based on 'comps' (component alias or tuple of aliases). + comps can be a (tuple of) full component names or component names with wildcards. + Returned components shall be based on the same model. + """ + if isinstance(comps, str): + comps = (comps,) + collect = [] + model = None + for c in comps: + for k, v in self.components: + if match_with_wildcard(c, k): + if model is None: + model = v["model"] + if v["model"] == model and k not in collect: + collect.append(k) + assert model is not None and len(collect), f"No component match for {comps}" + return (model, tuple(collect)) + + def _get_variables(self, source: Path) -> dict[str, dict]: + """Get the registered variables for a given model (added to _models dict). + + Returns + ------- + A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability + """ + assert source.exists() and source.suffix == ".fmu", f"FMU file {source} not found or wrong suffix" + md = from_xml(source, sub="modelDescription.xml") + variables = {} + for sv in md.findall(".//ScalarVariable"): + name = sv.attrib.pop("name") + vr = int(sv.attrib.pop("valueReference")) + var: dict[str, Any] = {k: v for k, v in sv.attrib.items()} + var.update({"reference": vr}) + typ = sv[0] + var.update({"type": SystemInterface.pytype(typ.tag)}) + var.update(typ.attrib) + variables.update({name: var}) + return variables + + def variables(self, comp: str | int) -> dict: + """Get the registered variables for a given component from the system. + + Args: + comp (str, int): The component name or its index within the model + + Returns + ------- + A dictionary of variable {names:info, ...}, where info is a dictionary containing reference, type, causality and variability + """ + if isinstance(comp, str): + for k, c in self.components: + if k == comp: + return self.models[c["model"]]["variables"] + elif isinstance(comp, int): + for i, (_, c) in enumerate(self.components): + if i == comp: + return self.models[c["model"]]["variables"] + else: + raise AssertionError(f"Unallowed argument {comp} in 'variables'") + raise KeyError(f"Component {comp} not found. Avalable components: {list(self.components)}") from None + + def variable_iter(self, variables: dict, flt: int | str | tuple | list): + """Get the variable dicts of the variables refered to by ids. + + Returns: Iterator over the dicts of the selected variables + """ + if isinstance(flt, (int, str)): + ids = [flt] + elif isinstance(flt, (tuple, list)): + ids = list(flt) + else: + raise ValueError(f"Unknown filter specification {flt} for variables") from None + if isinstance(ids[0], str): # by name + for i in ids: + if i in variables: + yield (i, variables[i]) + else: # by reference + for v, info in variables.items(): + if info["reference"] in ids: + yield (v, info) + + def match_variables(self, component: str, varname: str) -> tuple[int]: + """Based on an example component (instance), identify unique variables starting with 'varname'. + The returned information applies to all instances of the same model. + The variables shall all be of the same type, causality and variability. + + Args: + component: component instance varname. + varname (str): the varname to search for. This can be the full varname or only the start of the varname + If only the start of the varname is supplied, all matching variables are collected. + + Returns + ------- + Tuple of value references + """ + + def accept_as_alias(org: str) -> bool: + """Decide whether the alias can be accepted with respect to org (uniqueness).""" + if not org.startswith(varname): # necessary requirement + return False + rest = org[len(varname) :] + if not len(rest) or any(rest.startswith(c) for c in ("[", ".")): + return True + return False + + var = [] + assert hasattr(self, "components"), "Need the dictionary of components before maching variables" + + accepted = None + for k, v in self.variables(component).items(): + if accept_as_alias(k): + if accepted is None: + accepted = v + assert all( + v[e] == accepted[e] for e in ("type", "causality", "variability") + ), f"Variable {k} matches {varname}, but properties do not match" + var.append(v["reference"]) + return tuple(var) + + def variable_name_from_ref(self, comp: int | str, ref: int) -> str: + """Get the variable name from its component instant (id or name) and its valueReference.""" + for name, info in self.variables(comp).items(): + if info["reference"] == ref: + return name + return "" + + def component_name_from_id(self, idx: int) -> str: + """Retrieve the component name from the given index. + Return an empty string if not found. + """ + for i, (k, _) in enumerate(self.components): + if i == idx: + return k + return "" + + def component_id_from_name(self, name: str) -> int: + """Get the component id from the name. -1 if not found.""" + for i, (k, _) in enumerate(self.components): + if k == name: + return i + return -1 + + @staticmethod + def pytype(fmu_type: str, val: PyVal | None = None): + """Return the python type of the FMU type provided as string. + If val is None, the python type object is returned. Else if boolean, true or false is returned. + """ + typ = { + "real": float, + "integer": int, + "boolean": bool, + "string": str, + "enumeration": Enum, + }[fmu_type.lower()] + + if val is None: + return typ + elif typ is bool: + if isinstance(val, str): + return "true" in val.lower() # should be fmi2True and fmi2False + elif isinstance(val, int): + return bool(val) + else: + raise KeyError(f"The value {val} could not be converted to boolean") + else: + return typ(val) + + @staticmethod + def default_initial(causality: str, variability: str, only_default: bool = True) -> str | int | tuple: + """Return default initial setting as str. See p.50 FMI2. + With only_default, the single allowed value, or '' is returned. + Otherwise a tuple of possible values is returned where the default value is always listed first. + """ + col = {"parameter": 0, "calculated_parameter": 1, "input": 2, "output": 3, "local": 4, "independent": 5}[ + causality + ] + row = {"constant": 0, "fixed": 1, "tunable": 2, "discrete": 3, "continuous": 4}[variability] + init = ( + (-1, -1, -1, 7, 10, -3), + (1, 3, -4, -5, 11, -3), + (2, 4, -4, -5, 12, -3), + (-2, -2, 5, 8, 13, -3), + (-2, -2, 6, 9, 14, 15), + )[row][col] + if init < 0: # "Unallowed combination {variability}, {causality}. See '{chr(96-init)}' in FMI standard" + return init if only_default else (init,) + elif init in (1, 2, 7, 10): + return "exact" if only_default else ("exact",) + elif init in (3, 4, 11, 12): + return "calculated" if only_default else ("calculated", "approx") + elif init in (8, 9, 13, 14): + return "calculated" if only_default else ("calculated", "exact", "approx") + else: + return init if only_default else (init,) + + def allowed_action(self, action: str, comp: int | str, var: int | str | tuple | list, time: float): + """Check whether the action would be allowed according to FMI2 rules, see FMI2.01, p.49. + + * if a tuple of variables is provided, the variables shall have equal properties + in addition to the normal allowed rules. + + Args: + action (str): Action type, 'set', 'get', including init actions (set at time 0) + comp (int,str): The instantiated component within the system (as index or name) + var (int,str,tuple): The variable(s) (of component) as reference or name + time (float): The time at which the action will be performed + """ + + def _description(name: str, info: dict, initial: int) -> str: + descr = f"Variable {name}, causality {var_info['causality']}" + descr += f", variability {var_info['variability']}" + descr += f", initial {_initial}" + return descr + + def _check(cond, msg): + if not cond: + self.message = msg + return False + return True + + _type, _causality, _variability = ("", "", "") # unknown + + variables = self.variables(comp) + for name, var_info in self.variable_iter(variables, var): + # if not _check(name in variables, f"Variable {name} of component {comp} was not found"): + # print("VARIABLES", variables) + # return False + # var_info = variables[name] + if _type == "" or _causality == "" or _variability == "": # define the properties and check whether allowed + _type = var_info["type"] + _causality = var_info["causality"] + _variability = var_info["variability"] + _initial = var_info.get("initial", SystemInterface.default_initial(_causality, _variability)) + + if action == "get": # no restrictions on get + pass + elif action == "set": + if ( + time < 0 # before EnterInitializationMode + and not _check( + (_variability != "constant" and _initial in ("exact", "approx")), + f"Change of {name} before EnterInitialization", + ) + ): + return False + + elif ( + time == 0 # before ExitInitializationMode + and not _check( + (_variability != "constant" and (_initial == "exact" or _causality == "input")), + f"Change of {name} during Initialization", + ) + ): + return False + elif ( + time > 0 # at communication points + and not _check( + (_causality == "parameter" and _variability == "tunable") or _causality == "input", + f"Change of {name} at communication point", + ) + ): + return False + # additional rule for ModelExchange, not listed here + else: # check whether the properties are equal + if not _check(_type == var_info["type"], _description(name, var_info, _initial) + f" != type {_type}"): + return False + if not _check( + _causality == var_info["causality"], + _description(name, var_info, _initial) + f" != causality { _causality}", + ): + return False + if not _check( + _variability == var_info["variability"], + _description(name, var_info, _initial) + f" != variability {_variability}", + ): + return False + return True + + def set_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...], var_vals: tuple) -> bool: + """Provide a manipulator function which sets the 'variable' (of the given 'instance' model) to 'value'. + + Args: + instance (int): identifier of the instance model for which the variable is to be set + var_refs (tuple): Tuple of variable references for which the values shall be set + var_vals (tuple): Tuple of values (of the correct type), used to set model variables + """ + raise NotImplementedError("The method 'set_variable_value()' cannot be used in SystemInterface") from None + + def get_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...]): + """Provide an observer function which gets the 'variable' value (of the given 'instance' model) at the time when called. + + Args: + instance (int): identifier of the instance model for which the variable is to be set + var_refs (tuple): Tuple of variable references for which the values shall be retrieved + """ + raise NotImplementedError("The method 'get_variable_value()' cannot be used in SystemInterface") from None diff --git a/src/sim_explorer/system_interface_osp.py b/src/sim_explorer/system_interface_osp.py new file mode 100644 index 0000000..87df2d0 --- /dev/null +++ b/src/sim_explorer/system_interface_osp.py @@ -0,0 +1,107 @@ +from pathlib import Path + +from libcosimpy.CosimExecution import CosimExecution +from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore +from libcosimpy.CosimManipulator import CosimManipulator # type: ignore +from libcosimpy.CosimObserver import CosimObserver # type: ignore + +from sim_explorer.system_interface import SystemInterface + + +class SystemInterfaceOSP(SystemInterface): + """Implements the SystemInterface as a OSP. + + Args: + structure_file (Path): Path to system model definition file + name (str)="System": Possibility to provide an explicit system name (if not provided by system file) + description (str)="": Optional possibility to provide a system description + log_level (str) = 'fatal': Per default the level is set to 'fatal', + but it can be set to 'trace', 'debug', 'info', 'warning', 'error' or 'fatal' (e.g. for debugging purposes) + """ + + def __init__( + self, + structure_file: Path | str = "", + name: str | None = None, + description: str = "", + log_level: str = "fatal", + ): + super().__init__(structure_file, name, description, log_level) + log_output_level(CosimLogLevel[log_level.upper()]) + # ck, msg = self._check_system_structure(self.sysconfig) + # assert ck, msg + self.full_simulator_available = True # system and components specification + simulation capabilities + self.simulator = CosimExecution.from_osp_config_file(str(self.structure_file)) + assert isinstance(self.simulator, CosimExecution) + # Instantiate a suitable manipulator for changing variables. + self.manipulator = CosimManipulator.create_override() + assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object" + + # Instantiate a suitable observer for collecting results. + self.observer = CosimObserver.create_last_value() + assert self.simulator.add_observer(observer=self.observer), "Could not add observer object" + + def reset(self): # , cases:Cases): + """Reset the simulator interface, so that a new simulation can be run.""" + assert isinstance( + self.structure_file, Path + ), "Simulator resetting does not work with explicitly supplied simulator." + assert self.structure_file.exists(), "Simulator resetting does not work with explicitly supplied simulator." + assert isinstance(self.manipulator, CosimManipulator) + assert isinstance(self.observer, CosimObserver) + # self.simulator = self._simulator_from_config(self.sysconfig) + self.simulator = CosimExecution.from_osp_config_file(str(self.structure_file)) + assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object" + assert self.simulator.add_observer(observer=self.observer), "Could not add observer object" + + def set_initial(self, instance: int, typ: type, var_ref: int, var_val: str | float | int | bool): + """Provide an _initial_value set function (OSP only allows simple variables). + The signature is the same as the manipulator functions slave_real_values()..., + only that variables are set individually and the type is added as argument. + """ + if typ is float: + return self.simulator.real_initial_value(instance, var_ref, typ(var_val)) + elif typ is int: + return self.simulator.integer_initial_value(instance, var_ref, typ(var_val)) + elif typ is str: + return self.simulator.string_initial_value(instance, var_ref, typ(var_val)) + elif typ is bool: + return self.simulator.boolean_initial_value(instance, var_ref, typ(var_val)) + + def set_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...], var_vals: tuple) -> bool: + """Provide a manipulator function which sets the 'variable' (of the given 'instance' model) to 'value'. + + Args: + instance (int): identifier of the instance model for which the variable is to be set + var_refs (tuple): Tuple of variable references for which the values shall be set + var_vals (tuple): Tuple of values (of the correct type), used to set model variables + """ + _vals = [typ(x) for x in var_vals] # ensure list and correct type + if typ is float: + return self.manipulator.slave_real_values(instance, list(var_refs), _vals) + elif typ is int: + return self.manipulator.slave_integer_values(instance, list(var_refs), _vals) + elif typ is bool: + return self.manipulator.slave_boolean_values(instance, list(var_refs), _vals) + elif typ is str: + return self.manipulator.slave_string_values(instance, list(var_refs), _vals) + else: + raise ValueError(f"Unknown type {typ}") from None + + def get_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...]): + """Provide an observer function which gets the 'variable' value (of the given 'instance' model) at the time when called. + + Args: + instance (int): identifier of the instance model for which the variable is to be set + var_refs (tuple): Tuple of variable references for which the values shall be retrieved + """ + if typ is float: + return self.observer.last_real_values(instance, list(var_refs)) + elif typ is int: + return self.observer.last_integer_values(instance, list(var_refs)) + elif typ is bool: + return self.observer.last_boolean_values(instance, list(var_refs)) + elif typ is str: + return self.observer.last_string_values(instance, list(var_refs)) + else: + raise ValueError(f"Unknown type {typ}") from None diff --git a/tests/data/BouncingBall0/BouncingBall.fmu b/tests/data/BouncingBall0/BouncingBall.fmu index 8ff6b1b..fe755e6 100644 Binary files a/tests/data/BouncingBall0/BouncingBall.fmu and b/tests/data/BouncingBall0/BouncingBall.fmu differ diff --git a/tests/data/BouncingBall3D/BouncingBall3D.cases b/tests/data/BouncingBall3D/BouncingBall3D.cases index d43fb64..be9cfd4 100644 --- a/tests/data/BouncingBall3D/BouncingBall3D.cases +++ b/tests/data/BouncingBall3D/BouncingBall3D.cases @@ -2,7 +2,7 @@ name : 'BouncingBall3D', description : 'Simple sim explorer with the 3D BouncingBall FMU (3D position and speed', modelFile : "OspSystemStructure.xml", - logLevel : "FATAL", + logLevel : "fatal", timeUnit : "second", variables : { g : ['bb', 'g', "Gravity acting on the ball"], diff --git a/tests/data/BouncingBall3D/crane_table.xml b/tests/data/BouncingBall3D/crane_table.xml new file mode 100644 index 0000000..e15cd3b --- /dev/null +++ b/tests/data/BouncingBall3D/crane_table.xml @@ -0,0 +1,15 @@ + + 0.0 + 0.01 + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/data/BouncingBall3D/systemModel.xml b/tests/data/BouncingBall3D/systemModel.xml new file mode 100644 index 0000000..e15cd3b --- /dev/null +++ b/tests/data/BouncingBall3D/systemModel.xml @@ -0,0 +1,15 @@ + + 0.0 + 0.01 + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/data/Oscillator/HarmonicOscillator.fmu b/tests/data/Oscillator/HarmonicOscillator.fmu index 4c69a35..2ad3e37 100644 Binary files a/tests/data/Oscillator/HarmonicOscillator.fmu and b/tests/data/Oscillator/HarmonicOscillator.fmu differ diff --git a/tests/test_BouncingBall.py b/tests/test_BouncingBall.py index 46a51de..e141e9b 100644 --- a/tests/test_BouncingBall.py +++ b/tests/test_BouncingBall.py @@ -15,7 +15,7 @@ def nearly_equal(res: tuple, expected: tuple, eps=1e-7): def test_run_fmpy(show): """Test and validate the basic BouncingBall using fmpy and not using OSP or sim_explorer.""" - path = Path(Path(__file__).parent, "data/BouncingBall0/BouncingBall.fmu") + path = Path(__file__).parent / "data" / "BouncingBall0" / "BouncingBall.fmu" assert path.exists(), f"File {path} does not exist" stepsize = 0.01 result = simulate_fmu( @@ -61,3 +61,4 @@ def test_run_fmpy(show): if __name__ == "__main__": retcode = pytest.main(["-rA", "-v", __file__, "--show", "True"]) assert retcode == 0, f"Non-zero return code {retcode}" + # test_run_fmpy(show=True) diff --git a/tests/test_case.py b/tests/test_case.py index 12ee142..c53edf6 100644 --- a/tests/test_case.py +++ b/tests/test_case.py @@ -6,7 +6,7 @@ from sim_explorer.case import Case, Cases from sim_explorer.json5 import Json5 -from sim_explorer.simulator_interface import SimulatorInterface +from sim_explorer.system_interface import SystemInterface @pytest.fixture @@ -72,7 +72,7 @@ def _make_cases(): } js = Json5(json5) js.write("data/test.cases") - _ = SimulatorInterface("data/OspSystemStructure.xml", "testSystem") + _ = SystemInterface("data/OspSystemStructure.xml") _ = Cases("data/test.cases") @@ -189,17 +189,17 @@ def test_case_set_get(simpletable): # print(f"ACT_SET: {caseX.act_set[0.0][0]}") #! set_initial, therefore no tuples! assert caseX.act_set[0.0][0].func.__name__ == "set_initial", "function name" assert caseX.act_set[0.0][0].args[0] == 0, "model instance" - assert caseX.act_set[0.0][0].args[1] == 3, f"variable type {caseX.act_set[0.0][0].args[1]}" + assert caseX.act_set[0.0][0].args[1] is bool, f"variable type {caseX.act_set[0.0][0].args[1]}" assert caseX.act_set[0.0][0].args[2] == 3, f"variable ref {caseX.act_set[0.0][0].args[2]}" assert caseX.act_set[0.0][0].args[3], f"variable value {caseX.act_set[0.0][0].args[3]}" # print(caseX.act_set[0.0][0]) assert caseX.act_set[0.0][0].args[0] == 0, "model instance" - assert caseX.act_set[0.0][0].args[1] == 3, f"variable type {caseX.act_set[0.0][0].args[1]}" + assert caseX.act_set[0.0][0].args[1] is bool, f"variable type {caseX.act_set[0.0][0].args[1]}" assert caseX.act_set[0.0][0].args[2] == 3, f"variable ref {caseX.act_set[0.0][0].args[2]}" assert caseX.act_set[0.0][0].args[3] is True, f"variable value {caseX.act_set[0.0][0].args[3]}" # print(f"ACT_GET: {caseX.act_get}") assert caseX.act_get[1e9][0].args[0] == 0, "model instance" - assert caseX.act_get[1e9][0].args[1] == 0, "variable type" + assert caseX.act_get[1e9][0].args[1] is float, "variable type" assert caseX.act_get[1e9][0].args[2] == (0,), f"variable refs {caseX.act_get[1e9][0].args[2]}" # print( "PRINT", caseX.act_get[-1][0].args[2]) assert caseX.act_get[-1][0].args[2] == ( diff --git a/tests/test_cases.py b/tests/test_cases.py index 33f4050..28089be 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -3,7 +3,7 @@ import pytest from sim_explorer.case import Case, Cases -from sim_explorer.simulator_interface import SimulatorInterface +from sim_explorer.system_interface_osp import SystemInterfaceOSP # def test_tuple_iter(): # """Test of the features provided by the Case class""" @@ -34,8 +34,9 @@ def test_cases_management(): def test_cases(): """Test of the features provided by the Cases class""" - sim = SimulatorInterface(str(Path(__file__).parent / "data" / "BouncingBall0" / "OspSystemStructure.xml")) - cases = Cases(Path(__file__).parent / "data" / "BouncingBall0" / "BouncingBall.cases", sim) + cases = Cases( + Path(__file__).parent / "data" / "BouncingBall0" / "BouncingBall.cases", simulator_type=SystemInterfaceOSP + ) c: str | Case print(cases.info()) @@ -74,9 +75,9 @@ def test_cases(): assert cases.variables["h"]["instances"] == ("bb",) assert cases.variables["h"]["variables"] == (1,) assert cases.variables["h"]["description"] == "Position (z) of the ball" - assert cases.variables["h"]["type"] == 0 - assert cases.variables["h"]["causality"] == 2 - assert cases.variables["h"]["variability"] == 4 + assert cases.variables["h"]["type"] is float + assert cases.variables["h"]["causality"] == "output", f"Found {cases.variables['h']['causality']}" + assert cases.variables["h"]["variability"] == "continuous", f"Found {cases.variables['h']['variability']}" vs = dict((k, v) for k, v in cases.variables.items() if k.startswith("v")) assert all(x in vs for x in ("v_min", "v_z", "v")) diff --git a/tests/test_results.py b/tests/test_results.py index a73ff88..56a5e8d 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,6 +1,8 @@ from datetime import datetime from pathlib import Path +import pytest + from sim_explorer.case import Cases, Results @@ -65,13 +67,12 @@ def test_retrieve(): if __name__ == "__main__": - # retcode = pytest.main(["-rA", "-v", __file__, "--show", "True"]) - # assert retcode == 0, f"Non-zero return code {retcode}" - import os - - os.chdir(Path(__file__).parent.absolute() / "test_working_directory") + retcode = pytest.main(["-rA", "-v", __file__, "--show", "True"]) + assert retcode == 0, f"Non-zero return code {retcode}" + # import os + # os.chdir(Path(__file__).parent.absolute() / "test_working_directory") # test_retrieve() # test_init() # test_add() - test_plot_time_series(show=True) + # test_plot_time_series(show=True) # test_inspect() diff --git a/tests/test_run_bouncingball0.py b/tests/test_run_bouncingball0.py index 6e234be..386b732 100644 --- a/tests/test_run_bouncingball0.py +++ b/tests/test_run_bouncingball0.py @@ -6,7 +6,7 @@ from sim_explorer.case import Case, Cases from sim_explorer.json5 import Json5 -from sim_explorer.simulator_interface import SimulatorInterface +from sim_explorer.system_interface_osp import SystemInterfaceOSP def expected_actions(case: Case, act: dict, expect: dict): @@ -35,7 +35,7 @@ def expected_actions(case: Case, act: dict, expect: dict): args[k] = (action.args[k],) # type: ignore[call-overload] arg = [ sim.component_name_from_id(action.args[0]), - SimulatorInterface.pytype(action.args[1]), + action.args[1], tuple(sim.variable_name_from_ref(comp=action.args[0], ref=ref) for ref in args[2]), # type: ignore[attr-defined] ] for k in range(1, len(action.args)): @@ -75,7 +75,7 @@ def test_step_by_step(): """Do the simulation step-by step, only using libcosimpy""" path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") assert path.exists(), "System structure file not found" - sim = SimulatorInterface(path) + sim = SystemInterfaceOSP(path) assert sim.simulator.real_initial_value(0, 6, 0.35), "Setting of 'e' did not work" for t in np.linspace(1, 1e9, 100): sim.simulator.simulate_until(t) @@ -92,17 +92,17 @@ def test_step_by_step_interface(): """Do the simulation step by step, using the simulatorInterface""" path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") assert path.exists(), "System structure file not found" - sim = SimulatorInterface(path) + sim = SystemInterfaceOSP(path) # Commented out as order of variables and models are not guaranteed in different OS # assert sim.components["bb"] == 0 # print(f"Variables: {sim.get_variables( 0, as_numbers = False)}") # assert sim.get_variables(0)["e"] == {"reference": 6, "type": 0, "causality": 1, "variability": 2} - sim.set_initial(0, 0, 6, 0.35) + sim.set_variable_value(0, float, (6,), (0.35,)) for t in np.linspace(1, 1e9, 1): sim.simulator.simulate_until(t) - print(sim.get_variable_value(instance=0, typ=0, var_refs=(0, 1, 6))) + assert sim.get_variable_value(instance=0, typ=float, var_refs=(0, 1, 6)) == [0.01, 0.99955855, 0.35] if t == int(0.11 * 1e9): - assert sim.get_variable_value(instance=0, typ=0, var_refs=(0, 1, 6)) == [ + assert sim.get_variable_value(instance=0, typ=float, var_refs=(0, 1, 6)) == [ 0.11, 0.9411890500000001, 0.35, @@ -243,3 +243,5 @@ def test_run_cases(): retcode = pytest.main(["-rA", "-v", __file__]) assert retcode == 0, f"Non-zero return code {retcode}" # test_run_cases() + # test_step_by_step() + # test_step_by_step_interface() diff --git a/tests/test_run_mobilecrane.py b/tests/test_run_mobilecrane.py index 8c9354a..130ee37 100644 --- a/tests/test_run_mobilecrane.py +++ b/tests/test_run_mobilecrane.py @@ -10,11 +10,15 @@ from sim_explorer.case import Case, Cases from sim_explorer.json5 import Json5 -from sim_explorer.simulator_interface import SimulatorInterface +from sim_explorer.system_interface_osp import SystemInterfaceOSP @pytest.fixture(scope="session") def mobile_crane_fmu(): + return _mobile_crane_fmu() + + +def _mobile_crane_fmu(): return Path(__file__).parent / "data" / "MobileCrane" / "MobileCrane.fmu" @@ -110,15 +114,15 @@ def set_initial(name: str, value: float, slave: int = 0): sim.simulate_until(step_count * 1e9) -# @pytest.mark.skip("Alternative step-by step, using SimulatorInterface and Cases") +# @pytest.mark.skip("Alternative step-by step, using SystemInterfaceOSP and Cases") def test_step_by_step_cases(mobile_crane_fmu): - sim: SimulatorInterface + sim: SystemInterfaceOSP cosim: CosimExecution def get_ref(name: str): - variable = cases.simulator.get_variables(0, name) + variable = cases.simulator.variables(0)[name] assert len(variable), f"Variable {name} not found" - return next(iter(variable.values()))["reference"] + return variable["reference"] def set_initial(name: str, value: float, slave: int = 0): for idx in range(cosim.num_slave_variables(slave_index=slave)): @@ -126,16 +130,25 @@ def set_initial(name: str, value: float, slave: int = 0): return cosim.real_initial_value(slave_index=slave, variable_reference=idx, value=value) def initial_settings(): - cases.simulator.set_initial(0, 0, get_ref("pedestal_boom[0]"), 3.0) - cases.simulator.set_initial(0, 0, get_ref("boom_boom[0]"), 8.0) - cases.simulator.set_initial(0, 0, get_ref("boom_boom[1]"), 0.7854) - cases.simulator.set_initial(0, 0, get_ref("rope_boom[0]"), 1e-6) - cases.simulator.set_initial(0, 0, get_ref("dLoad"), 50.0) + cases.simulator.set_variable_value( + 0, + float, + ( + get_ref("pedestal_boom[0]"), + get_ref("boom_boom[0]"), + get_ref("boom_boom[1]"), + get_ref("rope_boom[0]"), + get_ref("dLoad"), + ), + (3.0, 8.0, 0.7854, 1e-6, 50.0), + ) system = Path(Path(__file__).parent / "data" / "MobileCrane" / "OspSystemStructure.xml") assert system.exists(), f"OspSystemStructure file {system} not found" - sim = SimulatorInterface(system) - assert sim.get_components() == {"mobileCrane": 0}, f"Found component {sim.get_components()}" + sim = SystemInterfaceOSP(system) + print("COMP", {k: v for k, v in sim.components}) + expected = {k: v for k, v in sim.components} + assert isinstance(expected["mobileCrane"], dict), f"Found components {expected}" path = Path(Path(__file__).parent, "data/MobileCrane/MobileCrane.cases") assert path.exists(), "Cases file not found" @@ -157,7 +170,7 @@ def initial_settings(): "timeUnit", "variables", ] - cases = Cases(path, sim) + cases = Cases(path) print("INFO", cases.info()) static = cases.case_by_name("static") assert static is not None @@ -169,10 +182,10 @@ def initial_settings(): } assert static.act_get[-1][0].args == ( 0, - 0, + float, (10, 11, 12), ), f"Step action arguments {static.act_get[-1][0].args}" - assert sim.get_variable_value(0, 0, (10, 11, 12)) == [ + assert sim.get_variable_value(0, float, (10, 11, 12)) == [ 0.0, 0.0, 0.0, @@ -250,15 +263,14 @@ def initial_settings(): # cases.simulator.set_variable_value(0, 0, (get_ref("boom_angularVelocity"),), (0.7,)) -# @pytest.mark.skip("Alternative only using SimulatorInterface") +# @pytest.mark.skip("Alternative only using SystemInterfaceOSP") def test_run_basic(): path = Path(Path(__file__).parent / "data" / "MobileCrane" / "OspSystemStructure.xml") assert path.exists(), "System structure file not found" - sim = SimulatorInterface(path) + sim = SystemInterfaceOSP(path) sim.simulator.simulate_until(1e9) -# @pytest.mark.skip("So far not working. Need to look into that: Run all cases defined in MobileCrane.cases") def test_run_cases(): path = Path(Path(__file__).parent / "data" / "MobileCrane" / "MobileCrane.cases") # system_structure = Path(Path(__file__).parent, "data/MobileCrane/OspSystemStructure.xml") @@ -270,10 +282,10 @@ def test_run_cases(): static = cases.case_by_name("static") assert static is not None assert static.act_get[-1][0].func.__name__ == "get_variable_value" - assert static.act_get[-1][0].args == (0, 0, (10, 11, 12)) - assert static.act_get[-1][1].args == (0, 0, (21, 22, 23)) - assert static.act_get[-1][2].args == (0, 0, (37, 38, 39)) - assert static.act_get[-1][3].args == (0, 0, (53, 54, 55)) + assert static.act_get[-1][0].args == (0, float, (10, 11, 12)) + assert static.act_get[-1][1].args == (0, float, (21, 22, 23)) + assert static.act_get[-1][2].args == (0, float, (37, 38, 39)) + assert static.act_get[-1][3].args == (0, float, (53, 54, 55)) print("Running case 'base'...") case = cases.case_by_name("base") @@ -315,4 +327,4 @@ def test_run_cases(): # test_step_by_step_cosim(_mobile_crane_fmu()) # test_step_by_step_cases(_mobile_crane_fmu()) # test_run_basic(_mobile_crane_fmu()) - # test_run_cases(_mobile_crane_fmu()) + # test_run_cases() diff --git a/tests/test_simulator_interface.py b/tests/test_simulator_interface.py deleted file mode 100644 index d07859c..0000000 --- a/tests/test_simulator_interface.py +++ /dev/null @@ -1,134 +0,0 @@ -from pathlib import Path - -import pytest -from libcosimpy.CosimExecution import CosimExecution - -from sim_explorer.simulator_interface import SimulatorInterface -from sim_explorer.utils.misc import match_with_wildcard - - -def test_match_with_wildcard(): - assert match_with_wildcard("Hello World", "Hello World"), "Match expected" - assert not match_with_wildcard("Hello World", "Helo World"), "No match expected" - assert match_with_wildcard("*o World", "Hello World"), "Match expected" - assert not match_with_wildcard("*o W*ld", "Hello Word"), "No match expected" - assert match_with_wildcard("*o W*ld", "Hello World"), "Two wildcard matches expected" - - -def test_pytype(): - assert SimulatorInterface.pytype("REAL", "2.3") == 2.3, "Expected 2.3 as float type" - assert SimulatorInterface.pytype("Integer", "99") == 99, "Expected 99 as int type" - assert SimulatorInterface.pytype("Boolean", "fmi2True"), "Expected True as bool type" - assert not SimulatorInterface.pytype("Boolean", "fmi2false"), "Expected True as bool type" - assert SimulatorInterface.pytype("String", "fmi2False") == "fmi2False", "Expected fmi2False as str type" - with pytest.raises(ValueError) as err: - SimulatorInterface.pytype("Real", "fmi2False") - assert str(err.value).startswith("could not convert string to float:"), "No error raised as expected" - assert SimulatorInterface.pytype(0) is float - assert SimulatorInterface.pytype(1) is int - assert SimulatorInterface.pytype(2) is str - assert SimulatorInterface.pytype(3) is bool - assert SimulatorInterface.pytype(1, 2.3) == 2 - - -def test_component_variable_name(): - path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") - system = SimulatorInterface(str(path), name="BouncingBall") - """ - Slave order is not guaranteed in different OS - assert 1 == system.simulator.slave_index_from_instance_name("bb") - assert 0 == system.simulator.slave_index_from_instance_name("bb2") - assert 2 == system.simulator.slave_index_from_instance_name("bb3") - assert system.components["bb"] == 0, f"Error in unique model index. Found {system.components['bb']}" - """ - assert system.variable_name_from_ref("bb", 0) == "time" - assert system.variable_name_from_ref("bb", 1) == "h" - assert system.variable_name_from_ref("bb", 2) == "der(h)" - assert system.variable_name_from_ref("bb", 3) == "v" - assert system.variable_name_from_ref("bb", 4) == "der(v)" - assert system.variable_name_from_ref("bb", 5) == "g" - assert system.variable_name_from_ref("bb", 6) == "e" - assert system.variable_name_from_ref("bb", 7) == "v_min" - assert system.variable_name_from_ref("bb", 8) == "" - - -def test_default_initial(): - print("DIR", dir(SimulatorInterface)) - assert SimulatorInterface.default_initial(0, 0) == 3, f"Found {SimulatorInterface.default_initial( 0, 0)}" - assert SimulatorInterface.default_initial(1, 0) == 3, f"Found {SimulatorInterface.default_initial( 1, 0)}" - assert SimulatorInterface.default_initial(2, 0) == 0, f"Found {SimulatorInterface.default_initial( 2, 0)}" - assert SimulatorInterface.default_initial(3, 0) == 3, f"Found {SimulatorInterface.default_initial( 3, 0)}" - assert SimulatorInterface.default_initial(4, 0) == 0, f"Found {SimulatorInterface.default_initial( 4, 0)}" - assert SimulatorInterface.default_initial(5, 0) == 3, f"Found {SimulatorInterface.default_initial( 5, 0)}" - assert SimulatorInterface.default_initial(1, 1) == 0, f"Found {SimulatorInterface.default_initial( 1, 1)}" - assert SimulatorInterface.default_initial(1, 2) == 0, f"Found {SimulatorInterface.default_initial( 1, 1)}" - assert SimulatorInterface.default_initial(1, 3) == 3, f"Found {SimulatorInterface.default_initial( 1, 1)}" - assert SimulatorInterface.default_initial(1, 4) == 3, f"Found {SimulatorInterface.default_initial( 1, 1)}" - assert SimulatorInterface.default_initial(2, 0) == 0, f"Found {SimulatorInterface.default_initial( 2, 0)}" - assert SimulatorInterface.default_initial(5, 4) == 3, f"Found {SimulatorInterface.default_initial( 5, 4)}" - assert SimulatorInterface.default_initial(3, 2) == 2, f"Found {SimulatorInterface.default_initial( 3, 2)}" - assert SimulatorInterface.default_initial(4, 2) == 2, f"Found {SimulatorInterface.default_initial( 4, 2)}" - - -def test_simulator_from_system_structure(): - """SimulatorInterface from OspSystemStructure.xml""" - path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") - system = SimulatorInterface(str(path), name="BouncingBall") - assert system.name == "BouncingBall", f"System.name should be BouncingBall. Found {system.name}" - assert "bb" in system.components, f"Instance name 'bb' expected. Found instances {system.components}" - # assert system.get_models()[0] == 0, f"Component model {system.get_models()[0]}" - assert "bb" in system.get_components() - - -def test_simulator_reset(): - """SimulatorInterface from OspSystemStructure.xml""" - path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") - system = SimulatorInterface(str(path), name="BouncingBall") - system.simulator.simulate_until(1e9) - # print("STATUS", system.simulator.status()) - assert system.simulator.status().current_time == 1e9 - system.reset() - assert system.simulator.status().current_time == 0 - - -def test_simulator_instantiated(): - """Start with an instantiated simulator.""" - path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") - sim = CosimExecution.from_osp_config_file(str(path)) - # print("STATUS", sim.status()) - simulator = SimulatorInterface( - system=str(path), - name="BouncingBall System", - description="Testing info retrieval from simulator (without OspSystemStructure)", - simulator=sim, - ) - # simulator.check_instances_variables() - assert len(simulator.components) == 3, "Three instantiated (identical) components" - variables = simulator.get_variables("bb") - assert variables["g"] == { - "reference": 5, - "type": 0, - "causality": 1, - "variability": 1, - } - assert simulator.allowed_action("set", "bb", "g", 0) - assert not simulator.allowed_action("set", "bb", "g", 100) - assert simulator.message.startswith("Variable g, causality PARAMETER,") - assert simulator.allowed_action("set", "bb", "e", 100), simulator.message - assert simulator.allowed_action("set", "bb", "h", 0), simulator.message - assert not simulator.allowed_action("set", "bb", "h", 100), simulator.message - assert simulator.allowed_action("set", "bb", "der(h)", 0), simulator.message - assert not simulator.allowed_action("set", "bb", "der(h)", 100), simulator.message - assert simulator.allowed_action("set", "bb", "v", 0), simulator.message - assert not simulator.allowed_action("set", "bb", "v", 100), simulator.message - assert simulator.allowed_action("set", "bb", "der(v)", 0), simulator.message - assert not simulator.allowed_action("set", "bb", "der(v)", 100), simulator.message - assert not simulator.allowed_action("set", "bb", "v_min", 0), simulator.message - assert simulator.allowed_action("set", "bb", (1, 3), 0), simulator.message # combination of h,v - assert not simulator.allowed_action("set", "bb", (1, 3), 100), simulator.message # combination of h,v - - -if __name__ == "__main__": - retcode = pytest.main(["-rA", "-v", __file__]) - assert retcode == 0, f"Return code {retcode}" - # test_component_variable_name() diff --git a/tests/test_system_interface.py b/tests/test_system_interface.py new file mode 100644 index 0000000..6e5366d --- /dev/null +++ b/tests/test_system_interface.py @@ -0,0 +1,74 @@ +from enum import Enum +from pathlib import Path + +import pytest + +from sim_explorer.system_interface import SystemInterface + + +def test_read_system_structure(): + for file in ("crane_table.js5", "crane_table.xml"): + s = SystemInterface.read_system_structure(Path(__file__).parent / "data" / "MobileCrane" / file) + # print(file, s) + assert s["header"]["version"] == "0.1", f"Found {s['header']['version']}" + assert s["header"]["xmlns"] == "http://opensimulationplatform.com/MSMI/OSPSystemStructure" + assert s["header"]["StartTime"] == 0.0 + assert s["header"]["Algorithm"] == "fixedStep" + assert s["header"]["BaseStepSize"] == 0.01 + assert len(s["Simulators"]) == 2 + assert ( + s["Simulators"]["simpleTable"]["source"] + == Path(__file__).parent / "data" / "SimpleTable" / "SimpleTable.fmu" + ) + assert s["Simulators"]["mobileCrane"]["pedestal.pedestalMass"] == 5000.0 + + +def test_pytype(): + assert SystemInterface.pytype("Real") is float + assert SystemInterface.pytype("Enumeration") is Enum + assert SystemInterface.pytype("Real", 1) == 1.0 + assert SystemInterface.pytype("Integer", 1) == 1 + assert SystemInterface.pytype("String", 1.0) == "1.0" + + +def test_interface(): + sys = SystemInterface(Path(__file__).parent / "data" / "MobileCrane" / "crane_table.js5") + st = Path(__file__).parent / "data" / "SimpleTable" / "SimpleTable.fmu" + sys.system_structure["Simulators"].update({"simpleTable2": {"source": st, "stepSize": "0.01"}}) + assert [k for k, _ in sys.components] == ["simpleTable", "mobileCrane", "simpleTable2"] + assert len(sys.models) == 2 + assert tuple(sys.models.keys()) == ("SimpleTable", "MobileCrane"), f"Found:{sys.models}" + m = sys.match_components("simple*") + assert m[0] == "SimpleTable", f"Found {m[0]}" + assert m[1] == ("simpleTable", "simpleTable2") + for k, _ in sys.components: + assert sys.component_name_from_id(sys.component_id_from_name(k)) == k + + vars = sys.variables("simpleTable") + assert vars["interpolate"]["causality"] == "parameter" + assert vars["interpolate"]["type"] is bool, f"Found {vars['interpolate']['type']}" + assert sys.match_variables("simpleTable", "outs") == (0, 1, 2) + assert sys.match_variables("simpleTable", "interpolate") == (3,) + assert sys.variable_name_from_ref("simpleTable", 2) == "outs[2]" + assert sys.variable_name_from_ref("simpleTable", 100) == "", "Not existent" + default = SystemInterface.default_initial("output", "fixed") + assert default == -5, f"Found:{default}" + assert SystemInterface.default_initial("parameter", "fixed") == "exact" + assert sys.allowed_action("Set", "simpleTable", "interpolate", 0) + assert sys.allowed_action("Get", "simpleTable", "outs", 0) + # assert sys.message, "Variable outs of component simpleTable was not found" + + with pytest.raises(NotImplementedError) as err: + sys.set_variable_value(0, float, (0, 1), (1.0, 2.0)) + assert str(err.value) == "The method 'set_variable_value()' cannot be used in SystemInterface" + with pytest.raises(NotImplementedError) as err: + _ = sys.get_variable_value(0, float, (0, 1)) + assert str(err.value) == "The method 'get_variable_value()' cannot be used in SystemInterface" + + +if __name__ == "__main__": + retcode = pytest.main(["-rA", "-v", __file__]) + assert retcode == 0, f"Non-zero return code {retcode}" + # test_read_system_structure() + # test_pytype() + # test_interface() diff --git a/tests/test_system_interface_osp.py b/tests/test_system_interface_osp.py new file mode 100644 index 0000000..a873812 --- /dev/null +++ b/tests/test_system_interface_osp.py @@ -0,0 +1,151 @@ +from pathlib import Path + +import pytest +from libcosimpy.CosimExecution import CosimExecution + +from sim_explorer.system_interface_osp import SystemInterfaceOSP +from sim_explorer.utils.misc import match_with_wildcard + + +def test_match_with_wildcard(): + assert match_with_wildcard("Hello World", "Hello World"), "Match expected" + assert not match_with_wildcard("Hello World", "Helo World"), "No match expected" + assert match_with_wildcard("*o World", "Hello World"), "Match expected" + assert not match_with_wildcard("*o W*ld", "Hello Word"), "No match expected" + assert match_with_wildcard("*o W*ld", "Hello World"), "Two wildcard matches expected" + + +def test_pytype(): + assert SystemInterfaceOSP.pytype("REAL", "2.3") == 2.3, "Expected 2.3 as float type" + assert SystemInterfaceOSP.pytype("Integer", "99") == 99, "Expected 99 as int type" + assert SystemInterfaceOSP.pytype("Boolean", "fmi2True"), "Expected True as bool type" + assert not SystemInterfaceOSP.pytype("Boolean", "fmi2false"), "Expected True as bool type" + assert SystemInterfaceOSP.pytype("String", "fmi2False") == "fmi2False", "Expected fmi2False as str type" + with pytest.raises(ValueError) as err: + SystemInterfaceOSP.pytype("Real", "fmi2False") + assert str(err.value).startswith("could not convert string to float:"), "No error raised as expected" + assert SystemInterfaceOSP.pytype("Real", 0) == 0.0 + assert SystemInterfaceOSP.pytype("Integer", 1) == 1 + assert SystemInterfaceOSP.pytype("String", 2) == "2" + assert SystemInterfaceOSP.pytype("Boolean", 3) + + +def test_component_variable_name(): + path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") + system = SystemInterfaceOSP(path, name="BouncingBall") + """ + Slave order is not guaranteed in different OS + assert 1 == system.simulator.slave_index_from_instance_name("bb") + assert 0 == system.simulator.slave_index_from_instance_name("bb2") + assert 2 == system.simulator.slave_index_from_instance_name("bb3") + assert system.components["bb"] == 0, f"Error in unique model index. Found {system.components['bb']}" + """ + assert system.variable_name_from_ref("bb", 0) == "time" + assert system.variable_name_from_ref("bb", 1) == "h" + assert system.variable_name_from_ref("bb", 2) == "der(h)" + assert system.variable_name_from_ref("bb", 3) == "v" + assert system.variable_name_from_ref("bb", 4) == "der(v)" + assert system.variable_name_from_ref("bb", 5) == "g" + assert system.variable_name_from_ref("bb", 6) == "e" + assert system.variable_name_from_ref("bb", 7) == "v_min" + assert system.variable_name_from_ref("bb", 8) == "" + + +def test_default_initial(): + def di(var: str, caus: str, expected: str | int | tuple, only_default: bool = True): + res = SystemInterfaceOSP.default_initial(caus, var, only_default) + assert res == expected, f"default_initial({var}, {caus}): Found {res} but expected {expected}" + + di("constant", "parameter", -1) + di("constant", "calculated_parameter", -1) + di("constant", "input", -1) + di("constant", "output", "exact") + di("constant", "local", "exact") + di("constant", "independent", -3) + di("fixed", "parameter", "exact") + di("fixed", "calculated_parameter", "calculated") + di("fixed", "local", "calculated") + di("fixed", "input", -4) + di("tunable", "parameter", "exact") + di("tunable", "calculated_parameter", "calculated") + di("tunable", "output", -5) + di("tunable", "local", "calculated") + di("tunable", "input", -4) + di("discrete", "calculated_parameter", -2) + di("discrete", "input", 5) + di("discrete", "output", "calculated") + di("discrete", "local", "calculated") + di("continuous", "calculated_parameter", -2) + di("continuous", "independent", 15) + di("discrete", "output", ("calculated", "exact", "approx"), False) + + +def test_simulator_from_system_structure(): + """SystemInterfaceOSP from OspSystemStructure.xml""" + path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") + system = SystemInterfaceOSP(str(path), name="BouncingBall") + assert system.name == "BouncingBall", f"System.name should be BouncingBall. Found {system.name}" + comps = {k: v for (k, v) in system.components} + assert "bb" in comps, f"Instance name 'bb' expected. Found instances {list(comps.keys())}" + assert len(comps) == 3 + assert len(system.models) == 1 + assert "BouncingBall" in system.models + # system.check_instances_variables() + variables = system.variables("bb") + print(f"g: {variables['g']}") + assert variables["g"]["reference"] == 5 + assert variables["g"]["type"] is float + assert variables["g"]["causality"] == "parameter" + assert variables["g"]["variability"] == "fixed" + + assert system.allowed_action("set", "bb", "g", 0) + assert not system.allowed_action("set", "bb", "g", 100) + assert system.message == "Change of g at communication point" + assert system.allowed_action("set", "bb", "e", 100), system.message + assert system.allowed_action("set", "bb", "h", 0), system.message + assert not system.allowed_action("set", "bb", "h", 100), system.message + assert not system.allowed_action("set", "bb", "der(h)", 0), system.message + assert not system.allowed_action("set", "bb", "der(h)", 100), system.message + assert system.allowed_action("set", "bb", "v", 0), system.message + assert not system.allowed_action("set", "bb", "v", 100), system.message + assert not system.allowed_action("set", "bb", "der(v)", 0), system.message + assert not system.allowed_action("set", "bb", "der(v)", 100), system.message + assert system.allowed_action("set", "bb", "v_min", 0), system.message + assert system.allowed_action("set", "bb", (1, 3), 0), system.message # combination of h,v + assert not system.allowed_action("set", "bb", (1, 3), 100), system.message # combination of h,v + + +def test_simulator_reset(): + """SystemInterfaceOSP from OspSystemStructure.xml""" + path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") + system = SystemInterfaceOSP(str(path), name="BouncingBall") + system.simulator.simulate_until(1e9) + # print("STATUS", system.simulator.status()) + assert system.simulator.status().current_time == 1e9 + system.reset() + assert system.simulator.status().current_time == 0 + + +def test_simulator_instantiated(): + """Start with an instantiated simulator.""" + path = Path(Path(__file__).parent, "data/BouncingBall0/OspSystemStructure.xml") + sim = CosimExecution.from_osp_config_file(str(path)) + assert sim.status().current_time == 0 + simulator = SystemInterfaceOSP( + structure_file=str(path), + name="BouncingBall System", + description="Testing info retrieval from simulator (without OspSystemStructure)", + log_level="warning", + ) + assert isinstance(simulator, SystemInterfaceOSP) + + +if __name__ == "__main__": + retcode = pytest.main(["-rA", "-v", __file__]) + assert retcode == 0, f"Return code {retcode}" + # test_pytype() + # test_component_variable_name() + # test_default_initial() + # test_simulator_from_system_structure() + # test_simulator_reset() + # test_simulator_instantiated()