From e5c5a9223c05982c217e82dd47e40f6b6cfe7679 Mon Sep 17 00:00:00 2001 From: Jules Dejaeghere Date: Tue, 26 Dec 2023 16:01:32 +0100 Subject: [PATCH] Remove YAML config and setup UI config --- custom_components/irm_kmi/__init__.py | 42 +++- custom_components/irm_kmi/api.py | 18 +- custom_components/irm_kmi/config_flow.py | 41 ++++ custom_components/irm_kmi/const.py | 6 + custom_components/irm_kmi/coordinator.py | 249 ++++++++++++----------- custom_components/irm_kmi/data.py | 9 + custom_components/irm_kmi/manifest.json | 1 + custom_components/irm_kmi/weather.py | 50 +++-- 8 files changed, 267 insertions(+), 149 deletions(-) create mode 100644 custom_components/irm_kmi/config_flow.py create mode 100644 custom_components/irm_kmi/data.py diff --git a/custom_components/irm_kmi/__init__.py b/custom_components/irm_kmi/__init__.py index 2c04bb3..29af938 100644 --- a/custom_components/irm_kmi/__init__.py +++ b/custom_components/irm_kmi/__init__.py @@ -1,7 +1,39 @@ """Integration for IRM KMI weather""" -from homeassistant.components.weather import Forecast -class IrmKmiForecast(Forecast): - """Forecast class with additional attributes for IRM KMI""" - text_fr: str | None - text_nl: str | None +# File inspired from https://github.com/ludeeus/integration_blueprint + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.const import CONF_ZONE + +from .const import DOMAIN, PLATFORMS +from .coordinator import IrmKmiCoordinator +from .weather import IrmKmiWeather + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up this integration using UI.""" + hass.data.setdefault(DOMAIN, {}) + + hass.data[DOMAIN][entry.entry_id] = coordinator = IrmKmiCoordinator(hass, entry.data[CONF_ZONE]) + + # https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities + await coordinator.async_config_entry_first_refresh() + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) + entry.async_on_unload(entry.add_update_listener(async_reload_entry)) + + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Handle removal of an entry.""" + if unloaded := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): + hass.data[DOMAIN].pop(entry.entry_id) + return unloaded + + +async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Reload config entry.""" + await async_unload_entry(hass, entry) + await async_setup_entry(hass, entry) diff --git a/custom_components/irm_kmi/api.py b/custom_components/irm_kmi/api.py index c3680de..1e25a8f 100644 --- a/custom_components/irm_kmi/api.py +++ b/custom_components/irm_kmi/api.py @@ -1,6 +1,7 @@ """API Client for IRM KMI weather""" from __future__ import annotations +import logging import asyncio import socket @@ -9,6 +10,8 @@ import hashlib from datetime import datetime +_LOGGER = logging.getLogger(__name__) + class IrmKmiApiError(Exception): """Exception to indicate a general API error.""" @@ -26,12 +29,6 @@ class IrmKmiApiParametersError( """Exception to indicate a parameter error.""" -class IrmKmiApiAuthenticationError( - IrmKmiApiError -): - """Exception to indicate an authentication error.""" - - def _api_key(method_name: str): """Get API key.""" return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest() @@ -39,7 +36,7 @@ def _api_key(method_name: str): class IrmKmiApiClient: """Sample API Client.""" - + COORD_DECIMALS = 6 def __init__(self, session: aiohttp.ClientSession) -> None: """Sample API Client.""" self._session = session @@ -54,6 +51,11 @@ async def get_forecasts_city(self, city_id: int) -> any: async def get_forecasts_coord(self, coord: dict) -> any: """Get forecasts for given city.""" + assert 'lat' in coord + assert 'long' in coord + coord['lat'] = round(coord['lat'], self.COORD_DECIMALS) + coord['long'] = round(coord['long'], self.COORD_DECIMALS) + return await self._api_wrapper( params={"s": "getForecasts"} | coord ) @@ -75,6 +77,7 @@ async def _api_wrapper( try: async with async_timeout.timeout(10): + _LOGGER.debug(f"Calling for {params}") response = await self._session.request( method=method, url=f"{self._base_url}{path}", @@ -82,6 +85,7 @@ async def _api_wrapper( json=data, params=params ) + _LOGGER.debug(f"API status code {response.status}") response.raise_for_status() return await response.json() diff --git a/custom_components/irm_kmi/config_flow.py b/custom_components/irm_kmi/config_flow.py new file mode 100644 index 0000000..517938a --- /dev/null +++ b/custom_components/irm_kmi/config_flow.py @@ -0,0 +1,41 @@ +import logging +import voluptuous as vol + +from homeassistant.components.zone import DOMAIN as ZONE_DOMAIN +from homeassistant.config_entries import ConfigFlow +from homeassistant.const import CONF_ZONE +from homeassistant.data_entry_flow import FlowResult +from homeassistant.helpers.selector import EntitySelector, EntitySelectorConfig + +from .const import DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +class IrmKmiConfigFlow(ConfigFlow, domain=DOMAIN): + VERSION = 1 + + async def async_step_user(self, user_input: dict | None = None) -> FlowResult: + + if user_input is not None: + _LOGGER.debug(f"Provided config user is: {user_input}") + + await self.async_set_unique_id(user_input[CONF_ZONE]) + self._abort_if_unique_id_configured() + + state = self.hass.states.get(user_input[CONF_ZONE]) + return self.async_create_entry( + title=state.name if state else "IRM KMI", + data={CONF_ZONE: user_input[CONF_ZONE]}, + ) + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema( + { + vol.Required(CONF_ZONE): EntitySelector( + EntitySelectorConfig(domain=ZONE_DOMAIN), + ), + } + ), + ) diff --git a/custom_components/irm_kmi/const.py b/custom_components/irm_kmi/const.py index a5f0c41..392f749 100644 --- a/custom_components/irm_kmi/const.py +++ b/custom_components/irm_kmi/const.py @@ -11,8 +11,14 @@ ATTR_CONDITION_CLEAR_NIGHT, ATTR_CONDITION_EXCEPTIONAL ) +from homeassistant.const import Platform DOMAIN = 'irm_kmi' +PLATFORMS: list[Platform] = [Platform.WEATHER] +OUT_OF_BENELUX = ["außerhalb der Benelux (Brussels)", + "Hors de Belgique (Bxl)", + "Outside the Benelux (Brussels)", + "Buiten de Benelux (Brussel)"] # map ('ww', 'dayNight') tuple from IRM KMI to HA conditions IRM_KMI_TO_HA_CONDITION_MAP = { diff --git a/custom_components/irm_kmi/coordinator.py b/custom_components/irm_kmi/coordinator.py index 513b463..b7a3783 100644 --- a/custom_components/irm_kmi/coordinator.py +++ b/custom_components/irm_kmi/coordinator.py @@ -7,101 +7,24 @@ import async_timeout from homeassistant.components.weather import Forecast +from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.update_coordinator import ( DataUpdateCoordinator, UpdateFailed, ) -from . import IrmKmiForecast -from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP +from .data import IrmKmiForecast +from .const import OUT_OF_BENELUX, IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP from .api import IrmKmiApiClient, IrmKmiApiError _LOGGER = logging.getLogger(__name__) -def hourly_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None: - if data is None or not isinstance(data, list) or len(data) == 0: - return None - - forecasts = list() - day = datetime.now() - - for f in data: - if 'dateShow' in f: - day = day + timedelta(days=1) - - hour = f.get('hour', None) - if hour is None: - continue - - precipitation_probability = None - if f.get('precipChance', None) is not None: - precipitation_probability = int(f.get('precipChance')) - - ww = None - if f.get('ww', None) is not None: - ww = int(f.get('ww')) - - forecast = Forecast( - datetime=day.strftime(f'%Y-%m-%dT{hour}:00:00'), - condition=CDT_MAP.get((ww, f.get('dayNight', None)), None), - native_precipitation=f.get('precipQuantity', None), - native_temperature=f.get('temp', None), - native_templow=None, - native_wind_gust_speed=f.get('windPeakSpeedKm', None), - native_wind_speed=f.get('windSpeedKm', None), - precipitation_probability=precipitation_probability, - wind_bearing=f.get('windDirectionText', {}).get('en'), - native_pressure=f.get('pressure', None), - is_daytime=f.get('dayNight', None) == 'd' - ) - - forecasts.append(forecast) - - return forecasts - - -def daily_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None: - if data is None or not isinstance(data, list) or len(data) == 0: - return None - - forecasts = list() - n_days = 0 - - for f in data: - precipitation = None - if f.get('precipQuantity', None) is not None: - precipitation = float(f.get('precipQuantity')) - - is_daytime = f.get('dayNight', None) == 'd' - - forecast = IrmKmiForecast( - datetime=(datetime.now() + timedelta(days=n_days)).strftime('%Y-%m-%d') - if is_daytime else datetime.now().strftime('%Y-%m-%d'), - condition=CDT_MAP.get((f.get('ww1', None), f.get('dayNight', None)), None), - native_precipitation=precipitation, - native_temperature=f.get('tempMax', None), - native_templow=f.get('tempMin', None), - native_wind_gust_speed=f.get('wind', {}).get('peakSpeed'), - native_wind_speed=f.get('wind', {}).get('speed'), - precipitation_probability=f.get('precipChance', None), - wind_bearing=f.get('wind', {}).get('dirText', {}).get('en'), - is_daytime=is_daytime, - text_fr=f.get('text', {}).get('fr'), - text_nl=f.get('text', {}).get('nl') - ) - forecasts.append(forecast) - if is_daytime: - n_days += 1 - - return forecasts - - class IrmKmiCoordinator(DataUpdateCoordinator): """Coordinator to update data from IRM KMI""" - def __init__(self, hass, coord: dict): + def __init__(self, hass, zone): """Initialize my coordinator.""" super().__init__( hass, @@ -112,7 +35,7 @@ def __init__(self, hass, coord: dict): update_interval=timedelta(seconds=30), ) self._api_client = IrmKmiApiClient(session=async_get_clientsession(hass)) - self._coord = coord + self._zone = zone async def _async_update_data(self): """Fetch data from API endpoint. @@ -120,47 +43,137 @@ async def _async_update_data(self): This is the place to pre-process the data to lookup tables so entities can quickly look up their data. """ + if (zone := self.hass.states.get(self._zone)) is None: + raise UpdateFailed(f"Zone '{self._zone}' not found") + try: # Note: asyncio.TimeoutError and aiohttp.ClientError are already # handled by the data update coordinator. async with async_timeout.timeout(10): - api_data = await self._api_client.get_forecasts_coord(self._coord) + api_data = await self._api_client.get_forecasts_coord( + {'lat': zone.attributes[ATTR_LATITUDE], + 'long': zone.attributes[ATTR_LONGITUDE]} + ) _LOGGER.debug(f"Observation for {api_data.get('cityName', '')}: {api_data.get('obs', '{}')}") - # Process data to get current hour forecast - now_hourly = None - hourly_forecast_data = api_data.get('for', {}).get('hourly') - if not (hourly_forecast_data is None - or not isinstance(hourly_forecast_data, list) - or len(hourly_forecast_data) == 0): - - for current in hourly_forecast_data[:2]: - if datetime.now().strftime('%H') == current['hour']: - now_hourly = current - # Get UV index - module_data = api_data.get('module', None) - uv_index = None - if not (module_data is None or not isinstance(module_data, list)): - for module in module_data: - if module.get('type', None) == 'uv': - uv_index = module.get('data', {}).get('levelValue') - - # Put everything together - processed_data = { - 'current_weather': { - 'condition': CDT_MAP.get( - (api_data.get('obs', {}).get('ww'), api_data.get('obs', {}).get('dayNight')), None), - 'temperature': api_data.get('obs', {}).get('temp'), - 'wind_speed': now_hourly.get('windSpeedKm', None) if now_hourly is not None else None, - 'wind_gust_speed': now_hourly.get('windPeakSpeedKm', None) if now_hourly is not None else None, - 'wind_bearing': now_hourly.get('windDirectionText', {}).get('en') if now_hourly is not None else None, - 'pressure': now_hourly.get('pressure', None) if now_hourly is not None else None, - 'uv_index': uv_index - }, - 'daily_forecast': daily_list_to_forecast(api_data.get('for', {}).get('daily')), - 'hourly_forecast': hourly_list_to_forecast(api_data.get('for', {}).get('hourly')) - } - - return processed_data except IrmKmiApiError as err: raise UpdateFailed(f"Error communicating with API: {err}") + + if api_data.get('cityName', None) in OUT_OF_BENELUX: + raise UpdateFailed(f"Zone '{self._zone}' is out of Benelux and forecast is only available in the Benelux") + + return self.process_api_data(api_data) + + @staticmethod + def process_api_data(api_data): + # Process data to get current hour forecast + now_hourly = None + hourly_forecast_data = api_data.get('for', {}).get('hourly') + if not (hourly_forecast_data is None + or not isinstance(hourly_forecast_data, list) + or len(hourly_forecast_data) == 0): + + for current in hourly_forecast_data[:2]: + if datetime.now().strftime('%H') == current['hour']: + now_hourly = current + # Get UV index + module_data = api_data.get('module', None) + uv_index = None + if not (module_data is None or not isinstance(module_data, list)): + for module in module_data: + if module.get('type', None) == 'uv': + uv_index = module.get('data', {}).get('levelValue') + # Put everything together + processed_data = { + 'current_weather': { + 'condition': CDT_MAP.get( + (api_data.get('obs', {}).get('ww'), api_data.get('obs', {}).get('dayNight')), None), + 'temperature': api_data.get('obs', {}).get('temp'), + 'wind_speed': now_hourly.get('windSpeedKm', None) if now_hourly is not None else None, + 'wind_gust_speed': now_hourly.get('windPeakSpeedKm', None) if now_hourly is not None else None, + 'wind_bearing': now_hourly.get('windDirectionText', {}).get('en') if now_hourly is not None else None, + 'pressure': now_hourly.get('pressure', None) if now_hourly is not None else None, + 'uv_index': uv_index + }, + 'daily_forecast': IrmKmiCoordinator.daily_list_to_forecast(api_data.get('for', {}).get('daily')), + 'hourly_forecast': IrmKmiCoordinator.hourly_list_to_forecast(api_data.get('for', {}).get('hourly')) + } + return processed_data + + @staticmethod + def hourly_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None: + if data is None or not isinstance(data, list) or len(data) == 0: + return None + + forecasts = list() + day = datetime.now() + + for f in data: + if 'dateShow' in f: + day = day + timedelta(days=1) + + hour = f.get('hour', None) + if hour is None: + continue + + precipitation_probability = None + if f.get('precipChance', None) is not None: + precipitation_probability = int(f.get('precipChance')) + + ww = None + if f.get('ww', None) is not None: + ww = int(f.get('ww')) + + forecast = Forecast( + datetime=day.strftime(f'%Y-%m-%dT{hour}:00:00'), + condition=CDT_MAP.get((ww, f.get('dayNight', None)), None), + native_precipitation=f.get('precipQuantity', None), + native_temperature=f.get('temp', None), + native_templow=None, + native_wind_gust_speed=f.get('windPeakSpeedKm', None), + native_wind_speed=f.get('windSpeedKm', None), + precipitation_probability=precipitation_probability, + wind_bearing=f.get('windDirectionText', {}).get('en'), + native_pressure=f.get('pressure', None), + is_daytime=f.get('dayNight', None) == 'd' + ) + + forecasts.append(forecast) + + return forecasts + + @staticmethod + def daily_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None: + if data is None or not isinstance(data, list) or len(data) == 0: + return None + + forecasts = list() + n_days = 0 + + for f in data: + precipitation = None + if f.get('precipQuantity', None) is not None: + precipitation = float(f.get('precipQuantity')) + + is_daytime = f.get('dayNight', None) == 'd' + + forecast = IrmKmiForecast( + datetime=(datetime.now() + timedelta(days=n_days)).strftime('%Y-%m-%d') + if is_daytime else datetime.now().strftime('%Y-%m-%d'), + condition=CDT_MAP.get((f.get('ww1', None), f.get('dayNight', None)), None), + native_precipitation=precipitation, + native_temperature=f.get('tempMax', None), + native_templow=f.get('tempMin', None), + native_wind_gust_speed=f.get('wind', {}).get('peakSpeed'), + native_wind_speed=f.get('wind', {}).get('speed'), + precipitation_probability=f.get('precipChance', None), + wind_bearing=f.get('wind', {}).get('dirText', {}).get('en'), + is_daytime=is_daytime, + text_fr=f.get('text', {}).get('fr'), + text_nl=f.get('text', {}).get('nl') + ) + forecasts.append(forecast) + if is_daytime: + n_days += 1 + + return forecasts diff --git a/custom_components/irm_kmi/data.py b/custom_components/irm_kmi/data.py new file mode 100644 index 0000000..3be4f96 --- /dev/null +++ b/custom_components/irm_kmi/data.py @@ -0,0 +1,9 @@ +from homeassistant.components.weather import Forecast + + +class IrmKmiForecast(Forecast): + """Forecast class with additional attributes for IRM KMI""" + + # TODO: add condition_2 as well and evolution to match data from the API? + text_fr: str | None + text_nl: str | None diff --git a/custom_components/irm_kmi/manifest.json b/custom_components/irm_kmi/manifest.json index 4bf85c4..e159b69 100644 --- a/custom_components/irm_kmi/manifest.json +++ b/custom_components/irm_kmi/manifest.json @@ -2,6 +2,7 @@ "domain": "irm_kmi", "name": "IRM KMI Weather Belgium", "codeowners": ["@jdejaegh"], + "config_flow": true, "dependencies": [], "documentation": "https://github.com/jdejaegh/irm-kmi-ha/", "integration_type": "service", diff --git a/custom_components/irm_kmi/weather.py b/custom_components/irm_kmi/weather.py index 8485356..dcef02a 100644 --- a/custom_components/irm_kmi/weather.py +++ b/custom_components/irm_kmi/weather.py @@ -2,37 +2,52 @@ from typing import List from homeassistant.components.weather import WeatherEntity, WeatherEntityFeature, Forecast +from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo +from homeassistant.config_entries import ConfigEntry from homeassistant.const import UnitOfTemperature, UnitOfSpeed, UnitOfPrecipitationDepth, UnitOfPressure +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.update_coordinator import ( CoordinatorEntity, ) +from . import DOMAIN from .coordinator import IrmKmiCoordinator _LOGGER = logging.getLogger(__name__) -async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): - _LOGGER.debug(f"IRM KMI setup. Config: {config}") - coordinator = IrmKmiCoordinator(hass, coord={'lat': config.get("lat"), 'long': config.get("lon")}) +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback): + """Set up the weather entry.""" + _LOGGER.debug(f'async_setup_entry entry is: {entry}') + coordinator = hass.data[DOMAIN][entry.entry_id] await coordinator.async_config_entry_first_refresh() - - async_add_entities([IrmKmiWeather( - coordinator, - config.get("name", "IRM KMI Weather") - )]) + async_add_entities( + [IrmKmiWeather(coordinator, entry)] + ) class IrmKmiWeather(CoordinatorEntity, WeatherEntity): - def __init__(self, coordinator: IrmKmiCoordinator, name: str) -> None: + def __init__(self, + coordinator: IrmKmiCoordinator, + entry: ConfigEntry + ) -> None: super().__init__(coordinator) - self._name = name + self._name = entry.title + self._attr_unique_id = entry.entry_id + self._attr_device_info = DeviceInfo( + entry_type=DeviceEntryType.SERVICE, + identifiers={(DOMAIN, entry.entry_id)}, + manufacturer="IRM KMI", + name=entry.title + ) @property def supported_features(self) -> WeatherEntityFeature: features = WeatherEntityFeature(0) + features |= WeatherEntityFeature.FORECAST_DAILY features |= WeatherEntityFeature.FORECAST_TWICE_DAILY features |= WeatherEntityFeature.FORECAST_HOURLY return features @@ -85,17 +100,14 @@ def native_pressure_unit(self) -> str | None: def uv_index(self) -> float | None: return self.coordinator.data.get('current_weather').get('uv_index') - @property - def forecast(self) -> list[Forecast] | None: - result = list() - if self.coordinator.data.get('daily_forecast') is not None: - result += self.coordinator.data.get('daily_forecast') - if self.coordinator.data.get('hourly_forecast') is not None: - result += self.coordinator.data.get('hourly_forecast') - return result - async def async_forecast_twice_daily(self) -> List[Forecast] | None: return self.coordinator.data.get('daily_forecast') + async def async_forecast_daily(self) -> list[Forecast] | None: + data: list[Forecast] = self.coordinator.data.get('daily_forecast') + if not isinstance(data, list): + return None + return [f for f in data if f.get('is_daytime')] + async def async_forecast_hourly(self) -> list[Forecast] | None: return self.coordinator.data.get('hourly_forecast')