Skip to content

Commit

Permalink
Added an 'inspect' function to Results. Fixed issues from last merge.…
Browse files Browse the repository at this point in the history
… Updated tests.
  • Loading branch information
eisDNV committed Oct 31, 2024
1 parent 7c4db00 commit 0764816
Show file tree
Hide file tree
Showing 17 changed files with 2,958 additions and 732 deletions.
7 changes: 0 additions & 7 deletions case_study/assertion.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import Any

from sympy import Symbol, sympify

if self.t_bounce < self.time: # calculate first bounce
self.t_bounce, self.p_bounce = self.next_bounce()

class Assertion:
"""Define Assertion objects for checking expectations with respect to simulation results.
Expand All @@ -21,11 +18,7 @@ class Assertion:
Any unknown symbol within the expression is defined as sympy.Symbol and is expected to match a variable.
"""

<<<<<<< HEAD
ns: dict = {}
=======
ns: dict[str, Any] = {}
>>>>>>> 94746a5a6e13c97614b3070264fdb39028eefb95

def __init__(self, expr: str):
self._expr = Assertion.do_sympify(expr)
Expand Down
211 changes: 137 additions & 74 deletions case_study/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,65 +257,6 @@ def _disect_at_time(self, txt: str, value: Any | None = None) -> tuple[str, str,
else:
return (pre, "set" if Case._num_elements(value) else "get", arg_float)

def _disect_range(self, key: str) -> tuple[str, dict, list | range]:
"""Extract the variable name, definition and explicit variable range, if relevant
(multi-valued variables, where only some all elements are addressed).
Note: since values are not specified for get actions, the validity of values cannot be checked here.
ToDo: handle multi-dimensional arrays (tables, ...).
Args:
key (str): The key as provided in case spec(, with [range] if provided).
Returns
-------
1. The variable name as defined in the 'variables' section of the spec
2. The variable definition, which the name refers to
3. An iterator over indices of the variable, i.e. the range
"""
pre, _, r = key.partition("[")
try:
cvar_info = self.cases.variables[pre]
except KeyError as err:
raise CaseInitError(f"Variable {pre} was not found in list of defined case variables") from err
cvar_len = len(cvar_info["variables"]) # len of the tuple of refs
if len(r): # range among several variables
r = r.rstrip("]").strip() # string version of a non-trivial range
parts_comma = r.split(",")
rng: range | list[int] = []
for i, p in enumerate(parts_comma):
parts_ellipses = p.split("..")
if len(parts_ellipses) == 1: # no ellipses. Should be an index
try:
idx = int(p)
assert 0 <= idx < cvar_len, f"Index {idx} of variable {pre} out of range"
except ValueError as err:
raise CaseInitError(f"Unhandled index {p}[{i}] for variable {pre}") from err
assert isinstance(rng, list), "A list was expected as range here. Found {rng}"
rng.append(idx)
else:
_assert(len(parts_ellipses) == 2, f"RangeError: Exactly two indices expected in {p} of {pre}")
parts_ellipses[1] = parts_ellipses[1].lstrip(".") # facilitates the option to use '...' or '..'
try:
if len(parts_ellipses[0]) == 0:
idx0 = 0
else:
idx0 = int(parts_ellipses[0])
assert 0 <= idx0 <= cvar_len, f"Index {idx0} of variable {pre} out of range"
if len(parts_ellipses[1]) == 0:
idx1 = cvar_len
else:
idx1 = int(parts_ellipses[1])
assert idx0 <= idx1 <= cvar_len, f"Index {idx1} of variable {pre} out of range"
except ValueError as err:
raise CaseInitError(f"Unhandled ellipses '{parts_comma}' for variable {pre}") from err
rng = range(idx0, idx1)
else: # no expicit range
if cvar_len == 1: # scalar variable
rng = [0]
else: # all elements
rng = range(cvar_len)
return (pre, cvar_info, rng)

def read_spec_item(self, key: str, value: Any | None = None):
"""Use the alias variable information (key) and the value to construct an action function,
which is run when this variable is set/read.
Expand Down Expand Up @@ -352,7 +293,7 @@ def read_spec_item(self, key: str, value: Any | None = None):
self.special.update({key: value}) # just keep these as a dictionary so far
else: # expect a variable-alias : value(s) specificator
key, at_time_type, at_time_arg = self._disect_at_time(key, value)
key, cvar_info, rng = self._disect_range(key)
key, cvar_info, rng = self.cases.disect_variable(key)
key = key.strip()
if value is not None: # check also the number of supplied values
if isinstance(value, (str, float, int, bool)): # make sure that there are always lists
Expand Down Expand Up @@ -587,9 +528,6 @@ def __init__(self, spec: str | Path, simulator: SimulatorInterface | None = None
)
except Exception as err:
raise AssertionError(f"'modelFile' needed from spec: {err}") from err
# else:
# raise Exception("'modelFile' needed from spec.") from None
# assert path.exists(), f"'modelFile' {self.spec['modelFile']} not found."
else:
self.simulator = simulator # SimulatorInterface( simulator = simulator)

Expand All @@ -602,9 +540,14 @@ def __init__(self, spec: str | Path, simulator: SimulatorInterface | None = None
self.read_cases()

def get_case_variables(self) -> dict[str, dict]:
"""Read the 'variables' main key, which defines self.variables (case variables) as a dictionary:
{ c_var_name : {'model':model ID, 'instances': tuple of instance names, 'variables': tuple of ValueReference,
'type':CosimVariableType, 'causality':CosimVariableCausality, 'variability': CosimVariableVariability}.
"""Read the 'variables' main key, which defines self.variables (case variables) as a dictionary.
{ c_var_name : {'model':model ID,
'instances': tuple of instance names,
'variables': tuple of ValueReference,
'type':CosimVariableType,
'causality':CosimVariableCausality,
'variability': CosimVariableVariability}.
Optionally a description of the alias variable may be provided (and added to the dictionary).
"""
_assert("variables" in self.spec, f"Mandatory key 'variables' not found in cases specification {self.file}")
Expand Down Expand Up @@ -783,6 +726,76 @@ def case_var_by_ref(self, comp: int | str, ref: int | tuple[int, ...]) -> tuple[
return (var, tuple([info["variables"].index(r) for r in refs]))
return ("", ())

def disect_variable(self, key: str, err_level: int = 2) -> tuple[str, dict, list | range]:
"""Extract the variable name, definition and explicit variable range, if relevant
(multi-valued variables, where only some elements are addressed).
ToDo: handle multi-dimensional arrays (tables, ...).
Args:
key (str): The key as provided in case spec(, with [range] if provided).
Returns
-------
1. The variable name as defined in the 'variables' section of the spec
2. The variable definition, which the name refers to
3. An iterator over indices of the variable, i.e. the range
"""

def handle_error(msg: str, err: Exception | None, level: int):
if level > 0:
if level == 1:
print(msg)
else:
raise AssertionError(msg) from err
return ("", None, range(0, 0))

pre, _, r = key.partition("[")
try:
cvar_info = self.variables[pre]
except KeyError as err:
handle_error(f"Variable {pre} was not found in list of defined case variables", err, err_level)

cvar_len = len(cvar_info["variables"]) # len of the tuple of refs
if len(r): # range among several variables
r = r.rstrip("]").strip() # string version of a non-trivial range
parts_comma = r.split(",")
rng: range | list[int] = []
for i, p in enumerate(parts_comma):
parts_ellipses = p.split("..")
if len(parts_ellipses) == 1: # no ellipses. Should be an index
try:
idx = int(p)
except ValueError as err:
return handle_error(f"Unhandled index {p}[{i}] for variable {pre}", err, err_level)
if not 0 <= idx < cvar_len:
return handle_error(f"Index {idx} of variable {pre} out of range", None, err_level)
if not isinstance(rng, list):
return handle_error(f"A list was expected as range here. Found {rng}", None, err_level)
rng.append(idx)
else:
_assert(len(parts_ellipses) == 2, f"RangeError: Exactly two indices expected in {p} of {pre}")
parts_ellipses[1] = parts_ellipses[1].lstrip(".") # facilitates the option to use '...' or '..'
try:
if len(parts_ellipses[0]) == 0:
idx0 = 0
else:
idx0 = int(parts_ellipses[0])
assert 0 <= idx0 <= cvar_len, f"Index {idx0} of variable {pre} out of range"
if len(parts_ellipses[1]) == 0:
idx1 = cvar_len
else:
idx1 = int(parts_ellipses[1])
assert idx0 <= idx1 <= cvar_len, f"Index {idx1} of variable {pre} out of range"
except ValueError as err:
return handle_error("Unhandled ellipses '{parts_comma}' for variable {pre}", err, err_level)
rng = range(idx0, idx1)
else: # no expicit range
if cvar_len == 1: # scalar variable
rng = [0]
else: # all elements
rng = range(cvar_len)
return (pre, cvar_info, rng)

def info(self, case: Case | None = None, level: int = 0) -> str:
"""Show main infromation and the cases structure as string."""
txt = ""
Expand Down Expand Up @@ -954,19 +967,69 @@ def save(self, jsfile: str | Path = ""):
self._header_transform(tostring=True)
self.res.write(jsfile)

def plot_time_series(self, aliases: list[str], title=""):
"""Extract the provided alias variables and plot the data found in the same plot."""
def inspect(self, component: str | None = None, variable: str | None = None):
"""Inspect the results and return a dictionary on which data are found.
Args:
component (str): Possibility to inspect only data with respect to a given component
variable (str): Possibility to inspect only data with respect to a given variable
Retruns:
A dictionary {<component.variable> : {'len':#data points, 'range':[tMin, tMax], 'info':info-dict}
The info-dict is and element of Cases.variables. See Cases.get_case_variables() for definition.
"""
cont : dict = {}
assert isinstance(self.case, Case)
assert isinstance(self.case.cases, Cases)
for _time, components in self.res.js_py.items():
if _time != "header":
time = float(_time)
for c, variables in components.items():
if component is None or c == component:
for v, _ in variables.items():
if variable is None or variable == v:
ident = c + "." + v
if ident in cont: # already registered
cont[ident]["range"][1] = time # update upper bound
cont[ident]["len"] += 1 # update length
else: # new entry
v_name, v_info, v_range = self.case.cases.disect_variable(v, err_level=0)
assert len(v_name), f"Variable {v} not found in cases spec {self.case.cases.file}"
cont.update(
{
ident: {
"len": 1,
"range": [time, time],
"info": v_info,
}
}
)
return cont

def plot_time_series(self, variables: list[str], title: str = ""):
"""Extract the provided alias variables and plot the data found in the same plot.
Args:
variables (list[str]): list of variable identificators as str.
A variable identificator is the jspath expression after the time, i.e. <component>.<variable>[<element>]
For example 'bb.v[2]' identifies the z-velocity of the component 'bb'
title (str): optional title of the plot
"""
if not len(self.res.js_py) or self.case is None:
return
timefac = self.case.cases.timefac
for var in aliases:
#timefac = self.case.cases.timefac
for var in variables:
times: list = []
values: list = []
for key in self.res.js_py:
if isinstance(key, int): # time value
if var in self.res.js_py[key]:
times.append(key / timefac)
values.append(self.res.js_py[key][var][2][0])
found = self.res.jspath("$['" + str(key) + "']." + var)
if found is not None:
if isinstance(found, list):
raise NotImplementedError("So far not implemented for multi-dimensional plots") from None
else:
times.append(key)
values.append(found)

plt.plot(times, values, label=var, linewidth=3)

if len(title):
Expand Down
12 changes: 0 additions & 12 deletions examples/run_cases.py

This file was deleted.

Loading

0 comments on commit 0764816

Please sign in to comment.