Skip to content

Commit

Permalink
Inclusion of assertions in cases, such that results can be automatica…
Browse files Browse the repository at this point in the history
…lly checked
  • Loading branch information
eisDNV committed Dec 13, 2024
1 parent 8767be8 commit d8cfd40
Show file tree
Hide file tree
Showing 7 changed files with 878 additions and 111 deletions.
451 changes: 451 additions & 0 deletions src/sim_explorer/assertion.py

Large diffs are not rendered by default.

224 changes: 135 additions & 89 deletions src/sim_explorer/case.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
# pyright: reportMissingImports=false, reportGeneralTypeIssues=false
from __future__ import annotations

import os
from collections.abc import Callable
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Any
from typing import Any, Iterable

import matplotlib.pyplot as plt
import numpy as np
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore

from sim_explorer.assertion import Assertion
from sim_explorer.assertion import Assertion, Temporal
from sim_explorer.exceptions import CaseInitError
from sim_explorer.json5 import Json5
from sim_explorer.simulator_interface import SimulatorInterface
Expand Down Expand Up @@ -203,13 +202,56 @@ def _num_elements(obj) -> int:
else:
return 1

def _disect_at_time(self, txt: str, value: Any | None = None, tl: bool = False) -> tuple[str, str, float]:
def _disect_at_time_tl(self, txt: str, value: Any | None = None) -> tuple[str, Temporal, tuple]:
"""Disect the @txt argument into 'at_time_type' and 'at_time_arg' for Temporal specification.
Args:
txt (str): The key text after '@' and before ':'
value (Any): the value argument. Needed to distinguish the action type
Returns
-------
tuple of pre, type, arg, where
pre is the text before '@',
type is the Temporal type,
args is the tuple of temporal arguments (may be empty)
"""

def time_spec(at: str):
"""Analyse the specification after '@' and disect into typ and arg."""
try:
arg_float = float(at)
return (Temporal["T"], (arg_float,))
except ValueError:
for i in range(len(at) - 1, -1, -1):
try:
typ = Temporal[at[i]]
except KeyError:
pass
else:
if at[i + 1 :].strip() == "":
return (typ, ())
elif typ == Temporal.T:
return (typ, (float(at[i + 1 :].strip()),))
else:
return (typ, (at[i + 1 :].strip(),))
raise ValueError(f"Unknown Temporal specification {at}") from None

pre, _, at = txt.partition("@")
assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time"
assert isinstance(value, list), f"Assertion spec expected: [expression, description]. Found {value}"
if not len(at): # no @time spec. Assume 'A'lways
return (pre, Temporal.ALWAYS, ())
else:
typ, arg = time_spec(at)
return (pre, typ, arg)

def _disect_at_time_spec(self, txt: str, value: Any | None = None) -> tuple[str, str, float]:
"""Disect the @txt argument into 'at_time_type' and 'at_time_arg'.
Args:
txt (str): The key text after '@' and before ':'
value (Any): the value argument. Needed to distinguish the action type
tl (bool)=False: expect a Temporal Logic type of '@' specification (for assertion)
Returns
-------
Expand All @@ -219,71 +261,57 @@ def _disect_at_time(self, txt: str, value: Any | None = None, tl: bool = False)
arg is the time argument, or -1
"""

def time_spec(at: str, tl: bool):
def time_spec(at: str):
"""Analyse the specification after '@' and disect into typ and arg."""
try:
arg_float = float(at)
except Exception:
return ("set" if Case._num_elements(value) else "get", arg_float)
except ValueError:
arg_float = float("-inf")
if tl:
typ = at[0] if arg_float == float("-inf") else "T"
assert typ in ("T", "G", "F"), f"Unknown temporal type {typ}"
return (typ, arg_float)
else:
if arg_float == float("-inf"):
if at.startswith("step"):
try:
return ("step", float(at[4:]))
except Exception:
return ("step", -1) # this means 'all macro steps'
else:
raise AssertionError(f"Unknown '@{txt}'. Case:{self.name}, value:'{value}'") from None
if at.startswith("step"):
try:
return ("step", float(at[4:]))
except Exception:
return ("step", -1) # this means 'all macro steps'
else:
return ("set" if Case._num_elements(value) else "get", arg_float)
raise AssertionError(f"Unknown '@{txt}'. Case:{self.name}, value:'{value}'") from None

pre, _, at = txt.partition("@")
assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time"
if tl: # temporal logic specification
assert isinstance(value, str), f"String value expected. Found {value}"
if not len(at): # no @time spec. Assume 'G'lobal
return (pre, "G", float("-inf"))
if value in ("result", "res"): # mark variable specification as 'get' or 'step' action
value = None
if not len(at): # no @time spec
if value is None:
return (pre, "get", self.special["stopTime"]) # report final value
else:
typ, arg = time_spec(at, tl)
return (pre, typ, arg)
else:
if value in ("result", "res"): # mark variable specification as 'get' or 'step' action
value = None
if not len(at): # no @time spec
if value is None:
return (pre, "get", self.special["stopTime"]) # report final value
else:
msg = f"Value required for 'set' in _disect_at_time('{txt}','{self.name}','{value}')"
assert Case._num_elements(value), msg
return (pre, "set", 0) # set at startTime
else: # time spec provided
typ, arg = time_spec(at, tl)
return (pre, typ, arg)

def read_assertion(self, key: str, expr: Any | None = None):
msg = f"Value required for 'set' in _disect_at_time('{txt}','{self.name}','{value}')"
assert Case._num_elements(value), msg
return (pre, "set", 0) # set at startTime
else: # time spec provided
typ, arg = time_spec(at)
return (pre, typ, arg)

def read_assertion(self, key: str, expr_descr: list | None = None):
"""Read an assert statement, compile as sympy expression, register and store the key..
Args:
key (str): Identification key for the assertion. Should be unique. Recommended to use numbers
Also assertion keys can have temporal specifications (@...) with the following possibilities:
* @G : The expression is expected to be globally (always) true
* @F : The expression is expected to be true at some point in time
* @<val>: The expression is expected to be true at the specific time value
expr: A sympy expression using available variables
* @A : The expression is expected to be Always (globally) true
* @F : The expression is expected to be true during the end of the simulation
* @<val> or @T<val>: The expression is expected to be true at the specific time value
expr: A python expression using available variables
"""
key, at_time_type, at_time_arg = self._disect_at_time(key, expr, tl=True)
key, at_time_type, at_time_arg = self._disect_at_time_tl(key, expr_descr)
assert isinstance(expr_descr, list), f"Assertion expression {expr_descr} should include a description."
expr, descr = expr_descr
self.cases.assertion.expr(key, expr)
if at_time_type in ("G", "F"): # no time argument
self.cases.assertion.temporal(key, (at_time_type,))
elif at_time_type in ("T",):
self.cases.assertion.temporal(key, (at_time_type, at_time_arg))
self.asserts.append(key)
self.cases.assertion.description(key, descr)
self.cases.assertion.temporal(key, at_time_type, at_time_arg)
if key not in self.asserts:
self.asserts.append(key)
return key

def read_spec_item(self, key: str, value: Any | None = None):
Expand Down Expand Up @@ -329,7 +357,7 @@ def read_spec_item(self, key: str, value: Any | None = None):
if key in ("startTime", "stopTime", "stepSize"):
self.special.update({key: value}) # just keep these as a dictionary so far
else: # expect a variable-alias : value(s) specificator
key, at_time_type, at_time_arg = self._disect_at_time(key, value)
key, at_time_type, at_time_arg = self._disect_at_time_spec(key, value)
if at_time_type in ("get", "step"):
value = None
key, cvar_info, rng = self.cases.disect_variable(key)
Expand Down Expand Up @@ -1046,7 +1074,8 @@ def inspect(self, component: str | None = None, variable: str | None = None):
component (str): Possibility to inspect only data with respect to a given component
variable (str): Possibility to inspect only data with respect to a given variable
Retruns:
Returns
-------
A dictionary {<component.variable> : {'len':#data points, 'range':[tMin, tMax], 'info':info-dict}
The info-dict is and element of Cases.variables. See Cases.get_case_variables() for definition.
"""
Expand Down Expand Up @@ -1078,49 +1107,66 @@ def inspect(self, component: str | None = None, variable: str | None = None):
)
return cont

def time_series(self, variable: str):
"""Extract the provided alias variables and make them available as two lists 'times' and 'values'
of equal length.
def retrieve(self, comp_var: Iterable) -> list:
"""Retrieve from results js5-dict the variables and return (times, values).
Args:
variable (str): variable identificator as str.
A variable identificator is the jspath expression after the time, i.e. <component>.<variable>[<element>]
For example 'bb.v[2]' identifies the z-velocity of the component 'bb'
Returns
-------
tuple of two lists (times, values)
comp_var (Iterable): iterable of (<component-name>, <variable_name>[, element])
Alternatively, the jspath syntax <component-name>.<variable_name>[[element]] can be used as comp_var.
Time is not explicitly including in comp_var
A record is only included if all variable are found for a given time
Returns:
Data table (list of lists), time and one column per variable
"""
if not len(self.res.js_py) or self.case is None:
return
times: list = []
values: list = []
for key in self.res.js_py:
found = self.res.jspath("$['" + str(key) + "']." + variable)
if found is not None:
if isinstance(found, list):
raise NotImplementedError("So far not implemented for multi-dimensional retrievals") from None
else:
times.append(float(key))
values.append(found)
return (times, values)
data = []
_comp_var = []
for _cv in comp_var:
el = None
if isinstance(_cv, str): # expect <component-name>.<variable_name> syntax
comp, var = _cv.split(".")
if "[" in var and var[-1] == "]": # explicit element
var, _el = var.split("[")
el = int(_el[:-1])
else: # expect (<component-name>, <variable_name>) syntax
comp, var = _cv
_comp_var.append((comp, var, el))

for key, values in self.res.js_py.items():
if key != "header":
time = float(key)
record = [time]
is_complete = True
for comp, var, el in _comp_var:
try:
_rec = values[comp][var]
except KeyError:
is_complete = False
break # give up
else:
record.append(_rec if el is None else _rec[el])

if is_complete:
data.append(record)
return data

def plot_time_series(self, variables: str | list[str], title: str = ""):
def plot_time_series(self, comp_var: Iterable, title: str = ""):
"""Extract the provided alias variables and plot the data found in the same plot.
Args:
variables (list[str]): list of variable identificators as str.
A variable identificator is the jspath expression after the time, i.e. <component>.<variable>[<element>]
For example 'bb.v[2]' identifies the z-velocity of the component 'bb'
comp_var (Iterable): Iterable of (<component-instance>,<variable>) tuples (as used in retrieve)
Alternatively, the jspath syntax <component>.<variable> is also accepted
title (str): optional title of the plot
"""
if not isinstance(variables, list):
variables = [
variables,
]
for var in variables:
times, values = self.time_series(var)

data = self.retrieve(comp_var)
times = [rec[0] for rec in data]
for i, var in enumerate(comp_var):
if isinstance(var, str):
label = var
else:
label = var[0] + "." + var[1]
if len(var) > 2:
label += "[" + var[2] + "]"
values = [rec[i + 1] for rec in data]
plt.plot(times, values, label=var, linewidth=3)

if len(title):
Expand Down
2 changes: 1 addition & 1 deletion src/sim_explorer/simulator_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from libcosimpy.CosimEnums import CosimVariableCausality, CosimVariableType, CosimVariableVariability # type: ignore
from libcosimpy.CosimExecution import CosimExecution # type: ignore
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level
from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore
from libcosimpy.CosimManipulator import CosimManipulator # type: ignore
from libcosimpy.CosimObserver import CosimObserver # type: ignore

Expand Down
20 changes: 12 additions & 8 deletions tests/data/BouncingBall3D/BouncingBall3D.cases
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ base : {
x[2] : 39.37007874015748, # this is in inch => 1m!
x@step : 'result',
v@step : 'result',
x_b[0]@step : 'res',
},
# assert: {
# 1 : 'abs(g-9.81)<1e-9'
# }
},
x_b@step : 'res',
}},
restitution : {
description : "Smaller coefficient of restitution e",
spec: {
Expand All @@ -37,9 +33,17 @@ restitutionAndGravity : {
parent : 'restitution',
spec : {
g : 1.5
}},
},
assert: {
1@A : ['g==1.5', 'Check setting of gravity (about 1/7 of earth)'],
2@ALWAYS : ['e==0.5', 'Check setting of restitution'],
3@F : ['x[2] < 3.0', 'For long times the z-position of the ball remains small (loss of energy)'],
4@T1.1547 : ['abs(x[2]) < 0.4', 'Close to bouncing time the ball should be close to the floor'],
}
},
gravity : {
description : "Gravity like on the moon",
spec : {
g : 1.5
}}}
},
}}
Loading

0 comments on commit d8cfd40

Please sign in to comment.