Source code for geonetworkx.geometry_operations

# -*- coding: utf-8 -*-
import numpy as np
import networkx as nx
import geopandas as gpd
from shapely.geometry import Point, MultiPoint, LineString, MultiLineString
from scipy.spatial import cKDTree
from collections import defaultdict
from typing import Union, Iterable
import pyproj
import geonetworkx.settings as settings
import geonetworkx as gnx


PointCoordinatesLike = Iterable[float]
PointsCoordinatesLike = Union[Iterable[PointCoordinatesLike], MultiPoint]


[docs]class Extremity: """Represents an extremity of a line. It's useful to parse and deal with lines given as input.""" nb_extremity = 0 def __init__(self, shape_id, position, coords): self.id = Extremity.nb_extremity Extremity.nb_extremity += 1 self.shape_id = shape_id self.position = position self.coords = coords self.related_extremity = None self.close_extremities = [] self.closest_station = None self.is_merged = False self.is_added_as_shape = False self.matching_items = dict()
[docs]def merge_two_shape(e1: Extremity, e2: Extremity, line1: LineString, line2: LineString) -> LineString: """Merge two lines (line1 and line2) with the given extremities (e1 and e2).""" if e1.position == -1 and e2.position == 0: new_line = LineString(list(line1.coords) + list(line2.coords)) elif e1.position == 0 and e2.position == 0: new_line = LineString(list(reversed(line1.coords)) + list(line2.coords)) elif e1.position == 0 and e2.position == -1: new_line = LineString(list(reversed(line1.coords)) + list(reversed(line2.coords))) elif e1.position == -1 and e2.position == -1: new_line = LineString(list(line1.coords) + list(reversed(line2.coords))) else: assert False, 'Invalid position attribute' return new_line
[docs]def merge_two_lines_with_closest_extremities(first_line: LineString, second_line: LineString) -> LineString: """Merge two lines with their closest extremities. Euclidian distance is used here.""" combinations = [(0, 0), (-1, 0), (0, -1), (-1, -1)] extremities = np.array([[first_line.coords[i1], second_line.coords[i2]] for (i1, i2) in combinations]) distances = np.linalg.norm(extremities[:, 0, :] - extremities[:, 1, :], axis=1) merge_couple_index = int(np.argmin(distances)) merge_couple = combinations[merge_couple_index] if merge_couple == (-1, 0): merged_line = LineString(list(first_line.coords) + list(second_line.coords)) elif merge_couple == (0, 0): merged_line = LineString(list(reversed(first_line.coords)) + list(second_line.coords)) elif merge_couple == (0, -1): merged_line = LineString(list(reversed(first_line.coords)) + list(reversed(second_line.coords))) else: # merge_couple == (-1, -1) merged_line = LineString(list(first_line.coords) + list(reversed(second_line.coords))) return merged_line
[docs]def get_shape_extremities(shape: LineString, shape_id: int): """Return the extremities of a shape in the network_shapes_gdf.""" assert isinstance(shape, LineString), "Shape must be a LineString" first_vertex = shape.coords[0] last_vertex = shape.coords[-1] e1 = Extremity(shape_id, 0, first_vertex) e2 = Extremity(shape_id, -1, last_vertex) e1.related_extremity = e2 e2.related_extremity = e1 return e1, e2
[docs]def convert_multilinestring_to_linestring(gdf: gpd.GeoDataFrame) -> int: """Convert all geometry attribute being a 'MultiLineString' to a 'LineString'. The created line is a merge of all sublines. Parameters ---------- gdf : gpd.GeoDataFrame A GeoDataFrame with a 'geometry' column to modify Returns ------- int The number of converted 'MultiLineString' Raises ------ RuntimeError If an input shape is not a LineString or a MultiLineString """ nb_converted_multilinestring = 0 for s in gdf.index: if isinstance(gdf.at[s, 'geometry'], LineString): continue elif isinstance(gdf.at[s, 'geometry'], MultiLineString): all_lines = gdf.at[s, 'geometry'].geoms new_line = all_lines[0] for i in range(1, len(all_lines)): new_line = merge_two_lines_with_closest_extremities(new_line, all_lines[i]) gdf.at[s, 'geometry'] = new_line nb_converted_multilinestring += 1 else: raise RuntimeError("Unknown shape type for shape input at index : ", s) return nb_converted_multilinestring
[docs]def discretize_line(line: LineString, discretization_tol) -> list: """Takes a shapely LineString and discretize it into a list of shapely Points. Each point is at most at the discretization tolerance distance of the following point. Parameters ---------- line : LineString Line to discretize discretization_tol : float Maximum distance between two points on the line. Returns ------- list An ordered list of shapely Point See Also -------- discretize_lines """ if discretization_tol <= 0.0: raise ValueError("Discretization tolerance must be strictly positive.") points_list = [] current_dist = discretization_tol line_length = line.length points_list.append(Point(list(line.coords)[0])) while current_dist < line_length: points_list.append(line.interpolate(current_dist)) current_dist += discretization_tol points_list.append(Point(list(line.coords)[-1])) return points_list
[docs]def discretize_lines(lines: Iterable[LineString], discretization_tol): """Discretize some line into points. Parameters ---------- lines: Iterable[LineString] : Lines to discretize discretization_tol : float Maximum distance between two points on the line. Returns ------- MultiPoint and defaultdict Return all the discretized points as a shapely MultiPoint and a dictionary to map the discretized points for each line. See Also -------- discretize_line """ points_line_association = defaultdict(list) all_points = [] nb_points = 0 for line_index, line in enumerate(lines): discretized_points = discretize_line(line, discretization_tol) nb_discretized_points = len(discretized_points) all_points.extend(discretized_points) points_line_association[line_index] = list(range(nb_points, nb_points + nb_discretized_points)) nb_points += nb_discretized_points all_points_as_mp = MultiPoint(all_points) return all_points_as_mp, points_line_association
[docs]def get_closest_point_from_points(points_from: PointsCoordinatesLike, points_to: list = None, kd_tree: cKDTree = None): """Compute the closest point among the ``points_from`` list for each point in the ``points_to`` list. Parameters ---------- points_from : PointsCoordinatesLike Iterable of points coordinates points_to : list or None Iterable of points coordinates (Default value = None) kd_tree : cKDTree a constructed kd tree representing ``points_from`` (Default value = None) Returns ------- array of floats distances ndarray of ints indexes """ if points_to is None and kd_tree is None: raise ValueError("Must provide at least argument 'points_to' or 'kd_tree'") if kd_tree is None: kd_tree = cKDTree(points_to) return kd_tree.query(points_from)
[docs]def get_closest_point_from_line(line_from: LineString, discretization_tol: float, points_to: list = None, kd_tree: cKDTree = None): """Return the closest point from a given line and its distance. Parameters ---------- line_from : LineString A shapely LineString (Default value = None) discretization_tol : float Maximum distance between two discretized points on the line. points_to : list A list of points among which the closest to the line has to be found (optional is ``kdtree`` is given) kd_tree : cKDTree A kd-tree representing the points among which the closest to the line has to be found (optional if ``points_to`` is given) (Default value = None) Returns ------- float closest distance int index of the closest point """ if points_to is None and kd_tree is None: raise ValueError("Must provide at least argument 'points_to' or 'kd_tree'") if kd_tree is None: kd_tree = cKDTree(points_to) discretized_points = discretize_line(line_from, discretization_tol) distances, closest_points_indexes = kd_tree.query(MultiPoint(discretized_points)) smallest_distance_index = np.argmin(distances) return distances[smallest_distance_index], closest_points_indexes[smallest_distance_index]
[docs]def get_closest_point_from_multi_shape(multi_shape, points_to=None, kd_tree=None): """Computes the closest point to the multi shape (i.e. the point that has the smallest projection distance on the entire multi shape object. Parameters ---------- multi_shape : MultiPoint or MultiLineString The multi shape object can be any shapely object among: MultiPoint, MultiLineString points_to : list A list of points among which to find the closest to the multi shape (Default value = None) kd_tree : cKDTree A kdtree representing the points among which the closest to the multishape has to be found (optional if 'points_to' is given) (Default value = None) Returns ------- float distance int index of the closest point """ if isinstance(multi_shape, MultiPoint): distances_and_points_indexes = get_closest_point_from_points(multi_shape, points_to, kd_tree) elif isinstance(multi_shape, MultiLineString): distances_and_points_indexes = [] for line in multi_shape: distances_and_points_indexes.append(get_closest_point_from_line(line, points_to, kd_tree)) else: raise TypeError("Method not implemented for shape of type '%s', expected MultiPoint" "or MultiLineString" % str(type(multi_shape))) smallest_distance_index = np.argmin(d_ix[0] for d_ix in distances_and_points_indexes) return distances_and_points_indexes[smallest_distance_index]
[docs]def get_closest_point_from_shape(shape: Union[Point, LineString, MultiPoint, MultiLineString], points_to: Union[MultiPoint, np.ndarray, list] = None, kd_tree: cKDTree = None): """Compute the closest point to the given shape. Parameters ---------- shape : Point, MultiPoint, LineString or MultiLineString Any shapely shape points_to : list A list of points among which to find the closest to the multi shape (Default value = None) kd_tree : cKDTree A kdtree representing the points among which the closest to the shape has to be found (optional if 'points_to' is given) (Default value = None) Returns ------- float distance int index of the closest point """ if isinstance(shape, Point): result = get_closest_point_from_points(MultiPoint([shape]), points_to, kd_tree=kd_tree) return result[0][0], result[1][0] elif isinstance(shape, LineString): return get_closest_point_from_line(shape, points_to, kd_tree=kd_tree) elif isinstance(shape, (MultiPoint, MultiLineString)): return get_closest_point_from_multi_shape(shape, points_to, kd_tree=kd_tree) else: raise TypeError("Method not implemented for shape of type '%s', expected Point, " "LineString or MultiLineString" % str(type(shape)))
[docs]def get_closest_point_from_shapes(shapes_from, points_to): """Compute the closest point for each given shape. Parameters ---------- shapes_from : An iterable of shapes (Point, MultiPoint, LineString, MultiLineString) points_to : list A list of points among which to find the closest to the multi shape Returns ------- float distance int index of the closest point """ results = [] kd_tree = cKDTree(points_to) for shape in shapes_from: distance, closest_points_index = get_closest_point_from_shape(shape, kd_tree=kd_tree) results.append((distance, closest_points_index)) return results
[docs]def get_closest_line_from_point(point_from: PointCoordinatesLike, lines_to=None, discretization_tol=None, kd_tree=None, points_line_association=None): """Find the closest line from a given point. Parameters ---------- point_from : PointCoordinatesLike Point coordinate to find the closest line. lines_to : list Group of lines among which the closest has to be found (optional if ``kdtree`` and ``points_line_association`` are given). (Default value = None) discretization_tol: float Maximum distance between discretized points (optional if ``kdtree`` and ``points_line_association`` are given). (Default value = None) kd_tree : cKDTree An optional pre-computed kd_tree of discretized lines. (Default value = None) points_line_association : dict An optional pre-computed dictionary matching lines and discretized points. (Default value = None) Returns ------- float distance int index of the closest line """ if lines_to is None and kd_tree is None: raise ValueError("Must provide at least argument 'points_to' or 'kd_tree'") elif kd_tree is None and points_line_association is None: raise ValueError("If a kd-tree is given, a point line association dictionary must provided") if kd_tree is None: if discretization_tol is None: raise ValueError("If no kd-tree is given, a discretization tolerance must be provided.") points_to, points_line_association = discretize_lines(lines_to, discretization_tol) kd_tree = cKDTree(points_to) distance, closest_point_index = get_closest_point_from_points([point_from], kd_tree=kd_tree) for line_index in points_line_association: if closest_point_index in points_line_association[line_index]: return distance, line_index assert False
[docs]def get_closest_line_from_points(points_from, lines_to, discretization_tol): """Find the closest line for each given points. Parameters ---------- points_from : list Points coordinates. lines_to : list Group of lines among which the closest has to be found. discretization_tol: float Maximum distance between discretized points Returns ------- list A list of closest lines indexes. """ points_to, points_line_association = discretize_lines(lines_to, discretization_tol) kd_tree = cKDTree(points_to) lines_indexes = [] for point in points_from: result = get_closest_line_from_point(point, kd_tree=kd_tree, points_line_association=points_line_association) lines_indexes.append(result[1]) return lines_indexes
[docs]def get_polygons_neighborhood(polygons): """Returns for each polygon a set of intersecting polygons.""" nb_polygons = len(polygons) neighborhoods = [set() for c in range(nb_polygons)] for ix1, p1 in enumerate(polygons): for ix2 in range(ix1 + 1, nb_polygons): p2 = polygons[ix2] if p1.intersects(p2): neighborhoods[ix1].add(ix2) neighborhoods[ix2].add(ix1) return neighborhoods
[docs]def split_line(line, distance): """Cuts a line in two at a distance from its starting point.""" coords = list(line.coords) if distance <= 0.0: return [LineString([coords[0]]*2), LineString(line)] if distance >= line.length: return [LineString(line), LineString([coords[-1]]*2)] for i, p in enumerate(coords): pd = line.project(Point(p)) if pd == distance: return [LineString(coords[:i + 1]), LineString(coords[i:])] if pd > distance: cp = line.interpolate(distance) return [LineString(coords[:i] + [(cp.x, cp.y)]), LineString([(cp.x, cp.y)] + coords[i:])]
[docs]def coordinates_almost_equal(c1: Iterable, c2: Iterable, tolerance=1e-8) -> bool: """Returns true if the two given list of coordinates equals within a given tolerance. Parameters ---------- c1: Iterable First point coordinates c2: Iterable Second point coordinates tolerance : float Tolerance comparison (Default value = 1e-8) Returns ------- bool True if the coordinates almost equal, false otherwise. """ for i, j in zip(c1, c2): if abs(i - j) > tolerance: return False return True
[docs]def almost_equally_located(p1: Point, p2: Point, tolerance=1e-8) -> bool: """Test if two point are loacated at the same place within a tolerance. Parameters ---------- p1: Point First point to compare p2: Point Second point to compare tolerance : Comparison tolerance (Default value = 1e-8) Returns ------- bool True if the two points have the same coordinates. """ return coordinates_almost_equal([p1.x, p1.y], [p2.x, p2.y], tolerance)
[docs]def insert_point_in_line(line: LineString, point_coords: list, position: int) -> LineString: """Insert a new point in a line given its coordinates.""" new_line_coordinates = line.coords[0:position] new_line_coordinates.append((point_coords[0], point_coords[1])) new_line_coordinates.extend(line.coords[position:]) return LineString(new_line_coordinates)
[docs]def get_default_discretization_tolerance(crs): """Return a discretization tolerance with the right order of magnitude for the given crs. Examples -------- >>> import geonetworkx as gnx >>> print(gnx.get_default_discretization_tolerance("epsg:3857")) 3.0 """ if not gnx.is_null_crs(crs): pp_crs = pyproj.CRS(crs) if gnx.crs_equals(pp_crs, gnx.WGS84_CRS): return 1e-4 # in degree elif pp_crs.axis_info is not None and len(pp_crs.axis_info) > 0: first_axis = pp_crs.axis_info[0] if first_axis.unit_name == "metre": return 3.0 # in meters raise ValueError("Impossible to provide a valid discretization tolerance" "for the given crs. A custom one can be set.")