Skip to content

Commit

Permalink
healthcheck on metro container; update env vars on readme; wrf_code o…
Browse files Browse the repository at this point in the history
…n station - issue noi-techpark#31
  • Loading branch information
Marco Angheben committed Sep 26, 2024
1 parent 2c91be1 commit bbcc290
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 56 deletions.
61 changes: 32 additions & 29 deletions pollution_v2/README.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pollution_v2/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ services:

metro:
image: road_weather:latest
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:80/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always

volumes:
Expand Down
9 changes: 8 additions & 1 deletion pollution_v2/src/common/data_model/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class Station:

code: str
wrf_code: Optional[str]
active: bool
available: bool
coordinates: dict
Expand Down Expand Up @@ -50,6 +51,7 @@ def sensor_type(self) -> float:
def from_odh_repr(cls, raw_data: dict):
return cls(
code=raw_data["scode"],
wrf_code=raw_data.get("wrf_code"),
active=raw_data["sactive"],
available=raw_data["savailable"],
coordinates=raw_data["scoordinate"],
Expand All @@ -62,6 +64,7 @@ def from_odh_repr(cls, raw_data: dict):
def to_json(self) -> dict:
return {
"code": self.code,
"wrf_code": self.wrf_code,
"active": self.active,
"available": self.available,
"coordinates": self.coordinates,
Expand All @@ -73,8 +76,9 @@ def to_json(self) -> dict:

@classmethod
def from_json(cls, dict_data) -> Station:
return Station(
res = Station(
code=dict_data["code"],
wrf_code=dict_data.get("wrf_code"),
active=dict_data["active"],
available=dict_data["available"],
coordinates=dict_data["coordinates"],
Expand All @@ -83,6 +87,9 @@ def from_json(cls, dict_data) -> Station:
station_type=dict_data["station_type"],
origin=dict_data["origin"]
)
if "wrf_code" in dict_data.keys():
res.wrf_code = dict_data["wrf_code"]
return res


StationType = TypeVar("StationType", bound=Station)
Expand Down
23 changes: 17 additions & 6 deletions pollution_v2/src/dags/aiaas_road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def get_stations_list(**kwargs) -> list[dict]:
with open(ROAD_WEATHER_CONFIG_FILE, 'r') as file:
config = yaml.safe_load(file)
whitelist = config.get('whitelist', [])
station_mapping = {str(k): str(v) for k, v in config['mappings'].items()}

if whitelist:
whitelist = list(map(str, whitelist))
Expand All @@ -109,11 +110,24 @@ def get_stations_list(**kwargs) -> list[dict]:
# Serialization and deserialization is dependent on speed.
# Use built-in functions like dict as much as you can and stay away
# from using classes and other complex structures.
station_dicts = [station.to_json() for station in stations_list]
station_dicts = []
for station in stations_list:
if str(station.code) not in station_mapping:
logger.error(f"Station code [{station.code}] not found in the mapping [{ROAD_WEATHER_CONFIG_FILE}]")
raise ValueError(f"Station code [{station.code}] not found in the mapping")

logger.info("Found mapping for ODH station code "
"[" + str(str(station.code)) + "] -> CISMA station code "
"[" + str(station_mapping[station.code]) + "]")
logger.info(f"Downloading forecast data for station [{station.code}] from CISMA")
wrf_station_code = station_mapping[str(station.code)]
station.wrf_code = wrf_station_code

station_dicts.append(station.to_json())

logger.info(f"Retrieved {len(station_dicts)} stations")

# TODO restore full list
return station_dicts[:2]
return station_dicts


@task
Expand All @@ -133,12 +147,9 @@ def process_station(station_dict: dict, **kwargs):
# min_from_date, max_to_date = dag.init_date_range(None, None)

computation_start_dt = datetime.now()
# logger.info(f"Running computation from [{min_from_date}] to [{max_to_date}]")
logger.info(f"Running computation")
manager.run_computation_for_single_station(station)

# TODO: run container with METRo

computation_end_dt = datetime.now()
logger.info(f"Completed computation in [{(computation_end_dt - computation_start_dt).seconds}]")

Expand Down
22 changes: 3 additions & 19 deletions pollution_v2/src/road_weather/manager/road_weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,15 @@ def _download_forecast_data(self, traffic_station: Station) -> Tuple[str, str]:
# !!!: temporarily downloading forecast data of WRF from CISMA
# TODO: replace with the actual forecast data from ODH when available

station_code = traffic_station.code
xml_url = f"https://www.cisma.bz.it/wrf-alpha/CR/1{traffic_station.wrf_code}.xml"

with open(ROAD_WEATHER_CONFIG_FILE, 'r') as file:
config = yaml.safe_load(file)
# ODH station code -> WRF station code
station_mapping = config['mappings']

station_mapping = {str(k): str(v) for k, v in station_mapping.items()}
if str(station_code) not in station_mapping:
logger.error(f"Station code [{station_code}] not found in the mapping [{ROAD_WEATHER_CONFIG_FILE}]")
raise ValueError(f"Station code [{station_code}] not found in the mapping")

logger.info("Found mapping for ODH station code [" + str(str(station_code)) + "] -> "
"CISMA station code [" + str(station_mapping[station_code]) + "]")
logger.info(f"Downloading forecast data for station [{station_code}] from CISMA")
wrf_station_code = station_mapping[str(station_code)]
xml_url = f"https://www.cisma.bz.it/wrf-alpha/CR/1{wrf_station_code}.xml"

forecast = Forecast(wrf_station_code)
forecast = Forecast(traffic_station.wrf_code)
forecast.download_xml(xml_url)
forecast.interpolate_hourly()
forecast.negative_radiation_filter()
roadcast_start = forecast.start
logger.info('forecast - XML processed correctly')
forecast_filename = f"{TMP_DIR}/forecast_{wrf_station_code}_{roadcast_start}.xml"
forecast_filename = f"{TMP_DIR}/forecast_{traffic_station.wrf_code}_{roadcast_start}.xml"
forecast.to_xml(forecast_filename)
logger.info(f'forecast - XML saved in {forecast_filename} ')
return forecast_filename, roadcast_start
Expand Down
2 changes: 1 addition & 1 deletion pollution_v2/src/road_weather/model/road_weather_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def compute_data(self, observation: RoadWeatherObservationMeasureCollection,
logger.info(f"Forecast start: {forecast_start}")

# TODO cablato
url = "http://metro:80/predict/?station_code=101"
url = f"http://metro:80/predict/?station_code=1{station.wrf_code}"

# List of files to upload
files_to_upload = [forecast_filename, observation_filename]
Expand Down

0 comments on commit bbcc290

Please sign in to comment.