Source code for snkit.network

"""Network representation and utilities
"""
import logging
import os
from typing import Optional
import warnings

import geopandas
import numpy as np
import pandas
import shapely.errors

try:
    import networkx as nx

    USE_NX = True
except ImportError:
    USE_NX = False

from geopandas import GeoDataFrame
from shapely.geometry import (
    Point,
    MultiPoint,
    LineString,
    GeometryCollection,
    shape,
    mapping,
)
from shapely.ops import split, linemerge, unary_union

from collections import Counter

# optional progress bars
if "SNKIT_PROGRESS" in os.environ and os.environ["SNKIT_PROGRESS"] in ("1", "TRUE"):
    try:
        from tqdm import tqdm
    except ImportError:
        from snkit.utils import tqdm_standin as tqdm
else:
    from snkit.utils import tqdm_standin as tqdm

# optional parallel processing
if "SNKIT_PROCESSES" in os.environ:
    processes_env_var = os.environ["SNKIT_PROCESSES"]
    try:
        requested_processes = int(processes_env_var)
    except TypeError:
        raise RuntimeError(
            "SNKIT_PROCESSES env var must be a non-negative integer. "
            "Use 0 or unset for serial operation."
        )
    PARALLEL_PROCESS_COUNT = min([os.cpu_count(), requested_processes])
    import multiprocessing
    logging.info(f"SNKIT_PROCESSES={processes_env_var}, using {PARALLEL_PROCESS_COUNT} processes")
else:
    PARALLEL_PROCESS_COUNT = 0


[docs]class Network: """A Network is composed of nodes (points in space) and edges (lines) Parameters ---------- nodes : geopandas.geodataframe.GeoDataFrame, optional edges : geopandas.geodataframe.GeoDataFrame, optional Attributes ---------- nodes : geopandas.geodataframe.GeoDataFrame edges : geopandas.geodataframe.GeoDataFrame """ def __init__(self, nodes=None, edges=None): """ """ if nodes is None: nodes = GeoDataFrame(geometry=[]) self.nodes = nodes if edges is None: edges = GeoDataFrame(geometry=[]) self.edges = edges
[docs] def set_crs(self, crs=None, epsg=None, allow_override=False): """Set the coordinate reference system (CRS) of the network nodes and edges. Parameters ---------- crs : pyproj.CRS, optional if epsg is specified The value can be anything accepted by pyproj.CRS.from_user_input(), such as an authority string (eg “EPSG:4326”) or a WKT string. epsg : int, optional if crs is specified EPSG code specifying output projection. allow_override : bool, default False If the nodes or edges GeoDataFrame already has a CRS, allow to replace the existing CRS, even when both are not equal. """ inplace = True self.edges.set_crs(crs, epsg, inplace, allow_override) self.nodes.set_crs(crs, epsg, inplace, allow_override)
[docs] def to_crs(self, crs=None, epsg=None): """Transform network nodes and edges geometries to a new coordinate reference system (CRS). Parameters ---------- crs : pyproj.CRS, optional if epsg is specified The value can be anything accepted by pyproj.CRS.from_user_input(), such as an authority string (eg “EPSG:4326”) or a WKT string. epsg : int, optional if crs is specified EPSG code specifying output projection. """ inplace = True self.edges.to_crs(crs, epsg, inplace) self.nodes.to_crs(crs, epsg, inplace)
[docs] def to_file(self, filename, nodes_layer="nodes", edges_layer="edges", **kwargs): """Write nodes and edges to a geographic data file with layers. Any additional keyword arguments are passed through to `geopandas.GeoDataFrame.to_file`. Parameters ---------- filename : str Path to geographic data file with layers nodes_layer : str, optional, default 'nodes' Layer name for nodes. edges_layer : str, optional, default 'edges' Layer name for edges. """ self.nodes.to_file(filename, layer=nodes_layer, **kwargs) self.edges.to_file(filename, layer=edges_layer, **kwargs)
[docs]def read_file(filename, nodes_layer="nodes", edges_layer="edges", **kwargs): """Read a geographic data file with layers containing nodes and edges. Any additional keyword arguments are passed through to `geopandas.read_file`. Parameters ---------- filename : str Path to geographic data file with layers nodes_layer : str, optional, default 'nodes' Layer name for nodes, or None if nodes should not be read. edges_layer : str, optional, default 'edges' Layer name for edges, or None if edges should not be read. """ if nodes_layer is not None: nodes = geopandas.read_file(filename, layer=nodes_layer, **kwargs) else: nodes = None if edges_layer is not None: edges = geopandas.read_file(filename, layer=edges_layer, **kwargs) else: edges = None return Network(nodes, edges)
[docs]def add_ids(network, id_col="id", edge_prefix="edge", node_prefix="node"): """Add or replace an id column with ascending ids""" nodes = network.nodes.copy() if not nodes.empty: nodes = nodes.reset_index(drop=True) edges = network.edges.copy() if not edges.empty: edges = edges.reset_index(drop=True) nodes[id_col] = ["{}_{}".format(node_prefix, i) for i in range(len(nodes))] edges[id_col] = ["{}_{}".format(edge_prefix, i) for i in range(len(edges))] return Network(nodes=nodes, edges=edges)
[docs]def add_topology(network, id_col="id"): """Add or replace from_id, to_id to edges""" from_ids = [] to_ids = [] for edge in tqdm( network.edges.itertuples(), desc="topology", total=len(network.edges) ): start, end = line_endpoints(edge.geometry) start_node = nearest_node(start, network.nodes) from_ids.append(start_node[id_col]) end_node = nearest_node(end, network.nodes) to_ids.append(end_node[id_col]) edges = network.edges.copy() edges["from_id"] = from_ids edges["to_id"] = to_ids return Network(nodes=network.nodes, edges=edges)
[docs]def get_endpoints(network): """Get nodes for each edge endpoint""" endpoints = [] for edge in tqdm( network.edges.itertuples(), desc="endpoints", total=len(network.edges) ): if edge.geometry is None: continue if edge.geometry.geom_type == "MultiLineString": for line in edge.geometry.geoms: start, end = line_endpoints(line) endpoints.append(start) endpoints.append(end) else: start, end = line_endpoints(edge.geometry) endpoints.append(start) endpoints.append(end) # create dataframe to match the nodes geometry column name return matching_gdf_from_geoms(network.nodes, endpoints)
[docs]def add_endpoints(network): """Add nodes at line endpoints""" endpoints = get_endpoints(network) nodes = concat_dedup([network.nodes, endpoints]) return Network(nodes=nodes, edges=network.edges)
[docs]def round_geometries(network, precision=3): """Round coordinates of all node points and vertices of edge linestrings to some precision""" def _set_precision(geom): return set_precision(geom, precision) network.nodes.geometry = network.nodes.geometry.apply(_set_precision) network.edges.geometry = network.edges.geometry.apply(_set_precision) return network
[docs]def split_multilinestrings(network): """ Create multiple edges from any MultiLineString edge Ensures that edge geometries are all LineStrings, duplicates attributes over any created multi-edges. """ edges = network.edges geom_col: str = geometry_column_name(edges) split_edges = edges.explode(column=geom_col, ignore_index=True) geo_types = set(split_edges.geom_type) if geo_types != {"LineString"}: raise ValueError( f"exploded edges are of type(s) {geo_types} but should only be LineString" ) return Network(nodes=network.nodes, edges=split_edges)
[docs]def merge_multilinestring(geom): """Merge a MultiLineString to LineString""" try: if geom.geom_type == "MultiLineString": geom_inb = linemerge(geom) if geom_inb.is_ring: return geom # In case of linestring merge issues, we could add this to the script again # from centerline.main import Centerline # if geom_inb.geom_type == 'MultiLineString': # return linemerge(Centerline(geom.buffer(0.5))) else: return geom_inb else: return geom except: return GeometryCollection()
[docs]def snap_nodes(network, threshold=None): """Move nodes (within threshold) to edges""" def snap_node(geom): snap = nearest_point_on_edges(geom, network.edges) distance = snap.distance(geom) if threshold is not None and distance > threshold: snap = geom return snap geom_col = geometry_column_name(network.nodes) snapped_geoms = network.nodes[geom_col].apply(snap_node) nodes = GeoDataFrame( pandas.concat( [ network.nodes.drop(geom_col, axis=1), GeoDataFrame(snapped_geoms, columns=[geom_col]), ], axis=1, ), crs=network.nodes.crs, ) return Network(nodes=nodes, edges=network.edges)
def _split_edges_at_nodes( edges: GeoDataFrame, nodes: GeoDataFrame, tolerance: float ) -> list: """Split edges at nodes for a network chunk""" split_edges = [] for edge in edges.itertuples(index=False): hits = nodes_intersecting(edge.geometry, nodes, tolerance) split_points = MultiPoint([hit.geometry for hit in hits.itertuples()]) # potentially split to multiple edges edges = split_edge_at_points(edge, split_points, tolerance) split_edges.append(edges) return split_edges
[docs]def split_edges_at_nodes(network: Network, tolerance: float = 1e-9, chunk_size: Optional[int] = None): """ Split network edges where they intersect node geometries. N.B. Can operate in parallel if SNKIT_PROCESSES is in the environment and a positive integer. Args: network: Network object to split edges for. tolerance: Proximity within which nodes are said to intersect an edge. chunk_size: When splitting in parallel, set the number of edges per unit of work. Returns: Network with edges split at nodes (within proximity tolerance). """ split_edges = [] n = len(network.edges) if PARALLEL_PROCESS_COUNT > 1: if chunk_size is None: chunk_size = max([1, int(n / PARALLEL_PROCESS_COUNT)]) args = [ (network.edges.iloc[i: i + chunk_size, :], network.nodes, tolerance) for i in range(0, n, chunk_size) ] with multiprocessing.Pool(PARALLEL_PROCESS_COUNT) as pool: results = pool.starmap(_split_edges_at_nodes, args) # flatten return list split_edges: list = [df for chunk in results for df in chunk] else: split_edges = _split_edges_at_nodes(network.edges, network.nodes, tolerance) # combine dfs edges = pandas.concat(split_edges, axis=0) edges = edges.reset_index().drop("index", axis=1) return Network(nodes=network.nodes, edges=edges)
[docs]def split_edges_at_intersections(network, tolerance=1e-9): """Split network edges where they intersect line geometries""" split_edges = [] split_points = [] for edge in tqdm( network.edges.itertuples(index=False), desc="split", total=len(network.edges) ): # note: the symmetry of intersection is not exploited here. # (If A intersects B, then B intersects A) # since edges are not modified within the loop, this has just # potential performance consequences. hits_points = edges_intersecting_points(edge.geometry, network.edges, tolerance) # store the split edges and intersection points split_points.extend(hits_points) hits_points = MultiPoint(hits_points) edges = split_edge_at_points(edge, hits_points, tolerance) split_edges.append(edges) # add the (potentially) split edges edges = pandas.concat(split_edges, axis=0) edges = edges.reset_index().drop("index", axis=1) # combine the original nodes with the new intersection nodes # dropping the duplicates. # note: there are at least duplicates from above since intersections # are checked twice # note: intersection nodes are appended, and if any duplicates, the # original counterparts are kept. nodes = GeoDataFrame(geometry=split_points) nodes = pandas.concat([network.nodes, nodes], axis=0).drop_duplicates() nodes = nodes.reset_index().drop("index", axis=1) return Network(nodes=nodes, edges=edges)
[docs]def merge_edges(network, id_col="id", by=None): """Merge edges that share a node with a connectivity degree of 2 Parameters ---------- network : snkit.network.Network id_col : string by : List[string], optional list of columns to use when merging an edge path - will not merge if edges have different values. """ if "degree" not in network.nodes.columns: network.nodes["degree"] = network.nodes[id_col].apply( lambda x: node_connectivity_degree(x, network) ) degree2 = list(network.nodes[id_col].loc[network.nodes.degree == 2]) d2_set = set(degree2) edge_paths = [] while d2_set: if len(d2_set) % 1000 == 0: print(len(d2_set)) popped_node = d2_set.pop() node_path = set([popped_node]) candidates = set([popped_node]) while candidates: popped_cand = candidates.pop() matches = set( np.unique( network.edges[["from_id", "to_id"]] .loc[ (network.edges.from_id == popped_cand) | (network.edges.to_id == popped_cand) ] .values ) ) matches.remove(popped_cand) matches = matches - node_path for match in matches: if match in degree2: candidates.add(match) node_path.add(match) d2_set.remove(match) else: node_path.add(match) if len(node_path) > 2: edge_paths.append( network.edges.loc[ (network.edges.from_id.isin(node_path)) & (network.edges.to_id.isin(node_path)) ] ) concat_edge_paths = [] unique_edge_ids = set() new_node_ids = set(network.nodes[id_col]) - set(degree2) for edge_path in tqdm(edge_paths, desc="merge_edge_paths"): unique_edge_ids.update(list(edge_path[id_col])) edge_path = edge_path.dissolve(by=by) edge_path_dicts = [] for edge in edge_path.itertuples(index=False): if edge.geometry.geom_type == "MultiLineString": edge_geom = linemerge(edge.geometry) if edge_geom.geom_type == "MultiLineString": edge_geoms = list(edge_geom) else: edge_geoms = [edge_geom] else: edge_geoms = [edge.geometry] for geom in edge_geoms: start, end = line_endpoints(geom) start = nearest_node(start, network.nodes) end = nearest_node(end, network.nodes) edge_path_dict = { "from_id": start[id_col], "to_id": end[id_col], "geometry": geom, } for i, col in enumerate(edge_path.columns): if col not in ("from_id", "to_id", "geometry"): edge_path_dict[col] = edge[i] edge_path_dicts.append(edge_path_dict) concat_edge_paths.append(geopandas.GeoDataFrame(edge_path_dicts)) new_node_ids.update(list(edge_path.from_id) + list(edge_path.to_id)) edges_new = network.edges.copy() edges_new = edges_new.loc[~(edges_new.id.isin(list(unique_edge_ids)))] edges_new.geometry = edges_new.geometry.apply(merge_multilinestring) edges = pandas.concat( [edges_new, pandas.concat(concat_edge_paths).reset_index()], sort=False ) nodes = network.nodes.set_index(id_col).loc[list(new_node_ids)].copy().reset_index() return Network(nodes=nodes, edges=edges)
[docs]def geometry_column_name(gdf): """Get geometry column name, fall back to 'geometry'""" try: geom_col = gdf.geometry.name except AttributeError: geom_col = "geometry" return geom_col
[docs]def matching_gdf_from_geoms(gdf, geoms): """Create a geometry-only GeoDataFrame with column name to match an existing GeoDataFrame""" geom_col = geometry_column_name(gdf) geom_arr = geoms_to_array(geoms) matching_gdf = GeoDataFrame(geometry=geom_arr, crs=gdf.crs) matching_gdf.geometry.name = geom_col return matching_gdf
[docs]def geoms_to_array(geoms): geom_arr = np.empty(len(geoms), dtype="object") geom_arr[:] = geoms return geom_arr
[docs]def concat_dedup(dfs): """Concatenate a list of GeoDataFrames, dropping duplicate geometries - note: repeatedly drops indexes for deduplication to work """ cat = pandas.concat(dfs, axis=0, sort=False) cat.reset_index(drop=True, inplace=True) cat_dedup = drop_duplicate_geometries(cat) cat_dedup.reset_index(drop=True, inplace=True) return cat_dedup
[docs]def node_connectivity_degree(node, network): return len( network.edges[(network.edges.from_id == node) | (network.edges.to_id == node)] )
[docs]def drop_duplicate_geometries(gdf, keep="first"): """Drop duplicate geometries from a dataframe""" # as of geopandas ~0.6 this should work without explicit conversion to wkb # discussed in https://github.com/geopandas/geopandas/issues/521 return gdf.drop_duplicates([gdf.geometry.name])
[docs]def nearest_point_on_edges(point, edges): """Find nearest point on edges to a point""" edge = nearest_edge(point, edges) snap = nearest_point_on_line(point, edge.geometry) return snap
[docs]def nearest_node(point, nodes): """Find nearest node to a point""" return nearest(point, nodes)
[docs]def nearest_edge(point, edges): """Find nearest edge to a point""" return nearest(point, edges)
[docs]def nearest(geom, gdf): """Find the element of a GeoDataFrame nearest a shapely geometry""" try: match_idx = gdf.sindex.nearest(geom, return_all=False)[1][0] return gdf.loc[match_idx] except TypeError: warnings.warn("Falling back to RTree index method for nearest element") matches_idx = gdf.sindex.nearest(geom.bounds) nearest_geom = min( [gdf.iloc[match_idx] for match_idx in matches_idx], key=lambda match: geom.distance(match.geometry), ) return nearest_geom
[docs]def edges_within(point, edges, distance): """Find edges within a distance of point""" return d_within(point, edges, distance)
[docs]def edges_intersecting_points(line, edges, tolerance=1e-9): """Return intersection points of intersecting edges""" hits = edges_intersecting(line, edges, tolerance) hits_points = [] for hit in hits.geometry: # first extract the actual intersections from the hits # for being new geometrical objects, they are not in the sindex intersection = line.intersection(hit) # if the line is not simple, there is a self-crossing point # (note that it will always interact with itself) # note that __eq__ is used on purpose instead of equals() # this is stricter: for geometries constructed in the same way # it makes sense since the sindex is used here if line == hit and not line.is_simple: # there is not built-in way to find self-crossing points # duplicated points after unary_union are the intersections intersection = unary_union(line) segments_coordinates = [] for seg in intersection.geoms: segments_coordinates.extend(list(seg.coords)) intersection = [ Point(p) for p, c in Counter(segments_coordinates).items() if c > 1 ] intersection = MultiPoint(intersection) # then extract the intersection points hits_points = intersection_endpoints(intersection, hits_points) return hits_points
[docs]def edges_intersecting(line, edges, tolerance=1e-9): """Find edges intersecting line""" return intersects(line, edges, tolerance)
[docs]def nodes_intersecting(line, nodes, tolerance=1e-9): """Find nodes intersecting line""" return intersects(line, nodes, tolerance)
[docs]def intersects(geom, gdf, tolerance=1e-9): """Find the subset of a GeoDataFrame intersecting with a shapely geometry""" return _intersects(geom, gdf, tolerance)
[docs]def d_within(geom, gdf, distance): """Find the subset of a GeoDataFrame within some distance of a shapely geometry""" return _intersects(geom, gdf, distance)
def _intersects(geom, gdf, tolerance=1e-9): if geom.is_empty: return geopandas.GeoDataFrame() buf = geom.buffer(tolerance) if buf.is_empty: # can have an empty buffer with too small a tolerance, fallback to original geom buf = geom try: return _intersects_gdf(buf, gdf) except shapely.errors.TopologicalError: # can exceptionally buffer to an invalid geometry, so try re-buffering buf = buf.buffer(0) return _intersects_gdf(buf, gdf) def _intersects_gdf(geom, gdf): candidate_idxs = list(gdf.sindex.intersection(geom.bounds)) candidates = gdf.iloc[candidate_idxs] return candidates[candidates.intersects(geom)]
[docs]def line_endpoints(line): """Return points at first and last vertex of a line""" try: coords = np.array(line.coords) start = Point(coords[0]) end = Point(coords[-1]) except NotImplementedError as e: print(line) raise e return start, end
[docs]def intersection_endpoints(geom, output=None): """Return the points from an intersection geometry It extracts the starting and ending points of intersection geometries recursively and appends them to `output`. This doesn't handle polygons or collections of polygons. """ if output is None: output = [] geom_type = geom.geom_type if geom.is_empty: pass elif geom_type == "Point": output.append(geom) elif geom_type == "LineString": start = Point(geom.coords[0]) end = Point(geom.coords[-1]) output.append(start) output.append(end) # recursively for collections of geometries # note that there is no shared inheritance relationship elif ( geom_type == "MultiPoint" or geom_type == "MultiLineString" or geom_type == "GeometryCollection" ): for geom_ in geom.geoms: output = intersection_endpoints(geom_, output) return output
[docs]def split_edge_at_points(edge, points, tolerance=1e-9): """Split edge at point/multipoint""" try: segments = split_line(edge.geometry, points, tolerance) except (ValueError, shapely.errors.GeometryTypeError): # GeometryTypeError derives from ShapelyError and not TypeError or # ValueError since Shapely 2.0. May be raised if splitting fails, e.g. # because points is an empty GeometryCollection segments = [edge.geometry] edges = GeoDataFrame([edge] * len(segments)) edges.geometry = segments return edges
[docs]def split_line(line, points, tolerance=1e-9): """Split line at point or multipoint, within some tolerance""" to_split = snap_line(line, points, tolerance) # when the splitter is a self-intersection point, shapely splits in # two parts only in a semi-arbitrary way, see the related question: # https://gis.stackexchange.com/questions/435879/python-shapely-split-a-complex-line-at-self-intersections?noredirect=1#comment711214_435879 # checking only that the line is complex might not be enough # but the difference operation is useless in the worst case if not to_split.is_simple: to_split = to_split.difference(points) return list(split(to_split, points).geoms)
[docs]def snap_line(line, points, tolerance=1e-9): """Snap a line to points within tolerance, inserting vertices as necessary""" if points.geom_type == "Point": if points.distance(line) < tolerance: line = add_vertex(line, points) elif points.geom_type == "MultiPoint": points = [point for point in points.geoms if point.distance(line) < tolerance] for point in points: line = add_vertex(line, point) return line
[docs]def add_vertex(line, point): """Add a vertex to a line at a point""" v_idx = nearest_vertex_idx_on_line(point, line) point_coords = np.array(point.coords)[0] line_coords = np.array(line.coords) if (point_coords == line_coords[v_idx]).all(): # nearest vertex could be identical to point, so return unchanged return line insert_before_idx = None if v_idx == 0: # nearest vertex could be start, so insert just after (or could extend) insert_before_idx = 1 elif v_idx == len(line_coords) - 1: # nearest vertex could be end, so insert just before (or could extend) insert_before_idx = v_idx else: # otherwise insert in between vertices of nearest segment segment_before = LineString([line_coords[v_idx], line_coords[v_idx - 1]]) segment_after = LineString([line_coords[v_idx], line_coords[v_idx + 1]]) if point.distance(segment_before) < point.distance(segment_after): insert_before_idx = v_idx else: insert_before_idx = v_idx + 1 # insert point coords before index, return new linestring new_coords = list(line_coords) new_coords.insert(insert_before_idx, point_coords) return LineString(new_coords)
[docs]def nearest_vertex_idx_on_line(point, line): """Return the index of nearest vertex to a point on a line""" # distance to all points is calculated here - and this is called once per splitting point # any way to avoid this m x n behaviour? # idea: put line vertices in an STRTree and query it repeatedly for nearest (with each # splitting point) line_coords = np.array(line.coords) nearest_idx, _ = min( [ (idx, point.distance(Point(coords))) for idx, coords in enumerate(line_coords) ], key=lambda item: item[1], ) return nearest_idx
[docs]def nearest_point_on_line(point, line): """Return the nearest point on a line""" return line.interpolate(line.project(point))
[docs]def set_precision(geom, precision): """Set geometry precision""" geom_mapping = mapping(geom) geom_mapping["coordinates"] = np.round( np.array(geom_mapping["coordinates"]), precision ) return shape(geom_mapping)
[docs]def to_networkx(network, directed=False, weight_col=None): """Return a networkx graph""" if not USE_NX: raise ImportError("No module named networkx") else: # init graph if not directed: G = nx.Graph() else: G = nx.MultiDiGraph() # get nodes from network data G.add_nodes_from(network.nodes.id.to_list()) # add nodal positions from geom for node_id, x, y in zip( network.nodes.id, network.nodes.geometry.x, network.nodes.geometry.y ): G.nodes[node_id]["pos"] = (x, y) # get edges from network data if weight_col is None: # default to geometry length edges_as_list = list( zip( network.edges.from_id, network.edges.to_id, network.edges.geometry.length, ) ) else: edges_as_list = list( zip( network.edges.from_id, network.edges.to_id, network.edges[weight_col], ) ) # add edges to graph G.add_weighted_edges_from(edges_as_list) return G
[docs]def get_connected_components(network): """Get connected components within network and id to each individual graph""" if not USE_NX: raise ImportError("No module named networkx") else: G = to_networkx(network) return sorted(nx.connected_components(G), key=len, reverse=True)
[docs]def add_component_ids(network: Network, id_col: str = "component_id") -> Network: """Add column of connected component IDs to network edges and nodes""" # get connected components in descending size order connected_parts: list[set] = get_connected_components(network) # init id_col network.edges[id_col] = 0 network.nodes[id_col] = 0 # add unique id for each graph component for count, part in enumerate(connected_parts): # edges edge_mask = network.edges.from_id.isin(part).values network.edges.loc[edge_mask, id_col] = count + 1 # nodes node_mask = network.nodes.id.isin(part).values network.nodes.loc[node_mask, id_col] = count + 1 return network