import os
import math
import numpy as np
from pandas import DataFrame
from geopandas import GeoDataFrame
from shapely.geometry.polygon import Polygon
from .constant import COMPRESSED_SUB_PATH, ORIGNAL_SUB_PATH
[docs]def file_content_2_data_frame(file_content: str) -> DataFrame:
"""
Create a ``DataFrame`` with the ``file_content``
Args:
file_content (str): content of file
Returns:
DataFrame: Content of file as DataFrame
"""
data_frame = DataFrame([[int(x), int(y), float(height)] for x, y, height in [row.split(" ") for row in file_content]])
data_frame.columns = ["x", "y", "height"]
return data_frame
[docs]def data_frame_2_file_content(data_frame: DataFrame) -> list:
"""
Converts the given ``data_frame`` into a list of lines as file content
Args:
data_frame: to be serialized
Returns:
list: list of lines for serialization as file
"""
if isinstance(data_frame, GeoDataFrame):
return [f"{int(x)} {int(y)} {round(float(height), 2)}" for x, y, height, _ in data_frame.values]
elif isinstance(data_frame, DataFrame):
return [f"{int(x)} {int(y)} {round(float(height), 2)}" for x, y, height in data_frame.values]
[docs]def download_2_file_content(download_content: str) -> list:
"""
Converts the ``download_content`` into proper format.
Args:
download_content (str): content of download
Returns:
list: list of lines for serialization as file
"""
return [f"{int(x)} {int(y)} {round(float(height), 2)}" for x, y, height in [row.split(" ") for row in download_content.splitlines()]]
[docs]def compress_data_frame(data_frame: DataFrame, tile_size: int) -> DataFrame:
"""
Compress the given ``DataFrame``. Uses windows of size ':math:`tile\ size \\times tile\ size`' and calculates their mean.
Use convolution algorithm to compress the neighboring tiles. TODO: see documentation to get an intuition about the convolution implementation.
Args:
data_frame (DataFrame): Original DataFrame
tile_size (int): window size
Returns:
DataFrame: The compressed pandas.DataFrame
"""
columns = ["x", "y", "height"]
data_frame.columns = columns
data_frame = data_frame.sort_values(["y", "x"])
y_min = data_frame["y"].min()
y_max = data_frame["y"].max()
x_min = data_frame["x"].min()
x_max = data_frame["x"].max()
# original x and y coordinates to restore them
original_x_y = [(x, y) for y in range(y_min, y_max, tile_size) for x in range(x_min, x_max, tile_size)]
original_size = int(math.sqrt(data_frame.shape[0]))
new_size = int(original_size / tile_size)
df_as_matrix = np.array([data_frame[i:i+original_size]["height"] for i in range(0, data_frame.shape[0], original_size)])
convolution_matrix = np.zeros((new_size**2, tile_size**2))
mean_kernel_vector = np.full((tile_size**2, 1), 1/(tile_size**2)) # convolution kernel as vector to mean cells in window
# reshape windows into rows of the ``convolution_matrix``
row_number = 0
for y in range(new_size):
for x in range(new_size):
convolution_matrix[row_number, :] = df_as_matrix[y * tile_size:y * tile_size + tile_size, x * tile_size:x * tile_size + tile_size].flatten()
row_number += 1
# calculate the actual convolution as dot product
convolution_result = convolution_matrix.dot(mean_kernel_vector)
# create compressed ``DataFrame``
compressed_data_frame = DataFrame([[x_y[0], x_y[1], height] for x_y, height in zip(original_x_y, convolution_result[:,0])], dtype=np.float32)
compressed_data_frame.columns = columns
return compressed_data_frame
[docs]def create_directories(download_path: str, keep_original: bool, compress: int, file_formats: tuple) -> (str, str):
"""
Simple helper function that creates all necessary directories.
Args:
download_path (str): download path
keep_original (bool): indicates whether the original directory is necessary or not.
compress (int): compression rate
file_formats (tuple): indicates the file types to save.
Returns:
str, str: path for original files, path for compressed files
"""
original_path = os.path.join(download_path, ORIGNAL_SUB_PATH)
compressed_path = os.path.join(download_path, COMPRESSED_SUB_PATH)
if not os.path.exists(original_path) and keep_original:
os.mkdir(original_path)
if not os.path.exists(compressed_path) and compress > 0:
os.mkdir(compressed_path)
for file_format in file_formats:
original_format_path = os.path.join(original_path, file_format.lower())
compressed_format_directory = os.path.join(compressed_path, file_format.lower())
if keep_original:
if not os.path.exists(original_format_path):
os.mkdir(original_format_path)
if not os.path.exists(compressed_format_directory):
os.mkdir(compressed_format_directory)
return original_path, compressed_path
[docs]def data_frame_2_geo_data_frame(data_frame: DataFrame) -> GeoDataFrame:
"""
Creates a ``GeoDataFrame`` from the given ``DataFrame``
Args:
data_frame (DataFrame): ``DataFrame`` that is used to create a ``GeoDataFrame`` with geometric information.
Returns:
GeoDataFrame: Created ``GeoDataFrame``.
"""
data_frame_sorted = data_frame["x"].sort_values()
tile_size = data_frame_sorted[1] - data_frame_sorted[0]
geo_data_frame = GeoDataFrame([[x, y, height, Polygon(((x, y), (x + tile_size, y), (x + tile_size, y + tile_size), (x, y + tile_size), (x, y)))] for x, y, height in data_frame.values], crs={"init": "epsg:25833"}) # Coordinate System Source: https://www.stadtentwicklung.berlin.de/geoinformation/landesvermessung/atkis/de/dgm.shtml
geo_data_frame.columns = ["x", "y", "height", "geometry"]
return geo_data_frame