diff --git a/pyproject.toml b/pyproject.toml index 6769697..bd86b8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ license = {file = "LICENSE"} readme = {file = "README.md", content-type = "text/markdown"} dynamic = ["version"] requires-python = ">=3.10" -dependencies = ["duckdb", "glom", "pyESDL[profiles]", "tabulate"] +dependencies = ["duckdb", "pyESDL[profiles]", "tabulate"] [project.optional-dependencies] dev = [ diff --git a/src/esdl4tulipa/mapping.py b/src/esdl4tulipa/mapping.py index d1099ae..0df56cc 100644 --- a/src/esdl4tulipa/mapping.py +++ b/src/esdl4tulipa/mapping.py @@ -1,80 +1,162 @@ """Mapping ESDL to Tulipa terminology.""" -from copy import deepcopy -from dataclasses import field +from dataclasses import dataclass +from dataclasses import fields from dataclasses import is_dataclass -from dataclasses import make_dataclass - -_base = {"name": ("name", str), "id": ("id", str), "state": ("active", bool)} -_producers = { - **_base, - "costInformation.investmentCosts.value": ("investment_cost", float), - "costInformation.variableOperationalAndMaintenanceCosts.value": ( - "variable_cost", - float, - ), - "technicalLifetime": ("lifetime", float), - "power": ("initial_capacity", float), -} -_storage = deepcopy(_producers) -_storage.pop("power") - -ESDL2TULIPA = { - "consumer": { - **_base, - "power": ("peak_demand", float), - }, - "producer": {**_producers}, - "conversion": { - **_producers, - "efficiency": ("efficiency", float), - # "inputOutputRelation": ("efficiency", float), # FIXME: duplicate - }, - "storage": { - **_storage, - "maxDischargeRate": ("capacity", float), - # "maxChargeRate": ("capacity", float), # FIXME: duplicate - "fillLevel": ("initial_storage_level", float), - "capacity": ("initial_storage_capacity", float), - }, -} +from typing import Type +from typing import TypeVar +from typing import Union +T = TypeVar("T") -def make_asset_t(kind: str) -> type: - """Create dataclass from list of fields and types. - Parameters - ---------- - kind : Literal['consumers', 'producers', 'conversion', 'storage'] - The fields are defined in the module level dictionary - `ESDL2TULIPA`. They are separated by asset type: 'consumers', - 'producers', 'conversion', 'storage'. - - Returns - ------- - +def unguarded_is_dataclass(_type: Type[T], /) -> bool: + """Remove :ref:`TypeGuard` from is_dataclass. + see: https://github.com/python/mypy/issues/14941 """ - if kind not in ESDL2TULIPA: - raise ValueError(f"unknown {kind=}, not one of {list(ESDL2TULIPA)}") + return is_dataclass(_type) + + +@dataclass(unsafe_hash=True) +class AssetData: + """Base dataclass to represent :ref:`esdl.esdl.EnergyAsset`.""" + + name: str = "" + id: str = "" + active: bool = False + + @classmethod + def esdl_key(cls, key: str) -> str: + """Get corresponding ESDL attribute.""" + _esdl_key = {"name": "name", "id": "id", "active": "state.value"} + return _esdl_key.get(key, "") + + @classmethod + def fields(cls) -> list[str]: + """Fields of the dataclass.""" + return [fld.name for fld in fields(cls)] # FIXME: add type validation - def __post_init__(self): + def __post_init__(self): # noqa: D105 for key, field_t in self.__annotations__.items(): value = getattr(self, key) - if not is_dataclass(field_t) or isinstance(value, dict): + if not unguarded_is_dataclass(field_t) or isinstance(value, dict): continue setattr(self, key, field_t(**value)) - fields = [ - (*f, field(default=None)) if len(f) == 2 else f - for f in ESDL2TULIPA[kind].values() - ] - return make_dataclass( - f"{kind}_t", fields, namespace={"__post_init__": __post_init__} - ) +@dataclass(unsafe_hash=True) +class hub_t(AssetData): # noqa: D101 + pass + + +@dataclass(unsafe_hash=True) +class consumer_t(AssetData): # noqa: D101 + peak_demand: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = {"peak_demand": "power"} + return _esdl_key.get(key, "") + + +@dataclass(unsafe_hash=True) +class _producer_t(AssetData): # noqa: D101 + investment_cost: float | None = None + variable_cost: float | None = None + lifetime: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = { + "investment_cost": "costInformation.investmentCosts.value", + "variable_cost": "costInformation.variableOperationalAndMaintenanceCosts.value", # noqa: E501 + "lifetime": "technicalLifetime", + } + return _esdl_key.get(key, "") -# create dataclasses for each kind, so the types can be reused, and -# created instances will be of the same type -asset_types = {kind: make_asset_t(kind) for kind in ESDL2TULIPA} + +@dataclass(unsafe_hash=True) +class producer_t(_producer_t): # noqa: D101 + initial_capacity: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = {"initial_capacity": "power"} + return _esdl_key.get(key, "") + + +@dataclass(unsafe_hash=True) +class conversion_t(producer_t): # noqa: D101 + efficiency: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = { + "efficiency": "efficiency" + # "efficiency": "inputOutputRelation", # FIXME: duplicate + } + return _esdl_key.get(key, "") + + +@dataclass(unsafe_hash=True) +class storage_t(AssetData): # noqa: D101 + capacity: float | None = None + initial_storage_level: float | None = None + initial_storage_capacity: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = { + "capacity": "maxDischargeRate", + # "capacity": "maxChargeRate", # FIXME: duplicate + "initial_storage_level": "fillLevel", + "initial_storage_capacity": "capacity", + } + return _esdl_key.get(key, "") + + +@dataclass(unsafe_hash=True) +class flow_t(_producer_t): # noqa: D101 + from_asset: str = "" + to_asset: str = "" + capacity: float | None = None + efficiency: float | None = None + + @classmethod + def esdl_key(cls, key: str) -> str: # noqa: D102 + if res := super().esdl_key(key): + return res + else: + _esdl_key = { + "capacity": "capacity", + "efficiency": "efficiency", + } + return _esdl_key.get(key, "") + + +ESDLAssets = Union[hub_t, consumer_t, producer_t, conversion_t, storage_t, flow_t] +asset_types: dict[str, type[ESDLAssets]] = { + "energynetwork": hub_t, + "consumer": consumer_t, + "producer": producer_t, + "conversion": conversion_t, + "storage": storage_t, + "transport": flow_t, +} diff --git a/src/esdl4tulipa/parser.py b/src/esdl4tulipa/parser.py index d2223d9..74adc1f 100644 --- a/src/esdl4tulipa/parser.py +++ b/src/esdl4tulipa/parser.py @@ -1,74 +1,134 @@ """Load and parse an ESDL file.""" +import contextlib +from dataclasses import fields +from functools import reduce +from io import StringIO +from itertools import chain +from itertools import islice from typing import Callable +from typing import TypeAlias +from typing import TypeVar from esdl import esdl from esdl.esdl_handler import EnergySystemHandler -from glom import glom -from pyecore.ecore import EObject from pyecore.ecore import EOrderedSet from tabulate import tabulate -from .mapping import ESDL2TULIPA +from .mapping import ESDLAssets from .mapping import asset_types + _HANDLER = EnergySystemHandler() +# Map Tulipa flow/asset kinds to corresponding ESDL base types. _kind_ts = { kind: esdl_type for name in dir(esdl) if not name.startswith("_") - for kind in ESDL2TULIPA - if name.casefold() == kind + for kind in asset_types + if kind == name.casefold() and (esdl_type := getattr(esdl, name)) and isinstance(esdl_type, type) and issubclass(esdl_type, esdl.EnergyAsset) } +# NOTE: EnergyNetwork is a subtype of Transport, this introduces +# ambiguity when trying to detect either. This is mitigated by +# relying on order dependence of case statements in Python's +# structural pattern matching, and adding guard clauses, see: +# https://peps.python.org/pep-0636/#matching-multiple-patterns + + +ESDLNode: TypeAlias = esdl.EnergySystem | esdl.Area | esdl.EnergyAsset + + +def batched(assets: list[esdl.EnergyAsset]): + """Batched iteration; in itertools from Python >=3.12.""" + batch = [] + for asset in assets: + batch.append(asset) + match asset: + case ( + esdl.Consumer() + | esdl.Conversion() + | esdl.Storage() + | esdl.EnergyNetwork() + ): + yield batch + batch = [] + case _: + pass -def fill_asset(asset: EObject, kind: str = ""): +def fill_asset(asset: esdl.EnergyAsset, kind: str = "", **overrides) -> ESDLAssets: """Fill asset dataclasses. Parameters ---------- - asset : EObject + asset : esdl.EnergyAsset ESDL Asset object - kind : Literal['consumers', 'producers', 'conversion', 'storage'] + kind : Literal['consumer', 'producer', 'conversion', 'storage', 'transport'] Kind of assets according to Tulipa; by default infer by string matching against ESDL asset types (see: `_kind_ts`). + **overrides + Any attribute overrides + Returns ------- - + ESDLAssets """ if not kind: - for k, t in _kind_ts.items(): - if isinstance(asset, t): - kind = k - break + (kind, _), *_ = filter(lambda i: isinstance(asset, i[1]), _kind_ts.items()) + + if asset_t := asset_types.get(kind): + args = { + tulipa_key: val + for tulipa_key in asset_t.fields() + if ( + val := reduce( + lambda obj, key: getattr(obj, key, None), + asset_t.esdl_key(tulipa_key).split("."), + asset, + ) + ) + } + return asset_types[kind](**args, **overrides) + else: + raise ValueError(f"unsupported {kind=}, not one of {list(asset_types)}") + - if kind not in asset_types: - raise ValueError(f"unknown {kind=}, not one of {list(asset_types)}") +def merge_assets(asset1: ESDLAssets, asset2: ESDLAssets, **overrides) -> ESDLAssets: + """Merge two assets (from & to) into a flow asset (transport in esdl). - args = { - tulipa_key: val - for esdl_key, (tulipa_key, _) in ESDL2TULIPA[kind].items() - if (val := glom(asset, esdl_key, default=None)) - } - return asset_types[kind](**args) + Logic: merge and discard. + - Merge + - if attribute is missing asset 1 but present in asset 2, accept the 2nd value + - if both assets have set the same value, accept it + - if both assets set the value but they aren't equal, raise an error -def merge_assets(asset1, asset2) -> dict: - """Merge two asset dataclasses. + - Discard if not a valid :ref:`flow_t` attribute (FIXME: should this precede merge?) - Logic: - - if for asset 1 it's missing and asset 2 is present, accept value set with asset 2 - - if asset 1 and asset 2 has set the same value, accept it - - if both assets set the value but they aren't equal, raise an error + Parameters + ---------- + asset1: ESDLAssets + From asset + + asset2: ESDLAssets + To asset + + **overrides + Any attribute overrides Returns ------- - dict + ESDLAssets + + Raises + ------ + ValueError + When attributes have mismatched values """ merged, errs = {}, [] @@ -96,44 +156,225 @@ def merge_assets(asset1, asset2) -> dict: if len(errs) > 0: tbl = tabulate(errs, headers=("column", "from", "to")) raise ValueError(f"mismatching assets: {asset1.name} != {asset2.name}\n {tbl}") - return merged + flow_t = asset_types["transport"] + _fields = {f.name for f in fields(flow_t)} + merged = {k: v for k, v in merged.items() if k in _fields} + return flow_t(**merged, **overrides) -def edge(out_port, in_port) -> dict: - """Find the edge that connects the OutPort to the InPort.""" - # TODO: customisable merge, how to choose kind? - asset1, asset2 = [fill_asset(port.energyasset) for port in (out_port, in_port)] - merged = merge_assets(asset1, asset2) - return {"from": asset1.name, "to": asset2.name, **merged} +def kinds(*assets: esdl.EnergyAsset) -> set[str]: + """Find and return `kinds` for all assets. + + Raises + ------ + RuntimeError + If one asset resolves to multiple kinds + + """ + res: set[str] = set() + for a in assets: + ks = {k for k, t in _kind_ts.items() if isinstance(a, t)} + if len(ks) > 1: + if ks == {"energynetwork", "transport"}: + res = res.union({"energynetwork"}) + else: + raise RuntimeError(f"{a}: unexpected asset type {ks}") + else: + res = res.union(ks) + return res + + +def edge(*assets: esdl.EnergyAsset) -> tuple[ESDLAssets, ESDLAssets, ESDLAssets]: + """Create a Tulipa flow, and assets from ESDL assets. + + Parameters + ---------- + *assets: esdl.EnergyAsset + Set of ESDL assets to link and convert. + + Returns + ------- + tuple[ESDLAssets, ...] + Tuple of (flow, from_asset, to_asset) + + Raises + ------ + ValueError + When connection pattern don't match either of: + + .. code:: + + asset1: producer | conversion | storage | energynetwork + asset2: producer | conversion | storage | energynetwork + + (asset1, asset2) + + link: transport + + (asset1, link, asset) + + """ + # FIXME: some (from, to) pairs are probably unphysical + match assets: + case ( + esdl.Producer() + | esdl.Conversion() + | esdl.Storage() + | esdl.EnergyNetwork() as a1, + esdl.Consumer() + | esdl.Conversion() + | esdl.Storage() + | esdl.EnergyNetwork() as a2, + ) if len(kinds(a1, a2)) == 2: + from_asset, to_asset = map(fill_asset, assets) + flow = merge_assets( + from_asset, to_asset, from_asset=from_asset.name, to_asset=to_asset.name + ) + case ( + esdl.Producer() + | esdl.Conversion() + | esdl.Storage() + | esdl.EnergyNetwork() as a1, + esdl.Transport() as link, + esdl.Consumer() + | esdl.Conversion() + | esdl.Storage() + | esdl.EnergyNetwork() as a2, + ) if not isinstance(link, esdl.EnergyNetwork) and len(kinds(a1, a2)) == 2: + from_asset = fill_asset(a1) + to_asset = fill_asset(a2) + flow = fill_asset(link, from_asset=a1.name, to_asset=a2.name) + case _: + # NOTE: unhandled case: asset, transport, ..., asset + raise ValueError(f"{assets=}: uncharted territory!") + + return (flow, from_asset, to_asset) + + +def itr_edges( + asset: esdl.EnergyAsset, edges: list[esdl.EnergyAsset], depth: int +) -> list[esdl.EnergyAsset]: + """Iterate over outgoing ports from an asset, and find the next asset. + + Parameters + ---------- + asset : esdl.EnergyAsset + Iterate over outgoing ports of this asset -def find_edge(asset: EObject) -> list[dict]: - """Find all out going connections to other assets.""" - return [ - edge(port, in_port) - for port in asset.port - if isinstance(port, esdl.OutPort) - for in_port in port.connectedTo - ] + edges : list[esdl.EnergyAsset] + List accumulating assets that form edges. + + depth : int + Counter keeping track of number of assets encountered; if this + exceeds 2 and recursion continues, raise :ref:`RecursionError`. + Subsequent call to :ref:`hop_edges` increments depth by 1. + + Returns + ------- + list[esdl.EnergyAsset] + + """ + if depth > 3: + raise RecursionError(f"{depth=}: trying to find an edge beyond 2 hops") + + if not isinstance(asset.port, EOrderedSet): + raise ValueError(f"{asset}: unsupported, doesn't have {asset.port=}") + + for _port1 in asset.port: + if isinstance(_port1, esdl.InPort): + continue + for _port2 in _port1.connectedTo: + if _port1 == _port2: # loop, don't know if it's likely + continue + if isinstance(_port2, esdl.OutPort): + continue + _asset = _port2.energyasset + hop_edges(_asset, edges, depth + 1) + return edges + + +def hop_edges( + asset: esdl.EnergyAsset, edges: list[esdl.EnergyAsset], depth: int = 1 +) -> list[esdl.EnergyAsset]: + """Find all the assets that have an incoming flow from initially provided asset. + + Parameters + ---------- + asset : esdl.EnergyAsset + Starting point of edge search, or an intermediate + :ref:`esdl.Transport` asset (excluding + :ref:`esdl.EnergyNetwork`). + + edges : list[esdl.EnergyAsset] + List accumulating assets that form edges. + + depth : int (default: 1) + Counter keeping track of number of assets encountered; if this + exceeds 2 and recursion continues, raise :ref:`RecursionError`. + + Returns + ------- + list[esdl.EnergyAsset] + Accumulated list of assets that form edges (same as the + parameter `edges`) in the format: + + [from_asset, to_asset1, link, to_asset2, ...] + + where the edges are: + + (from_asset, to_asset1), (from_asset, link, to_asset2), ... + + """ + match asset: + case ( + esdl.Producer() | esdl.Conversion() | esdl.Storage() | esdl.EnergyNetwork() + ) if depth == 1: + edges.append(asset) + itr_edges(asset, edges, depth) + case esdl.Transport() if depth == 2 and not isinstance( + asset, esdl.EnergyNetwork + ): + edges.append(asset) + itr_edges(asset, edges, depth) + case ( + esdl.Consumer() | esdl.Conversion() | esdl.Storage() | esdl.EnergyNetwork() + ) if depth > 1: + edges.append(asset) + case _: + raise ValueError(f"{asset}: why am I here? {depth=}") + return edges + + +def find_edges(asset: esdl.EnergyAsset) -> list[tuple[ESDLAssets, ...]]: + """Find all out going flows from the provided asset.""" + if (edges := hop_edges(asset, [])) and len(edges) > 1: + from_asset, *rest = edges + return [edge(from_asset, *assets) for assets in batched(rest)] + else: + return [] + + +res_t = TypeVar("res_t", tuple, list, set, dict, esdl.EnergyAsset) def parse_graph( - obj: EObject | EOrderedSet, - predicate: Callable[[EObject], list[dict]], - res: list[dict], -): + obj: ESDLNode | EOrderedSet, + predicate: Callable[[esdl.EnergyAsset], list[res_t] | res_t], + res: list[res_t], +) -> list[res_t]: """Parse ecore object to extract node attributes and connections. Parameters ---------- - obj : EObject | EOrderedSet + obj : ESDLNode | EOrderedSet ESDL object, or an ordered set of ESDL objects - predicate : Callable[[EObject], list[tuple]] + predicate : Callable[[esdl.EnergyAsset], list[res_t] | res_t] Function predicate that is applied on all assets, to find an edge that originates at that asset. - res : list[tuple] + res : list[res_t] Edges that are found are inserted into this list. Each edge is represented as a tuple: tuple[str, str, ]; the dataclass has edge properties determined by combining the @@ -141,42 +382,48 @@ def parse_graph( Returns ------- - list[tuple] - Same as `res` above. + list[res_t] + Same as `res` above (res_t: dict | tuple | esdl.EnergyAsset). """ match obj: case EOrderedSet(): for el in obj: parse_graph(el, predicate, res) - case EObject(): - if (obj_asset := getattr(obj, "asset", None)) and len(obj_asset) > 0: - for asset in obj_asset: - if not hasattr(asset, "name"): - continue - match predicate(asset): - case list() as _interim: - res.extend(_interim) - case _interim: - res.append(_interim) - - if getattr(obj, "area", None): + case esdl.EnergySystem() if isinstance(obj.instance, esdl.EOrderedSet): + parse_graph(obj.instance, predicate, res) + case esdl.Instance(): + parse_graph(obj.area, predicate, res) + case esdl.Area(): + if isinstance(obj.area, EOrderedSet): # may contain sub-areas parse_graph(obj.area, predicate, res) - - if getattr(obj, "instance", None): - parse_graph(obj.instance, predicate, res) + if isinstance(obj.asset, EOrderedSet): # may also contain assets + parse_graph(obj.asset, predicate, res) + case (esdl.Producer() | esdl.Conversion() | esdl.Storage()) as asset: + if hasattr(asset, "name"): + match predicate(asset): + case list() as _interim: + res.extend(_interim) + case _interim: + res.append(_interim) + case esdl.Transport() | esdl.Consumer(): + pass # only following out going flows case _: raise ValueError(f"{obj}: unsupported value") return res -def load(path: str): - """Load ESDL file and parse nodes.""" - ensys = _HANDLER.load_file(path) - edges = parse_graph(ensys, find_edge, []) - return edges - - -def debug(path: str): +def load(path: str) -> tuple[tuple[ESDLAssets, ...], tuple[ESDLAssets, ...]]: """Load ESDL file and parse nodes.""" - return _HANDLER.load_file(path) + with contextlib.redirect_stdout(StringIO()): + ensys = _HANDLER.load_file(path) + edges = parse_graph(ensys, find_edges, []) + flows = tuple(edge[0] for edge in edges) + assets: tuple[ESDLAssets, ...] = tuple(set(chain(*(edge[1:] for edge in edges)))) + return flows, assets + + +def debug(path: str) -> esdl.EnergySystem: + """Load ESDL file and parse nodes for debugging.""" + with contextlib.redirect_stdout(StringIO()): + return _HANDLER.load_file(path)