Source code for reproject_geometry.reproject

from typing import Any, Dict, List, Optional, Tuple

import numpy as np
from rasterio import warp
from rasterio.crs import CRS
from shapely.geometry import MultiPolygon, Polygon, mapping, shape

DEFAULT_PRECISION = 3


[docs]def reproject_geometry( geojson: Dict[str, Any], src_crs: str, dst_crs: str, dst_tolerance: Optional[float] = None, precision: int = DEFAULT_PRECISION, ) -> Dict[str, Any]: """Reprojects a GeoJSON-like geometry from a source to destination CRS with the option to limit reprojection error to approximately ``dst_tolerance``. If ``dst_tolerance`` is specified, additional vertices are inserted into the geometry polygon(s) prior to reprojection to capture projection distortion. The projected geometries are then simplified by removing vertices until the simplificaton imparts an error that exceeds ``dst_tolerance``. The supplied ``dst_tolerance`` must be in the linear unit (e.g., meters, feet, degrees) of the destination CRS. Args: geojson (Dict[str, Any]): A GeoJSON-like dictionary containing a polygon or multipolygon to be reprojected. src_crs (str): Source CRS string, e.g., an EPSG code or WKT2. dst_crs (str): Destination CRS string. dst_tolerance (float, optional): Maximum acceptable "error" of the reprojected Polygon(s) in the destination CRS. Must be in the linear unit of the destination CRS. precision (int, optional): The number of decimal places to include in the coordinates for the reprojected geometry. Defaults to 3 decimal places. Returns: Dict[str, Any]: A GeoJSON-like dictionary containg the reprojected Polygon(s). """ geometry = shape(geojson) if isinstance(geometry, Polygon): multipolygon = False elif isinstance(geometry, MultiPolygon): multipolygon = True else: raise ValueError( f"Can only reproject Polygons or MultiPolygons, geometry={geometry}" ) if multipolygon: reprojected_geometry = reproject_multipolygon( geometry, src_crs, dst_crs, dst_tolerance, precision ) else: reprojected_geometry = reproject_polygon( geometry, src_crs, dst_crs, dst_tolerance, precision ) result: Dict[str, Any] = mapping(reprojected_geometry) return result
[docs]def reproject_multipolygon( multipolygon: MultiPolygon, src_crs: str, dst_crs: str, dst_tolerance: Optional[float] = None, precision: int = DEFAULT_PRECISION, ) -> MultiPolygon: """Reprojects each polygon in a multipolygon from a source to destination CRS with the option to limit reprojection error to approximately ``dst_tolerance``. If ``dst_tolerance`` is specified, additional vertices are inserted into the geometry polygon(s) prior to reprojection to capture projection distortion. The projected geometries are then simplified by removing vertices until the simplificaton imparts an error that exceeds ``dst_tolerance``. The supplied ``dst_tolerance`` must be in the linear unit (e.g., meters, feet, degrees) of the destination CRS. Args: multipolygon (MultiPolygon): The multipolygon to be reprojected. src_crs (str): Source CRS string, e.g., an EPSG code or WKT2. dst_crs (str): Destination CRS string. dst_tolerance (float, optional): Maximum acceptable "error" of the reprojected Polygon(s) in the destination CRS. Must be in the linear unit of the destination CRS. precision (int, optional): The number of decimal places to include in the coordinates for the reprojected geometry. Defaults to 3 decimal places. Returns: MultiPolygon: The reprojected multipolygon. """ polygons = [] for polygon in multipolygon.geoms: polygons.extend( reproject_polygon(polygon, src_crs, dst_crs, dst_tolerance, precision) ) return MultiPolygon(polygons)
[docs]def reproject_polygon( polygon: Polygon, src_crs: str, dst_crs: str, dst_tolerance: Optional[float] = None, precision: int = DEFAULT_PRECISION, ) -> Polygon: """Reprojects a polygon from a source to destination CRS with the option to limit reprojection error to approximately ``dst_tolerance``. If ``dst_tolerance`` is specified, additional vertices are inserted into the geometry polygon(s) prior to reprojection to capture projection distortion. The projected geometries are then simplified by removing vertices until the simplificaton imparts an error that exceeds ``dst_tolerance``. The supplied ``dst_tolerance`` must be in the linear unit (e.g., meters, feet, degrees) of the destination CRS. Args: polygon (Polygon): The polygon to be reprojected. src_crs (str): Source CRS string, e.g., an EPSG code or WKT2. dst_crs (str): Destination CRS string. dst_tolerance (float, optional): Maximum acceptable "error" of the reprojected Polygon(s) in the destination CRS. Must be in the linear unit of the destination CRS. precision (int, optional): The number of decimal places to include in the coordinates for the reprojected geometry. Defaults to 3 decimal places. Returns: Polygon: The reprojected polygon. """ if dst_tolerance is not None: src_bbox = polygon.bounds src_tolerance = _src_tol(src_crs, src_bbox, dst_crs, dst_tolerance) polygon = Polygon(_densify_by_distance(polygon.exterior.coords, src_tolerance)) polygon = shape(warp.transform_geom(src_crs, dst_crs, polygon, precision=precision)) if dst_tolerance is not None: return polygon.simplify(dst_tolerance).simplify(0) else: return polygon.simplify(0)
def _src_tol( src_crs: str, src_bbox: List[float], dst_crs: str, dst_tol: float ) -> float: """Converts a tolerance (a distance) from a source CRS linear unit to a destination CRS linear unit. Longitudinal ground distances vary with latitude, which means the conversion of the tolerance (distance) between destination and source units also varies with latitude. A mid-latitude value of the bounding box is therefore used to 'average' this discrepancy when converting between geographic and projected units. Since this is not an exact computation, a spherical approximation is used for the shape of the Earth for simplicity. Args: src_crs (str): CRS of the source geometry. src_bbox (List[float]): Bounding box of the geometry in the source CRS. dst_crs (str): CRS of the destination geometry. dst_tolerance (float, optional): Maximum acceptable "error" of the reprojected Polygon(s) in the destination CRS. Must be in the linear unit of the destination CRS. Returns: float: The destination tolerance (distance) in the source CRS linear units. """ d_crs = CRS.from_string(dst_crs) s_crs = CRS.from_string(src_crs) src_tol: float if s_crs.is_geographic and d_crs.is_geographic: src_tol = dst_tol elif s_crs.is_projected and d_crs.is_projected: src_tol = ( d_crs.linear_units_factor[1] / s_crs.linear_units_factor[1] ) * dst_tol elif s_crs.is_projected and d_crs.is_geographic: dst_bbox = warp.transform_bounds(src_crs, dst_crs, *src_bbox) mid_latitude = (dst_bbox[1] + dst_bbox[3]) / 2 meters_per_degree = 111320 * np.cos(np.deg2rad(mid_latitude)) meters_per_src_unit = s_crs.linear_units_factor[1] src_units_per_degree = meters_per_degree / meters_per_src_unit tol_src_units = src_units_per_degree * dst_tol src_tol = tol_src_units elif s_crs.is_geographic and d_crs.is_projected: mid_latitude = (src_bbox[1] + src_bbox[3]) / 2 meters_per_degree = 111320 * np.cos(np.deg2rad(mid_latitude)) meters_per_dst_unit = d_crs.linear_units_factor[1] tol_meters = meters_per_dst_unit * dst_tol tol_degree = tol_meters / meters_per_degree src_tol = tol_degree return src_tol def _densify_by_distance( point_list: List[Tuple[float, float]], densification_length: float ) -> List[Tuple[float, float]]: """Densifies the number of points in a list of points by inserting points at densification_length intervals along the polygon formed by the points. Args: point_list (List[Tuple[float, float]]): The list of points to be densified. densification_length (int): The interval at which to insert additional points. Returns: List[Tuple[float, float]]: The list of densified points. """ points: Any = np.asarray(point_list) dxdy = points[1:, :] - points[:-1, :] segment_lengths = np.sqrt(np.sum(np.square(dxdy), axis=1)) total_length = np.sum(segment_lengths) cum_segment_lengths = np.cumsum(segment_lengths) cum_segment_lengths = np.insert(cum_segment_lengths, 0, [0]) cum_interp_lengths = np.arange(0, total_length, densification_length) cum_interp_lengths = np.append(cum_interp_lengths, [total_length]) interp_x = np.interp(cum_interp_lengths, cum_segment_lengths, points[:, 0]) interp_y = np.interp(cum_interp_lengths, cum_segment_lengths, points[:, 1]) return [(x, y) for x, y in zip(interp_x, interp_y)]