Skip to content

Commit

Permalink
Moved functions match_with_wildcards and from_fmu to utils/misc.py
Browse files Browse the repository at this point in the history
  • Loading branch information
eisDNV committed Nov 8, 2024
1 parent 34d5551 commit 865cb1d
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 107 deletions.
1 change: 1 addition & 0 deletions src/sim_explorer/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sim_explorer.json5 import Json5
from sim_explorer.simulator_interface import SimulatorInterface, from_xml
from sim_explorer.utils.paths import get_path, relative_path
from sim_explorer.utils.misc import from_xml

"""
sim_explorer module for definition and execution of simulation experiments
Expand Down
204 changes: 124 additions & 80 deletions src/sim_explorer/simulator_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import TypeAlias, cast
from zipfile import BadZipFile, ZipFile, is_zipfile
from sim_explorer.utils.misc import match_with_wildcard, from_xml

from libcosimpy.CosimEnums import CosimVariableCausality, CosimVariableType, CosimVariableVariability # type: ignore
from libcosimpy.CosimExecution import CosimExecution # type: ignore
Expand Down Expand Up @@ -83,22 +84,30 @@ def __init__(
self.sysconfig: Path | None = None
log_output_level(log_level)
self.simulator: CosimExecution
if simulator is None: # instantiate the simulator through the system config file
if (
simulator is None
): # instantiate the simulator through the system config file
self.sysconfig = Path(system)
assert self.sysconfig.exists(), f"File {self.sysconfig.name} not found"
ck, msg = self._check_system_structure(self.sysconfig)
assert ck, msg
self.simulator = cast(CosimExecution, self._simulator_from_config(self.sysconfig))
self.simulator = cast(
CosimExecution, self._simulator_from_config(self.sysconfig)
)
else:
self.simulator = simulator
self.components = self.get_components() # dict of {component name : modelId}
# Instantiate a suitable manipulator for changing variables.
self.manipulator = CosimManipulator.create_override()
assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object"
assert self.simulator.add_manipulator(
manipulator=self.manipulator
), "Could not add manipulator object"

# Instantiate a suitable observer for collecting results.
self.observer = CosimObserver.create_last_value()
assert self.simulator.add_observer(observer=self.observer), "Could not add observer object"
assert self.simulator.add_observer(
observer=self.observer
), "Could not add observer object"
self.message = "" # possibility to save additional message for (optional) retrieval by client

@property
Expand All @@ -118,14 +127,22 @@ def _check_system_structure(self, file: Path):

def reset(self): # , cases:Cases):
"""Reset the simulator interface, so that a new simulation can be run."""
assert isinstance(self.sysconfig, Path), "Simulator resetting does not work with explicitly supplied simulator."
assert self.sysconfig.exists(), "Simulator resetting does not work with explicitly supplied simulator."
assert isinstance(
self.sysconfig, Path
), "Simulator resetting does not work with explicitly supplied simulator."
assert (
self.sysconfig.exists()
), "Simulator resetting does not work with explicitly supplied simulator."
assert isinstance(self.manipulator, CosimManipulator)
assert isinstance(self.observer, CosimObserver)
# self.simulator = self._simulator_from_config(self.sysconfig)
self.simulator = CosimExecution.from_osp_config_file(str(self.sysconfig))
assert self.simulator.add_manipulator(manipulator=self.manipulator), "Could not add manipulator object"
assert self.simulator.add_observer(observer=self.observer), "Could not add observer object"
assert self.simulator.add_manipulator(
manipulator=self.manipulator
), "Could not add manipulator object"
assert self.simulator.add_observer(
observer=self.observer
), "Could not add observer object"
# for case in cases:

def _simulator_from_config(self, file: Path):
Expand Down Expand Up @@ -162,7 +179,9 @@ def get_components(self, model: int = -1) -> dict:
if self.simulator is None:
pass # nothing to do we return an empty dict

elif model >= 0: # use self.components to extract only components related to the provided model
elif (
model >= 0
): # use self.components to extract only components related to the provided model
for comp, mod in self.components.items():
if mod == model:
comps.update({comp: self.components[comp]})
Expand Down Expand Up @@ -229,7 +248,9 @@ def accept_as_alias(org: str) -> bool:
return False

var = []
assert len(self.components), "Need the dictionary of components before maching variables"
assert len(
self.components
), "Need the dictionary of components before maching variables"

accepted = None
variables = self.get_variables(component)
Expand Down Expand Up @@ -263,7 +284,9 @@ def is_output_var(self, comp: int, ref: int) -> bool:
return struct.causality == 2
return False

def get_variables(self, comp: str | int, single: int | str | None = None, as_numbers: bool = True) -> dict:
def get_variables(
self, comp: str | int, single: int | str | None = None, as_numbers: bool = True
) -> dict:
"""Get the registered variables for a given component from the simulator.
Args:
Expand Down Expand Up @@ -295,8 +318,16 @@ def get_variables(self, comp: str | int, single: int | str | None = None, as_num
or struct.name.decode() == single
):
typ = struct.type if as_numbers else CosimVariableType(struct.type).name
causality = struct.causality if as_numbers else CosimVariableCausality(struct.causality).name
variability = struct.variability if as_numbers else CosimVariableVariability(struct.variability).name
causality = (
struct.causality
if as_numbers
else CosimVariableCausality(struct.causality).name
)
variability = (
struct.variability
if as_numbers
else CosimVariableVariability(struct.variability).name
)
variables.update(
{
struct.name.decode(): {
Expand Down Expand Up @@ -384,15 +415,25 @@ def set_initial(self, instance: int, typ: int, var_ref: int, var_val: PyVal):
only that variables are set individually and the type is added as argument.
"""
if typ == CosimVariableType.REAL.value:
return self.simulator.real_initial_value(instance, var_ref, self.pytype(typ, var_val))
return self.simulator.real_initial_value(
instance, var_ref, self.pytype(typ, var_val)
)
elif typ == CosimVariableType.INTEGER.value:
return self.simulator.integer_initial_value(instance, var_ref, self.pytype(typ, var_val))
return self.simulator.integer_initial_value(
instance, var_ref, self.pytype(typ, var_val)
)
elif typ == CosimVariableType.STRING.value:
return self.simulator.string_initial_value(instance, var_ref, self.pytype(typ, var_val))
return self.simulator.string_initial_value(
instance, var_ref, self.pytype(typ, var_val)
)
elif typ == CosimVariableType.BOOLEAN.value:
return self.simulator.boolean_initial_value(instance, var_ref, self.pytype(typ, var_val))
return self.simulator.boolean_initial_value(
instance, var_ref, self.pytype(typ, var_val)
)

def set_variable_value(self, instance: int, typ: int, var_refs: tuple[int], var_vals: tuple[PyVal]) -> bool:
def set_variable_value(
self, instance: int, typ: int, var_refs: tuple[int], var_vals: tuple[PyVal]
) -> bool:
"""Provide a manipulator function which sets the 'variable' (of the given 'instance' model) to 'value'.
Args:
Expand All @@ -404,9 +445,13 @@ def set_variable_value(self, instance: int, typ: int, var_refs: tuple[int], var_
if typ == CosimVariableType.REAL.value:
return self.manipulator.slave_real_values(instance, list(var_refs), _vals)
elif typ == CosimVariableType.INTEGER.value:
return self.manipulator.slave_integer_values(instance, list(var_refs), _vals)
return self.manipulator.slave_integer_values(
instance, list(var_refs), _vals
)
elif typ == CosimVariableType.BOOLEAN.value:
return self.manipulator.slave_boolean_values(instance, list(var_refs), _vals)
return self.manipulator.slave_boolean_values(
instance, list(var_refs), _vals
)
elif typ == CosimVariableType.STRING.value:
return self.manipulator.slave_string_values(instance, list(var_refs), _vals)
else:
Expand Down Expand Up @@ -435,8 +480,16 @@ def pytype(fmu_type: str | int, val: PyVal | None = None):
"""Return the python type of the FMU type provided as string or int (CosimEnums).
If val is None, the python type object is returned. Else if boolean, true or false is returned.
"""
fmu_type_str = CosimVariableType(fmu_type).name if isinstance(fmu_type, int) else fmu_type
typ = {"real": float, "integer": int, "boolean": bool, "string": str, "enumeration": Enum}[fmu_type_str.lower()]
fmu_type_str = (
CosimVariableType(fmu_type).name if isinstance(fmu_type, int) else fmu_type
)
typ = {
"real": float,
"integer": int,
"boolean": bool,
"string": str,
"enumeration": Enum,
}[fmu_type_str.lower()]

if val is None:
return typ
Expand All @@ -446,27 +499,37 @@ def pytype(fmu_type: str | int, val: PyVal | None = None):
elif isinstance(val, int):
return bool(val)
else:
raise CaseInitError(f"The value {val} could not be converted to boolean")
raise CaseInitError(
f"The value {val} could not be converted to boolean"
)
else:
return typ(val)

@staticmethod
def default_initial(causality: int, variability: int, max_possible: bool = False) -> int:
def default_initial(
causality: int, variability: int, max_possible: bool = False
) -> int:
"""Return default initial setting as int, as initial setting is not explicitly available in OSP. See p.50 FMI2.
maxPossible = True chooses the the initial setting with maximum allowance.
* Causality: input=0, parameter=1, output=2, calc.par.=3, local=4, independent=5 (within OSP)
* Initial: exact=0, approx=1, calculated=2, none=3.
"""
code = ((3, 3, 0, 3, 0, 3), (3, 0, 3, 1, 1, 3), (3, 0, 3, 1, 1, 3), (3, 3, 2, 3, 2, 3), (3, 3, 2, 3, 2, 3))[
variability
][causality]
code = (
(3, 3, 0, 3, 0, 3),
(3, 0, 3, 1, 1, 3),
(3, 0, 3, 1, 1, 3),
(3, 3, 2, 3, 2, 3),
(3, 3, 2, 3, 2, 3),
)[variability][causality]
if max_possible:
return (0, 1, 0, 3)[code] # first 'possible value' in table
else:
return (0, 2, 2, 3)[code] # default value in table

def allowed_action(self, action: str, comp: int | str, var: int | str | tuple, time: float):
def allowed_action(
self, action: str, comp: int | str, var: int | str | tuple, time: float
):
"""Check whether the action would be allowed according to FMI2 rules, see FMI2.01, p.49.
* Unfortunately, the OSP interface does not explicitly provide the 'initial' setting,
Expand Down Expand Up @@ -500,48 +563,70 @@ def _check(cond, msg):
var = (var,)
for v in var:
variables = self.get_variables(comp, v)
if _check(len(variables) != 1, f"Variable {v} of component {comp} was not found"):
if _check(
len(variables) != 1, f"Variable {v} of component {comp} was not found"
):
return False
name, var_info = next(variables.items().__iter__())
if _type < 0 or _causality < 0 or _variability < 0: # define the properties and check whether allowed
if (
_type < 0 or _causality < 0 or _variability < 0
): # define the properties and check whether allowed
_type = var_info["type"]
_causality = var_info["causality"]
_variability = var_info["variability"]
initial = SimulatorInterface.default_initial(_causality, _variability, True)
initial = SimulatorInterface.default_initial(
_causality, _variability, True
)

if action == "get": # no restrictions on get
pass
elif action == "set":
if _check(_variability == 0, f"Variable {name} is defined as 'constant' and cannot be set"):
if _check(
_variability == 0,
f"Variable {name} is defined as 'constant' and cannot be set",
):
return False
if _check(_variability == 0, f"Variable {name} is defined as 'constant' and cannot be set"):
if _check(
_variability == 0,
f"Variable {name} is defined as 'constant' and cannot be set",
):
return False

if time == 0: # initialization
# initial settings 'exact', 'approx' or 'input'
if _check(
not (initial in (0, 1) or _causality == 0),
_description(name, var_info, initial) + " cannot be set before or during initialization.",
_description(name, var_info, initial)
+ " cannot be set before or during initialization.",
):
return False
else: # at communication points
# 'parameter', 'tunable' or 'input
if _check(
not ((_causality == 1 and _variability == 2) or _causality == 0),
_description(name, var_info, initial) + " cannot be set at communication points.",
not (
(_causality == 1 and _variability == 2)
or _causality == 0
),
_description(name, var_info, initial)
+ " cannot be set at communication points.",
):
return False
else: # check whether the properties are equal
if _check(_type != var_info["type"], _description(name, var_info, initial) + f" != type {_type}"):
if _check(
_type != var_info["type"],
_description(name, var_info, initial) + f" != type {_type}",
):
return False
if _check(
_causality != var_info["causality"],
_description(name, var_info, initial) + f" != causality { _causality}",
_description(name, var_info, initial)
+ f" != causality { _causality}",
):
return False
if _check(
_variability != var_info["variability"],
_description(name, var_info, initial) + f" != variability {_variability}",
_description(name, var_info, initial)
+ f" != variability {_variability}",
):
return False
return True
Expand All @@ -564,44 +649,3 @@ def component_id_from_name(self, name: str) -> int:
id = self.simulator.slave_index_from_instance_name(name)
return id if id is not None else -1


def match_with_wildcard(findtxt: str, matchtxt: str) -> bool:
"""Check whether 'findtxt' matches 'matchtxt'.
Args:
findtxt (str): the text string which is checked. It can contain wildcard characters '*', matching zero or more of any character.
matchtxt (str): the text agains findtxt is checked
Returns: True/False
"""
if "*" not in findtxt: # no wildcard characters
return matchtxt == findtxt
else: # there are wildcards
m = re.search(findtxt.replace("*", ".*"), matchtxt)
return m is not None


def from_xml(file: Path, sub: str | None = None, xpath: str | None = None) -> ET.Element | list[ET.Element]:
"""Retrieve the Element root from a zipped file (retrieve sub), or an xml file (sub unused).
If xpath is provided only the xpath matching element (using findall) is returned.
"""
if is_zipfile(file) and sub is not None: # expect a zipped archive containing xml file 'sub'
with ZipFile(file) as zp:
try:
xml = zp.read(sub).decode("utf-8")
except BadZipFile as err:
raise CaseInitError(f"File '{sub}' not found in {file}: {err}") from err
elif not is_zipfile(file) and file.exists() and sub is None: # expect an xml file
with open(file, encoding="utf-8") as f:
xml = f.read()
else:
raise CaseInitError(f"It was not possible to read an XML from file {file}, sub {sub}")

try:
et = ET.fromstring(xml)
except ET.ParseError as err:
raise CaseInitError(f"File '{file}' does not seem to be a proper xml file") from err

if xpath is None:
return et
else:
return et.findall(xpath)
Loading

0 comments on commit 865cb1d

Please sign in to comment.