Skip to content

Commit

Permalink
json timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinRinas committed Dec 28, 2024
1 parent a3eaee9 commit 466d1e7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
2 changes: 2 additions & 0 deletions packages/modules/vehicles/json/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ def __init__(
url: Optional[str] = None,
soc_pattern: Optional[str] = None,
range_pattern: Optional[str] = None,
timestamp_pattern: Optional[str] = None,
timeout: Optional[int] = None,
calculate_soc: bool = False
):
self.url = url
self.soc_pattern = soc_pattern
self.range_pattern = range_pattern
self.timestamp_pattern = timestamp_pattern
self.timeout = timeout
self.calculate_soc = calculate_soc

Expand Down
37 changes: 31 additions & 6 deletions packages/modules/vehicles/json/soc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,70 @@
from modules.common.configurable_vehicle import ConfigurableVehicle
from modules.vehicles.json.config import JsonSocSetup, JsonSocConfiguration
from typing import Any, Dict
from datetime import datetime


log = logging.getLogger(__name__)


def extract_to_epoch(input_string: str) -> float:
# If already an integer, return it
if isinstance(input_string, int) or isinstance(input_string, float):
return int(input_string)

# Try parsing as UTC formatted time
try:
dt = datetime.strptime(input_string, "%Y-%m-%dT%H:%M:%SZ")
return int(dt.timestamp())
except ValueError:
log.exception(f"Kein UTC formatiertes Datum in '{input_string}' gefunden.")
return None


def parse_data(data: Dict[str, Any], pattern: str) -> float:
log.debug(f"parse_data: data='{data}' pattern='{pattern}'")

if pattern == "":
raise ValueError("Please provide pattern to parse data")
raise ValueError("Kein Pattern zum extrahieren der Daten definiert. Bitte in der Konfiguration aktualisieren.")

result = jq.compile(pattern).input(data).first()
if result is None:
raise ValueError(f"Pattern {pattern} did not match any data in {data}")
raise ValueError(f"Pattern {pattern} hat keine Ergebnisse in '{data}' geliefert.")

log.debug(f"result='{result}'")
return float(result)


def fetch_soc(config: JsonSocSetup) -> CarState:
url = config.configuration.url
soc_pattern = config.configuration.soc_pattern
range_pattern = config.configuration.range_pattern
timestamp_pattern = config.configuration.timestamp_pattern
timeout = config.configuration.timeout if isinstance(config.configuration.timeout, int) else None

if url is None or url == "":
log.warning("url not defined, set soc to 0")
log.warning("URL nicht definiert, setze SOC auf 0")
return CarState(0, 0)
else:
raw_data: Dict[str, Any] = req.get_http_session().get(url, timeout=timeout).json()

soc = parse_data(raw_data, soc_pattern)

if range_pattern is None or range_pattern == "":
log.warning("range_pattern not defined, set range to 0")
range = 0
log.debug("Kein Pattern für Range angegeben, setze Range auf None.")
range = None
else:
range = int(parse_data(raw_data, range_pattern))

return CarState(soc, range)
if timestamp_pattern is None or timestamp_pattern == "":
log.debug("Kein Pattern für Timestamp angegeben, setze Timestamp auf None.")
timestamp = None
else:
log.debug(f"timestamp_pattern='{timestamp_pattern}'")
timestamp = parse_data(raw_data, timestamp_pattern)
timestamp = extract_to_epoch(timestamp)

return CarState(soc=soc, range=range, soc_timestamp=timestamp)


def create_vehicle(vehicle_config: JsonSocSetup, vehicle: int):
Expand Down

0 comments on commit 466d1e7

Please sign in to comment.