From d8cfd40f5b99c189afbafa91367ac3aaed82077c Mon Sep 17 00:00:00 2001 From: Eisinger Date: Fri, 13 Dec 2024 16:01:48 +0100 Subject: [PATCH] Inclusion of assertions in cases, such that results can be automatically checked --- src/sim_explorer/assertion.py | 451 ++++++++++++++++++ src/sim_explorer/case.py | 224 +++++---- src/sim_explorer/simulator_interface.py | 2 +- .../data/BouncingBall3D/BouncingBall3D.cases | 20 +- tests/test_assertion.py | 256 ++++++++++ tests/test_bouncing_ball_3d.py | 10 +- tests/test_results.py | 26 +- 7 files changed, 878 insertions(+), 111 deletions(-) create mode 100644 src/sim_explorer/assertion.py create mode 100644 tests/test_assertion.py diff --git a/src/sim_explorer/assertion.py b/src/sim_explorer/assertion.py new file mode 100644 index 0000000..f253734 --- /dev/null +++ b/src/sim_explorer/assertion.py @@ -0,0 +1,451 @@ +import ast +from enum import Enum +from typing import Any, Callable, Iterable + +import numpy as np + + +class Temporal(Enum): + UNDEFINED = 0 + A = 1 + ALWAYS = 1 + F = 2 + FINALLY = 2 + T = 3 + TIME = 3 + + +class Assertion: + """Defines a common Assertion object for checking expectations with respect to simulation results. + + The class uses eval/exec, where the symbols are + + * the independent variable t (time) + * all variables defined as variables in cases file, + * functions from any loaded module + + These can then be combined to boolean expressions and be checked against + single points of a data series (see `assert_single()` or against a whole series (see `assert_series()`). + + Single assertion expressions are stored in the dict self._expr with their key as given in cases file. + All assertions have a common symbol basis in self._symbols + + Args: + funcs (dict) : Dictionary of module : of allowed functions inside assertion expressions. + """ + + def __init__(self, imports: dict | None = None): + if imports is None: + self._imports = {"math": ["sin", "cos", "sqrt"]} # default imports + else: + self._imports = imports + self._symbols = {"t": 1} # list of all symbols and their length + self._functions: list = [] # list of all functions used in expressions + # per expression as key: + self._syms: dict = {} # the symbols used in expression + self._funcs: dict = {} # the functions used in expression + self._expr: dict = {} # the raw expression + self._compiled: dict = {} # the byte-compiled expression + self._temporal: dict = {} # additional information for evaluation as time series + self._description: dict = {} + self._cases_variables: dict = {} # is set to Cases.variables when calling self.register_vars + self._assertions: dict = {} # assertion results, set by do_assert + + def info(self, sym: str, typ: str = "instance") -> str | int: + """Retrieve detailed information related to the registered symbol 'sym'.""" + if sym == "t": # the independent variable + return {"instance": "none", "variable": "t", "length": 1, "model": "none"}[typ] # type: ignore + + parts = sym.split("_") + var = parts.pop() + while True: + if var in self._cases_variables: # found the variable + if not len(parts): # abbreviated variable without instance information + assert len(self._cases_variables[var]["instances"]) == 1, f"Non-unique instance for variable {var}" + instance = self._cases_variables[var]["instances"][0] # use the unique instance + else: + instance = parts[0] + "".join("_" + x for x in parts[1:]) + assert instance in self._cases_variables[var]["instances"], f"No instance {instance} of {var}" + break + else: + if not len(parts): + raise KeyError(f"The symbol {sym} does not seem to represent a registered variable") from None + var = parts.pop() + "_" + var + if typ == "instance": # get the instance + return instance + elif typ == "variable": # get the generic variable name + return var + elif typ == "length": # get the number of elements + return len(self._cases_variables[var]["variables"]) + elif typ == "model": # get the basic (FMU) model + return self._cases_variables[var]["model"] + else: + raise KeyError(f"Unknown typ {typ} within info()") from None + + def symbol(self, name: str, length: int = 1): + """Get or set a symbol. + + Args: + key (str): The symbol identificator (name) + length (int)=1: Optional length. 1,2,3 allowed. + Vectors are registered as # + for the whole vector + + Returns: The sympy Symbol corresponding to the name 'key' + """ + try: + sym = self._symbols[name] + except KeyError: # not yet registered + assert length > 0, f"Vector length should be positive. Found {length}" + if length > 1: + self._symbols.update({name: np.ones(length, dtype=float)}) # type: ignore + else: + self._symbols.update({name: 1}) + sym = self._symbols[name] + return sym + + def expr(self, key: str, ex: str | None = None): + """Get or set an expression. + + Args: + key (str): the expression identificator + ex (str): Optional expression as string. If not None, register/update the expression as key + + Returns: the sympified expression + """ + + def make_func(name: str, args: dict, body: str): + """Make a python function from the body.""" + code = "def _" + name + "(" + for a in args: + code += a + ", " + code += "):\n" + # code += " print('dir:', dir())\n" + code += " return " + body + "\n" + return code + + if ex is None: # getter + try: + ex = self._expr[key] + except KeyError as err: + raise Exception(f"Expression with identificator {key} is not found") from err + else: + return ex + else: # setter + syms, funcs = self.expr_get_symbols_functions(ex) + self._syms.update({key: syms}) + self._funcs.update({key: funcs}) + code = make_func(key, syms, ex) + try: + # print("GLOBALS", globals()) + # print("LOCALS", locals()) + # exec( code, globals(), locals()) # compile using the defined symbols + compiled = compile(code, "", "exec") # compile using the defined symbols + except ValueError as err: + raise Exception(f"Something wrong with expression {ex}: {err}|. Cannot compile.") from None + else: + self._expr.update({key: ex}) + self._compiled.update({key: compiled}) + # print("KEY", key, ex, syms, compiled) + return compiled + + def syms(self, key: str): + """Get the symbols of the expression 'key'.""" + try: + syms = self._syms[key] + except KeyError as err: + raise Exception(f"Expression {key} was not found") from err + else: + return syms + + def expr_get_symbols_functions(self, expr: str) -> tuple: + """Get the symbols used in the expression. + + 1. Symbol as listed in expression and function body. In general _[] + 2. Argument as used in the argument list of the function call. In general _ + 3. Fully qualified symbol: (, , |None) + + If there is only a single instance, it is allowed to skip in 1 and 2 + + Returns + ------- + tuple of (syms, funcs), + where syms is a dict {_ : fully-qualified-symbol tuple, ...} + + funcs is a list of functions used in the expression. + """ + + def ast_walk(node: ast.AST, syms: list | None = None, funcs: list | None = None): + """Recursively walk an ast node (width first) and collect symbol and function names.""" + if syms is None: + syms = [] + if funcs is None: + funcs = [] + for n in ast.iter_child_nodes(node): + if isinstance(n, ast.Name): + if n.id in self._symbols: + if isinstance(syms, list) and n.id not in syms: + syms.append(n.id) + elif isinstance(node, ast.Call): + if isinstance(funcs, list) and n.id not in funcs: + funcs.append(n.id) + else: + raise KeyError(f"Unknown symbol {n.id}") + syms, funcs = ast_walk(n, syms, funcs) + return (syms, funcs) + + if expr in self._expr: # assume that actually a key is queried + expr = self._expr[expr] + syms, funcs = ast_walk(ast.parse(expr, "", "exec")) + syms = sorted(syms, key=list(self._symbols.keys()).index) + return (syms, funcs) + + def temporal(self, key: str, typ: Temporal | str | None = None, args: tuple | None = None): + """Get or set a temporal instruction. + + Args: + key (str): the assert key + typ (str): optional temporal type + """ + if typ is None: # getter + try: + temp = self._temporal[key] + except KeyError as err: + raise Exception(f"Temporal instruction for {key} is not found") from err + else: + return temp + else: # setter + if isinstance(typ, Temporal): + self._temporal.update({key: {"type": typ, "args": args}}) + elif isinstance(typ, str): + self._temporal.update({key: {"type": Temporal[typ], "args": args}}) + else: + raise ValueError(f"Unknown temporal type {typ}") from None + return self._temporal[key] + + def description(self, key: str, descr: str | None = None): + """Get or set a description.""" + if descr is None: # getter + try: + _descr = self._description[key] + except KeyError as err: + raise Exception(f"Description for {key} not found") from err + else: + return _descr + else: # setter + self._description.update({key: descr}) + return descr + + def assertions(self, key: str, res: bool | None = None, details: str | None = None): + """Get or set an assertion result.""" + if res is None: # getter + try: + _res = self._assertions[key] + except KeyError as err: + raise Exception(f"Assertion results for {key} not found") from err + else: + return _res + else: # setter + self._assertions.update({key: {"passed": res, "details": details}}) + return self._assertions[key] + + def register_vars(self, variables: dict): + """Register the variables in varnames as symbols. + + Can be used directly from Cases with varnames = tuple( Cases.variables.keys()) + """ + self._cases_variables = variables # remember the full dict for retrieval of details + for key, info in variables.items(): + for inst in info["instances"]: + if len(info["instances"]) == 1: # the instance is unique + self.symbol(key, len(info["variables"])) # we allow to use the 'short name' if unique + self.symbol(inst + "_" + key, len(info["variables"])) # fully qualified name can always be used + + def make_locals(self, loc: dict): + """Adapt the locals with 'allowed' functions.""" + from importlib import import_module + + for modulename, funclist in self._imports.items(): + module = import_module(modulename) + for func in funclist: + loc.update({func: getattr(module, func)}) + loc.update({"np": import_module("numpy")}) + return loc + + def _eval(self, func: Callable, kvargs: dict | list | tuple): + """Call a function of multiple arguments and return the single result. + All internal vecor arguments are transformed to np.arrays. + """ + if isinstance(kvargs, dict): + for k, v in kvargs.items(): + if isinstance(v, Iterable): + kvargs[k] = np.array(v, float) + return func(**kvargs) + elif isinstance(kvargs, list): + for i, v in enumerate(kvargs): + if isinstance(v, Iterable): + kvargs[i] = np.array(v, dtype=float) + return func(*kvargs) + elif isinstance(kvargs, tuple): + _args = [] # make new, because tuple is not mutable + for v in kvargs: + if isinstance(v, Iterable): + _args.append(np.array(v, dtype=float)) + else: + _args.append(v) + return func(*_args) + + def eval_single(self, key: str, kvargs: dict | list | tuple): + """Perform assertion of 'key' on a single data point. + + Args: + key (str): The expression identificator to be used + kvargs (dict|list|tuple): variable substitution kvargs as dict or args as tuple/list + All required variables for the evaluation shall be listed. + Results: + (bool) result of assertion + """ + assert key in self._compiled, f"Expression {key} not found" + loc = self.make_locals(locals()) + exec(self._compiled[key], loc, loc) + # print("kvargs", kvargs, self._syms[key], self.expr_get_symbols_functions(key)) + return self._eval(locals()["_" + key], kvargs) + + def eval_series(self, key: str, data: list[list], ret: float | str | Callable | None = None): + """Perform assertion on a (time) series. + + Args: + key (str): Expression identificator + data (tuple): data table with arguments as columns and series in rows, + where the independent variable (normally the time) shall be listed first in each row. + All required variables for the evaluation shall be listed (columns) + The names of variables correspond to self._syms[key], but is taken as given here. + ret (str)='bool': Determines how to return the result of the assertion: + + float : Linear interpolation of result at the given float time + `bool` : (time, True/False) for first row evaluating to True. + `bool-list` : (times, True/False) for all data points in the series + `A` : Always true for the whole time-series. Same as 'bool' + `F` : is True at end of time series. + Callable : run the given callable on times, expr(data) + None : Use the internal 'temporal(key)' setting + Results: + tuple of (time(s), value(s)), depending on `ret` parameter + """ + times = [] # return the independent variable values (normally time) + results = [] # return the scalar results at all times + bool_type = (ret is None and self.temporal(key)["type"] in (Temporal.A, Temporal.F)) or ( + isinstance(ret, str) and (ret in ["A", "F"] or ret.startswith("bool")) + ) + argnames = self._syms[key] + loc = self.make_locals(locals()) + exec(self._compiled[key], loc, loc) # the function is then available as _ among locals() + func = locals()["_" + key] # scalar function of all used arguments + _temp = self._temporal[key]["type"] if ret is None else Temporal.UNDEFINED + + for row in data: + if not isinstance(row, Iterable): # can happen if the time itself is evaluated + time = row + row = [row] + elif "t" not in argnames: # the independent variable is not explicitly used in the expression + time = row[0] + row = row[1:] + assert len(row), f"Time data in eval_series seems to be lacking. Data:{data}, Argnames:{argnames}" + else: # time used also explicitly in the expression + time = row[0] + res = func(*row) + if bool_type: + res = bool(res) + + times.append(time) + results.append(res) # Note: res is always a scalar result + + if (ret is None and _temp == Temporal.A) or (isinstance(ret, str) and ret in ("A", "bool")): # always True + for t, v in zip(times, results, strict=False): + if v: + return (t, True) + return (times[-1], False) + elif (ret is None and _temp == Temporal.F) or (isinstance(ret, str) and ret == "F"): # finally True + t_true = times[-1] + for t, v in zip(times, results, strict=False): + if v and t_true > t: + t_true = t + elif not v and t_true < t: # detected False after expression became True + t_true = times[-1] + return (t_true, t_true < times[-1]) + elif isinstance(ret, str) and ret == "bool-list": + return (times, results) + elif (ret is None and _temp == Temporal.T) or (isinstance(ret, float)): + if isinstance(ret, float): + t0 = ret + else: + assert len(self._temporal[key]["args"]), "Need a temporal argument (time at which to interpolate)" + t0 = self._temporal[key]["args"][0] + # idx = min(range(len(times)), key=lambda i: abs(times[i]-t0)) + # print("INDEX", t0, idx, results[idx-10:idx+10]) + # return (t0, results[idx]) + # else: + interpolated = np.interp(t0, times, results) + return (t0, bool(interpolated) if all(isinstance(res, bool) for res in results) else interpolated) + elif callable(ret): + return (times, ret(results)) + else: + raise ValueError(f"Unknown return type '{ret}'") from None + + def do_assert(self, key: str, result: Any): + """Perform assert action 'key' on data of 'result' object.""" + assert isinstance(key, str) and key in self._temporal, f"Assertion key {key} not found" + from sim_explorer.case import Results + + assert isinstance(result, Results), f"Results object expected. Found {result}" + inst = [] + var = [] + for sym in self._syms[key]: + inst.append(self.info(sym, "instance")) + var.append(self.info(sym, "variable")) + assert len(var), "No variables to retrieve" + if var[0] == "t": # the independent variable is always the first column in data + inst.pop(0) + var.pop(0) + + data = result.retrieve(zip(inst, var, strict=False)) + res = self.eval_series(key, data, ret=None) + if self._temporal[key]["type"] == Temporal.A: + self.assertions(key, res[1]) + elif self._temporal[key]["type"] == Temporal.F: + self.assertions(key, res[1], f"@{res[0]}") + elif self._temporal[key]["type"] == Temporal.T: + self.assertions(key, res[1], f"@{res[0]} (interpolated)") + return res[1] + + def do_assert_case(self, result: Any) -> list[int]: + """Perform all assertions defined for the case related to the result object.""" + count = [0, 0] + for key in result.case.asserts: + self.do_assert(key, result) + count[0] += self._assertions[key]["passed"] + count[1] += 1 + return count + + def report(self, case: Any = None): + """Report on all registered asserts. + If case denotes a case object, only the results for this case are reported. + """ + + def do_report(key: str): + return { + "key": key, + "description": self._description[key], + "temporal": self._temporal[key], + "expression": self._expr[key], + "passed": self._assertions[key].get("passed", "unknown"), + "assert-details": self._assertions[key].get("passed", "none"), + } + + from sim_explorer.case import Case + + if isinstance(case, Case): + for key in case.asserts: + yield do_report(key) + else: # report all + for key in self._assertions: + yield do_report(key) diff --git a/src/sim_explorer/case.py b/src/sim_explorer/case.py index 752891d..41d56b3 100644 --- a/src/sim_explorer/case.py +++ b/src/sim_explorer/case.py @@ -1,4 +1,3 @@ -# pyright: reportMissingImports=false, reportGeneralTypeIssues=false from __future__ import annotations import os @@ -6,13 +5,13 @@ from datetime import datetime from functools import partial from pathlib import Path -from typing import Any +from typing import Any, Iterable import matplotlib.pyplot as plt import numpy as np -from libcosimpy.CosimLogging import CosimLogLevel, log_output_level +from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore -from sim_explorer.assertion import Assertion +from sim_explorer.assertion import Assertion, Temporal from sim_explorer.exceptions import CaseInitError from sim_explorer.json5 import Json5 from sim_explorer.simulator_interface import SimulatorInterface @@ -203,13 +202,56 @@ def _num_elements(obj) -> int: else: return 1 - def _disect_at_time(self, txt: str, value: Any | None = None, tl: bool = False) -> tuple[str, str, float]: + def _disect_at_time_tl(self, txt: str, value: Any | None = None) -> tuple[str, Temporal, tuple]: + """Disect the @txt argument into 'at_time_type' and 'at_time_arg' for Temporal specification. + + Args: + txt (str): The key text after '@' and before ':' + value (Any): the value argument. Needed to distinguish the action type + + Returns + ------- + tuple of pre, type, arg, where + pre is the text before '@', + type is the Temporal type, + args is the tuple of temporal arguments (may be empty) + """ + + def time_spec(at: str): + """Analyse the specification after '@' and disect into typ and arg.""" + try: + arg_float = float(at) + return (Temporal["T"], (arg_float,)) + except ValueError: + for i in range(len(at) - 1, -1, -1): + try: + typ = Temporal[at[i]] + except KeyError: + pass + else: + if at[i + 1 :].strip() == "": + return (typ, ()) + elif typ == Temporal.T: + return (typ, (float(at[i + 1 :].strip()),)) + else: + return (typ, (at[i + 1 :].strip(),)) + raise ValueError(f"Unknown Temporal specification {at}") from None + + pre, _, at = txt.partition("@") + assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time" + assert isinstance(value, list), f"Assertion spec expected: [expression, description]. Found {value}" + if not len(at): # no @time spec. Assume 'A'lways + return (pre, Temporal.ALWAYS, ()) + else: + typ, arg = time_spec(at) + return (pre, typ, arg) + + def _disect_at_time_spec(self, txt: str, value: Any | None = None) -> tuple[str, str, float]: """Disect the @txt argument into 'at_time_type' and 'at_time_arg'. Args: txt (str): The key text after '@' and before ':' value (Any): the value argument. Needed to distinguish the action type - tl (bool)=False: expect a Temporal Logic type of '@' specification (for assertion) Returns ------- @@ -219,52 +261,37 @@ def _disect_at_time(self, txt: str, value: Any | None = None, tl: bool = False) arg is the time argument, or -1 """ - def time_spec(at: str, tl: bool): + def time_spec(at: str): """Analyse the specification after '@' and disect into typ and arg.""" try: arg_float = float(at) - except Exception: + return ("set" if Case._num_elements(value) else "get", arg_float) + except ValueError: arg_float = float("-inf") - if tl: - typ = at[0] if arg_float == float("-inf") else "T" - assert typ in ("T", "G", "F"), f"Unknown temporal type {typ}" - return (typ, arg_float) - else: - if arg_float == float("-inf"): - if at.startswith("step"): - try: - return ("step", float(at[4:])) - except Exception: - return ("step", -1) # this means 'all macro steps' - else: - raise AssertionError(f"Unknown '@{txt}'. Case:{self.name}, value:'{value}'") from None + if at.startswith("step"): + try: + return ("step", float(at[4:])) + except Exception: + return ("step", -1) # this means 'all macro steps' else: - return ("set" if Case._num_elements(value) else "get", arg_float) + raise AssertionError(f"Unknown '@{txt}'. Case:{self.name}, value:'{value}'") from None pre, _, at = txt.partition("@") assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time" - if tl: # temporal logic specification - assert isinstance(value, str), f"String value expected. Found {value}" - if not len(at): # no @time spec. Assume 'G'lobal - return (pre, "G", float("-inf")) + if value in ("result", "res"): # mark variable specification as 'get' or 'step' action + value = None + if not len(at): # no @time spec + if value is None: + return (pre, "get", self.special["stopTime"]) # report final value else: - typ, arg = time_spec(at, tl) - return (pre, typ, arg) - else: - if value in ("result", "res"): # mark variable specification as 'get' or 'step' action - value = None - if not len(at): # no @time spec - if value is None: - return (pre, "get", self.special["stopTime"]) # report final value - else: - msg = f"Value required for 'set' in _disect_at_time('{txt}','{self.name}','{value}')" - assert Case._num_elements(value), msg - return (pre, "set", 0) # set at startTime - else: # time spec provided - typ, arg = time_spec(at, tl) - return (pre, typ, arg) - - def read_assertion(self, key: str, expr: Any | None = None): + msg = f"Value required for 'set' in _disect_at_time('{txt}','{self.name}','{value}')" + assert Case._num_elements(value), msg + return (pre, "set", 0) # set at startTime + else: # time spec provided + typ, arg = time_spec(at) + return (pre, typ, arg) + + def read_assertion(self, key: str, expr_descr: list | None = None): """Read an assert statement, compile as sympy expression, register and store the key.. Args: @@ -272,18 +299,19 @@ def read_assertion(self, key: str, expr: Any | None = None): Also assertion keys can have temporal specifications (@...) with the following possibilities: - * @G : The expression is expected to be globally (always) true - * @F : The expression is expected to be true at some point in time - * @: The expression is expected to be true at the specific time value - expr: A sympy expression using available variables + * @A : The expression is expected to be Always (globally) true + * @F : The expression is expected to be true during the end of the simulation + * @ or @T: The expression is expected to be true at the specific time value + expr: A python expression using available variables """ - key, at_time_type, at_time_arg = self._disect_at_time(key, expr, tl=True) + key, at_time_type, at_time_arg = self._disect_at_time_tl(key, expr_descr) + assert isinstance(expr_descr, list), f"Assertion expression {expr_descr} should include a description." + expr, descr = expr_descr self.cases.assertion.expr(key, expr) - if at_time_type in ("G", "F"): # no time argument - self.cases.assertion.temporal(key, (at_time_type,)) - elif at_time_type in ("T",): - self.cases.assertion.temporal(key, (at_time_type, at_time_arg)) - self.asserts.append(key) + self.cases.assertion.description(key, descr) + self.cases.assertion.temporal(key, at_time_type, at_time_arg) + if key not in self.asserts: + self.asserts.append(key) return key def read_spec_item(self, key: str, value: Any | None = None): @@ -329,7 +357,7 @@ def read_spec_item(self, key: str, value: Any | None = None): if key in ("startTime", "stopTime", "stepSize"): self.special.update({key: value}) # just keep these as a dictionary so far else: # expect a variable-alias : value(s) specificator - key, at_time_type, at_time_arg = self._disect_at_time(key, value) + key, at_time_type, at_time_arg = self._disect_at_time_spec(key, value) if at_time_type in ("get", "step"): value = None key, cvar_info, rng = self.cases.disect_variable(key) @@ -1046,7 +1074,8 @@ def inspect(self, component: str | None = None, variable: str | None = None): component (str): Possibility to inspect only data with respect to a given component variable (str): Possibility to inspect only data with respect to a given variable - Retruns: + Returns + ------- A dictionary { : {'len':#data points, 'range':[tMin, tMax], 'info':info-dict} The info-dict is and element of Cases.variables. See Cases.get_case_variables() for definition. """ @@ -1078,49 +1107,66 @@ def inspect(self, component: str | None = None, variable: str | None = None): ) return cont - def time_series(self, variable: str): - """Extract the provided alias variables and make them available as two lists 'times' and 'values' - of equal length. + def retrieve(self, comp_var: Iterable) -> list: + """Retrieve from results js5-dict the variables and return (times, values). Args: - variable (str): variable identificator as str. - A variable identificator is the jspath expression after the time, i.e. .[] - For example 'bb.v[2]' identifies the z-velocity of the component 'bb' - - Returns - ------- - tuple of two lists (times, values) + comp_var (Iterable): iterable of (, [, element]) + Alternatively, the jspath syntax .[[element]] can be used as comp_var. + Time is not explicitly including in comp_var + A record is only included if all variable are found for a given time + Returns: + Data table (list of lists), time and one column per variable """ - if not len(self.res.js_py) or self.case is None: - return - times: list = [] - values: list = [] - for key in self.res.js_py: - found = self.res.jspath("$['" + str(key) + "']." + variable) - if found is not None: - if isinstance(found, list): - raise NotImplementedError("So far not implemented for multi-dimensional retrievals") from None - else: - times.append(float(key)) - values.append(found) - return (times, values) + data = [] + _comp_var = [] + for _cv in comp_var: + el = None + if isinstance(_cv, str): # expect . syntax + comp, var = _cv.split(".") + if "[" in var and var[-1] == "]": # explicit element + var, _el = var.split("[") + el = int(_el[:-1]) + else: # expect (, ) syntax + comp, var = _cv + _comp_var.append((comp, var, el)) + + for key, values in self.res.js_py.items(): + if key != "header": + time = float(key) + record = [time] + is_complete = True + for comp, var, el in _comp_var: + try: + _rec = values[comp][var] + except KeyError: + is_complete = False + break # give up + else: + record.append(_rec if el is None else _rec[el]) + + if is_complete: + data.append(record) + return data - def plot_time_series(self, variables: str | list[str], title: str = ""): + def plot_time_series(self, comp_var: Iterable, title: str = ""): """Extract the provided alias variables and plot the data found in the same plot. Args: - variables (list[str]): list of variable identificators as str. - A variable identificator is the jspath expression after the time, i.e. .[] - For example 'bb.v[2]' identifies the z-velocity of the component 'bb' + comp_var (Iterable): Iterable of (,) tuples (as used in retrieve) + Alternatively, the jspath syntax . is also accepted title (str): optional title of the plot """ - if not isinstance(variables, list): - variables = [ - variables, - ] - for var in variables: - times, values = self.time_series(var) - + data = self.retrieve(comp_var) + times = [rec[0] for rec in data] + for i, var in enumerate(comp_var): + if isinstance(var, str): + label = var + else: + label = var[0] + "." + var[1] + if len(var) > 2: + label += "[" + var[2] + "]" + values = [rec[i + 1] for rec in data] plt.plot(times, values, label=var, linewidth=3) if len(title): diff --git a/src/sim_explorer/simulator_interface.py b/src/sim_explorer/simulator_interface.py index f16b5ef..cf85c09 100644 --- a/src/sim_explorer/simulator_interface.py +++ b/src/sim_explorer/simulator_interface.py @@ -6,7 +6,7 @@ from libcosimpy.CosimEnums import CosimVariableCausality, CosimVariableType, CosimVariableVariability # type: ignore from libcosimpy.CosimExecution import CosimExecution # type: ignore -from libcosimpy.CosimLogging import CosimLogLevel, log_output_level +from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore from libcosimpy.CosimManipulator import CosimManipulator # type: ignore from libcosimpy.CosimObserver import CosimObserver # type: ignore diff --git a/tests/data/BouncingBall3D/BouncingBall3D.cases b/tests/data/BouncingBall3D/BouncingBall3D.cases index 53c8ebe..c41aa16 100644 --- a/tests/data/BouncingBall3D/BouncingBall3D.cases +++ b/tests/data/BouncingBall3D/BouncingBall3D.cases @@ -21,12 +21,8 @@ base : { x[2] : 39.37007874015748, # this is in inch => 1m! x@step : 'result', v@step : 'result', - x_b[0]@step : 'res', - }, -# assert: { -# 1 : 'abs(g-9.81)<1e-9' -# } - }, + x_b@step : 'res', + }}, restitution : { description : "Smaller coefficient of restitution e", spec: { @@ -37,9 +33,17 @@ restitutionAndGravity : { parent : 'restitution', spec : { g : 1.5 - }}, + }, + assert: { + 1@A : ['g==1.5', 'Check setting of gravity (about 1/7 of earth)'], + 2@ALWAYS : ['e==0.5', 'Check setting of restitution'], + 3@F : ['x[2] < 3.0', 'For long times the z-position of the ball remains small (loss of energy)'], + 4@T1.1547 : ['abs(x[2]) < 0.4', 'Close to bouncing time the ball should be close to the floor'], + } +}, gravity : { description : "Gravity like on the moon", spec : { g : 1.5 - }}} + }, +}} diff --git a/tests/test_assertion.py b/tests/test_assertion.py new file mode 100644 index 0000000..78f442a --- /dev/null +++ b/tests/test_assertion.py @@ -0,0 +1,256 @@ +import ast +from math import cos, sin +from pathlib import Path + +import matplotlib.pyplot as plt +import numpy as np +import pytest + +from sim_explorer.assertion import Assertion, Temporal +from sim_explorer.case import Cases, Results + +_t = [0.1 * float(x) for x in range(100)] +_x = [0.3 * sin(t) for t in _t] +_y = [1.0 * cos(t) for t in _t] + + +def test_globals_locals(): + """Test the usage of the globals and locals arguments within exec.""" + from importlib import __import__ + + module = __import__("math", fromlist=["sin"]) + locals().update({"sin": module.sin}) + code = "def f(x):\n return sin(x)" + compiled = compile(code, "", "exec") + exec(compiled, locals(), locals()) + # print(f"locals:{locals()}") + assert abs(locals()["f"](3.0) - sin(3.0)) < 1e-15 + + +def test_ast(show): + expr = "1+2+x.dot(x) + sin(y)" + if show: + a = ast.parse(expr, "", "exec") + print(a, ast.dump(a, indent=4)) + + ass = Assertion() + ass.register_vars( + {"x": {"instances": ("dummy",), "variables": (1, 2, 3)}, "y": {"instances": ("dummy2",), "variables": (1,)}} + ) + syms, funcs = ass.expr_get_symbols_functions(expr) + assert syms == ["x", "y"], f"SYMS: {syms}" + assert funcs == ["sin"], f"FUNCS: {funcs}" + + expr = "abs(y-4)<0.11" + if show: + a = a = ast.parse(expr) + print(a, ast.dump(a, indent=4)) + syms, funcs = ass.expr_get_symbols_functions(expr) + assert syms == ["y"] + assert funcs == ["abs"] + + ass = Assertion() + ass.symbol("t", 1) + ass.symbol("x", 3) + ass.symbol("y", 1) + ass.expr("1", "1+2+x.dot(x) + sin(y)") + syms, funcs = ass.expr_get_symbols_functions("1") + assert syms == ["x", "y"] + assert funcs == ["sin"] + syms, funcs = ass.expr_get_symbols_functions("abs(y-4)<0.11") + assert syms == ["y"] + assert funcs == ["abs"] + + ass = Assertion() + ass.register_vars( + {"g": {"instances": ("bb",), "variables": (1,)}, "x": {"instances": ("bb",), "variables": (2, 3, 4)}} + ) + expr = "sqrt(2*bb_x[2] / bb_g)" # fully qualified variables with components + a = ast.parse(expr, "", "exec") + if show: + print(a, ast.dump(a, indent=4)) + syms, funcs = ass.expr_get_symbols_functions(expr) + assert syms == ["bb_g", "bb_x"] + assert funcs == ["sqrt"] + + +def show_data(): + fig, ax = plt.subplots() + ax.plot(_x, _y) + plt.title("Data (_x, _y)", loc="left") + plt.show() + + +def test_temporal(): + print(Temporal.ALWAYS.name) + for name, member in Temporal.__members__.items(): + print(name, member, member.value) + assert Temporal["A"] == Temporal.A, "Set through name as string" + assert Temporal["ALWAYS"] == Temporal.A, "Alias works also" + with pytest.raises(KeyError) as err: + _ = Temporal["G"] + assert str(err.value) == "'G'", f"Found:{err.value}" + + +def test_assertion(): + # show_data()print("Analyze", analyze( "t>8 & x>0.1")) + ass = Assertion() + ass.symbol("t") + ass.register_vars( + { + "x": {"instances": ("dummy",), "variables": (2,)}, + "y": {"instances": ("dummy",), "variables": (3,)}, + "z": {"instances": ("dummy",), "variables": (4, 5)}, + } + ) + ass.expr("1", "t>8") + assert ass.eval_single("1", {"t": 9.0}) + assert not ass.eval_single("1", {"t": 7}) + times, results = ass.eval_series("1", _t, "bool-list") + assert True in results, "There is at least one point where the assertion is True" + assert results.index(True) == 81, f"Element {results.index(True)} is True" + assert all(results[i] for i in range(81, 100)), "Assertion remains True" + assert ass.eval_series("1", _t, max)[1] + assert results == ass.eval_series("1", _t, "bool-list")[1] + assert ass.eval_series("1", _t, "F") == (8.1, True), "Finally True" + ass.symbol("x") + ass.expr("2", "(t>8) and (x>0.1)") + times, results = ass.eval_series("2", zip(_t, _x, strict=True), "bool") + assert times == 8.1, f"Should be 'True' (at some point). Found {times}, {results}. Expr: {ass.expr('2')}" + times, results = ass.eval_series("2", zip(_t, _x, strict=True), "bool-list") + time_interval = [r[0] for r in filter(lambda res: res[1], zip(times, results, strict=False))] + assert (time_interval[0], time_interval[-1]) == (8.1, 9.0) + assert len(time_interval) == 10 + with pytest.raises(ValueError, match="Unknown return type 'Hello'") as err: + ass.eval_series("2", zip(_t, _x, strict=True), "Hello") + assert str(err.value) == "Unknown return type 'Hello'" + # Checking equivalence. '==' does not work + ass.symbol("y") + ass.expr("3", "(y<=4) & (y>=4)") + expected = ["t", "x", "dummy_x", "y", "dummy_y", "z", "dummy_z"] + assert list(ass._symbols.keys()) == expected, f"Found: {list(ass._symbols.keys())}" + assert ass.expr_get_symbols_functions("3") == (["y"], []) + assert ass.eval_single("3", {"y": 4}) + assert not ass.eval_series("3", zip(_t, _y, strict=True), ret="bool")[1] + ass.expr("4", "y==4"), "Also equivalence check is allowed here" + assert ass.eval_single("4", {"y": 4}) + ass.expr("5", "abs(y-4)<0.11") # abs function can also be used + assert ass.eval_single("5", (4.1,)) + ass.expr("6", "sin(t)**2 + cos(t)**2") + assert abs(ass.eval_series("6", _t, ret=max)[1] - 1.0) < 1e-15, "sin and cos accepted" + ass.expr("7", "sqrt(t)") + assert abs(ass.eval_series("7", _t, ret=max)[1] ** 2 - _t[-1]) < 1e-14, "Also sqrt works out of the box" + ass.expr("8", "dummy_x*dummy_y") + assert abs(ass.eval_series("8", zip(_t, _x, _y, strict=False), ret=max)[1] - 0.14993604045622577) < 1e-14 + ass.expr("9", "dummy_x*dummy_y* z[0]") + assert ( + abs( + ass.eval_series("9", zip(_t, _x, _y, zip(_x, _y, strict=False), strict=False), ret=max)[1] + - 0.03455981729517478 + ) + < 1e-14 + ) + + +def test_assertion_spec(): + cases = Cases(Path(__file__).parent / "data" / "SimpleTable" / "test.cases") + _c = cases.case_by_name("case1") + _c.read_assertion("3@9.85", ["x*t", "Description"]) + assert _c.cases.assertion.expr_get_symbols_functions("3") == (["t", "x"], []) + res = _c.cases.assertion.eval_series("3", zip(_t, _x, strict=False), ret=9.85) + assert _c.cases.assertion.info("x", "instance") == "tab" + _c.read_assertion("1", ["t-1", "Description"]) + assert _c.asserts == ["3", "1"] + assert _c.cases.assertion.temporal("1")["type"] == Temporal.A + assert _c.cases.assertion.eval_single("1", (1,)) == 0 + with pytest.raises(AssertionError) as err: + _c.read_assertion("2@F", "t-1") + assert str(err.value).startswith("Assertion spec expected: [expression, description]. Found") + _c.read_assertion("2@F", ["t-1", "Subtract 1 from time"]) + + assert _c.cases.assertion.temporal("2")["type"] == Temporal.F + assert _c.cases.assertion.temporal("2")["args"] == () + assert _c.cases.assertion.eval_single("2", (1,)) == 0 + _c.cases.assertion.symbol("y") + found = list(_c.cases.assertion._symbols.keys()) + assert found == ["t", "x", "tab_x", "i", "tab_i", "y"], f"Found: {found}" + _c.read_assertion("3@9.85", ["x*t", "Test assertion"]) + assert _c.asserts == ["3", "1", "2"], f"Found: {_c.asserts}" + assert _c.cases.assertion.temporal("3")["type"] == Temporal.T + assert _c.cases.assertion.temporal("3")["args"][0] == 9.85 + assert _c.cases.assertion.expr_get_symbols_functions("3") == (["t", "x"], []) + res = _c.cases.assertion.eval_series("3", zip(_t, _x, strict=False), ret=9.85) + assert res[0] == 9.85 + assert abs(res[1] - 0.5 * (_x[-1] * _t[-1] + _x[-2] * _t[-2])) < 1e-10 + + +def test_vector(): + """Test sympy vector operations.""" + ass = Assertion() + ass.symbol("x", length=3) + print("Symbol x", ass.symbol("x"), type(ass.symbol("x"))) + ass.expr("1", "x.dot(x)") + assert ass.expr_get_symbols_functions("1") == (["x"], []) + ass.eval_single("1", ((1, 2, 3),)) + ass.eval_single("1", {"x": (1, 2, 3)}) + assert ass.symbol("x").dot(ass.symbol("x")) == 3.0, "Initialized as ones" + assert ass.symbol("x").dot(np.array((0, 1, 0), dtype=float)) == 1.0, "Initialized as ones" + ass.symbol("y", 3) # a vector without explicit components + assert all(ass.symbol("y")[i] == 1.0 for i in range(3)) + y = ass.symbol("y") + assert y.dot(y) == 3.0, "Initialized as ones" + + +def test_do_assert(show): + cases = Cases(spec=Path(__file__).parent / "data" / "BouncingBall3D" / "BouncingBall3D.cases") + case = cases.case_by_name("restitutionAndGravity") + case.run() + #res = Results(file=Path(__file__).parent / "data" / "BouncingBall3D" / "restitutionAndGravity.js5") + res = case.res + # cases = res.case.cases + assert res.case.name == "restitutionAndGravity" + assert cases.file.name == "BouncingBall3D.cases" + for key, inf in res.inspect().items(): + print(key, inf["len"], inf["range"]) + info = res.inspect()["bb.v"] + assert info["len"] == 300 + assert info["range"] == [0.01, 3.0] + ass = cases.assertion + # ass.vector('x', (1,0,0)) + # ass.vector('v', (0,1,0)) + _ = ass.expr("0", "x.dot(v)") # additional expression (not in .cases) + assert ass._syms["0"] == ["x", "v"] + assert all(ass.symbol("x")[i] == np.ones(3, dtype=float)[i] for i in range(3)), "Initialized to ones" + assert ass.eval_single("0", ((1, 2, 3), (4, 5, 6))) == 32 + assert ass.expr("1") == "g==1.5" + assert ass.temporal("1")["type"] == Temporal.A + assert ass.syms("1") == ["g"] + assert ass.do_assert("1", res) + assert ass.assertions("1") == {"passed": True, "details": None} + ass.do_assert("2", res) + assert ass.assertions("2") == {"passed": True, "details": None}, f"Found {ass.assertions('2')}" + if show: + res.plot_time_series(["bb.x[2]"]) + ass.do_assert("3", res) + assert ass.assertions("3") == {"passed": True, "details": "@2.22"}, f"Found {ass.assertions('3')}" + ass.do_assert("4", res) + assert ass.assertions("4") == {"passed": True, "details": "@1.1547 (interpolated)"}, f"Found {ass.assertions('4')}" + count = ass.do_assert_case(res) # do all + assert count == [4, 4], "Expected 4 of 4 passed" + for rep in ass.report(): + print(rep) + + +if __name__ == "__main__": + # retcode = pytest.main(["-rA", "-v", __file__, "--show", "False"]) + # assert retcode == 0, f"Non-zero return code {retcode}" + import os + + os.chdir(Path(__file__).parent.absolute() / "test_working_directory") + # test_temporal() + # test_ast( show=True) + # test_globals_locals() + # test_assertion() + # test_assertion_spec() + # test_vector() + test_do_assert(show=True) diff --git a/tests/test_bouncing_ball_3d.py b/tests/test_bouncing_ball_3d.py index f5a83e5..4e6a84e 100644 --- a/tests/test_bouncing_ball_3d.py +++ b/tests/test_bouncing_ball_3d.py @@ -74,7 +74,7 @@ def check_case( e = val elif k == "x[2]": x[2] = val - elif k in ("x@step", "v@step", "x_b[0]@step"): + elif k in ("x@step", "v@step", "x_b@step"): pass # get actions else: raise KeyError(f"Unknown key {k}") @@ -95,8 +95,8 @@ def check_case( res=results.res.jspath(path="$['0.01'].bb.v"), expected=(v[0], 0, -g * dt), ) - x_b = results.res.jspath(path="$.['0.01'].bb.['x_b[0]']") - assert abs(x_b - x_bounce) < 1e-9 + x_b = results.res.jspath(path="$.['0.01'].bb.['x_b']") + assert abs(x_b[0] - x_bounce) < 1e-9 # just before bounce t_before = int(t_bounce * tfac) / tfac # * dt # just before bounce if t_before == t_bounce: # at the interval border @@ -110,7 +110,7 @@ def check_case( res=results.res.jspath(path=f"$['{t_before}'].bb.v"), expected=(v[0], 0, -g * t_before), ) - assert abs(results.res.jspath(f"$['{t_before}'].bb.['x_b[0]']") - x_bounce) < 1e-9 + assert abs(results.res.jspath(f"$['{t_before}'].bb.['x_b']")[0] - x_bounce) < 1e-9 # just after bounce ddt = t_before + dt - t_bounce # time from bounce to end of step x_bounce2 = x_bounce + 2 * v_bounce * e * 1.0 * e / g @@ -127,7 +127,7 @@ def check_case( res=results.res.jspath(path=f"$['{t_before+dt}'].bb.v"), expected=(e * v[0], 0, (v_bounce * e - g * ddt)), ) - assert abs(results.res.jspath(path=f"$['{t_before+dt}'].bb.['x_b[0]']") - x_bounce2) < 1e-9 + assert abs(results.res.jspath(path=f"$['{t_before+dt}'].bb.['x_b']")[0] - x_bounce2) < 1e-9 # from bounce to bounce v_x, v_z, t_b, x_b = ( v[0], diff --git a/tests/test_results.py b/tests/test_results.py index 266858b..a73ff88 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,8 +1,6 @@ from datetime import datetime from pathlib import Path -import pytest - from sim_explorer.case import Cases, Results @@ -39,7 +37,7 @@ def test_plot_time_series(show): assert file.exists(), f"File {file} not found" res = Results(file=file) if show: - res.plot_time_series(variables=["bb.x[2]", "bb.v[2]"], title="Test plot") + res.plot_time_series(comp_var=["bb.x[2]", "bb.v[2]"], title="Test plot") def test_inspect(): @@ -56,12 +54,24 @@ def test_inspect(): assert cont["bb.x"]["info"]["variables"] == (0, 1, 2), "ValueReferences" +def test_retrieve(): + file = Path(__file__).parent / "data" / "BouncingBall3D" / "test_results" + res = Results(file=file) + data = res.retrieve((("bb", "g"), ("bb", "e"))) + assert data == [[0.01, 9.81, 0.5]] + data = res.retrieve((("bb", "x"), ("bb", "v"))) + assert len(data) == 300 + assert data[0] == [0.01, [0.01, 0.0, 39.35076771653544], [1.0, 0.0, -0.0981]] + + if __name__ == "__main__": - retcode = pytest.main(["-rA", "-v", __file__, "--show", "True"]) - assert retcode == 0, f"Non-zero return code {retcode}" - # import os - # os.chdir(Path(__file__).parent.absolute() / "test_working_directory") + # retcode = pytest.main(["-rA", "-v", __file__, "--show", "True"]) + # assert retcode == 0, f"Non-zero return code {retcode}" + import os + + os.chdir(Path(__file__).parent.absolute() / "test_working_directory") + # test_retrieve() # test_init() # test_add() - # test_plot_time_series() + test_plot_time_series(show=True) # test_inspect()