Source code for geonetworkx.simplify

# -*- coding: utf-8 -*-
import networkx as nx
from shapely.geometry import Polygon, MultiPolygon, LineString
from geonetworkx import GeoGraph, GeoDiGraph
from geonetworkx.utils import is_nan
from geonetworkx.geometry_operations import merge_two_lines_with_closest_extremities
from typing import Union
from networkx.classes.filters import no_filter


[docs]def remove_isolates(graph: nx.Graph) -> int: """Removes all isolates nodes in the given graph. Parameters ---------- graph : nx.Graph A graph on which to remove all isolates Returns ------- int Number of removed isolates """ isolates = list(nx.isolates(graph)) graph.remove_nodes_from(isolates) return len(isolates)
[docs]def remove_self_loop_edges(graph: nx.Graph) -> int: """Remove self loop edges on nodes of the given graph. Parameters ---------- graph : nx.Graph A graph on which to remove all self loops. Returns ------- int The number of removed self loops """ self_loops_edges = list(nx.selfloop_edges(graph)) graph.remove_edges_from(self_loops_edges) return len(self_loops_edges)
[docs]def remove_small_connected_components(graph: nx.Graph, minimum_allowed_size: int) -> int: """Remove all connected components having strictly less than ``minimum_allowed_size``. Parameters ---------- graph : nx.Graph The graph on which to remove connected components minimum_allowed_size : int The minimum number of nodes where a connected component is kept. Returns ------- int The number of removed connected components """ connected_components = list(nx.connected_components(graph)) nb_removed_cc = 0 for c_ix, cc in enumerate(connected_components): if len(cc) < minimum_allowed_size: graph.remove_nodes_from(cc) nb_removed_cc += 1 else: for n in cc: graph.nodes[n]['cc'] = c_ix return nb_removed_cc
[docs]def trim_graph_with_polygon(graph: GeoGraph, polygon: Union[Polygon, MultiPolygon], as_view=False, method="intersects"): """Trim a graph with a given polygon. Keep only the nodes that intersect (or are within) the polygon. Parameters ---------- graph : GeoGraph, GeoDiGraph, GeoMultiGraph or GeoMultiDiGraph A GeoGraph (or subclass) polygon : Polygon or MultiPolygon A ``shapely.Polygon`` describing the area to keep as_view : bool If ``True``, a view of the given graph is returned method : str If set to ``"intersects"``, the ``shapely.intersects`` is used (keeps nodes and edges that intersects the polygon). If set to ``"within"``, the ``shapely.within`` is used (keep nodes and edges that are strictly into the polygon). (Default value = "intersects") Returns ------- None or GeoGraph The modified graph if ``as_view`` is ``True``. """ if method not in ["intersects", "within"]: raise ValueError("Unknown method for trimming : '%s'" % str(method)) nodes_series = graph.get_nodes_as_point_series() if method == 'intersects': nodes_criteria = nodes_series.intersects(polygon) else: nodes_criteria = nodes_series.within(polygon) if as_view: return graph.subgraph(nodes_series[nodes_criteria].index) else: nodes_to_remove = nodes_series[~ nodes_criteria].index graph.remove_nodes_from(nodes_to_remove)
[docs]def remove_nan_attributes(graph: nx.Graph, remove_nan=True, remove_none=True, copy=False): """Remove the `nan` and `None` values from nodes and edges attributes. Parameters ---------- graph : nx.Graph Graph (or subclass) remove_nan : If true, remove the `nan` values (test is ``val is np.nan``) (Default value = True) remove_none : If true, remove the ``None`` values (test is ``val is None``) (Default value = True) copy : If True, a copy of the graph is returned, otherwise the graph is modified inplace. (Default value = False) Returns ------- None or nx.Graph The modified graph if ``copy`` is true. """ if copy: used_graph = graph.copy() else: used_graph = graph def trim_data(d): keys_to_remove = set() for k, val in d.items(): if remove_none and val is None: keys_to_remove.add(k) if remove_nan and is_nan(val): keys_to_remove.add(k) for k in keys_to_remove: del d[k] for n, data in used_graph.nodes(data=True): trim_data(data) for u, v, data in used_graph.edges(data=True): trim_data(data) if copy: return used_graph
[docs]def get_dead_ends(graph: nx.Graph, node_filter=no_filter, only_strict=False) -> list: """Return the list of dead end in the given graph. A dead end is defined as a node having only one neighbor. For directed graphs, a strict dead end is a node having a unique predecessor and no successors. A weak dead end is a node having a unique predecessor that is also its unique successor. Parameters ---------- graph : nx.Graph Graph to parse. node_filter : Evaluates to true if a node can be considered as dead end, false otherwise. (Default value = no_filter) only_strict : If true, remove only strict dead ends. Used only for directed graphs. (Default value = False) Returns ------- list List of node name that are dead ends. """ if graph.is_directed(): dead_ends = [] for n in graph.nodes(): if not node_filter(n): continue nb_predecessors = len(graph.pred[n]) if nb_predecessors != 1: continue nb_successors = len(graph.succ[n]) if nb_successors == 0: dead_ends.append(n) elif not only_strict and nb_successors == 1: pred = next(iter(graph.pred[n])) if pred in graph.successors(n): dead_ends.append(n) return dead_ends else: return [n for n in graph.nodes() if node_filter(n) and len(graph.neighbors(n)) == 1]
[docs]def remove_dead_ends(graph: nx.Graph, node_filter=no_filter, only_strict=False): """Remove dead ends from a given graph. A dead end is defined as a node having only one neighbor. For directed graphs, a strict dead end is a node having a unique predecessor and no successors. A weak dead end is a node having a unique predecessor that is also its unique successor. Parameters ---------- graph : nx.Graph Graph to simplify node_filter : Evaluates to true if a node can be removed, false otherwise. (Default value = no_filter) only_strict : If true, remove only strict dead ends. Used only for directed graphs. (Default value = False) """ nodes_to_remove = get_dead_ends(graph, node_filter, only_strict) while nodes_to_remove: graph.remove_nodes_from(nodes_to_remove) nodes_to_remove = get_dead_ends(graph, node_filter, only_strict)
[docs]def _clean_merge_mapping(edge_mapping: dict, new_edge: tuple, old_edges: list, directed: bool): """For the two-degree node merge operation, it cleans the new-old edges mapping dictionary by reporting original edges to the newest edge. It makes sure that all edges in the mapping dictionary dict are in the resulting graph. """ for e in old_edges: old_edge = None if e in edge_mapping.keys(): old_edge = e elif not directed: reversed_edge = (e[1], e[0], *e[2:]) if reversed_edge in edge_mapping.keys(): old_edge = reversed_edge if old_edge is not None: if e in edge_mapping[new_edge]: edge_mapping[new_edge].remove(e) edge_mapping[new_edge].extend(edge_mapping[old_edge]) del edge_mapping[old_edge]
[docs]def two_degree_node_merge_for_directed_graphs(graph: GeoDiGraph, node_filter=no_filter): """Merge edges that connects two nodes with a unique third node. A potential node to merge `n` must have exactly two different neighbors `u` and `v` with one of the following set of edges: * `(u, n)` and `(n, v)` * `(u, n)`, `(n, u)`, `(n, v)` and `(v, n)` For the first case, a merging edge `(u, v)` is added. Under the latter, two edges `(u, v)` and `(v, u)` are added. The added edges will have a geometry corresponding to concatenation of the two replaced edges. If a replaced edge doesn't have a geometry, the added edge will not have a geometry as well. Edges geometries must be well ordered (first node must match with line's first extremity), otherwise lines concatenation may not be consistent (see ``order_well_lines``). Parameters ---------- graph : GeoDiGraph or GeoMultiDiGraph Given graph to modify node_filter : Evaluates to true if a given node can be merged. (Default value = no_filter) Returns ------- merged_edges: dict Dictionary indicating for each new edge the merged ones. """ def _get_merging_line(g: GeoDiGraph, e1: tuple, e2: tuple) -> Union[LineString, None]: first_edge_geometry = g.edges[e1].get(g.edges_geometry_key, None) second_edge_geometry = g.edges[e2].get(g.edges_geometry_key, None) if first_edge_geometry is not None and second_edge_geometry is not None: return LineString(list(first_edge_geometry.coords) + list(second_edge_geometry.coords)) else: return None merged_edges = dict() nodes = list(graph.nodes) for n in nodes: if not node_filter(n): continue in_degree = graph.in_degree(n) out_degree = graph.out_degree(n) merging_edges = [] if in_degree == out_degree == 1: predecessor = next(iter(graph.pred[n])) successor = next(iter(graph.succ[n])) if predecessor == successor: continue if graph.is_multigraph(): new_edge = (predecessor, successor, graph.new_edge_key(predecessor, successor)) edges = [(predecessor, n, next(iter(graph.adj[predecessor][n]))), (n, successor, next(iter(graph.adj[n][successor])))] else: new_edge = (predecessor, successor) edges = [(predecessor, n), (n, successor)] merged_line = _get_merging_line(graph, edges[0], edges[1]) merging_edges = [(new_edge, merged_line)] merged_edges[new_edge] = edges.copy() # copy for non destructive delete in clean merger function _clean_merge_mapping(merged_edges, new_edge, edges, True) if in_degree == out_degree == 2: successors = list(graph.succ[n]) if all(p in successors for p in graph.pred[n]): if successors[1] == successors[0]: continue if graph.is_multigraph(): back_new_edge = (successors[1], successors[0], graph.new_edge_key(successors[1], successors[0])) forth_new_edge = (successors[0], successors[1], graph.new_edge_key(successors[0], successors[1])) back_edges = [(successors[1], n, next(iter(graph.adj[successors[1]][n]))), (n, successors[0], next(iter(graph.adj[n][successors[0]])))] forth_edges = [(successors[0], n, next(iter(graph.adj[successors[0]][n]))), (n, successors[1], next(iter(graph.adj[n][successors[1]])))] else: back_new_edge = (successors[1], successors[0]) forth_new_edge = (successors[0], successors[1]) back_edges = [(successors[1], n), (n, successors[0])] forth_edges = [(successors[0], n), (n, successors[1])] back_merged_line = _get_merging_line(graph, back_edges[0], back_edges[1]) forth_merged_line = _get_merging_line(graph, forth_edges[0], forth_edges[1]) merging_edges = [(back_new_edge, back_merged_line), (forth_new_edge, forth_merged_line)] merged_edges[back_new_edge] = back_edges.copy() _clean_merge_mapping(merged_edges, back_new_edge, back_edges, True) merged_edges[forth_new_edge] = forth_edges.copy() _clean_merge_mapping(merged_edges, forth_new_edge, forth_edges, True) if merging_edges: # Remove node (and thus edges) graph.remove_node(n) # Add merging edges for new_edge, line in merging_edges: merging_edge_attributes = {} if line is not None: merging_edge_attributes[graph.edges_geometry_key] = line graph.add_edge(*new_edge, **merging_edge_attributes) return merged_edges
[docs]def two_degree_node_merge_for_undirected_graphs(graph: GeoGraph, node_filter=no_filter) -> dict: """Merge edges that connects two nodes with a unique third node for undirected graphs. Potential nodes to merge are nodes with two edges connecting two different nodes. If a replaced edge doesn't have a geometry, the added edge will not have a geometry as well. Parameters ---------- graph : GeoGraph or GeoMultiGraph Graph to modify node_filter : Evaluates to true if a given node can be merged. (Default value = no_filter) Returns ------- dict Dictionary indicating for each new edge the merged ones. """ if graph.is_multigraph(): keys_args = {"keys": True} else: keys_args = {} merged_edges = dict() two_degree_nodes = [n for n in graph.nodes() if graph.degree(n) == 2 and node_filter(n)] for n in two_degree_nodes: edges = list(graph.edges(n, **keys_args)) assert (len(edges) == 2) first_node = edges[0][1] if edges[0][1] != n else edges[0][0] second_node = edges[1][1] if edges[1][1] != n else edges[1][0] if graph.is_multigraph(): new_edge = (first_node, second_node, graph.new_edge_key(first_node, second_node)) else: new_edge = (first_node, second_node) if first_node == second_node: continue first_edge_geometry = graph.edges[edges[0]].get(graph.edges_geometry_key, None) second_edge_geometry = graph.edges[edges[1]].get(graph.edges_geometry_key, None) if first_edge_geometry is not None and second_edge_geometry is not None: merged_line = merge_two_lines_with_closest_extremities(first_edge_geometry, second_edge_geometry) else: merged_line = None merged_edges[new_edge] = edges.copy() _clean_merge_mapping(merged_edges, new_edge, edges, False) # Remove node (and thus edges) graph.remove_node(n) # Add merging edge merging_edge_attributes = {} if merged_line is not None: merging_edge_attributes[graph.edges_geometry_key] = merged_line graph.add_edge(*new_edge, **merging_edge_attributes) return merged_edges
[docs]def two_degree_node_merge(graph: GeoGraph, node_filter=no_filter) -> dict: """Merge edges that connects two nodes with a unique third node. Parameters ---------- graph : GeoGraph, GeoDiGraph, GeoMultiGraph or GeoMultiDiGraph Graph to modify node_filter : Evaluates to true if a given node can be merged. (Default value = no_filter) Returns ------- dict Dictionary indicating for each new edge the merged ones. See Also -------- two_degree_node_merge_for_directed_graphs, two_degree_node_merge_for_undirected_graphs """ if graph.is_directed(): return two_degree_node_merge_for_directed_graphs(graph, node_filter) else: return two_degree_node_merge_for_undirected_graphs(graph, node_filter)