aio_overpass.pt_ordered

Collect the routes of a RouteQuery with optimized geometry.

  1"""Collect the routes of a ``RouteQuery`` with optimized geometry."""
  2
  3import itertools
  4from collections.abc import Callable, Generator, Iterator
  5from dataclasses import dataclass, replace
  6from typing import Any, Final, TypeAlias, TypeGuard
  7
  8from aio_overpass._dist import fast_distance
  9from aio_overpass.element import Element, Node, Relation, Relationship, Way
 10
 11import networkx as nx
 12import shapely.ops
 13from networkx import MultiDiGraph
 14from shapely.geometry import (
 15    GeometryCollection,
 16    LinearRing,
 17    LineString,
 18    MultiLineString,
 19    MultiPoint,
 20    Point,
 21    Polygon,
 22)
 23
 24from .pt import _MAX_DISTANCE_TO_TRACK, Route, RouteQuery, Stop, collect_routes
 25from .spatial import GeoJsonDict, Spatial
 26
 27
 28__docformat__ = "google"
 29__all__ = (
 30    "collect_ordered_routes",
 31    "to_ordered_route",
 32    "to_ordered_routes",
 33    "OrderedRouteView",
 34    "OrderedRouteViewNode",
 35)
 36
 37
 38@dataclass(kw_only=True, slots=True)
 39class OrderedRouteViewNode:
 40    """
 41    A node on the path of a route.
 42
 43    Attributes:
 44        lon: Node longitude.
 45        lat: Node latitude.
 46        way_id: The ID of the way this node is located on.
 47                ``None`` if this node is a stop position and there was no
 48                path to this node.
 49        path_idx: Index in ``OrderedRouteView.paths`` this node is a part of.
 50        n_seen_stops: The number of visited stops so far.
 51                      Increases on every stop (``1..=nb_stops``).
 52        distance: The approximate distance travelled so far, in meters.
 53    """
 54
 55    lon: float
 56    lat: float
 57    way_id: int | None
 58    path_idx: int
 59    n_seen_stops: int
 60    distance: float
 61
 62
 63@dataclass(kw_only=True, slots=True)
 64class OrderedRouteView(Spatial):
 65    """
 66    A view of a public transportation route with simplified and directed path geometry.
 67
 68    Views of a route can start at any stop, and end at a later stop.
 69    They are continouous if between every stop, there is a path of contigious ways.
 70    This is not the case for route relations that have a gap in their track.
 71
 72    Attributes:
 73        route: The viewed route.
 74        ordering: The visited nodes while trying to walk the shortest paths between pairs of stops,
 75                  ordered from start to finish. This is an empty list if this is an empty view.
 76    """
 77
 78    route: Route
 79    ordering: list[OrderedRouteViewNode]
 80
 81    @property
 82    def is_continuous(self) -> bool:
 83        """
 84        True if there's a path between all stops on the route.
 85
 86        False for empty views.
 87        """
 88        if not self.ordering:
 89            return False
 90
 91        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
 92
 93        sequence_actual = [n for n, _grp in grouped]
 94        sequence_continuous = list(range(min(sequence_actual), max(sequence_actual) + 1))
 95
 96        # True if 'path_idx' never skips any stop when iterating 'ordering'
 97        return sequence_actual == sequence_continuous
 98
 99    @property
100    def stops(self) -> list[Stop]:
101        """All stops of this view where a path either starts or ends."""
102        if not self.ordering:
103            return []
104
105        has_path_to_idx = {node.n_seen_stops for node in self.ordering}
106        has_path_from_idx = {node.n_seen_stops - 1 for node in self.ordering}
107        indexes = has_path_to_idx | has_path_from_idx
108        indexes.remove(len(self.route.stops))
109        return [self.route.stops[i] for i in sorted(indexes)]
110
111    def _split(
112        self,
113        predicate: Callable[[OrderedRouteViewNode, OrderedRouteViewNode], bool],
114    ) -> Generator["OrderedRouteView", None, None]:
115        if len(self.ordering) < 2:
116            msg = "cannot split"  # TODO: more specific
117            raise ValueError(msg)
118
119        ordering: list[OrderedRouteViewNode] = []
120
121        for a, b in zip(self.ordering, self.ordering[1:], strict=False):
122            if predicate(a, b):
123                yield replace(self, ordering=ordering)
124                ordering = [a]
125            else:
126                ordering.append(a)
127
128        ordering.append(b)
129
130        yield replace(self, ordering=ordering)
131
132    def _group_by(self, key: Callable[[OrderedRouteViewNode], Any]) -> list["OrderedRouteView"]:
133        return list(self._split(predicate=lambda a, b: key(a) != key(b)))
134
135    def gap_split(self) -> list["OrderedRouteView"]:
136        """
137        Split this view wherever there's a gap in between stops.
138
139        Raises:
140            TODO: doc
141        """
142        return list(self._split(predicate=lambda a, b: b.path_idx > a.path_idx + 1))
143
144    def stop_split(self) -> list["OrderedRouteView"]:
145        """
146        Split this view at every stop, returning views between every pair of stops.
147
148        Raises:
149            TODO: doc
150        """
151        return self._group_by(key=lambda node: node.path_idx)
152
153    def take(self, first_n: int) -> "OrderedRouteView":
154        """
155        Returns the continuous view that connects a maximum number of stops at the beginning.
156
157        Raises:
158            TODO: doc
159        """
160        if first_n < 2:
161            msg = "cannot take less than two stops"
162            raise ValueError(msg)  # TODO: add a specific subclass for our ValueErrors?
163
164        pre_gap, *_ = self.gap_split()
165
166        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
167        by_stop_truncated = itertools.islice(by_stop, first_n - 1)
168
169        ordering = [node for _, nodes in by_stop_truncated for node in nodes]
170
171        return replace(self, ordering=ordering)
172
173    def trim(self, distance: float) -> "OrderedRouteView":
174        """
175        Trim this view to some distance in meters.
176
177        Returns:
178            the continuous view that is not longer than a given distance,
179            starting from the first stop.
180
181        Raises:
182            TODO: doc
183        """
184        # TODO: distance ValueErrors
185
186        pre_gap, *_ = self.gap_split()
187
188        distance_start = pre_gap.ordering[0].distance
189
190        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
191
192        ordering: list[OrderedRouteViewNode] = []
193
194        for _, nodes_iter in by_stop:
195            nodes = list(nodes_iter)
196            distance_end = nodes[-1].distance
197
198            if (distance_end - distance_start) > distance:
199                break
200
201            ordering.extend(nodes)
202
203        return replace(self, ordering=ordering)
204
205    @property
206    def paths(self) -> list[LineString | None]:
207        """
208        The simple paths between every pair of stops.
209
210        These linestrings are the shortest paths, merged from contiguous ways in the route relation.
211        Whenever there is no path between two stops, a `None` element will be inserted into the
212        result list.
213        """
214        if not self.ordering:
215            return []
216
217        from_idx = self.ordering[0].path_idx
218        to_idx = self.ordering[-1].path_idx
219        lines = [None for _ in range(from_idx, to_idx + 1)]
220
221        assert len(lines) + 1 == len(self.stops)
222
223        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
224
225        for path_idx, nodes_iter in grouped:
226            nodes = [(node.lat, node.lon) for node in nodes_iter]
227            lines[path_idx - from_idx] = LineString(nodes) if len(nodes) >= 2 else None
228
229        return lines
230
231    @property
232    def path(self) -> LineString | MultiLineString:
233        """
234        The geometry representing the path travelled on this view from the first to last stop.
235
236        This is the result of ordering (a subset of) ways inside a route relation by the order of
237        traversal, and concatenating them. The order is found by building shortest paths between
238        stops. Whenever stops cannot be reached with the ways that are included in the relation,
239        the geometry will be split up. This happens when there are "holes" in the relation.
240        """
241        paths = self.paths
242
243        if not any(ls for ls in paths):
244            return MultiLineString([])
245
246        # group consecutive stretches of lines or None values
247        stretches_grouped = itertools.groupby(iterable=paths, key=bool)
248
249        # select all sets of consecutive lines to merge them
250        stretches: Iterator[Iterator[LineString]] = (
251            line_strings for has_track, line_strings in stretches_grouped if has_track
252        )
253
254        merged_lines = []
255
256        for line_strings in stretches:
257            coords: list[list[float]] = []
258
259            for line in line_strings:
260                if not coords:
261                    coords.extend(line.coords)
262                else:
263                    # ignore first coord, it's equal to the previous one
264                    coords.extend(line.coords[1:])
265
266            merged_line = LineString(coords)
267            merged_lines.append(merged_line)
268
269        if len(merged_lines) == 1:
270            return merged_lines[0]
271
272        return MultiLineString(merged_lines)
273
274    @property
275    def geojson(self) -> GeoJsonDict:
276        """A mapping of this object, using the GeoJSON format."""
277        # TODO: OrderedRouteView geojson
278        raise NotImplementedError
279
280
281def collect_ordered_routes(
282    query: RouteQuery,
283    perimeter: Polygon | None = None,
284    n_jobs: int = 1,
285) -> list[OrderedRouteView]:
286    # TODO: the way 'perimeter' works might be confusing
287    """
288    Produce ``OrderedRouteViews`` objects from a result set.
289
290    Compared to ``collect_routes()``, this function tries to build the geometry representing the
291    path travelled on every route from the first to last stop. If there are no holes in a route's
292    relation, this can typically generate a single line string to represent the entire path of
293    a route. Note that routes tagged with the PTv1 scheme will be ignored (in an effort to keep
294    the implemenation as simple as possible).
295
296    Since we cannot guarantee that relation members are ordered, we have to convert the route into
297    a graph, and find the shortest paths between stops. The more complex the graph, and the more
298    stops in a relation, the more expensive it is to generate those paths. Parallelizing this is
299    highly recommended, even for a small amount of routes (see ``n_jobs``).
300
301    You should use this instead of ``collect_routes()`` if you are particularly interested in a
302    route's path. Here are some example use cases:
303     - rendering a route's path between any two stops
304     - measuring the route's travelled distance between any two stops
305     - validating the order of ways in the relation
306
307    Args:
308        query: The query that produced a result set of route relations.
309        perimeter: If set, ``stops`` and ``paths`` of resulting routes will be limited
310                   to the exterior of this polygon. Any relation members in ``members``
311                   will not be filtered.
312        n_jobs: The number of workers that generate route geometries in parallel.
313                Passing ``-1`` will start a worker for each CPU, ``-2`` will start one for
314                each CPU but one, and the default ``1`` will not run any parallel workers at all.
315
316    Raises:
317        ValueError: if the input query has no result set
318        ImportError: if ``n_jobs != 1`` and the ``joblib`` extra is missing
319
320    Returns:
321        all routes in the result set of the input query
322    """
323    routes = collect_routes(query, perimeter)
324    return to_ordered_routes(routes, n_jobs)
325
326
327def to_ordered_route(route: Route) -> OrderedRouteView:
328    """Like ``collect_ordered_routes()``, but for a single route."""
329    return next(iter(to_ordered_routes([route])))
330
331
332def to_ordered_routes(routes: list[Route], n_jobs: int = 1) -> list[OrderedRouteView]:
333    """Like ``collect_ordered_routes()``, but for a list of routes."""
334    if not routes:
335        return []
336
337    views = []
338
339    order_views = []
340    order_graphs = []
341    order_target_lists = []
342
343    for route in routes:
344        view = OrderedRouteView(route=route, ordering=[])
345        views.append(view)
346
347        has_geometry = route.scheme.version_number == 2 and len(route.stops) >= 2
348
349        if not has_geometry:
350            continue
351
352        # Idea: when converting the route's track to a directed graph with parallel edges
353        # and allowing edges to be traversed only once, the linestring we look for is the
354        # one that concatenates the shortest paths between every pair of stops.
355        track_graph = _route_graph(route.relation)
356
357        # For each stop, try to find a stop position on the route's graph (its track).
358        track_nodes = MultiPoint([Point(*node) for node in track_graph.nodes])
359        track_ways = MultiLineString([LineString([u, v]) for u, v, _key in track_graph.edges])
360        for stop in route.stops:
361            stop.stop_coords = _find_stop_coords(stop, track_graph, track_nodes, track_ways)
362
363        order_views.append(view)
364        order_graphs.append(track_graph)
365        order_target_lists.append([stop._stop_point for stop in route.stops])
366
367    # Try to find linestrings that connect all pairs of stops.
368    if n_jobs == 1 or len(order_target_lists) <= 1:  # single process
369        for view, targets, graph in zip(order_views, order_target_lists, order_graphs, strict=True):
370            view.ordering = _paths(graph, targets)
371        return views
372
373    # multiprocessing
374    import joblib  # noqa: PLC0415
375
376    # Note: keep in mind that these objects have to be serialized to use in a seperate process,
377    # which could take a while for large objects.
378    parallel_args = zip(order_graphs, order_target_lists, strict=True)
379
380    # TODO: think about using joblib.Parallel's "return_as"
381    #   => can produce a generator that yields the results as soon as they are available
382    with joblib.parallel_backend(backend="loky", n_jobs=n_jobs):
383        paths = joblib.Parallel()(joblib.delayed(_paths)(*args) for args in parallel_args)
384
385    for view, path in zip(order_views, paths, strict=True):
386        view.ordering = path
387
388    return views
389
390
391def _route_graph(rel: Relation) -> MultiDiGraph:
392    """
393    Build a directed graph of a route's track.
394
395    In this graph…
396     - …nodes will be a tuple of lat, lon
397     - …nodes are every node of every way (stop positions can lie anywhere on the track)
398     - …ways that are listed more than once in the relation have parallel edges
399     - …inverse edges are added for each way, unless it is tagged as a oneway
400     - …edges remain unweighted for now
401    """
402    graph = MultiDiGraph()
403
404    track = [relship for relship in rel.members if _is_track(relship)]
405
406    for relship in track:
407        way = relship.member
408        assert isinstance(way, Way)
409        assert way.geometry is not None
410
411        data = {_WAY_ID_KEY: way.id}
412
413        if way.tag("oneway") == "no":
414            is_oneway = False
415        else:
416            is_oneway = any((way.tag(k) in v for k, v in _PT_ONEWAY_TAGS.items()))
417
418        if is_oneway:
419            add_forward_edges = relship.role != "backward"
420            add_backward_edges = not add_forward_edges
421        else:
422            add_forward_edges = add_backward_edges = True
423
424        if isinstance(way.geometry, Polygon):
425            ext: LinearRing = way.geometry.exterior
426            nodes = list(ext.coords)
427        else:
428            nodes = list(way.geometry.coords)
429
430        for a, b in itertools.pairwise(nodes):
431            if add_forward_edges:
432                graph.add_edge(a, b, **data)
433
434            if add_backward_edges:
435                graph.add_edge(b, a, **data)
436
437    return graph
438
439
440_PT_ONEWAY_TAGS: Final[dict[str, set[str]]] = {
441    "oneway": {"yes"},
442    "highway": {"motorway", "motorway_link", "trunk_link", "primary_link"},
443    "junction": {"circular", "roundabout"},
444}
445"""
446Tag values that are commonly associated with oneways.
447
448References:
449    - https://wiki.openstreetmap.org/wiki/Key:oneway
450    - https://github.com/openstreetmap/iD/blob/develop/modules/osm/tags.js#L81
451    - https://wiki.openstreetmap.org/wiki/Talk:Key:oneway
452"""
453
454
455def _is_track(relship: Relationship) -> bool:
456    return (
457        relship.member.type == "way"
458        and (not relship.role or not relship.role.startswith("platform"))
459        and relship.member.base_geometry is not None
460    )
461
462
463def _find_stop_coords(
464    stop: Stop, track_graph: MultiDiGraph, track_nodes: MultiPoint, track_ways: MultiLineString
465) -> Node | Point | None:
466    """
467    Find a node on the track that closesly represents the stop position.
468
469    Args:
470        stop: the stop to locate on the graph
471        track_graph: the graph that represents the route's track
472        track_nodes: a point for every node in the graph
473        track_ways: a line string for every edge in the graph
474
475    Returns:
476     - None if no appropriate node found
477     - Some Node if we found a stop position in either relation or a stop relation
478     - Some Point if we found a close node on the track, that is *probably* close to the
479       actual stop position
480    """
481    # (a) check if the route relation has a stop_position for this stop
482    if stop.stop_position:
483        stop_node = stop.stop_position.member
484        assert isinstance(stop_node, Node)
485        assert stop_node.geometry is not None
486        if stop_node.geometry.coords[0] in track_graph:
487            return stop_node
488
489    # (b) check if a related stop_area has a stop_position for this stop
490    station_stop_positions = (
491        member
492        for stop_area in stop.stop_areas
493        for _, member in stop_area
494        if _is_tagged_stop_position(member)
495    )
496
497    stop_pos = next(
498        (el for el in station_stop_positions if el.geometry.coords[0] in track_graph),  # type: ignore[union-attr]
499        None,
500    )
501
502    if stop_pos:
503        return stop_pos
504
505    # (c) use a node on the graph, that is closest to one of the relation members
506    station_geom = GeometryCollection(
507        [relship.member.base_geometry for relship in (stop.stop_position, stop.platform) if relship]
508    )
509
510    if not track_nodes or not station_geom:
511        return None
512
513    # Calculate the distance of the stop geometry to the track geometry.
514    a, b = shapely.ops.nearest_points(track_ways, station_geom)  # euclidean nearest
515
516    distance_to_track = fast_distance(*a.coords[0], *b.coords[0])
517
518    # Idea: if the stop is too far away from the track, it cannot be representative.
519    if distance_to_track > _MAX_DISTANCE_TO_TRACK:
520        return None
521
522    # Find the node in the graph that is closest to the stop.
523    a, _ = shapely.ops.nearest_points(track_nodes, station_geom)  # euclidean nearest
524
525    # This node is *probably* representative of the actual stop position for this stop.
526    # It is possible though, that the node is actually close to more than one way
527    # on the route's track, and that we choose a node that could be far from the
528    # actual stop position.
529    return a
530
531
532def _is_tagged_stop_position(elem: Element) -> TypeGuard[Node]:
533    return elem.tag("public_transport") == "stop_position"
534
535
536def _paths(route_graph: MultiDiGraph, targets: list[Point | None]) -> list[OrderedRouteViewNode]:
537    """
538    Find shortest paths in the directed route graph between every target stop.
539
540    Edge weights are set to the metric distance between two nodes.
541
542    Not every two stops can be connected, f.e. when they have no representative
543    position on the route's track, or when that track has gaps.
544
545    Args:
546        route_graph: the unweighted, directed graph of the route's track
547        targets: the stop positions to connect - missing stop positions are
548                 represented by ``None``
549    """
550    # set edge weights to metric distance
551    for u, v in route_graph.edges():
552        if _WEIGHT_KEY in route_graph[u][v][0]:
553            continue
554
555        distance = fast_distance(*u, *v)  # meters
556
557        for k in route_graph[u][v]:
558            route_graph[u][v][k]["weight"] = distance
559
560        if u not in route_graph[v]:  # if no inverse exists
561            continue
562
563        for k in route_graph[v][u]:
564            route_graph[v][u][k]["weight"] = distance
565
566    traversal = _Traversal(
567        targets_left=targets[1:],
568        targets_visited=targets[:1],
569        ordering=[],
570        distance=0.0,
571        path_idx=0,
572    )
573
574    _traverse_graph(graph=route_graph, progress=traversal)
575
576    assert len(targets) == len(traversal.targets_visited)
577    assert traversal.path_idx + 1 == len(traversal.targets_visited)
578
579    return traversal.ordering
580
581
582_GraphNode: TypeAlias = tuple[float, float]
583
584
585@dataclass(kw_only=True, slots=True, repr=False, eq=False, match_args=False)
586class _Traversal:
587    """
588    Route graph traversal.
589
590    Attributes:
591        ordering: nodes that make up the traversed path
592        targets_visited: visited targets - missing stop positions are represented by ``None``
593        targets_left: targets left to visit - missing stop positions are represented by ``None``
594        distance: travelled distance so far
595        path_idx: see ``OrderedRouteViewNode.path_idx``
596    """
597
598    ordering: list[OrderedRouteViewNode]
599    targets_visited: list[Point | None]
600    targets_left: list[Point | None]
601    distance: float
602    path_idx: int
603
604
605def _traverse_graph(graph: MultiDiGraph, progress: _Traversal) -> None:
606    """Find shortest paths between targets, while discouraging edges to be traversed twice."""
607    if len(progress.targets_left) == 0:
608        return None
609
610    a = progress.targets_visited[-1]
611    b = progress.targets_left[0]
612
613    u = a.coords[0] if a else None
614    v = b.coords[0] if b else None
615
616    if u != v:
617        try:
618            path_nodes = nx.shortest_path(graph, source=u, target=v, weight=_WEIGHT_KEY)
619            _traverse_path(graph, progress, path_nodes)
620            return _traverse_graph(graph, progress)
621        except nx.NetworkXNoPath:
622            pass
623
624    if u is not None and progress.ordering:
625        progress.ordering.append(
626            OrderedRouteViewNode(
627                lon=u[1],
628                lat=u[0],
629                way_id=None,
630                path_idx=progress.path_idx,
631                n_seen_stops=len(progress.targets_visited),
632                distance=progress.distance,
633            )
634        )
635
636    progress.targets_visited.append(progress.targets_left.pop(0))
637    progress.path_idx += 1
638
639    return _traverse_graph(graph, progress)
640
641
642def _traverse_path(graph: MultiDiGraph, progress: _Traversal, path_nodes: list[_GraphNode]) -> None:
643    """
644    Walk the path to visit the next stop, and collect path nodes along the way.
645
646    Repeated traversal of the edge (u, v) is discouraged by setting a large, arbitrary weight.
647    Implicitly, this discourages repeated traversal of ways in a route relation. Since the path
648    members are supposed to be ordered, ways that are repeatedly traversed should appear more than
649    once in the relation. But since we cannot guarantee that, flat-out removal of these edges/ways
650    would be too drastic.
651    """
652    if not path_nodes:
653        msg = "expected non-empty list of nodes"
654        raise ValueError(msg)
655
656    edges = itertools.pairwise(path_nodes)
657    n_seen_stops = len(progress.targets_visited)
658
659    last_node: _GraphNode | None = None
660    last_way_id: int | None = None
661
662    skip_node: _GraphNode | None = None
663    if progress.ordering:
664        skip_node = (progress.ordering[-1].lat, progress.ordering[-1].lon)
665
666    for u_traversed, v_traversed in edges:
667        # The path does not specify exactly which edge was traversed, so we select
668        # the parallel edge of (u, v) that has the smallest weight, and increase it.
669        u, v, key, _ = min(
670            graph.edges([u_traversed, v_traversed], keys=True, data=True),
671            key=lambda t: t[3][_WEIGHT_KEY],
672        )
673
674        graph[u][v][key][_WEIGHT_KEY] += _WEIGHT_MULTIPLIER
675
676        # Also increase weight of the inverse edge
677        if graph.has_edge(v, u, key):
678            graph[v][u][key][_WEIGHT_KEY] += _WEIGHT_MULTIPLIER
679
680        way_id = graph[u][v][key][_WAY_ID_KEY]
681
682        last_node = v
683        last_way_id = way_id
684
685        # don't duplicate last visited stop position node
686        if skip_node == u:
687            skip_node = None
688            continue
689
690        progress.ordering.append(
691            OrderedRouteViewNode(
692                lon=u[1],
693                lat=u[0],
694                way_id=way_id,
695                path_idx=progress.path_idx,
696                n_seen_stops=n_seen_stops,
697                distance=progress.distance,
698            )
699        )
700
701        way_distance = graph[u][v][key][_WEIGHT_KEY] % _WEIGHT_MULTIPLIER
702        progress.distance += way_distance
703
704    assert last_node is not None
705    assert last_way_id is not None
706
707    progress.ordering.append(
708        OrderedRouteViewNode(
709            lon=last_node[1],
710            lat=last_node[0],
711            way_id=last_way_id,
712            path_idx=progress.path_idx,
713            n_seen_stops=n_seen_stops + 1,
714            distance=progress.distance,
715        )
716    )
717
718    progress.targets_visited.append(progress.targets_left.pop(0))
719    progress.path_idx += 1
720
721
722_WEIGHT_MULTIPLIER: Final[float] = 1e10
723_WEIGHT_KEY: Final[str] = "weight"
724_WAY_ID_KEY: Final[str] = "way"
def collect_ordered_routes( query: aio_overpass.pt.RouteQuery, perimeter: shapely.geometry.polygon.Polygon | None = None, n_jobs: int = 1) -> list[OrderedRouteView]:
282def collect_ordered_routes(
283    query: RouteQuery,
284    perimeter: Polygon | None = None,
285    n_jobs: int = 1,
286) -> list[OrderedRouteView]:
287    # TODO: the way 'perimeter' works might be confusing
288    """
289    Produce ``OrderedRouteViews`` objects from a result set.
290
291    Compared to ``collect_routes()``, this function tries to build the geometry representing the
292    path travelled on every route from the first to last stop. If there are no holes in a route's
293    relation, this can typically generate a single line string to represent the entire path of
294    a route. Note that routes tagged with the PTv1 scheme will be ignored (in an effort to keep
295    the implemenation as simple as possible).
296
297    Since we cannot guarantee that relation members are ordered, we have to convert the route into
298    a graph, and find the shortest paths between stops. The more complex the graph, and the more
299    stops in a relation, the more expensive it is to generate those paths. Parallelizing this is
300    highly recommended, even for a small amount of routes (see ``n_jobs``).
301
302    You should use this instead of ``collect_routes()`` if you are particularly interested in a
303    route's path. Here are some example use cases:
304     - rendering a route's path between any two stops
305     - measuring the route's travelled distance between any two stops
306     - validating the order of ways in the relation
307
308    Args:
309        query: The query that produced a result set of route relations.
310        perimeter: If set, ``stops`` and ``paths`` of resulting routes will be limited
311                   to the exterior of this polygon. Any relation members in ``members``
312                   will not be filtered.
313        n_jobs: The number of workers that generate route geometries in parallel.
314                Passing ``-1`` will start a worker for each CPU, ``-2`` will start one for
315                each CPU but one, and the default ``1`` will not run any parallel workers at all.
316
317    Raises:
318        ValueError: if the input query has no result set
319        ImportError: if ``n_jobs != 1`` and the ``joblib`` extra is missing
320
321    Returns:
322        all routes in the result set of the input query
323    """
324    routes = collect_routes(query, perimeter)
325    return to_ordered_routes(routes, n_jobs)

Produce OrderedRouteViews objects from a result set.

Compared to collect_routes(), this function tries to build the geometry representing the path travelled on every route from the first to last stop. If there are no holes in a route's relation, this can typically generate a single line string to represent the entire path of a route. Note that routes tagged with the PTv1 scheme will be ignored (in an effort to keep the implemenation as simple as possible).

Since we cannot guarantee that relation members are ordered, we have to convert the route into a graph, and find the shortest paths between stops. The more complex the graph, and the more stops in a relation, the more expensive it is to generate those paths. Parallelizing this is highly recommended, even for a small amount of routes (see n_jobs).

You should use this instead of collect_routes() if you are particularly interested in a route's path. Here are some example use cases:

  • rendering a route's path between any two stops
  • measuring the route's travelled distance between any two stops
  • validating the order of ways in the relation
Arguments:
  • query: The query that produced a result set of route relations.
  • perimeter: If set, stops and paths of resulting routes will be limited to the exterior of this polygon. Any relation members in members will not be filtered.
  • n_jobs: The number of workers that generate route geometries in parallel. Passing -1 will start a worker for each CPU, -2 will start one for each CPU but one, and the default 1 will not run any parallel workers at all.
Raises:
  • ValueError: if the input query has no result set
  • ImportError: if n_jobs != 1 and the joblib extra is missing
Returns:

all routes in the result set of the input query

def to_ordered_route(route: aio_overpass.pt.Route) -> OrderedRouteView:
328def to_ordered_route(route: Route) -> OrderedRouteView:
329    """Like ``collect_ordered_routes()``, but for a single route."""
330    return next(iter(to_ordered_routes([route])))

Like collect_ordered_routes(), but for a single route.

def to_ordered_routes( routes: list[aio_overpass.pt.Route], n_jobs: int = 1) -> list[OrderedRouteView]:
333def to_ordered_routes(routes: list[Route], n_jobs: int = 1) -> list[OrderedRouteView]:
334    """Like ``collect_ordered_routes()``, but for a list of routes."""
335    if not routes:
336        return []
337
338    views = []
339
340    order_views = []
341    order_graphs = []
342    order_target_lists = []
343
344    for route in routes:
345        view = OrderedRouteView(route=route, ordering=[])
346        views.append(view)
347
348        has_geometry = route.scheme.version_number == 2 and len(route.stops) >= 2
349
350        if not has_geometry:
351            continue
352
353        # Idea: when converting the route's track to a directed graph with parallel edges
354        # and allowing edges to be traversed only once, the linestring we look for is the
355        # one that concatenates the shortest paths between every pair of stops.
356        track_graph = _route_graph(route.relation)
357
358        # For each stop, try to find a stop position on the route's graph (its track).
359        track_nodes = MultiPoint([Point(*node) for node in track_graph.nodes])
360        track_ways = MultiLineString([LineString([u, v]) for u, v, _key in track_graph.edges])
361        for stop in route.stops:
362            stop.stop_coords = _find_stop_coords(stop, track_graph, track_nodes, track_ways)
363
364        order_views.append(view)
365        order_graphs.append(track_graph)
366        order_target_lists.append([stop._stop_point for stop in route.stops])
367
368    # Try to find linestrings that connect all pairs of stops.
369    if n_jobs == 1 or len(order_target_lists) <= 1:  # single process
370        for view, targets, graph in zip(order_views, order_target_lists, order_graphs, strict=True):
371            view.ordering = _paths(graph, targets)
372        return views
373
374    # multiprocessing
375    import joblib  # noqa: PLC0415
376
377    # Note: keep in mind that these objects have to be serialized to use in a seperate process,
378    # which could take a while for large objects.
379    parallel_args = zip(order_graphs, order_target_lists, strict=True)
380
381    # TODO: think about using joblib.Parallel's "return_as"
382    #   => can produce a generator that yields the results as soon as they are available
383    with joblib.parallel_backend(backend="loky", n_jobs=n_jobs):
384        paths = joblib.Parallel()(joblib.delayed(_paths)(*args) for args in parallel_args)
385
386    for view, path in zip(order_views, paths, strict=True):
387        view.ordering = path
388
389    return views

Like collect_ordered_routes(), but for a list of routes.

@dataclass(kw_only=True, slots=True)
class OrderedRouteView(aio_overpass.spatial.Spatial):
 64@dataclass(kw_only=True, slots=True)
 65class OrderedRouteView(Spatial):
 66    """
 67    A view of a public transportation route with simplified and directed path geometry.
 68
 69    Views of a route can start at any stop, and end at a later stop.
 70    They are continouous if between every stop, there is a path of contigious ways.
 71    This is not the case for route relations that have a gap in their track.
 72
 73    Attributes:
 74        route: The viewed route.
 75        ordering: The visited nodes while trying to walk the shortest paths between pairs of stops,
 76                  ordered from start to finish. This is an empty list if this is an empty view.
 77    """
 78
 79    route: Route
 80    ordering: list[OrderedRouteViewNode]
 81
 82    @property
 83    def is_continuous(self) -> bool:
 84        """
 85        True if there's a path between all stops on the route.
 86
 87        False for empty views.
 88        """
 89        if not self.ordering:
 90            return False
 91
 92        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
 93
 94        sequence_actual = [n for n, _grp in grouped]
 95        sequence_continuous = list(range(min(sequence_actual), max(sequence_actual) + 1))
 96
 97        # True if 'path_idx' never skips any stop when iterating 'ordering'
 98        return sequence_actual == sequence_continuous
 99
100    @property
101    def stops(self) -> list[Stop]:
102        """All stops of this view where a path either starts or ends."""
103        if not self.ordering:
104            return []
105
106        has_path_to_idx = {node.n_seen_stops for node in self.ordering}
107        has_path_from_idx = {node.n_seen_stops - 1 for node in self.ordering}
108        indexes = has_path_to_idx | has_path_from_idx
109        indexes.remove(len(self.route.stops))
110        return [self.route.stops[i] for i in sorted(indexes)]
111
112    def _split(
113        self,
114        predicate: Callable[[OrderedRouteViewNode, OrderedRouteViewNode], bool],
115    ) -> Generator["OrderedRouteView", None, None]:
116        if len(self.ordering) < 2:
117            msg = "cannot split"  # TODO: more specific
118            raise ValueError(msg)
119
120        ordering: list[OrderedRouteViewNode] = []
121
122        for a, b in zip(self.ordering, self.ordering[1:], strict=False):
123            if predicate(a, b):
124                yield replace(self, ordering=ordering)
125                ordering = [a]
126            else:
127                ordering.append(a)
128
129        ordering.append(b)
130
131        yield replace(self, ordering=ordering)
132
133    def _group_by(self, key: Callable[[OrderedRouteViewNode], Any]) -> list["OrderedRouteView"]:
134        return list(self._split(predicate=lambda a, b: key(a) != key(b)))
135
136    def gap_split(self) -> list["OrderedRouteView"]:
137        """
138        Split this view wherever there's a gap in between stops.
139
140        Raises:
141            TODO: doc
142        """
143        return list(self._split(predicate=lambda a, b: b.path_idx > a.path_idx + 1))
144
145    def stop_split(self) -> list["OrderedRouteView"]:
146        """
147        Split this view at every stop, returning views between every pair of stops.
148
149        Raises:
150            TODO: doc
151        """
152        return self._group_by(key=lambda node: node.path_idx)
153
154    def take(self, first_n: int) -> "OrderedRouteView":
155        """
156        Returns the continuous view that connects a maximum number of stops at the beginning.
157
158        Raises:
159            TODO: doc
160        """
161        if first_n < 2:
162            msg = "cannot take less than two stops"
163            raise ValueError(msg)  # TODO: add a specific subclass for our ValueErrors?
164
165        pre_gap, *_ = self.gap_split()
166
167        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
168        by_stop_truncated = itertools.islice(by_stop, first_n - 1)
169
170        ordering = [node for _, nodes in by_stop_truncated for node in nodes]
171
172        return replace(self, ordering=ordering)
173
174    def trim(self, distance: float) -> "OrderedRouteView":
175        """
176        Trim this view to some distance in meters.
177
178        Returns:
179            the continuous view that is not longer than a given distance,
180            starting from the first stop.
181
182        Raises:
183            TODO: doc
184        """
185        # TODO: distance ValueErrors
186
187        pre_gap, *_ = self.gap_split()
188
189        distance_start = pre_gap.ordering[0].distance
190
191        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
192
193        ordering: list[OrderedRouteViewNode] = []
194
195        for _, nodes_iter in by_stop:
196            nodes = list(nodes_iter)
197            distance_end = nodes[-1].distance
198
199            if (distance_end - distance_start) > distance:
200                break
201
202            ordering.extend(nodes)
203
204        return replace(self, ordering=ordering)
205
206    @property
207    def paths(self) -> list[LineString | None]:
208        """
209        The simple paths between every pair of stops.
210
211        These linestrings are the shortest paths, merged from contiguous ways in the route relation.
212        Whenever there is no path between two stops, a `None` element will be inserted into the
213        result list.
214        """
215        if not self.ordering:
216            return []
217
218        from_idx = self.ordering[0].path_idx
219        to_idx = self.ordering[-1].path_idx
220        lines = [None for _ in range(from_idx, to_idx + 1)]
221
222        assert len(lines) + 1 == len(self.stops)
223
224        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
225
226        for path_idx, nodes_iter in grouped:
227            nodes = [(node.lat, node.lon) for node in nodes_iter]
228            lines[path_idx - from_idx] = LineString(nodes) if len(nodes) >= 2 else None
229
230        return lines
231
232    @property
233    def path(self) -> LineString | MultiLineString:
234        """
235        The geometry representing the path travelled on this view from the first to last stop.
236
237        This is the result of ordering (a subset of) ways inside a route relation by the order of
238        traversal, and concatenating them. The order is found by building shortest paths between
239        stops. Whenever stops cannot be reached with the ways that are included in the relation,
240        the geometry will be split up. This happens when there are "holes" in the relation.
241        """
242        paths = self.paths
243
244        if not any(ls for ls in paths):
245            return MultiLineString([])
246
247        # group consecutive stretches of lines or None values
248        stretches_grouped = itertools.groupby(iterable=paths, key=bool)
249
250        # select all sets of consecutive lines to merge them
251        stretches: Iterator[Iterator[LineString]] = (
252            line_strings for has_track, line_strings in stretches_grouped if has_track
253        )
254
255        merged_lines = []
256
257        for line_strings in stretches:
258            coords: list[list[float]] = []
259
260            for line in line_strings:
261                if not coords:
262                    coords.extend(line.coords)
263                else:
264                    # ignore first coord, it's equal to the previous one
265                    coords.extend(line.coords[1:])
266
267            merged_line = LineString(coords)
268            merged_lines.append(merged_line)
269
270        if len(merged_lines) == 1:
271            return merged_lines[0]
272
273        return MultiLineString(merged_lines)
274
275    @property
276    def geojson(self) -> GeoJsonDict:
277        """A mapping of this object, using the GeoJSON format."""
278        # TODO: OrderedRouteView geojson
279        raise NotImplementedError

A view of a public transportation route with simplified and directed path geometry.

Views of a route can start at any stop, and end at a later stop. They are continouous if between every stop, there is a path of contigious ways. This is not the case for route relations that have a gap in their track.

Attributes:
  • route: The viewed route.
  • ordering: The visited nodes while trying to walk the shortest paths between pairs of stops, ordered from start to finish. This is an empty list if this is an empty view.
OrderedRouteView( *, route: aio_overpass.pt.Route, ordering: list[OrderedRouteViewNode])
ordering: list[OrderedRouteViewNode]
is_continuous: bool
82    @property
83    def is_continuous(self) -> bool:
84        """
85        True if there's a path between all stops on the route.
86
87        False for empty views.
88        """
89        if not self.ordering:
90            return False
91
92        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
93
94        sequence_actual = [n for n, _grp in grouped]
95        sequence_continuous = list(range(min(sequence_actual), max(sequence_actual) + 1))
96
97        # True if 'path_idx' never skips any stop when iterating 'ordering'
98        return sequence_actual == sequence_continuous

True if there's a path between all stops on the route.

False for empty views.

stops: list[aio_overpass.pt.Stop]
100    @property
101    def stops(self) -> list[Stop]:
102        """All stops of this view where a path either starts or ends."""
103        if not self.ordering:
104            return []
105
106        has_path_to_idx = {node.n_seen_stops for node in self.ordering}
107        has_path_from_idx = {node.n_seen_stops - 1 for node in self.ordering}
108        indexes = has_path_to_idx | has_path_from_idx
109        indexes.remove(len(self.route.stops))
110        return [self.route.stops[i] for i in sorted(indexes)]

All stops of this view where a path either starts or ends.

def gap_split(self) -> list[OrderedRouteView]:
136    def gap_split(self) -> list["OrderedRouteView"]:
137        """
138        Split this view wherever there's a gap in between stops.
139
140        Raises:
141            TODO: doc
142        """
143        return list(self._split(predicate=lambda a, b: b.path_idx > a.path_idx + 1))

Split this view wherever there's a gap in between stops.

Raises:
  • TODO: doc
def stop_split(self) -> list[OrderedRouteView]:
145    def stop_split(self) -> list["OrderedRouteView"]:
146        """
147        Split this view at every stop, returning views between every pair of stops.
148
149        Raises:
150            TODO: doc
151        """
152        return self._group_by(key=lambda node: node.path_idx)

Split this view at every stop, returning views between every pair of stops.

Raises:
  • TODO: doc
def take(self, first_n: int) -> OrderedRouteView:
154    def take(self, first_n: int) -> "OrderedRouteView":
155        """
156        Returns the continuous view that connects a maximum number of stops at the beginning.
157
158        Raises:
159            TODO: doc
160        """
161        if first_n < 2:
162            msg = "cannot take less than two stops"
163            raise ValueError(msg)  # TODO: add a specific subclass for our ValueErrors?
164
165        pre_gap, *_ = self.gap_split()
166
167        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
168        by_stop_truncated = itertools.islice(by_stop, first_n - 1)
169
170        ordering = [node for _, nodes in by_stop_truncated for node in nodes]
171
172        return replace(self, ordering=ordering)

Returns the continuous view that connects a maximum number of stops at the beginning.

Raises:
  • TODO: doc
def trim(self, distance: float) -> OrderedRouteView:
174    def trim(self, distance: float) -> "OrderedRouteView":
175        """
176        Trim this view to some distance in meters.
177
178        Returns:
179            the continuous view that is not longer than a given distance,
180            starting from the first stop.
181
182        Raises:
183            TODO: doc
184        """
185        # TODO: distance ValueErrors
186
187        pre_gap, *_ = self.gap_split()
188
189        distance_start = pre_gap.ordering[0].distance
190
191        by_stop = itertools.groupby(pre_gap.ordering, key=lambda node: node.path_idx)
192
193        ordering: list[OrderedRouteViewNode] = []
194
195        for _, nodes_iter in by_stop:
196            nodes = list(nodes_iter)
197            distance_end = nodes[-1].distance
198
199            if (distance_end - distance_start) > distance:
200                break
201
202            ordering.extend(nodes)
203
204        return replace(self, ordering=ordering)

Trim this view to some distance in meters.

Returns:

the continuous view that is not longer than a given distance, starting from the first stop.

Raises:
  • TODO: doc
paths: list[shapely.geometry.linestring.LineString | None]
206    @property
207    def paths(self) -> list[LineString | None]:
208        """
209        The simple paths between every pair of stops.
210
211        These linestrings are the shortest paths, merged from contiguous ways in the route relation.
212        Whenever there is no path between two stops, a `None` element will be inserted into the
213        result list.
214        """
215        if not self.ordering:
216            return []
217
218        from_idx = self.ordering[0].path_idx
219        to_idx = self.ordering[-1].path_idx
220        lines = [None for _ in range(from_idx, to_idx + 1)]
221
222        assert len(lines) + 1 == len(self.stops)
223
224        grouped = itertools.groupby(iterable=self.ordering, key=lambda node: node.path_idx)
225
226        for path_idx, nodes_iter in grouped:
227            nodes = [(node.lat, node.lon) for node in nodes_iter]
228            lines[path_idx - from_idx] = LineString(nodes) if len(nodes) >= 2 else None
229
230        return lines

The simple paths between every pair of stops.

These linestrings are the shortest paths, merged from contiguous ways in the route relation. Whenever there is no path between two stops, a None element will be inserted into the result list.

path: shapely.geometry.linestring.LineString | shapely.geometry.multilinestring.MultiLineString
232    @property
233    def path(self) -> LineString | MultiLineString:
234        """
235        The geometry representing the path travelled on this view from the first to last stop.
236
237        This is the result of ordering (a subset of) ways inside a route relation by the order of
238        traversal, and concatenating them. The order is found by building shortest paths between
239        stops. Whenever stops cannot be reached with the ways that are included in the relation,
240        the geometry will be split up. This happens when there are "holes" in the relation.
241        """
242        paths = self.paths
243
244        if not any(ls for ls in paths):
245            return MultiLineString([])
246
247        # group consecutive stretches of lines or None values
248        stretches_grouped = itertools.groupby(iterable=paths, key=bool)
249
250        # select all sets of consecutive lines to merge them
251        stretches: Iterator[Iterator[LineString]] = (
252            line_strings for has_track, line_strings in stretches_grouped if has_track
253        )
254
255        merged_lines = []
256
257        for line_strings in stretches:
258            coords: list[list[float]] = []
259
260            for line in line_strings:
261                if not coords:
262                    coords.extend(line.coords)
263                else:
264                    # ignore first coord, it's equal to the previous one
265                    coords.extend(line.coords[1:])
266
267            merged_line = LineString(coords)
268            merged_lines.append(merged_line)
269
270        if len(merged_lines) == 1:
271            return merged_lines[0]
272
273        return MultiLineString(merged_lines)

The geometry representing the path travelled on this view from the first to last stop.

This is the result of ordering (a subset of) ways inside a route relation by the order of traversal, and concatenating them. The order is found by building shortest paths between stops. Whenever stops cannot be reached with the ways that are included in the relation, the geometry will be split up. This happens when there are "holes" in the relation.

geojson: dict[str, typing.Any]
275    @property
276    def geojson(self) -> GeoJsonDict:
277        """A mapping of this object, using the GeoJSON format."""
278        # TODO: OrderedRouteView geojson
279        raise NotImplementedError

A mapping of this object, using the GeoJSON format.

@dataclass(kw_only=True, slots=True)
class OrderedRouteViewNode:
39@dataclass(kw_only=True, slots=True)
40class OrderedRouteViewNode:
41    """
42    A node on the path of a route.
43
44    Attributes:
45        lon: Node longitude.
46        lat: Node latitude.
47        way_id: The ID of the way this node is located on.
48                ``None`` if this node is a stop position and there was no
49                path to this node.
50        path_idx: Index in ``OrderedRouteView.paths`` this node is a part of.
51        n_seen_stops: The number of visited stops so far.
52                      Increases on every stop (``1..=nb_stops``).
53        distance: The approximate distance travelled so far, in meters.
54    """
55
56    lon: float
57    lat: float
58    way_id: int | None
59    path_idx: int
60    n_seen_stops: int
61    distance: float

A node on the path of a route.

Attributes:
  • lon: Node longitude.
  • lat: Node latitude.
  • way_id: The ID of the way this node is located on. None if this node is a stop position and there was no path to this node.
  • path_idx: Index in OrderedRouteView.paths this node is a part of.
  • n_seen_stops: The number of visited stops so far. Increases on every stop (1..=nb_stops).
  • distance: The approximate distance travelled so far, in meters.
OrderedRouteViewNode( *, lon: float, lat: float, way_id: int | None, path_idx: int, n_seen_stops: int, distance: float)
lon: float
lat: float
way_id: int | None
path_idx: int
n_seen_stops: int
distance: float