diff --git a/CHANGELOG.md b/CHANGELOG.md
index d8a7b4f..3cbf3a3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,10 +3,13 @@
All notable changes to the [sim-explorer] project will be documented in this file.
The changelog format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
-## [Unreleased]
-
-/-
+## [0.2.0] - 2024-12-18
+New Assertions release:
+
+* Added support for assertions in each of the cases to have some kind of evaluation being run after every simulation.
+* Display features to show the results of the assertions in a developer friendly format.
## [0.1.0] - 2024-11-08
diff --git a/docs/source/conf.py b/docs/source/conf.py
index c72e540..1b4c676 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -26,7 +26,7 @@
author = "Siegfried Eisinger, DNV Simulation Technology Team, SEACo project team"
# The full version, including alpha/beta/rc tags
-release = "0.1.0"
+release = "0.2.0"
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
diff --git a/docs/source/sim-explorer.pptx b/docs/source/sim-explorer.pptx
index 85cbc74..3969175 100644
Binary files a/docs/source/sim-explorer.pptx and b/docs/source/sim-explorer.pptx differ
diff --git a/pyproject.toml b/pyproject.toml
index 770f23e..34d073b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -22,7 +22,7 @@ packages = [
[project]
name = "sim-explorer"
-version = "0.1.2"
+version = "0.2.0"
description = "Experimentation tools on top of OSP simulation models."
readme = "README.rst"
requires-python = ">= 3.10"
@@ -61,6 +61,8 @@ dependencies = [
"fmpy>=0.3.21",
"component-model>=0.1.0",
"plotly>=5.24.1",
+ "pydantic>=2.10.3",
+ "rich>=13.9.4",
]
[project.optional-dependencies]
diff --git a/src/sim_explorer/assertion.py b/src/sim_explorer/assertion.py
index fb69479..3b23f16 100644
--- a/src/sim_explorer/assertion.py
+++ b/src/sim_explorer/assertion.py
@@ -1,160 +1,449 @@
-from sympy import Symbol, sympify # type: ignore
-from sympy.vector import CoordSys3D # type: ignore
+# type: ignore
+
+import ast
+from typing import Any, Callable, Iterable, Iterator
+
+import numpy as np
+
+from sim_explorer.models import AssertionResult, Temporal
class Assertion:
- """Define Assertion objects for checking expectations with respect to simulation results.
+ """Defines a common Assertion object for checking expectations with respect to simulation results.
+
+ The class uses eval/exec, where the symbols are
+
+ * the independent variable t (time)
+ * all variables defined as variables in cases file,
+ * functions from any loaded module
- The class uses sympy, where the symbols are expected to be results variables,
- as defined in the variable definition section of Cases.
These can then be combined to boolean expressions and be checked against
single points of a data series (see `assert_single()` or against a whole series (see `assert_series()`).
- The symbols used in the expression are accessible as `.symbols` (dict of `name : symbol`).
- All symbols used by all defined Assertion objects are accessible as Assertion.ns
+ Single assertion expressions are stored in the dict self._expr with their key as given in cases file.
+ All assertions have a common symbol basis in self._symbols
Args:
- expr (str): The boolean expression definition as string.
- Any unknown symbol within the expression is defined as sympy.Symbol and is expected to match a variable.
+ funcs (dict) : Dictionary of module : of allowed functions inside assertion expressions.
"""
- ns: dict = {}
- N = CoordSys3D("N")
+ def __init__(self, imports: dict | None = None):
+ if imports is None:
+ self._imports = {"math": ["sin", "cos", "sqrt"]} # default imports
+ else:
+ self._imports = imports
+ self._symbols = {"t": 1} # list of all symbols and their length
+ self._functions: list = [] # list of all functions used in expressions
+ # per expression as key:
+ self._syms: dict = {} # the symbols used in expression
+ self._funcs: dict = {} # the functions used in expression
+ self._expr: dict = {} # the raw expression
+ self._compiled: dict = {} # the byte-compiled expression
+ self._temporal: dict = {} # additional information for evaluation as time series
+ self._description: dict = {}
+ self._cases_variables: dict = {} # is set to Cases.variables when calling self.register_vars
+ self._assertions: dict = {} # assertion results, set by do_assert
- def __init__(self, expr: str):
- self._expr = Assertion.do_sympify(expr)
- self._symbols = self.get_symbols()
- # t = Symbol('t', positive=True) # default symbol for time
- # self._symbols.update( {'t':t})
- Assertion.update_namespace(self._symbols)
+ def info(self, sym: str, typ: str = "instance") -> str | int:
+ """Retrieve detailed information related to the registered symbol 'sym'."""
+ if sym == "t": # the independent variable
+ return {"instance": "none", "variable": "t", "length": 1, "model": "none"}[typ] # type: ignore
- @property
- def expr(self):
- return self._expr
+ parts = sym.split("_")
+ var = parts.pop()
+ while True:
+ if var in self._cases_variables: # found the variable
+ if not len(parts): # abbreviated variable without instance information
+ assert len(self._cases_variables[var]["instances"]) == 1, f"Non-unique instance for variable {var}"
+ instance = self._cases_variables[var]["instances"][0] # use the unique instance
+ else:
+ instance = parts[0] + "".join("_" + x for x in parts[1:])
+ assert instance in self._cases_variables[var]["instances"], f"No instance {instance} of {var}"
+ break
+ else:
+ if not len(parts):
+ raise KeyError(f"The symbol {sym} does not seem to represent a registered variable") from None
+ var = parts.pop() + "_" + var
+ if typ == "instance": # get the instance
+ return instance
+ elif typ == "variable": # get the generic variable name
+ return var
+ elif typ == "length": # get the number of elements
+ return len(self._cases_variables[var]["variables"])
+ elif typ == "model": # get the basic (FMU) model
+ return self._cases_variables[var]["model"]
+ else:
+ raise KeyError(f"Unknown typ {typ} within info()") from None
+
+ def symbol(self, name: str, length: int = 1):
+ """Get or set a symbol.
- @property
- def symbols(self):
- return self._symbols
+ Args:
+ key (str): The symbol identificator (name)
+ length (int)=1: Optional length. 1,2,3 allowed.
+ Vectors are registered as # + for the whole vector
- def symbol(self, name: str):
+ Returns: The sympy Symbol corresponding to the name 'key'
+ """
try:
- return self._symbols[name]
- except KeyError:
- return None
-
- @staticmethod
- def do_sympify(_expr):
- """Evaluate the initial expression as sympy expression.
- Return the sympified expression or throw an error if sympification is not possible.
+ sym = self._symbols[name]
+ except KeyError: # not yet registered
+ assert length > 0, f"Vector length should be positive. Found {length}"
+ if length > 1:
+ self._symbols.update({name: np.ones(length, dtype=float)}) # type: ignore
+ else:
+ self._symbols.update({name: 1})
+ sym = self._symbols[name]
+ return sym
+
+ def expr(self, key: str, ex: str | None = None):
+ """Get or set an expression.
+
+ Args:
+ key (str): the expression identificator
+ ex (str): Optional expression as string. If not None, register/update the expression as key
+
+ Returns: the sympified expression
"""
- if "==" in _expr:
- raise ValueError("'==' cannot be used to check equivalence. Use 'a-b' and check against 0") from None
+
+ def make_func(name: str, args: dict, body: str):
+ """Make a python function from the body."""
+ code = "def _" + name + "("
+ for a in args:
+ code += a + ", "
+ code += "):\n"
+ # code += " print('dir:', dir())\n"
+ code += " return " + body + "\n"
+ return code
+
+ if ex is None: # getter
+ try:
+ ex = self._expr[key]
+ except KeyError as err:
+ raise Exception(f"Expression with identificator {key} is not found") from err
+ else:
+ return ex
+ else: # setter
+ syms, funcs = self.expr_get_symbols_functions(ex)
+ self._syms.update({key: syms})
+ self._funcs.update({key: funcs})
+ code = make_func(key, syms, ex)
+ try:
+ # print("GLOBALS", globals())
+ # print("LOCALS", locals())
+ # exec( code, globals(), locals()) # compile using the defined symbols
+ compiled = compile(code, "", "exec") # compile using the defined symbols
+ except ValueError as err:
+ raise Exception(f"Something wrong with expression {ex}: {err}|. Cannot compile.") from None
+ else:
+ self._expr.update({key: ex})
+ self._compiled.update({key: compiled})
+ # print("KEY", key, ex, syms, compiled)
+ return compiled
+
+ def syms(self, key: str):
+ """Get the symbols of the expression 'key'."""
try:
- expr = sympify(_expr)
- except ValueError as err:
- raise Exception(f"Something wrong with expression {_expr}: {err}|. Cannot sympify.") from None
- return expr
+ syms = self._syms[key]
+ except KeyError as err:
+ raise Exception(f"Expression {key} was not found") from err
+ else:
+ return syms
- def get_symbols(self):
- """Get the atom symbols used in the expression. Return the symbols as dict of `name : symbol`."""
- syms = self._expr.atoms(Symbol)
- return {s.name: s for s in syms}
+ def expr_get_symbols_functions(self, expr: str) -> tuple:
+ """Get the symbols used in the expression.
- @staticmethod
- def casesvar_to_symbol(variables: dict):
- """Register all variables defined in cases as sympy symbols.
+ 1. Symbol as listed in expression and function body. In general _[]
+ 2. Argument as used in the argument list of the function call. In general _
+ 3. Fully qualified symbol: (, , |None)
- Args:
- variables (dict): The variables dict as registered in Cases
+ If there is only a single instance, it is allowed to skip in 1 and 2
+
+ Returns
+ -------
+ tuple of (syms, funcs),
+ where syms is a dict {_ : fully-qualified-symbol tuple, ...}
+
+ funcs is a list of functions used in the expression.
"""
- for var in variables:
- sym = sympify(var)
- Assertion.update_namespace({var: sym})
- @staticmethod
- def reset():
- """Reset the global dictionary of symbols used by all Assertions."""
- Assertion.ns = {}
+ def ast_walk(node: ast.AST, syms: list | None = None, funcs: list | None = None):
+ """Recursively walk an ast node (width first) and collect symbol and function names."""
+ if syms is None:
+ syms = []
+ if funcs is None:
+ funcs = []
+ for n in ast.iter_child_nodes(node):
+ if isinstance(n, ast.Name):
+ if n.id in self._symbols:
+ if isinstance(syms, list) and n.id not in syms:
+ syms.append(n.id)
+ elif isinstance(node, ast.Call):
+ if isinstance(funcs, list) and n.id not in funcs:
+ funcs.append(n.id)
+ else:
+ raise KeyError(f"Unknown symbol {n.id}")
+ syms, funcs = ast_walk(n, syms, funcs)
+ return (syms, funcs)
- @staticmethod
- def update_namespace(sym: dict):
- """Ensure that the symbols of this expression are registered in the global namespace `ns`
- and include all global namespace symbols in the symbol list of this class.
+ if expr in self._expr: # assume that actually a key is queried
+ expr = self._expr[expr]
+ syms, funcs = ast_walk(ast.parse(expr, "", "exec"))
+ syms = sorted(syms, key=list(self._symbols.keys()).index)
+ return (syms, funcs)
+
+ def temporal(self, key: str, typ: Temporal | str | None = None, args: tuple | None = None):
+ """Get or set a temporal instruction.
Args:
- sym (dict): dict of {symbol-name : symbol}
+ key (str): the assert key
+ typ (str): optional temporal type
"""
- for n, s in sym.items():
- if n not in Assertion.ns:
- Assertion.ns.update({n: s})
+ if typ is None: # getter
+ try:
+ temp = self._temporal[key]
+ except KeyError as err:
+ raise Exception(f"Temporal instruction for {key} is not found") from err
+ else:
+ return temp
+ else: # setter
+ if isinstance(typ, Temporal):
+ self._temporal.update({key: {"type": typ, "args": args}})
+ elif isinstance(typ, str):
+ self._temporal.update({key: {"type": Temporal[typ], "args": args}})
+ else:
+ raise ValueError(f"Unknown temporal type {typ}") from None
+ return self._temporal[key]
+
+ def description(self, key: str, descr: str | None = None):
+ """Get or set a description."""
+ if descr is None: # getter
+ try:
+ _descr = self._description[key]
+ except KeyError as err:
+ raise Exception(f"Description for {key} not found") from err
+ else:
+ return _descr
+ else: # setter
+ self._description.update({key: descr})
+ return descr
+
+ def assertions(self, key: str, res: bool | None = None, details: str | None = None, case_name: str | None = None):
+ """Get or set an assertion result."""
+ if res is None: # getter
+ try:
+ _res = self._assertions[key]
+ except KeyError as err:
+ raise Exception(f"Assertion results for {key} not found") from err
+ else:
+ return _res
+ else: # setter
+ self._assertions.update({key: {"passed": res, "details": details, "case": case_name}})
+ return self._assertions[key]
- # for name, sym in Assertion.ns:
- # if name not in self._symbols:
- # sym = sympify( name)
- # self._symbols.update( {name : sym})
+ def register_vars(self, variables: dict):
+ """Register the variables in varnames as symbols.
- @staticmethod
- def vector(x: tuple | list):
- assert isinstance(x, (tuple, list)) and len(x) == 3, f"Vector of length 3 expected. Found {x}"
- return x[0] * Assertion.N.i + x[1] * Assertion.N.j + x[2] * Assertion.N.k # type: ignore
+ Can be used directly from Cases with varnames = tuple( Cases.variables.keys())
+ """
+ self._cases_variables = variables # remember the full dict for retrieval of details
+ for key, info in variables.items():
+ for inst in info["instances"]:
+ if len(info["instances"]) == 1: # the instance is unique
+ self.symbol(key, len(info["variables"])) # we allow to use the 'short name' if unique
+ self.symbol(inst + "_" + key, len(info["variables"])) # fully qualified name can always be used
+
+ def make_locals(self, loc: dict):
+ """Adapt the locals with 'allowed' functions."""
+ from importlib import import_module
+
+ for modulename, funclist in self._imports.items():
+ module = import_module(modulename)
+ for func in funclist:
+ loc.update({func: getattr(module, func)})
+ loc.update({"np": import_module("numpy")})
+ return loc
+
+ def _eval(self, func: Callable, kvargs: dict | list | tuple):
+ """Call a function of multiple arguments and return the single result.
+ All internal vecor arguments are transformed to np.arrays.
+ """
+ if isinstance(kvargs, dict):
+ for k, v in kvargs.items():
+ if isinstance(v, Iterable):
+ kvargs[k] = np.array(v, float)
+ return func(**kvargs)
+ elif isinstance(kvargs, list):
+ for i, v in enumerate(kvargs):
+ if isinstance(v, Iterable):
+ kvargs[i] = np.array(v, dtype=float)
+ return func(*kvargs)
+ elif isinstance(kvargs, tuple):
+ _args = [] # make new, because tuple is not mutable
+ for v in kvargs:
+ if isinstance(v, Iterable):
+ _args.append(np.array(v, dtype=float))
+ else:
+ _args.append(v)
+ return func(*_args)
- def assert_single(self, subs: list[tuple]):
- """Perform assertion on a single data point.
+ def eval_single(self, key: str, kvargs: dict | list | tuple):
+ """Perform assertion of 'key' on a single data point.
Args:
- subs (list): list of tuples of `(variable-name, value)`,
- where the independent variable (normally the time) shall be listed first.
+ key (str): The expression identificator to be used
+ kvargs (dict|list|tuple): variable substitution kvargs as dict or args as tuple/list
All required variables for the evaluation shall be listed.
- The variable-name provided as string is translated to its symbol before evaluation.
Results:
(bool) result of assertion
"""
- _subs = [(self._symbols[s[0]], s[1]) for s in subs]
- return self._expr.subs(_subs)
+ assert key in self._compiled, f"Expression {key} not found"
+ loc = self.make_locals(locals())
+ exec(self._compiled[key], loc, loc)
+ # print("kvargs", kvargs, self._syms[key], self.expr_get_symbols_functions(key))
+ return self._eval(locals()["_" + key], kvargs)
- def assert_series(self, subs: list[tuple], ret: str = "bool"):
+ def eval_series(self, key: str, data: list[Any], ret: float | str | Callable | None = None):
"""Perform assertion on a (time) series.
Args:
- subs (list): list of tuples of `(variable-symbol, list-of-values)`,
- where the independent variable (normally the time) shall be listed first.
- All required variables for the evaluation shall be listed
- The variable-name provided as string is translated to its symbol before evaluation.
+ key (str): Expression identificator
+ data (tuple): data table with arguments as columns and series in rows,
+ where the independent variable (normally the time) shall be listed first in each row.
+ All required variables for the evaluation shall be listed (columns)
+ The names of variables correspond to self._syms[key], but is taken as given here.
ret (str)='bool': Determines how to return the result of the assertion:
- `bool` : True if any element of the assertion of the series is evaluated to True
- `bool-list` : List of True/False for each data point in the series
- `interval` : tuple of interval of indices for which the assertion is True
- `count` : Count the number of points where the assertion is True
+ float : Linear interpolation of result at the given float time
+ `bool` : (time, True/False) for first row evaluating to True.
+ `bool-list` : (times, True/False) for all data points in the series
+ `A` : Always true for the whole time-series. Same as 'bool'
+ `F` : is True at end of time series.
+ Callable : run the given callable on times, expr(data)
+ None : Use the internal 'temporal(key)' setting
Results:
- bool, list[bool], tuple[int] or int, depending on `ret` parameter.
- Default: True/False on whether at least one record is found where the assertion is True.
+ tuple of (time(s), value(s)), depending on `ret` parameter
"""
- _subs = [(self._symbols[s[0]], s[1]) for s in subs]
- length = len(subs[0][1])
- result = [False] * length
-
- for i in range(length):
- s = []
- for k in range(len(_subs)): # number of variables in substitution
- s.append((_subs[k][0], _subs[k][1][i]))
- res = self._expr.subs(s)
- if res:
- result[i] = True
- if ret == "bool":
- return True in result
- elif ret == "bool-list":
- return result
- elif ret == "interval":
- if True in result:
- idx0 = result.index(True)
- if False in result[idx0:]:
- return (idx0, idx0 + result[idx0:].index(False))
- else:
- return (idx0, length)
+ times = [] # return the independent variable values (normally time)
+ results = [] # return the scalar results at all times
+ bool_type = (ret is None and self.temporal(key)["type"] in (Temporal.A, Temporal.F)) or (
+ isinstance(ret, str) and (ret in ["A", "F"] or ret.startswith("bool"))
+ )
+ argnames = self._syms[key]
+ loc = self.make_locals(locals())
+ exec(self._compiled[key], loc, loc) # the function is then available as _ among locals()
+ func = locals()["_" + key] # scalar function of all used arguments
+ _temp = self._temporal[key]["type"] if ret is None else Temporal.UNDEFINED
+
+ for row in data:
+ if not isinstance(row, Iterable): # can happen if the time itself is evaluated
+ time = row
+ row = [row]
+ elif "t" not in argnames: # the independent variable is not explicitly used in the expression
+ time = row[0]
+ row = row[1:]
+ assert len(row), f"Time data in eval_series seems to be lacking. Data:{data}, Argnames:{argnames}"
+ else: # time used also explicitly in the expression
+ time = row[0]
+ res = func(*row)
+ if bool_type:
+ res = bool(res)
+
+ times.append(time)
+ results.append(res) # Note: res is always a scalar result
+
+ if (ret is None and _temp == Temporal.A) or (isinstance(ret, str) and ret in ("A", "bool")): # always True
+ for t, v in zip(times, results, strict=False):
+ if v:
+ return (t, True)
+ return (times[-1], False)
+ elif (ret is None and _temp == Temporal.F) or (isinstance(ret, str) and ret == "F"): # finally True
+ t_true = times[-1]
+ for t, v in zip(times, results, strict=False):
+ if v and t_true > t:
+ t_true = t
+ elif not v and t_true < t: # detected False after expression became True
+ t_true = times[-1]
+ return (t_true, t_true < times[-1])
+ elif isinstance(ret, str) and ret == "bool-list":
+ return (times, results)
+ elif (ret is None and _temp == Temporal.T) or (isinstance(ret, float)):
+ if isinstance(ret, float):
+ t0 = ret
else:
- return None
- elif ret == "count":
- return sum(x for x in result)
+ assert len(self._temporal[key]["args"]), "Need a temporal argument (time at which to interpolate)"
+ t0 = self._temporal[key]["args"][0]
+ # idx = min(range(len(times)), key=lambda i: abs(times[i]-t0))
+ # print("INDEX", t0, idx, results[idx-10:idx+10])
+ # return (t0, results[idx])
+ # else:
+ interpolated = np.interp(t0, times, results)
+ return (t0, bool(interpolated) if all(isinstance(res, bool) for res in results) else interpolated)
+ elif callable(ret):
+ return (times, ret(results))
else:
raise ValueError(f"Unknown return type '{ret}'") from None
+
+ def do_assert(self, key: str, result: Any, case_name: str | None = None):
+ """Perform assert action 'key' on data of 'result' object."""
+ assert isinstance(key, str) and key in self._temporal, f"Assertion key {key} not found"
+ from sim_explorer.case import Results
+
+ assert isinstance(result, Results), f"Results object expected. Found {result}"
+ inst = []
+ var = []
+ for sym in self._syms[key]:
+ inst.append(self.info(sym, "instance"))
+ var.append(self.info(sym, "variable"))
+ assert len(var), "No variables to retrieve"
+ if var[0] == "t": # the independent variable is always the first column in data
+ inst.pop(0)
+ var.pop(0)
+
+ data = result.retrieve(zip(inst, var, strict=False))
+ res = self.eval_series(key, data, ret=None)
+ if self._temporal[key]["type"] == Temporal.A:
+ self.assertions(key, res[1], None, case_name)
+ elif self._temporal[key]["type"] == Temporal.F:
+ self.assertions(key, res[1], f"@{res[0]}", case_name)
+ elif self._temporal[key]["type"] == Temporal.T:
+ self.assertions(key, res[1], f"@{res[0]} (interpolated)", case_name)
+ return res[1]
+
+ def do_assert_case(self, result: Any) -> list[int]:
+ """Perform all assertions defined for the case related to the result object."""
+ count = [0, 0]
+ for key in result.case.asserts:
+ self.do_assert(key, result, result.case.name)
+ count[0] += self._assertions[key]["passed"]
+ count[1] += 1
+ return count
+
+ def report(self, case: Any = None) -> Iterator[AssertionResult]:
+ """Report on all registered asserts.
+ If case denotes a case object, only the results for this case are reported.
+ """
+
+ def do_report(key: str):
+ time_arg = self._temporal[key].get("args", None)
+ return AssertionResult(
+ key=key,
+ expression=self._expr[key],
+ time=time_arg[0]
+ if len(time_arg) > 0 and (isinstance(time_arg[0], int) or isinstance(time_arg[0], float))
+ else None,
+ result=self._assertions[key].get("passed", False),
+ description=self._description[key],
+ temporal=self._temporal[key].get("type", None),
+ case=self._assertions[key].get("case", None),
+ details="No details",
+ )
+
+ from sim_explorer.case import Case
+
+ if isinstance(case, Case):
+ for key in case.asserts:
+ yield do_report(key)
+ else: # report all
+ for key in self._assertions:
+ yield do_report(key)
diff --git a/src/sim_explorer/case.py b/src/sim_explorer/case.py
index 4bbde3e..a154c13 100644
--- a/src/sim_explorer/case.py
+++ b/src/sim_explorer/case.py
@@ -1,20 +1,20 @@
-# pyright: reportMissingImports=false, reportGeneralTypeIssues=false
from __future__ import annotations
-import math
import os
from collections.abc import Callable
from datetime import datetime
from functools import partial
from pathlib import Path
-from typing import Any
+from typing import Any, Iterable, List
import matplotlib.pyplot as plt
import numpy as np
-from libcosimpy.CosimLogging import CosimLogLevel, log_output_level
+from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore
+from sim_explorer.assertion import Assertion # type: ignore
from sim_explorer.exceptions import CaseInitError
from sim_explorer.json5 import Json5
+from sim_explorer.models import AssertionResult, Temporal
from sim_explorer.simulator_interface import SimulatorInterface
from sim_explorer.utils.misc import from_xml
from sim_explorer.utils.paths import get_path, relative_path
@@ -102,7 +102,11 @@ def __init__(
if _results is not None:
for _res in _results:
self.read_spec_item(_res)
-
+ self.asserts: list = [] # list of assert keys
+ _assert = self.js.jspath("$.assert", dict)
+ if _assert is not None:
+ for k, v in _assert.items():
+ _ = self.read_assertion(k, v)
if self.name == "base":
self.special = self._ensure_specials(self.special) # must specify for base case
self.act_get = dict(sorted(self.act_get.items()))
@@ -199,7 +203,51 @@ def _num_elements(obj) -> int:
else:
return 1
- def _disect_at_time(self, txt: str, value: Any | None = None) -> tuple[str, str, float]:
+ def _disect_at_time_tl(self, txt: str, value: Any | None = None) -> tuple[str, Temporal, tuple]:
+ """Disect the @txt argument into 'at_time_type' and 'at_time_arg' for Temporal specification.
+
+ Args:
+ txt (str): The key text after '@' and before ':'
+ value (Any): the value argument. Needed to distinguish the action type
+
+ Returns
+ -------
+ tuple of pre, type, arg, where
+ pre is the text before '@',
+ type is the Temporal type,
+ args is the tuple of temporal arguments (may be empty)
+ """
+
+ def time_spec(at: str):
+ """Analyse the specification after '@' and disect into typ and arg."""
+ try:
+ arg_float = float(at)
+ return (Temporal["T"], (arg_float,))
+ except ValueError:
+ for i in range(len(at) - 1, -1, -1):
+ try:
+ typ = Temporal[at[i]]
+ except KeyError:
+ pass
+ else:
+ if at[i + 1 :].strip() == "":
+ return (typ, ())
+ elif typ == Temporal.T:
+ return (typ, (float(at[i + 1 :].strip()),))
+ else:
+ return (typ, (at[i + 1 :].strip(),))
+ raise ValueError(f"Unknown Temporal specification {at}") from None
+
+ pre, _, at = txt.partition("@")
+ assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time"
+ assert isinstance(value, list), f"Assertion spec expected: [expression, description]. Found {value}"
+ if not len(at): # no @time spec. Assume 'A'lways
+ return (pre, Temporal.ALWAYS, ())
+ else:
+ typ, arg = time_spec(at)
+ return (pre, typ, arg)
+
+ def _disect_at_time_spec(self, txt: str, value: Any | None = None) -> tuple[str, str, float]:
"""Disect the @txt argument into 'at_time_type' and 'at_time_arg'.
Args:
@@ -213,12 +261,25 @@ def _disect_at_time(self, txt: str, value: Any | None = None) -> tuple[str, str,
type is the type of action (get, set, step),
arg is the time argument, or -1
"""
+
+ def time_spec(at: str):
+ """Analyse the specification after '@' and disect into typ and arg."""
+ try:
+ arg_float = float(at)
+ return ("set" if Case._num_elements(value) else "get", arg_float)
+ except ValueError:
+ arg_float = float("-inf")
+ if at.startswith("step"):
+ try:
+ return ("step", float(at[4:]))
+ except Exception:
+ return ("step", -1) # this means 'all macro steps'
+ else:
+ raise AssertionError(f"Unknown '@{txt}'. Case:{self.name}, value:'{value}'") from None
+
pre, _, at = txt.partition("@")
assert len(pre), f"'{txt}' is not allowed as basis for _disect_at_time"
- if value in (
- "result",
- "res",
- ): # marking a normal variable specification as 'get' or 'step' action
+ if value in ("result", "res"): # mark variable specification as 'get' or 'step' action
value = None
if not len(at): # no @time spec
if value is None:
@@ -228,29 +289,31 @@ def _disect_at_time(self, txt: str, value: Any | None = None) -> tuple[str, str,
assert Case._num_elements(value), msg
return (pre, "set", 0) # set at startTime
else: # time spec provided
- try:
- arg_float = float(at)
- except Exception:
- arg_float = float("nan")
- if math.isnan(arg_float):
- if at.startswith("step"):
- try:
- return (pre, "step", float(at[4:]))
- except Exception:
- return (pre, "step", -1) # this means 'all macro steps'
- else:
- raise AssertionError(f"Unknown @time instruction {txt}. Case:{self.name}, value:'{value}'")
- else:
- return (pre, "set" if Case._num_elements(value) else "get", arg_float)
+ typ, arg = time_spec(at)
+ return (pre, typ, arg)
- def read_assertion(self, key: str, expr: Any | None = None):
- """Read an assert statement, compile as sympy expression and return the Assertion object.
+ def read_assertion(self, key: str, expr_descr: list | None = None):
+ """Read an assert statement, compile as sympy expression, register and store the key..
Args:
key (str): Identification key for the assertion. Should be unique. Recommended to use numbers
- expr: A sympy expression using available variables
+
+ Also assertion keys can have temporal specifications (@...) with the following possibilities:
+
+ * @A : The expression is expected to be Always (globally) true
+ * @F : The expression is expected to be true during the end of the simulation
+ * @ or @T: The expression is expected to be true at the specific time value
+ expr: A python expression using available variables
"""
- return
+ key, at_time_type, at_time_arg = self._disect_at_time_tl(key, expr_descr)
+ assert isinstance(expr_descr, list), f"Assertion expression {expr_descr} should include a description."
+ expr, descr = expr_descr
+ self.cases.assertion.expr(key, expr)
+ self.cases.assertion.description(key, descr)
+ self.cases.assertion.temporal(key, at_time_type, at_time_arg)
+ if key not in self.asserts:
+ self.asserts.append(key)
+ return key
def read_spec_item(self, key: str, value: Any | None = None):
"""Use the alias variable information (key) and the value to construct an action function,
@@ -295,7 +358,7 @@ def read_spec_item(self, key: str, value: Any | None = None):
if key in ("startTime", "stopTime", "stepSize"):
self.special.update({key: value}) # just keep these as a dictionary so far
else: # expect a variable-alias : value(s) specificator
- key, at_time_type, at_time_arg = self._disect_at_time(key, value)
+ key, at_time_type, at_time_arg = self._disect_at_time_spec(key, value)
if at_time_type in ("get", "step"):
value = None
key, cvar_info, rng = self.cases.disect_variable(key)
@@ -533,10 +596,11 @@ class Cases:
"timefac",
"variables",
"base",
- "results",
+ "assertion",
"_comp_refs_to_case_var_cache",
"results_print_type",
)
+ assertion_results: List[AssertionResult] = []
def __init__(self, spec: str | Path, simulator: SimulatorInterface | None = None):
self.file = Path(spec) # everything relative to the folder of this file!
@@ -563,9 +627,9 @@ def __init__(self, spec: str | Path, simulator: SimulatorInterface | None = None
self.timefac = self._get_time_unit() * 1e9 # internally OSP uses pico-seconds as integer!
# read the 'variables' section and generate dict { alias : { (instances), (variables)}}:
self.variables = self.get_case_variables()
- self._comp_refs_to_case_var_cache: dict = (
- dict()
- ) # cache of results indices translations used by comp_refs_to_case_var()
+ self.assertion = Assertion()
+ self.assertion.register_vars(self.variables) # register variables as symbols
+ self._comp_refs_to_case_var_cache: dict = dict() # cache used by comp_refs_to_case_var()
self.read_cases()
def get_case_variables(self) -> dict[str, dict]:
@@ -675,9 +739,7 @@ def read_cases(self):
if k not in ("header", "base"):
_ = Case(self, k, spec=self.js.jspath(f"$.{k}", dict, True))
else:
- raise CaseInitError(
- f"Mandatory main section 'base' is needed. Found {list(self.js.js_py.keys())}"
- ) from None
+ raise CaseInitError(f"Main section 'base' is needed. Found {list(self.js.js_py.keys())}") from None
def case_by_name(self, name: str) -> Case | None:
"""Find the case 'name' amoung all defined cases. Return None if not found.
@@ -843,7 +905,7 @@ def comp_refs_to_case_var(self, comp: int, refs: tuple[int, ...]):
self._comp_refs_to_case_var_cache[comp].update({refs: (component, var)})
return component, var
- def run_case(self, name: str | Case, dump: str | None = "", run_subs: bool = False):
+ def run_case(self, name: str | Case, dump: str | None = "", run_subs: bool = False, run_assertions: bool = False):
"""Initiate case run. If done from here, the case name can be chosen.
If run_subs = True, also the sub-cases are run.
"""
@@ -856,8 +918,16 @@ def run_case(self, name: str | Case, dump: str | None = "", run_subs: bool = Fal
raise ValueError(f"Invalid argument name:{name}") from None
c.run(dump)
+
+ if run_assertions and c:
+ # Run assertions on every case after running the case -> results will be saved in memory for now
+ self.assertion.do_assert_case(c.res)
+
+ if not run_subs:
+ return None
+
for _c in c.subs:
- self.run_case(_c, dump)
+ self.run_case(_c, dump, run_subs, run_assertions)
class Results:
@@ -1014,7 +1084,8 @@ def inspect(self, component: str | None = None, variable: str | None = None):
component (str): Possibility to inspect only data with respect to a given component
variable (str): Possibility to inspect only data with respect to a given variable
- Retruns:
+ Returns
+ -------
A dictionary { : {'len':#data points, 'range':[tMin, tMax], 'info':info-dict}
The info-dict is and element of Cases.variables. See Cases.get_case_variables() for definition.
"""
@@ -1046,49 +1117,66 @@ def inspect(self, component: str | None = None, variable: str | None = None):
)
return cont
- def time_series(self, variable: str):
- """Extract the provided alias variables and make them available as two lists 'times' and 'values'
- of equal length.
+ def retrieve(self, comp_var: Iterable) -> list:
+ """Retrieve from results js5-dict the variables and return (times, values).
Args:
- variable (str): variable identificator as str.
- A variable identificator is the jspath expression after the time, i.e. .[]
- For example 'bb.v[2]' identifies the z-velocity of the component 'bb'
-
- Returns
- -------
- tuple of two lists (times, values)
+ comp_var (Iterable): iterable of (, [, element])
+ Alternatively, the jspath syntax .[[element]] can be used as comp_var.
+ Time is not explicitly including in comp_var
+ A record is only included if all variable are found for a given time
+ Returns:
+ Data table (list of lists), time and one column per variable
"""
- if not len(self.res.js_py) or self.case is None:
- return
- times: list = []
- values: list = []
- for key in self.res.js_py:
- found = self.res.jspath("$['" + str(key) + "']." + variable)
- if found is not None:
- if isinstance(found, list):
- raise NotImplementedError("So far not implemented for multi-dimensional plots") from None
- else:
- times.append(float(key))
- values.append(found)
- return (times, values)
-
- def plot_time_series(self, variables: str | list[str], title: str = ""):
+ data = []
+ _comp_var = []
+ for _cv in comp_var:
+ el = None
+ if isinstance(_cv, str): # expect . syntax
+ comp, var = _cv.split(".")
+ if "[" in var and var[-1] == "]": # explicit element
+ var, _el = var.split("[")
+ el = int(_el[:-1])
+ else: # expect (, ) syntax
+ comp, var = _cv
+ _comp_var.append((comp, var, el))
+
+ for key, values in self.res.js_py.items():
+ if key != "header":
+ time = float(key)
+ record = [time]
+ is_complete = True
+ for comp, var, el in _comp_var:
+ try:
+ _rec = values[comp][var]
+ except KeyError:
+ is_complete = False
+ break # give up
+ else:
+ record.append(_rec if el is None else _rec[el])
+
+ if is_complete:
+ data.append(record)
+ return data
+
+ def plot_time_series(self, comp_var: Iterable, title: str = ""):
"""Extract the provided alias variables and plot the data found in the same plot.
Args:
- variables (list[str]): list of variable identificators as str.
- A variable identificator is the jspath expression after the time, i.e. .[]
- For example 'bb.v[2]' identifies the z-velocity of the component 'bb'
+ comp_var (Iterable): Iterable of (,) tuples (as used in retrieve)
+ Alternatively, the jspath syntax . is also accepted
title (str): optional title of the plot
"""
- if not isinstance(variables, list):
- variables = [
- variables,
- ]
- for var in variables:
- times, values = self.time_series(var)
-
+ data = self.retrieve(comp_var)
+ times = [rec[0] for rec in data]
+ for i, var in enumerate(comp_var):
+ if isinstance(var, str):
+ label = var
+ else:
+ label = var[0] + "." + var[1]
+ if len(var) > 2:
+ label += "[" + var[2] + "]"
+ values = [rec[i + 1] for rec in data]
plt.plot(times, values, label=var, linewidth=3)
if len(title):
diff --git a/src/sim_explorer/cli/display_results.py b/src/sim_explorer/cli/display_results.py
new file mode 100644
index 0000000..c01c178
--- /dev/null
+++ b/src/sim_explorer/cli/display_results.py
@@ -0,0 +1,85 @@
+from rich.console import Console
+from rich.panel import Panel
+
+from sim_explorer.models import AssertionResult
+
+console = Console()
+
+
+def reconstruct_assertion_name(result: AssertionResult) -> str:
+ """
+ Reconstruct the assertion name from the key and expression.
+
+ :param result: Assertion result.
+ :return: Reconstructed assertion name.
+ """
+ time = result.time if result.time is not None else ""
+ return f"{result.key}@{result.temporal.name}{time}({result.expression})"
+
+
+def log_assertion_results(results: dict[str, list[AssertionResult]]):
+ """
+ Log test scenarios and results in a visually appealing bullet-point list format.
+
+ :param scenarios: Dictionary where keys are scenario names and values are lists of test results.
+ Each test result is a tuple (test_name, status, details).
+ Status is True for pass, False for fail.
+ """
+ total_passed = 0
+ total_failed = 0
+
+ console.print()
+
+ # Print results for each assertion executed in each of the cases ran
+ for case_name, assertions in results.items():
+ # Show case name first
+ console.print(f"[bold magenta]• {case_name}[/bold magenta]")
+ for assertion in assertions:
+ if assertion.result:
+ total_passed += 1
+ else:
+ total_failed += 1
+
+ # Print assertion status, details and error message if failed
+ status_icon = "✅" if assertion.result else "❌"
+ status_color = "green" if assertion.result else "red"
+ assertion_name = reconstruct_assertion_name(assertion)
+
+ # Need to add some padding to show that the assertion belongs to a case
+ console.print(f" [{status_color}]{status_icon}[/] [cyan]{assertion_name}[/cyan]: {assertion.description}")
+
+ if not assertion.result:
+ console.print(" [red]⚠️ Error:[/] [dim]Assertion has failed[/dim]")
+
+ console.print() # Add spacing between scenarios
+
+ if total_failed == 0 and total_passed == 0:
+ return
+
+ # Summary at the end
+ passed_tests = f"[green]✅ {total_passed} tests passed[/green] 😎" if total_passed > 0 else ""
+ failed_tests = f"[red]❌ {total_failed} tests failed[/red] 😭" if total_failed > 0 else ""
+ padding = " " if total_passed > 0 and total_failed > 0 else ""
+ console.print(
+ Panel.fit(
+ f"{passed_tests}{padding}{failed_tests}", title="[bold blue]Test Summary[/bold blue]", border_style="blue"
+ )
+ )
+
+
+def group_assertion_results(results: list[AssertionResult]) -> dict[str, list[AssertionResult]]:
+ """
+ Group test results by case name.
+
+ :param results: list of assertion results.
+ :return: Dictionary where keys are case names and values are lists of assertion results.
+ """
+ grouped_results: dict[str, list[AssertionResult]] = {}
+ for result in results:
+ case_name = result.case
+ if case_name and case_name not in grouped_results:
+ grouped_results[case_name] = []
+
+ if case_name:
+ grouped_results[case_name].append(result)
+ return grouped_results
diff --git a/src/sim_explorer/cli/sim_explorer.py b/src/sim_explorer/cli/sim_explorer.py
index f90c6d3..ee9a407 100644
--- a/src/sim_explorer/cli/sim_explorer.py
+++ b/src/sim_explorer/cli/sim_explorer.py
@@ -7,6 +7,10 @@
import sys
from pathlib import Path
+from sim_explorer.case import Case, Cases
+from sim_explorer.cli.display_results import group_assertion_results, log_assertion_results
+from sim_explorer.utils.logging import configure_logging
+
# Remove current directory from Python search path.
# Only through this trick it is possible that the current CLI file 'sim_explorer.py'
# carries the same name as the package 'sim_explorer' we import from in the next lines.
@@ -14,8 +18,6 @@
# Python would start searching for the imported names within the current file (sim_explorer.py)
# instead of the package 'sim_explorer' (and the import statements fail).
sys.path = [path for path in sys.path if Path(path) != Path(__file__).parent]
-from sim_explorer.case import Case, Cases
-from sim_explorer.utils.logging import configure_logging
logger = logging.getLogger(__name__)
@@ -153,12 +155,19 @@ def main() -> None:
elif args.run is not None:
case = cases.case_by_name(args.run)
+
if case is None:
logger.error(f"Case {args.run} not found in {args.cases}")
return
+
logger.info(f"{log_msg_stub}\t option: run \t\t\t{args.run}\n")
# Invoke API
- case.run()
+ cases.run_case(case, run_subs=False, run_assertions=True)
+
+ # Display assertion results
+ assertion_results = [assertion for assertion in cases.assertion.report()]
+ grouped_results = group_assertion_results(assertion_results)
+ log_assertion_results(grouped_results)
elif args.Run is not None:
case = cases.case_by_name(args.Run)
@@ -167,7 +176,12 @@ def main() -> None:
return
logger.info(f"{log_msg_stub}\t --Run \t\t\t{args.Run}\n")
# Invoke API
- cases.run_case(case, run_subs=True)
+ cases.run_case(case, run_subs=True, run_assertions=True)
+
+ # Display assertion results
+ assertion_results = [assertion for assertion in cases.assertion.report()]
+ grouped_results = group_assertion_results(assertion_results)
+ log_assertion_results(grouped_results)
if __name__ == "__main__":
diff --git a/src/sim_explorer/models.py b/src/sim_explorer/models.py
new file mode 100644
index 0000000..caa4770
--- /dev/null
+++ b/src/sim_explorer/models.py
@@ -0,0 +1,24 @@
+from enum import IntEnum
+
+from pydantic import BaseModel
+
+
+class Temporal(IntEnum):
+ UNDEFINED = 0
+ A = 1
+ ALWAYS = 1
+ F = 2
+ FINALLY = 2
+ T = 3
+ TIME = 3
+
+
+class AssertionResult(BaseModel):
+ key: str
+ expression: str
+ result: bool
+ temporal: Temporal
+ time: float | int | None
+ description: str
+ case: str | None
+ details: str
diff --git a/src/sim_explorer/simulator_interface.py b/src/sim_explorer/simulator_interface.py
index f16b5ef..8b87827 100644
--- a/src/sim_explorer/simulator_interface.py
+++ b/src/sim_explorer/simulator_interface.py
@@ -6,7 +6,7 @@
from libcosimpy.CosimEnums import CosimVariableCausality, CosimVariableType, CosimVariableVariability # type: ignore
from libcosimpy.CosimExecution import CosimExecution # type: ignore
-from libcosimpy.CosimLogging import CosimLogLevel, log_output_level
+from libcosimpy.CosimLogging import CosimLogLevel, log_output_level # type: ignore
from libcosimpy.CosimManipulator import CosimManipulator # type: ignore
from libcosimpy.CosimObserver import CosimObserver # type: ignore
diff --git a/src/sim_explorer/utils/osp.py b/src/sim_explorer/utils/osp.py
new file mode 100644
index 0000000..f42f778
--- /dev/null
+++ b/src/sim_explorer/utils/osp.py
@@ -0,0 +1,208 @@
+import xml.etree.ElementTree as ET # noqa: N817
+from pathlib import Path
+
+from sim_explorer.json5 import Json5
+
+
+# ==========================================
+# Open Simulation Platform related functions
+# ==========================================
+def make_osp_system_structure(
+ name: str = "OspSystemStructure",
+ version: str = "0.1",
+ start: float = 0.0,
+ base_step: float = 0.01,
+ algorithm: str = "fixedStep",
+ simulators: dict | None = None,
+ functions_linear: dict | None = None,
+ functions_sum: dict | None = None,
+ functions_vectorsum: dict | None = None,
+ connections_variable: tuple = (),
+ connections_signal: tuple = (),
+ connections_group: tuple = (),
+ connections_signalgroup: tuple = (),
+ path: Path | str = ".",
+):
+ """Prepare a OspSystemStructure xml file according to `OSP configuration specification `_.
+
+ Args:
+ name (str)='OspSystemStructure': the name of the system model, used also as file name
+ version (str)='0.1': The version of the OspSystemConfiguration xmlns
+ start (float)=0.0: The simulation start time
+ base_step (float)=0.01: The base stepSize of the simulation. The exact usage depends on the algorithm chosen
+ algorithm (str)='fixedStep': The name of the algorithm
+ simulators (dict)={}: dict of models (in OSP called 'simulators'). Per simulator:
+ : {source: , stepSize: , : value, ...} (values as python types)
+ functions_linear (dict)={}: dict of LinearTransformation function. Per function:
+ : {factor: , offset: }
+ functions_sum (dict)={}: dict of Sum functions. Per function:
+ : {inputCount: } (number of inputs to sum over)
+ functions_vectorsum (dict)={}: dict of VectorSum functions. Per function:
+ : {inputCount: , numericType: , dimension: }
+ connections_variable (tuple)=(): tuple of model connections.
+ Each connection is defined through (model, out-variable, model, in-variable)
+ connections_signal (tuple)=(): tuple of signal connections:
+ Each connection is defined through (model, variable, function, signal)
+ connections_group (tuple)=(): tuple of group connections:
+ Each connection is defined through (model, group, model, group)
+ connections_signalgroup (tuple)=(): tuple of signal group connections:
+ Each connection is defined through (model, group, function, signal-group)
+ dest (Path,str)='.': the path where the file should be saved
+
+ Returns
+ -------
+ The absolute path of the file as Path object
+
+ .. todo:: better stepSize control in dependence on algorithm selected, e.g. with fixedStep we should probably set all step sizes to the minimum of everything?
+ """
+
+ def element_text(tag: str, attr: dict | None = None, text: str | None = None):
+ el = ET.Element(tag, {} if attr is None else attr)
+ if text is not None:
+ el.text = text
+ return el
+
+ def make_simulators(simulators: dict | None):
+ """Make the element (list of component models)."""
+
+ def make_initial_value(var: str, val: bool | int | float | str):
+ """Make a element from the provided var dict."""
+ typ = {bool: "Boolean", int: "Integer", float: "Real", str: "String"}[type(val)]
+ initial = ET.Element("InitialValue", {"variable": var})
+ ET.SubElement(initial, typ, {"value": str(val)})
+ return initial
+
+ _simulators = ET.Element("Simulators")
+ if simulators is not None:
+ for m, props in simulators.items():
+ simulator = ET.Element(
+ "Simulator",
+ {
+ "name": m,
+ "source": props.get("source", m[0].upper() + m[1:] + ".fmu"),
+ "stepSize": str(props.get("stepSize", base_step)),
+ },
+ )
+ if "initialValues" in props:
+ initial = ET.SubElement(simulator, "InitialValues")
+ for var, value in props["initialValues"].items():
+ initial.append(make_initial_value(var, value))
+ _simulators.append(simulator)
+ # print(f"Model {m}: {simulator}. Length {len(simulators)}")
+ # ET.ElementTree(simulators).write("Test.xml")
+ return _simulators
+
+ def make_functions(f_linear: dict | None, f_sum: dict | None, f_vectorsum: dict | None):
+ _functions = ET.Element("Functions")
+ if f_linear is not None:
+ for key, val in f_linear:
+ _functions.append(
+ ET.Element("LinearTransformation", {"name": key, "factor": val["factor"], "offset": val["offset"]})
+ )
+ if f_sum is not None:
+ for key, val in f_sum:
+ _functions.append(ET.Element("Sum", {"name": key, "inputCount": val["inputCount"]}))
+ if f_vectorsum is not None:
+ for key, val in f_vectorsum:
+ _functions.append(
+ ET.Element(
+ "VectorSum",
+ {
+ "name": key,
+ "inputCount": val["inputCount"],
+ "numericType": val["numericType"],
+ "dimension": val["dimension"],
+ },
+ )
+ )
+ return _functions
+
+ def make_connections(c_variable: tuple, c_signal: tuple, c_group: tuple, c_signalgroup: tuple):
+ """Make the element from the provided con."""
+
+ def make_connection(main: str, sub1: str, attr1: dict, sub2: str, attr2: dict):
+ el = ET.Element(main)
+ ET.SubElement(el, sub1, attr1)
+ ET.SubElement(el, sub2, attr2)
+ return el
+
+ _cons = ET.Element("Connections")
+ for m1, v1, m2, v2 in c_variable:
+ _cons.append(
+ make_connection(
+ "VariableConnection",
+ "Variable",
+ {"simulator": m1, "name": v1},
+ "Variable",
+ {"simulator": m2, "name": v2},
+ )
+ )
+ for m1, v1, f, v2 in c_signal:
+ _cons.append(
+ make_connection(
+ "SignalConnection", "Variable", {"simulator": m1, "name": v1}, "Signal", {"function": f, "name": v2}
+ )
+ )
+ for m1, g1, m2, g2 in c_group:
+ _cons.append(
+ make_connection(
+ "VariableGroupConnection",
+ "VariableGroup",
+ {"simulator": m1, "name": g1},
+ "VariableGroup",
+ {"simulator": m2, "name": g2},
+ )
+ )
+ for m1, g1, f, g2 in c_signalgroup:
+ _cons.append(
+ make_connection(
+ "SignalGroupConnection",
+ "VariableGroup",
+ {"simulator": m1, "name": g1},
+ "SignalGroup",
+ {"function": f, "name": g2},
+ )
+ )
+ return _cons
+
+ osp = ET.Element(
+ "OspSystemStructure", {"xmlns": "http://opensimulationplatform.com/MSMI/OSPSystemStructure", "version": version}
+ )
+ osp.append(element_text("StartTime", text=str(start)))
+ osp.append(element_text("BaseStepSize", text=str(base_step)))
+ osp.append(make_simulators(simulators))
+ osp.append(make_functions(functions_linear, functions_sum, functions_vectorsum))
+ osp.append(make_connections(connections_variable, connections_signal, connections_group, connections_signalgroup))
+ tree = ET.ElementTree(osp)
+ ET.indent(tree, space=" ", level=0)
+ file = Path(path).absolute() / (name + ".xml")
+ tree.write(file, encoding="utf-8")
+ return file
+
+
+def osp_system_structure_from_js5(file: Path, dest: Path | None = None):
+ """Make a OspSystemStructure file from a js5 specification.
+ The js5 specification is closely related to the make_osp_systemStructure() function (and uses it).
+ """
+ assert file.exists(), f"File {file} not found"
+ assert file.name.endswith(".js5"), f"Json5 file expected. Found {file.name}"
+ js = Json5(file)
+
+ ss = make_osp_system_structure(
+ name=file.name[:-4],
+ version=js.jspath("$.header.version", str) or "0.1",
+ start=js.jspath("$.header.StartTime", float) or 0.0,
+ base_step=js.jspath("$.header.BaseStepSize", float) or 0.01,
+ algorithm=js.jspath("$.header.algorithm", str) or "fixedStep",
+ simulators=js.jspath("$.Simulators", dict) or {},
+ functions_linear=js.jspath("$.FunctionsLinear", dict) or {},
+ functions_sum=js.jspath("$.FunctionsSum", dict) or {},
+ functions_vectorsum=js.jspath("$.FunctionsVectorSum", dict) or {},
+ connections_variable=tuple(js.jspath("$.ConnectionsVariable", list) or []),
+ connections_signal=tuple(js.jspath("$.ConnectionsSignal", list) or []),
+ connections_group=tuple(js.jspath("$.ConnectionsGroup", list) or []),
+ connections_signalgroup=tuple(js.jspath("$.ConnectionsSignalGroup", list) or []),
+ path=dest or Path(file).parent,
+ )
+
+ return ss
diff --git a/tests/data/BouncingBall3D/BouncingBall3D.cases b/tests/data/BouncingBall3D/BouncingBall3D.cases
index 53c8ebe..d43fb64 100644
--- a/tests/data/BouncingBall3D/BouncingBall3D.cases
+++ b/tests/data/BouncingBall3D/BouncingBall3D.cases
@@ -21,12 +21,8 @@ base : {
x[2] : 39.37007874015748, # this is in inch => 1m!
x@step : 'result',
v@step : 'result',
- x_b[0]@step : 'res',
- },
-# assert: {
-# 1 : 'abs(g-9.81)<1e-9'
-# }
- },
+ x_b@step : 'res',
+ }},
restitution : {
description : "Smaller coefficient of restitution e",
spec: {
@@ -37,9 +33,20 @@ restitutionAndGravity : {
parent : 'restitution',
spec : {
g : 1.5
- }},
+ },
+ assert: {
+ 1@A : ['g==1.5', 'Check setting of gravity (about 1/7 of earth)'],
+ 2@ALWAYS : ['e==0.5', 'Check setting of restitution'],
+ 3@F : ['x[2] < 3.0', 'For long times the z-position of the ball remains small (loss of energy)'],
+ 4@T1.1547 : ['abs(x[2]) < 0.4', 'Close to bouncing time the ball should be close to the floor'],
+ }
+},
gravity : {
description : "Gravity like on the moon",
spec : {
g : 1.5
- }}}
+ },
+ assert: {
+ 6@ALWAYS: ['g==9.81', 'Check wrong gravity.']
+ }
+}}
diff --git a/tests/data/MobileCrane/MobileCrane.fmu b/tests/data/MobileCrane/MobileCrane.fmu
index bbe67a3..482b5b1 100644
Binary files a/tests/data/MobileCrane/MobileCrane.fmu and b/tests/data/MobileCrane/MobileCrane.fmu differ
diff --git a/tests/data/Oscillator/ForcedOscillator.xml b/tests/data/Oscillator/ForcedOscillator.xml
index adc0065..79b6700 100644
--- a/tests/data/Oscillator/ForcedOscillator.xml
+++ b/tests/data/Oscillator/ForcedOscillator.xml
@@ -1,12 +1,11 @@
+ 0.0
+ 0.01
-
-
-
-
-
-
+
+
+
diff --git a/tests/data/Oscillator/HarmonicOscillator.fmu b/tests/data/Oscillator/HarmonicOscillator.fmu
index b09cd16..d2974bf 100644
Binary files a/tests/data/Oscillator/HarmonicOscillator.fmu and b/tests/data/Oscillator/HarmonicOscillator.fmu differ
diff --git a/tests/data/crane_table.js5 b/tests/data/crane_table.js5
new file mode 100644
index 0000000..46812d6
--- /dev/null
+++ b/tests/data/crane_table.js5
@@ -0,0 +1,16 @@
+{
+header : {
+ xmlns : "http://opensimulationplatform.com/MSMI/OSPSystemStructure",
+ version : "0.1",
+ StartTime : 0.0,
+ BaseStepSize : 0.01,
+ },
+Simulators : {
+ simpleTable : {source: "SimpleTable.fmu", interpolate: True},
+ mobileCrane : {source: "MobileCrane.fmu" stepSize: 0.01,
+ pedestal.pedestalMass: 5000.0, boom.boom[0]: 20.0},
+ },
+ConnectionsVariable : [
+ ["simpleTable", "outputs[0]", "mobileCrane", "pedestal.angularVelocity"],
+ ],
+}
\ No newline at end of file
diff --git a/tests/test_assertion.py b/tests/test_assertion.py
index 0393b8a..a9b1e31 100644
--- a/tests/test_assertion.py
+++ b/tests/test_assertion.py
@@ -1,17 +1,81 @@
+# type: ignore
+
+import ast
from math import cos, sin
+from pathlib import Path
import matplotlib.pyplot as plt
+import numpy as np
import pytest
-from sympy import symbols
-from sympy.vector import CoordSys3D
-from sim_explorer.assertion import Assertion
+from sim_explorer.assertion import Assertion, Temporal
+from sim_explorer.case import Cases, Results
_t = [0.1 * float(x) for x in range(100)]
_x = [0.3 * sin(t) for t in _t]
_y = [1.0 * cos(t) for t in _t]
+def test_globals_locals():
+ """Test the usage of the globals and locals arguments within exec."""
+ from importlib import __import__
+
+ module = __import__("math", fromlist=["sin"])
+ locals().update({"sin": module.sin})
+ code = "def f(x):\n return sin(x)"
+ compiled = compile(code, "", "exec")
+ exec(compiled, locals(), locals())
+ # print(f"locals:{locals()}")
+ assert abs(locals()["f"](3.0) - sin(3.0)) < 1e-15
+
+
+def test_ast(show):
+ expr = "1+2+x.dot(x) + sin(y)"
+ if show:
+ a = ast.parse(expr, "