From 57c7226241374537ac044edf127e9aac422cdf15 Mon Sep 17 00:00:00 2001 From: Stan Soldatov Date: Sun, 24 Nov 2024 04:42:40 +0100 Subject: [PATCH] Background generation WIP. --- Dockerfile | 2 +- dev/requirements.txt | 3 +- maps4fs/generator/background.py | 220 ++++++++++++++++++++++++++++++++ maps4fs/generator/dem.py | 68 +++++++--- maps4fs/generator/map.py | 8 ++ maps4fs/generator/tile.py | 28 ++++ pyproject.toml | 2 + requirements.txt | 2 + 8 files changed, 313 insertions(+), 20 deletions(-) create mode 100644 maps4fs/generator/background.py create mode 100644 maps4fs/generator/tile.py diff --git a/Dockerfile b/Dockerfile index b34b9437..8032ac87 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ WORKDIR /usr/src/app COPY data /usr/src/app/data COPY webui /usr/src/app/webui -RUN pip install "opencv-python" "folium" "osmnx<2.0.0" "rasterio" "tqdm" "streamlit" "maps4fs" +RUN pip install "opencv-python" "folium" "geopy" "osmnx<2.0.0" "rasterio" "trimesh" "tqdm" "streamlit" "maps4fs" EXPOSE 8501 diff --git a/dev/requirements.txt b/dev/requirements.txt index 7f7bb634..3dcf1413 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -9,4 +9,5 @@ types-tqdm pandas-stubs types-requests pytest -folium \ No newline at end of file +folium +geopy \ No newline at end of file diff --git a/maps4fs/generator/background.py b/maps4fs/generator/background.py new file mode 100644 index 00000000..a07238a4 --- /dev/null +++ b/maps4fs/generator/background.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, NamedTuple + +import cv2 +import numpy as np +import trimesh +from geopy.distance import distance + +from maps4fs.generator.tile import Tile + +if TYPE_CHECKING: + from maps4fs.generator.map import Map + +DEFAULT_DISTANCE = 2048 + + +class PathInfo(NamedTuple): + code: str + angle: int + distance: int + size: tuple[int, int] + + +class Background: + def __init__(self, map: Map): + self.map = map + self.center_latitude = map.coordinates[0] + self.center_longitude = map.coordinates[1] + self.map_height = map.height + self.map_width = map.width + self.map_directory = map.map_directory + self.logger = map.logger + + self.tiles: list[Tile] = [] + self.register_tiles() + + def register_tiles(self): + # Move clockwise from N and calculate coordinates and sizes for each tile. + origin = (self.center_latitude, self.center_longitude) + half_width = int(self.map_width / 2) + half_height = int(self.map_height / 2) + + paths = [ + PathInfo( + "N", 0, half_height + DEFAULT_DISTANCE / 2, (self.map_width, DEFAULT_DISTANCE) + ), + PathInfo( + "NE", 90, half_width + DEFAULT_DISTANCE / 2, (DEFAULT_DISTANCE, DEFAULT_DISTANCE) + ), + PathInfo( + "E", 180, half_height + DEFAULT_DISTANCE / 2, (self.map_height, DEFAULT_DISTANCE) + ), + PathInfo( + "SE", 180, half_height + DEFAULT_DISTANCE / 2, (DEFAULT_DISTANCE, DEFAULT_DISTANCE) + ), + PathInfo( + "S", 270, half_width + DEFAULT_DISTANCE / 2, (self.map_width, DEFAULT_DISTANCE) + ), + PathInfo( + "SW", 270, half_width + DEFAULT_DISTANCE / 2, (DEFAULT_DISTANCE, DEFAULT_DISTANCE) + ), + PathInfo( + "W", 0, half_height + DEFAULT_DISTANCE / 2, (self.map_height, DEFAULT_DISTANCE) + ), + PathInfo( + "NW", 0, half_height + DEFAULT_DISTANCE / 2, (DEFAULT_DISTANCE, DEFAULT_DISTANCE) + ), + ] + + for path in paths: + destination = distance(meters=path.distance).destination(origin, path.angle) + tile_coordinates = (destination.latitude, destination.longitude) + + tile = Tile( + game=self.map.game, + coordinates=tile_coordinates, + map_height=path.size[1], + map_width=path.size[0], + map_directory=self.map_directory, + logger=self.logger, + tile_code=path.code, + auto_process=False, + blur_radius=0, + multiplier=10, + ) + + origin = tile_coordinates + self.tiles.append(tile) + + def process(self): + for tile in self.tiles: + tile.process() + + self.debug() + self.generate_obj_files() + + def generate_obj_files(self): + for tile in self.tiles: + # Read DEM data from the tile. + dem_path = tile._dem_path + base_directory = os.path.dirname(dem_path) + save_path = os.path.join(base_directory, f"{tile.code}.obj") + dem_data = cv2.imread(tile._dem_path, cv2.IMREAD_UNCHANGED) + self.plane_from_np(dem_data, save_path) + + def plane_from_np(self, dem_data: np.ndarray, save_path: str): + # We need to apply gaussian blur to the DEM data to make it smooth. + dem_data = cv2.normalize(dem_data, None, 0, 255, cv2.NORM_MINMAX, cv2.CV_8U) + + # resize to 25% of the original size + dem_data = cv2.resize(dem_data, (0, 0), fx=0.25, fy=0.25) + + dem_data = cv2.GaussianBlur(dem_data, (51, 51), sigmaX=40, sigmaY=40) + + rows, cols = dem_data.shape + x = np.linspace(0, cols - 1, cols) + y = np.linspace(0, rows - 1, rows) + x, y = np.meshgrid(x, y) + z = dem_data + + vertices = np.column_stack([x.ravel(), y.ravel(), z.ravel()]) + faces = [] + + for i in range(rows - 1): + for j in range(cols - 1): + top_left = i * cols + j + top_right = top_left + 1 + bottom_left = top_left + cols + bottom_right = bottom_left + 1 + + faces.append([top_left, bottom_left, bottom_right]) + faces.append([top_left, bottom_right, top_right]) + + faces = np.array(faces) + mesh = trimesh.Trimesh(vertices=vertices, faces=faces) + mesh.export(save_path) + + def debug(self): + # Merge all tiles into one image for debugging purposes. + # Center tile not exists, fill it with black color. + + image_height = self.map_height + DEFAULT_DISTANCE * 2 + image_width = self.map_width + DEFAULT_DISTANCE * 2 + + image = np.zeros((image_height, image_width, 3), np.uint8) + + for tile in self.tiles: + tile_image = cv2.imread(tile._dem_path) + if tile.code == "N": + x = DEFAULT_DISTANCE + y = 0 + elif tile.code == "NE": + x = image_width - DEFAULT_DISTANCE + y = 0 + elif tile.code == "E": + x = image_width - DEFAULT_DISTANCE + y = DEFAULT_DISTANCE + elif tile.code == "SE": + x = image_width - DEFAULT_DISTANCE + y = image_height - DEFAULT_DISTANCE + elif tile.code == "S": + x = DEFAULT_DISTANCE + y = image_height - DEFAULT_DISTANCE + elif tile.code == "SW": + x = 0 + y = image_height - DEFAULT_DISTANCE + elif tile.code == "W": + x = 0 + y = DEFAULT_DISTANCE + elif tile.code == "NW": + x = 0 + y = 0 + + image[y : y + tile.map_height, x : x + tile.map_width] = tile_image + + # Save image to the map directory. + cv2.imwrite(f"{self.map_directory}/background.png", image) + + # def parse_code(self, tile_code: str): + # half_width = int(self.map_width / 2) + # half_height = int(self.map_height / 2) + # offset = DEFAULT_DISTANCE / 2 + + +# Creates tiles around the map. +# The one on corners 2048x2048, on sides and in the middle map_size x 2048. +# So 2048 is a distance FROM the edge of the map, but the other size depends on the map size. +# But for corner tiles it's always 2048. + +# In the beginning we have coordinates of the central point of the map and it's size. +# We need to calculate the coordinates of central points all 8 tiles around the map. + +# Latitude is a vertical line, Longitude is a horizontal line. + +# 2048 +# | | +# ____________________|_________|___ +# | | | | +# | NW | N | NE | 2048 +# |_________|_________|_________|___ +# | | | | +# | W | C | E | +# |_________|_________|_________| +# | | | | +# | SW | S | SE | +# |_________|_________|_________| +# +# N = C map_height / 2 + 1024; N_width = map_width; N_height = 2048 +# NW = N - map_width / 2 - 1024; NW_width = 2048; NW_height = 2048 +# and so on... + +# lat, lon = 45.28565000315636, 20.237121355049904 +# dst = 1024 + +# # N +# destination = distance(meters=dst).destination((lat, lon), 0) +# lat, lon = destination.latitude, destination.longitude +# print(lat, lon) diff --git a/maps4fs/generator/dem.py b/maps4fs/generator/dem.py index 60547e61..121a0441 100644 --- a/maps4fs/generator/dem.py +++ b/maps4fs/generator/dem.py @@ -40,7 +40,10 @@ def preprocess(self) -> None: self.multiplier = self.kwargs.get("multiplier", DEFAULT_MULTIPLIER) blur_radius = self.kwargs.get("blur_radius", DEFAULT_BLUR_RADIUS) - if blur_radius % 2 == 0: + if blur_radius <= 0: + # We'll disable blur if the radius is 0 or negative. + blur_radius = 0 + elif blur_radius % 2 == 0: blur_radius += 1 self.blur_radius = blur_radius self.logger.debug( @@ -49,12 +52,12 @@ def preprocess(self) -> None: self.auto_process = self.kwargs.get("auto_process", False) - # pylint: disable=no-member - def process(self) -> None: - """Reads SRTM file, crops it to map size, normalizes and blurs it, - saves to map directory.""" - north, south, east, west = self.bbox + def get_output_resolution(self) -> tuple[int, int]: + """Get output resolution for DEM data. + Returns: + tuple[int, int]: Output resolution for DEM data. + """ dem_height = int((self.map_height / 2) * self.game.dem_multipliyer + 1) dem_width = int((self.map_width / 2) * self.game.dem_multipliyer + 1) self.logger.debug( @@ -63,7 +66,15 @@ def process(self) -> None: dem_height, dem_width, ) - dem_output_resolution = (dem_width, dem_height) + return dem_width, dem_height + + # pylint: disable=no-member + def process(self) -> None: + """Reads SRTM file, crops it to map size, normalizes and blurs it, + saves to map directory.""" + north, south, east, west = self.bbox + + dem_output_resolution = self.get_output_resolution() self.logger.debug("DEM output resolution: %s.", dem_output_resolution) tile_path = self._srtm_tile() @@ -122,13 +133,15 @@ def process(self) -> None: resampled_data.max(), ) - resampled_data = cv2.GaussianBlur( - resampled_data, (self.blur_radius, self.blur_radius), sigmaX=40, sigmaY=40 - ) - self.logger.debug( - "Gaussion blur applied to DEM data with kernel size %s.", - self.blur_radius, - ) + if self.blur_radius > 0: + resampled_data = cv2.GaussianBlur( + resampled_data, (self.blur_radius, self.blur_radius), sigmaX=40, sigmaY=40 + ) + self.logger.debug( + "Gaussion blur applied to DEM data with kernel size %s.", + self.blur_radius, + ) + self.logger.debug( "DEM data was blurred. Shape: %s, dtype: %s. Min: %s, max: %s.", resampled_data.shape, @@ -141,10 +154,20 @@ def process(self) -> None: self.logger.debug("DEM data was saved to %s.", self._dem_path) if self.game.additional_dem_name is not None: - dem_directory = os.path.dirname(self._dem_path) - additional_dem_path = os.path.join(dem_directory, self.game.additional_dem_name) - shutil.copyfile(self._dem_path, additional_dem_path) - self.logger.debug("Additional DEM data was copied to %s.", additional_dem_path) + self.make_copy(self.game.additional_dem_name) + + def make_copy(self, dem_name: str) -> None: + """Copies DEM data to additional DEM file. + + Args: + dem_name (str): Name of the additional DEM file. + """ + dem_directory = os.path.dirname(self._dem_path) + + additional_dem_path = os.path.join(dem_directory, dem_name) + + shutil.copyfile(self._dem_path, additional_dem_path) + self.logger.debug("Additional DEM data was copied to %s.", additional_dem_path) def _tile_info(self, lat: float, lon: float) -> tuple[str, str]: """Returns latitude band and tile name for SRTM tile from coordinates. @@ -292,6 +315,15 @@ def previews(self) -> list[str]: return [self.grayscale_preview(), self.colored_preview()] def _get_scaling_factor(self, maximum_deviation: int) -> float: + """Calculate scaling factor for DEM data normalization. + NOTE: Needs reconsideration for the implementation. + + Args: + maximum_deviation (int): Maximum deviation in DEM data. + + Returns: + float: Scaling factor for DEM data normalization. + """ ESTIMATED_MAXIMUM_DEVIATION = 1000 # pylint: disable=C0103 scaling_factor = maximum_deviation / ESTIMATED_MAXIMUM_DEVIATION return scaling_factor if scaling_factor < 1 else 1 diff --git a/maps4fs/generator/map.py b/maps4fs/generator/map.py index 69f6355e..b01fbe07 100644 --- a/maps4fs/generator/map.py +++ b/maps4fs/generator/map.py @@ -1,11 +1,14 @@ """This module contains Map class, which is used to generate map using all components.""" +from __future__ import annotations + import os import shutil from typing import Any from tqdm import tqdm +from maps4fs.generator.background import Background from maps4fs.generator.component import Component from maps4fs.generator.game import Game from maps4fs.logger import Logger @@ -58,6 +61,9 @@ def __init__( # pylint: disable=R0917 except Exception as e: raise RuntimeError(f"Can not unpack map template due to error: {e}") from e + self.background = Background(self) + # self.background.process() + def generate(self) -> None: """Launch map generation using all components.""" with tqdm(total=len(self.game.components), desc="Generating map...") as pbar: @@ -84,6 +90,8 @@ def generate(self) -> None: pbar.update(1) + self.background.process() + def previews(self) -> list[str]: """Get list of preview images. diff --git a/maps4fs/generator/tile.py b/maps4fs/generator/tile.py new file mode 100644 index 00000000..6ff9184d --- /dev/null +++ b/maps4fs/generator/tile.py @@ -0,0 +1,28 @@ +import os + +from maps4fs.generator.dem import DEM + + +class Tile(DEM): + def preprocess(self) -> None: + super().preprocess() + self.code = self.kwargs.get("tile_code") + if not self.code: + raise ValueError("Tile code was not provided") + + self.logger.debug(f"Generating tile {self.code}") + + tiles_directory = os.path.join(self.map_directory, "objects", "tiles") + os.makedirs(tiles_directory, exist_ok=True) + + self._dem_path = os.path.join(tiles_directory, f"{self.code}.png") + self.logger.debug(f"DEM path for tile {self.code} is {self._dem_path}") + + def get_output_resolution(self) -> tuple[int, int]: + return self.map_width, self.map_height + + def process(self) -> None: + super().process() + + def make_copy(self, *args, **kwargs) -> None: + pass diff --git a/pyproject.toml b/pyproject.toml index 590ee238..265ed4dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,8 @@ dependencies = [ "rasterio", "tqdm", "folium", + "geopy", + "trimesh", ] [project.urls] diff --git a/requirements.txt b/requirements.txt index 2a79d532..ba0963fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ rasterio tqdm streamlit folium +geopy +trimesh maps4fs \ No newline at end of file