aio_overpass.query

Query state and runner.

  1"""Query state and runner."""
  2
  3import asyncio
  4import hashlib
  5import json
  6import logging
  7import math
  8import os
  9import re
 10import sys
 11import tempfile
 12import time
 13from abc import ABC, abstractmethod
 14from dataclasses import dataclass
 15from datetime import datetime, timezone
 16from pathlib import Path
 17
 18from aio_overpass.error import (
 19    ClientError,
 20    GiveupError,
 21    QueryRejectCause,
 22    is_exceeding_timeout,
 23    is_rejection,
 24    is_server_error,
 25)
 26
 27
 28__docformat__ = "google"
 29__all__ = (
 30    "Query",
 31    "QueryRunner",
 32    "DefaultQueryRunner",
 33    "RequestTimeout",
 34    "DEFAULT_MAXSIZE_MIB",
 35    "DEFAULT_TIMEOUT_SECS",
 36)
 37
 38
 39DEFAULT_MAXSIZE_MIB = 512
 40"""Default ``maxsize`` setting in mebibytes."""
 41
 42DEFAULT_TIMEOUT_SECS = 180
 43"""Default ``timeout`` setting in seconds."""
 44
 45_COPYRIGHT = (
 46    "The data included in this document is from www.openstreetmap.org."
 47    " The data is made available under ODbL."
 48)
 49"""This is the same copyright notice included in result sets"""
 50
 51_SETTING_PATTERN = re.compile(r"\[(\w+?):(.+?)]\s*;?")
 52"""A pattern to match setting declarations (not the entire settings statement)."""
 53
 54_NULL_LOGGER = logging.getLogger()
 55_NULL_LOGGER.addHandler(logging.NullHandler())
 56
 57
 58class Query:
 59    """
 60    State of a query that is either pending, running, successful, or failed.
 61
 62    Args:
 63        input_code: The input Overpass QL code. Note that some settings might be changed
 64                    by query runners, notably the 'timeout' and 'maxsize' settings.
 65        logger: The logger to use for all logging output related to this query.
 66        **kwargs: Additional keyword arguments that can be used to identify queries.
 67
 68    References:
 69        - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL
 70    """
 71
 72    __slots__ = (
 73        "_error",
 74        "_input_code",
 75        "_kwargs",
 76        "_last_timeout_secs_used",
 77        "_logger",
 78        "_max_timeout_secs_exceeded",
 79        "_nb_tries",
 80        "_request_timeout",
 81        "_response",
 82        "_response_bytes",
 83        "_run_timeout_secs",
 84        "_settings",
 85        "_time_end_try",
 86        "_time_start",
 87        "_time_start_req",
 88        "_time_start_try",
 89    )
 90
 91    def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None:
 92        self._input_code = input_code
 93        """the original given overpass ql code"""
 94
 95        self._logger = logger
 96        """logger to use for this query"""
 97
 98        self._kwargs = kwargs
 99        """used to identify this query"""
100
101        self._settings = dict(_SETTING_PATTERN.findall(input_code))
102        """all overpass ql settings [k:v];"""
103
104        self._settings["out"] = "json"
105
106        if "maxsize" not in self._settings:
107            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
108
109        if "timeout" not in self._settings:
110            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
111
112        self._run_timeout_secs: float | None = None
113        """total time limit for running this query"""
114
115        self._request_timeout: RequestTimeout = RequestTimeout()
116        """config for request timeouts"""
117
118        self._error: ClientError | None = None
119        """error of the last try, or None"""
120
121        self._response: dict | None = None
122        """response JSON as a dict, or None"""
123
124        self._response_bytes = 0.0
125        """number of bytes in a response, or zero"""
126
127        self._nb_tries = 0
128        """number of tries so far, starting at zero"""
129
130        self._time_start: _Instant | None = None
131        """time prior to executing the first try"""
132
133        self._time_start_try: _Instant | None = None
134        """time prior to executing the most recent try"""
135
136        self._time_start_req: _Instant | None = None
137        """time prior to executing the most recent try's query request"""
138
139        self._time_end_try: _Instant | None = None
140        """time the most recent try finished"""
141
142        self._last_timeout_secs_used: int | None = None
143        """the last used 'timeout' setting"""
144
145        self._max_timeout_secs_exceeded: int | None = None
146        """the largest 'timeout' setting that was exceeded in a try of this query"""
147
148    def reset(self) -> None:
149        """Reset the query to its initial state, ignoring previous tries."""
150        Query.__init__(self, input_code=self._input_code, **self._kwargs)
151
152    @property
153    def input_code(self) -> str:
154        """The original input Overpass QL source code."""
155        return self._input_code
156
157    @property
158    def kwargs(self) -> dict:
159        """
160        Keyword arguments that can be used to identify queries.
161
162        The default query runner will log these values when a query is run.
163        """
164        return self._kwargs
165
166    @property
167    def logger(self) -> logging.Logger:
168        """The logger used for logging output related to this query."""
169        return self._logger
170
171    @property
172    def nb_tries(self) -> int:
173        """Current number of tries."""
174        return self._nb_tries
175
176    @property
177    def error(self) -> ClientError | None:
178        """
179        Error of the most recent try.
180
181        Returns:
182            an error or ``None`` if the query wasn't tried or hasn't failed
183        """
184        return self._error
185
186    @property
187    def response(self) -> dict | None:
188        """
189        The entire JSON response of the query.
190
191        Returns:
192            the response, or ``None`` if the query has not successfully finished (yet)
193        """
194        return self._response
195
196    @property
197    def was_cached(self) -> bool | None:
198        """
199        Indicates whether the query result was cached.
200
201        Returns:
202            ``None`` if the query has not been run yet.
203            ``True`` if the query has a result set with zero tries.
204            ``False`` otherwise.
205        """
206        if self._response is None:
207            return None
208        return self._nb_tries == 0
209
210    @property
211    def result_set(self) -> list[dict] | None:
212        """
213        The result set of the query.
214
215        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
216        You are free to copy, distribute, transmit and adapt this data, as long as you credit
217        OpenStreetMap and its contributors. If you alter or build upon this data, you may
218        distribute the result only under the same licence.
219
220        Returns:
221            the elements of the result set, or ``None`` if the query has not successfully
222            finished (yet)
223
224        References:
225            - https://www.openstreetmap.org/copyright
226            - https://opendatacommons.org/licenses/odbl/1-0/
227        """
228        if not self._response:
229            return None
230        return self._response["elements"]
231
232    @property
233    def response_size_mib(self) -> float | None:
234        """
235        The size of the response in mebibytes.
236
237        Returns:
238            the size, or ``None`` if the query has not successfully finished (yet)
239        """
240        if self._response is None:
241            return None
242        return self._response_bytes / 1024.0 / 1024.0
243
244    @property
245    def maxsize_mib(self) -> float:
246        """
247        The current value of the [maxsize:*] setting in mebibytes.
248
249        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
250        as expected by the user. If the query needs more RAM than this value, the server may abort
251        the query with a memory exhaustion. The higher this size, the more probably the server
252        rejects the query before executing it.
253        """
254        return float(self._settings["maxsize"]) // 1024.0 // 1024.0
255
256    @maxsize_mib.setter
257    def maxsize_mib(self, value: float) -> None:
258        if value <= 0.0:
259            msg = "maxsize_mib must be > 0.0"
260            raise ValueError(msg)
261        self._settings["maxsize"] = int(value * 1024.0 * 1024.0)
262
263    @property
264    def timeout_secs(self) -> int:
265        """
266        The current value of the [timeout:*] setting in seconds.
267
268        This duration is the maximum allowed runtime for the query in seconds, as expected by the
269        user. If the query runs longer than this time, the server may abort the query. The higher
270        this duration, the more probably the server rejects the query before executing it.
271        """
272        return int(self._settings["timeout"])
273
274    @timeout_secs.setter
275    def timeout_secs(self, value: int) -> None:
276        if value < 1:
277            msg = "timeout_secs must be >= 1"
278            raise ValueError(msg)
279        self._settings["timeout"] = value
280
281    @property
282    def run_timeout_secs(self) -> float | None:
283        """
284        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
285
286        Defaults to no timeout.
287
288        The client will raise a ``GiveupError`` if the timeout is reached.
289
290        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
291        that limits a single query execution time. Instead, this value can be used to limit the
292        total client-side time spent on this query (see ``Client.run_query``).
293        """
294        return self._run_timeout_secs
295
296    @run_timeout_secs.setter
297    def run_timeout_secs(self, value: float | None) -> None:
298        if value is not None and value <= 0.0:
299            msg = "run_timeout_secs must be > 0"
300            raise ValueError(msg)
301        self._run_timeout_secs = value
302
303    @property
304    def run_timeout_elapsed(self) -> bool:
305        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
306        return (
307            self.run_timeout_secs is not None
308            and self.run_duration_secs is not None
309            and self.run_timeout_secs < self.run_duration_secs
310        )
311
312    @property
313    def request_timeout(self) -> "RequestTimeout":
314        """Request timeout settings for this query."""
315        return self._request_timeout
316
317    @request_timeout.setter
318    def request_timeout(self, value: "RequestTimeout") -> None:
319        self._request_timeout = value
320
321    def _code(self) -> str:
322        # TODO doc
323        # TODO refactor? this function might do a bit too much
324        # TODO needs tests
325        settings_copy = self._settings.copy()
326
327        max_timeout = settings_copy["timeout"]
328
329        # if a run timeout is set, the remaining time is the max query timeout we will use
330        if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs):
331            max_timeout = math.ceil(time_max - time_so_far)
332            if max_timeout <= 0:
333                raise GiveupError(kwargs=self.kwargs, after_secs=time_so_far)
334
335        # if we already had a query that exceeded a timeout that is >= that max timeout,
336        # we might as well give up already
337        if (min_needed := self._max_timeout_secs_exceeded) and min_needed >= max_timeout:
338            self._logger.error(f"give up on {self} since query will likely time out")
339            raise GiveupError(kwargs=self.kwargs, after_secs=self.run_duration_secs or 0.0)
340
341        # pick the timeout we will use for the next try
342        next_timeout_secs_used = min(settings_copy["timeout"], max_timeout)
343
344        # log if had to override the timeout setting with "max_timeout"
345        if next_timeout_secs_used != settings_copy["timeout"]:
346            settings_copy["timeout"] = next_timeout_secs_used
347            self._logger.info(f"adjust timeout to {next_timeout_secs_used}s")
348
349        # update the used timeout in state
350        self._last_timeout_secs_used = next_timeout_secs_used
351
352        # remove the original settings statement
353        code = _SETTING_PATTERN.sub("", self._input_code)
354
355        # put the adjusted settings in front
356        settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";"
357        return f"{settings}\n{code}"
358
359    @property
360    def cache_key(self) -> str:
361        """
362        Hash QL code, and return its digest as hexadecimal string.
363
364        The default query runner uses this as cache key.
365        """
366        # Remove the original settings statement
367        code = _SETTING_PATTERN.sub("", self._input_code)
368        hasher = hashlib.blake2b(digest_size=8)
369        hasher.update(code.encode("utf-8"))
370        return hasher.hexdigest()
371
372    @property
373    def done(self) -> bool:
374        """Returns ``True`` if the result set was received."""
375        return self._response is not None
376
377    @property
378    def request_duration_secs(self) -> float | None:
379        """
380        How long it took to fetch the result set in seconds.
381
382        This is the duration starting with the API request, and ending once
383        the result is written to this query object. Although it depends on how busy
384        the API instance is, this can give some indication of how long a query takes.
385
386        Returns:
387            the duration or ``None`` if there is no result set yet, or when it was cached.
388        """
389        if self._response is None or self.was_cached:
390            return None
391
392        assert self._time_end_try is not None
393        assert self._time_start_req is not None
394
395        return self._time_end_try - self._time_start_req
396
397    @property
398    def run_duration_secs(self) -> float | None:
399        """
400        The total required time for this query in seconds (so far).
401
402        Returns:
403            the duration or ``None`` if there is no result set yet, or when it was cached.
404        """
405        if self._time_start is None:
406            return None
407
408        if self._time_end_try:
409            return self._time_end_try - self._time_start
410
411        return self._time_start.elapsed_secs_since
412
413    @property
414    def api_version(self) -> str | None:
415        """
416        The Overpass API version used by the queried instance.
417
418        Returns:
419            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
420            has not successfully finished (yet)
421
422        References:
423            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
424        """
425        if self._response is None:
426            return None
427
428        return self._response["generator"]
429
430    @property
431    def timestamp_osm(self) -> datetime | None:
432        """
433        All OSM edits that have been uploaded before this date are included.
434
435        It can take a couple of minutes for changes to the database to show up in the
436        Overpass API query results.
437
438        Returns:
439            the timestamp, or ``None`` if the query has not successfully finished (yet)
440        """
441        if self._response is None:
442            return None
443
444        date_str = self._response["osm3s"]["timestamp_osm_base"]
445        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
446
447    @property
448    def timestamp_areas(self) -> datetime | None:
449        """
450        All area data edits that have been uploaded before this date are included.
451
452        If the query involves area data processing, this is the date of the latest edit
453        that has been considered in the most recent batch run of the area generation.
454
455        Returns:
456            the timestamp, or ``None`` if the query has not successfully finished (yet), or
457            if it does not involve area data processing.
458        """
459        if self._response is None:
460            return None
461
462        date_str = self._response["osm3s"].get("timestamp_areas_base")
463        if not date_str:
464            return None
465
466        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
467
468    @property
469    def copyright(self) -> str:
470        """A copyright notice that comes with the result set."""
471        if self._response is None:
472            return _COPYRIGHT
473
474        return self._response["osm3s"].get("copyright") or _COPYRIGHT
475
476    def __str__(self) -> str:
477        query = f"query{self.kwargs!r}"
478
479        size = self.response_size_mib
480        time_request = self.request_duration_secs
481        time_total = self.run_duration_secs
482
483        if self.nb_tries == 0:
484            details = "pending"
485        elif self.done:
486            if self.nb_tries == 1:
487                details = f"done - {size:.01f}mb in {time_request:.01f}s"
488            else:
489                details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s"
490        else:
491            t = "try" if self.nb_tries == 1 else "tries"
492            details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s"
493
494        return f"{query} ({details})"
495
496    def __repr__(self) -> str:
497        cls_name = type(self).__name__
498
499        details = {
500            "kwargs": self._kwargs,
501            "done": self.done,
502        }
503
504        if self.nb_tries == 0 or self.error:
505            details["tries"] = self.nb_tries
506
507        if self.error:
508            details["error"] = type(self.error).__name__
509
510        if self.done:
511            details["response_size"] = f"{self.response_size_mib:.02f}mb"
512
513            if not self.was_cached:
514                details["request_duration"] = f"{self.request_duration_secs:.02f}s"
515
516        if self.nb_tries > 0:
517            details["run_duration"] = f"{self.run_duration_secs:.02f}s"
518
519        details_str = ", ".join((f"{k}={v!r}" for k, v in details.items()))
520
521        return f"{cls_name}({details_str})"
522
523    def _mutator(self) -> "_QueryMutator":
524        return _QueryMutator(self)
525
526
527class _QueryMutator:
528    __slots__ = ("_query",)
529
530    def __init__(self, query: Query) -> None:
531        self._query = query
532
533    def begin_try(self) -> None:
534        if self._query._time_start is None:
535            self._query._time_start = _Instant.now()
536
537        self._query._time_start_try = _Instant.now()
538        self._query._time_start_req = None
539        self._query._time_end_try = None
540
541    def begin_request(self) -> None:
542        self._query._time_start_req = _Instant.now()
543
544    def succeed_try(self, response: dict, response_bytes: int) -> None:
545        self._query._time_end_try = _Instant.now()
546        self._query._response = response
547        self._query._response_bytes = response_bytes
548        self._query._error = None
549
550    def fail_try(self, err: ClientError) -> None:
551        self._query._error = err
552
553        if is_exceeding_timeout(err):
554            assert self._query._last_timeout_secs_used
555            self._query._max_timeout_secs_exceeded = self._query._last_timeout_secs_used
556
557    def end_try(self) -> None:
558        self._query._nb_tries += 1
559
560
561@dataclass(kw_only=True, slots=True, frozen=True, repr=False, order=True)
562class _Instant:
563    """
564    Measurement of a monotonic clock.
565
566    Attributes:
567        when: the current time, according to the event loop's internal monotonic clock
568              (details are unspecified and may differ per event loop).
569    """
570
571    when: float
572
573    @classmethod
574    def now(cls) -> "_Instant":
575        return cls(when=asyncio.get_event_loop().time())
576
577    @property
578    def ceil(self) -> int:
579        return math.ceil(self.when)
580
581    @property
582    def elapsed_secs_since(self) -> float:
583        return asyncio.get_event_loop().time() - self.when
584
585    def __sub__(self, earlier: "_Instant") -> float:
586        if self.when < earlier.when:
587            msg = f"{self} is earlier than {earlier}"
588            raise ValueError(msg)
589        return self.when - earlier.when
590
591    def __repr__(self) -> str:
592        return f"{type(self).__name__}({self.when:.02f})"
593
594
595@dataclass(kw_only=True, slots=True)
596class RequestTimeout:
597    """
598    Request timeout settings.
599
600    Attributes:
601        total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]``
602                                  setting is used as timeout duration of the entire request,
603                                  including connection establishment, request sending and response
604                                  reading (``aiohttp.ClientTimeout.total``).
605                                  Defaults to 20 seconds.
606        sock_connect_secs: The maximum number of seconds allowed for pure socket connection
607                           establishment (same as ``aiohttp.ClientTimeout.sock_connect``).
608        each_sock_read_secs: The maximum number of seconds allowed for the period between reading
609                             a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``).
610    """
611
612    total_without_query_secs: float | None = 20.0
613    sock_connect_secs: float | None = None
614    each_sock_read_secs: float | None = None
615
616    def __post_init__(self) -> None:
617        if self.total_without_query_secs is not None and self.total_without_query_secs <= 0.0:
618            msg = "'total_without_query_secs' has to be > 0"
619            raise ValueError(msg)
620
621        if self.sock_connect_secs is not None and self.sock_connect_secs <= 0.0:
622            msg = "'sock_connect_secs' has to be > 0"
623            raise ValueError(msg)
624
625        if self.each_sock_read_secs is not None and self.each_sock_read_secs <= 0.0:
626            msg = "'each_sock_read_secs' has to be > 0"
627            raise ValueError(msg)
628
629
630class QueryRunner(ABC):
631    """
632    A query runner is an async function that is called before a client makes an API request.
633
634    Query runners can be used to…
635     - …retry queries when they fail
636     - …modify queries, f.e. to lower/increase their maxsize/timeout
637     - …log query results & errors
638     - …implement caching
639
640    The absolute minimum a query runner function has to do is to simply return to (re)try
641    a query, or to raise ``query.error`` to stop trying.
642    """
643
644    __slots__ = ()
645
646    @abstractmethod
647    async def __call__(self, query: Query) -> None:
648        """Called with the current query state before the client makes an API request."""
649        pass
650
651
652class DefaultQueryRunner(QueryRunner):
653    """
654    The default query runner.
655
656    This runner…
657     - …retries with an increasing back-off period in between tries if the server is too busy
658     - …retries and doubles timeout and maxsize settings if they were exceeded
659     - …limits the number of tries
660     - …optionally caches query results in temp files
661
662    This runner does *not* lower timeout and maxsize settings if the server rejected a query.
663
664    Args:
665        max_tries: The maximum number of times a query is tried. (5 by default)
666        cache_ttl_secs: Amount of seconds a query's result set is cached for.
667                        Set to zero to disable caching. (zero by default)
668    """
669
670    __slots__ = (
671        "_max_tries",
672        "_cache_ttl_secs",
673    )
674
675    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
676        if max_tries < 1:
677            msg = "max_tries must be >= 1"
678            raise ValueError(msg)
679
680        if cache_ttl_secs < 0:
681            msg = "cache_ttl_secs must be >= 0"
682            raise ValueError(msg)
683
684        self._max_tries = max_tries
685        self._cache_ttl_secs = cache_ttl_secs
686
687    def _is_caching(self, query: Query) -> bool:
688        if self._cache_ttl_secs and _FORCE_DISABLE_CACHE:
689            query.logger.debug("caching is forced disabled")
690            return False
691        return self._cache_ttl_secs > 0
692
693    def _cache_read(self, query: Query) -> None:
694        logger = query.logger
695
696        now = int(time.time())
697
698        file_name = f"{query.cache_key}.json"
699        file_path = Path(tempfile.gettempdir()) / file_name
700
701        if not file_path.exists():
702            logger.info("result was not cached")
703            logger.debug(f"checked for cache at {file_path}")
704            return
705
706        try:
707            with Path(file_path).open(mode="r", encoding="utf-8") as file:
708                response = json.load(file)
709        except (OSError, json.JSONDecodeError):
710            logger.exception(f"failed to read cached {query}")
711            return
712
713        if response.get(_EXPIRATION_KEY, 0) <= now:
714            logger.info(f"{query} cache expired")
715            return
716
717        query._response = response
718        logger.info(f"{query} was cached")
719
720    def _cache_write(self, query: Query) -> None:
721        logger = query.logger
722
723        now = int(time.time())
724
725        assert query._response is not None
726        query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs
727
728        file_name = f"{query.cache_key}.json"
729        file_path = Path(tempfile.gettempdir()) / file_name
730
731        logger.debug(f"caching at {file_path}…")
732
733        try:
734            with Path(file_path).open(mode="w", encoding="utf-8") as file:
735                json.dump(query._response, file)
736        except OSError:
737            logger.exception(f"failed to cache {query}")
738
739    async def __call__(self, query: Query) -> None:
740        """Called with the current query state before the client makes an API request."""
741        logger = query.logger
742
743        # Check cache ahead of first try
744        if query.nb_tries == 0 and self._is_caching(query):
745            await asyncio.to_thread(self._cache_read, query)
746
747        # Success or cached
748        if query.done:
749            logger.info(f"{query}")
750            if not query.was_cached and self._is_caching(query):
751                await asyncio.to_thread(self._cache_write, query)
752            return
753
754        err = query.error
755
756        if err:
757            logger.info(f"try for query{query.kwargs!r} failed: {err}")
758
759        if is_server_error(err):
760            logger.error(f"unexpected response body:\n{err.body}")
761
762        # Do not retry if we exhausted all tries, when a retry would not change the result,
763        # or when the timeout was reached.
764        if err and (query.nb_tries == self._max_tries or not err.should_retry):
765            logger.error(f"give up on {query}", exc_info=err)
766            raise err
767
768        if is_rejection(err):
769            # Wait if the server is too busy.
770            if err.cause == QueryRejectCause.TOO_BUSY:
771                backoff = _fibo_backoff_secs(query.nb_tries)
772                logger.info(f"retry {query} in {backoff:.1f}s")
773                await asyncio.sleep(backoff)
774
775            # Wait until a slot opens if the rate limit was exceeded.
776            elif err.cause == QueryRejectCause.TOO_MANY_QUERIES:
777                pass  # let client enforce cooldown
778
779            # Double timeout if exceeded.
780            elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT:
781                old = f"{query.timeout_secs:.1f}s"
782                query.timeout_secs *= 2
783                new = f"{query.timeout_secs:.1f}s"
784                logger.info(f"increased [timeout:*] for {query} from {old} to {new}")
785
786            # Double maxsize if exceeded.
787            elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE:
788                old = f"{query.maxsize_mib:.1f}mib"
789                query.maxsize_mib *= 2
790                new = f"{query.maxsize_mib:.1f}mib"
791                logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")
792
793
794def _fibo_backoff_secs(tries: int) -> float:
795    """Fibonacci sequence without zero: 1, 1, 2, 3, 5, 8, etc."""
796    a, b = 1.0, 1.0
797
798    for _ in range(tries):
799        a, b = b, a + b
800
801    return a
802
803
804def __cache_delete(query: Query) -> None:
805    """Clear a response cached by the default runner (only to be used in tests)."""
806    file_name = f"{query.cache_key}.json"
807    file_path = Path(tempfile.gettempdir()) / file_name
808    file_path.unlink(missing_ok=True)
809
810
811def __cache_expire(query: Query) -> None:
812    """Clear a response cached by the default runner (only to be used in tests)."""
813    file_name = f"{query.cache_key}.json"
814    file_path = Path(tempfile.gettempdir()) / file_name
815
816    with Path(file_path).open(mode="r", encoding="utf-8") as file:
817        response = json.load(file)
818
819    response[_EXPIRATION_KEY] = 0
820
821    with Path(file_path).open(mode="w", encoding="utf-8") as file:
822        json.dump(response, file)
823
824
825_EXPIRATION_KEY = "__expiration__"
826_IS_CI = os.getenv("GITHUB_ACTIONS") == "true"
827_IS_UNIT_TEST = "pytest" in sys.modules
828_FORCE_DISABLE_CACHE = _IS_CI and not _IS_UNIT_TEST
class Query:
 59class Query:
 60    """
 61    State of a query that is either pending, running, successful, or failed.
 62
 63    Args:
 64        input_code: The input Overpass QL code. Note that some settings might be changed
 65                    by query runners, notably the 'timeout' and 'maxsize' settings.
 66        logger: The logger to use for all logging output related to this query.
 67        **kwargs: Additional keyword arguments that can be used to identify queries.
 68
 69    References:
 70        - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL
 71    """
 72
 73    __slots__ = (
 74        "_error",
 75        "_input_code",
 76        "_kwargs",
 77        "_last_timeout_secs_used",
 78        "_logger",
 79        "_max_timeout_secs_exceeded",
 80        "_nb_tries",
 81        "_request_timeout",
 82        "_response",
 83        "_response_bytes",
 84        "_run_timeout_secs",
 85        "_settings",
 86        "_time_end_try",
 87        "_time_start",
 88        "_time_start_req",
 89        "_time_start_try",
 90    )
 91
 92    def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None:
 93        self._input_code = input_code
 94        """the original given overpass ql code"""
 95
 96        self._logger = logger
 97        """logger to use for this query"""
 98
 99        self._kwargs = kwargs
100        """used to identify this query"""
101
102        self._settings = dict(_SETTING_PATTERN.findall(input_code))
103        """all overpass ql settings [k:v];"""
104
105        self._settings["out"] = "json"
106
107        if "maxsize" not in self._settings:
108            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
109
110        if "timeout" not in self._settings:
111            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
112
113        self._run_timeout_secs: float | None = None
114        """total time limit for running this query"""
115
116        self._request_timeout: RequestTimeout = RequestTimeout()
117        """config for request timeouts"""
118
119        self._error: ClientError | None = None
120        """error of the last try, or None"""
121
122        self._response: dict | None = None
123        """response JSON as a dict, or None"""
124
125        self._response_bytes = 0.0
126        """number of bytes in a response, or zero"""
127
128        self._nb_tries = 0
129        """number of tries so far, starting at zero"""
130
131        self._time_start: _Instant | None = None
132        """time prior to executing the first try"""
133
134        self._time_start_try: _Instant | None = None
135        """time prior to executing the most recent try"""
136
137        self._time_start_req: _Instant | None = None
138        """time prior to executing the most recent try's query request"""
139
140        self._time_end_try: _Instant | None = None
141        """time the most recent try finished"""
142
143        self._last_timeout_secs_used: int | None = None
144        """the last used 'timeout' setting"""
145
146        self._max_timeout_secs_exceeded: int | None = None
147        """the largest 'timeout' setting that was exceeded in a try of this query"""
148
149    def reset(self) -> None:
150        """Reset the query to its initial state, ignoring previous tries."""
151        Query.__init__(self, input_code=self._input_code, **self._kwargs)
152
153    @property
154    def input_code(self) -> str:
155        """The original input Overpass QL source code."""
156        return self._input_code
157
158    @property
159    def kwargs(self) -> dict:
160        """
161        Keyword arguments that can be used to identify queries.
162
163        The default query runner will log these values when a query is run.
164        """
165        return self._kwargs
166
167    @property
168    def logger(self) -> logging.Logger:
169        """The logger used for logging output related to this query."""
170        return self._logger
171
172    @property
173    def nb_tries(self) -> int:
174        """Current number of tries."""
175        return self._nb_tries
176
177    @property
178    def error(self) -> ClientError | None:
179        """
180        Error of the most recent try.
181
182        Returns:
183            an error or ``None`` if the query wasn't tried or hasn't failed
184        """
185        return self._error
186
187    @property
188    def response(self) -> dict | None:
189        """
190        The entire JSON response of the query.
191
192        Returns:
193            the response, or ``None`` if the query has not successfully finished (yet)
194        """
195        return self._response
196
197    @property
198    def was_cached(self) -> bool | None:
199        """
200        Indicates whether the query result was cached.
201
202        Returns:
203            ``None`` if the query has not been run yet.
204            ``True`` if the query has a result set with zero tries.
205            ``False`` otherwise.
206        """
207        if self._response is None:
208            return None
209        return self._nb_tries == 0
210
211    @property
212    def result_set(self) -> list[dict] | None:
213        """
214        The result set of the query.
215
216        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
217        You are free to copy, distribute, transmit and adapt this data, as long as you credit
218        OpenStreetMap and its contributors. If you alter or build upon this data, you may
219        distribute the result only under the same licence.
220
221        Returns:
222            the elements of the result set, or ``None`` if the query has not successfully
223            finished (yet)
224
225        References:
226            - https://www.openstreetmap.org/copyright
227            - https://opendatacommons.org/licenses/odbl/1-0/
228        """
229        if not self._response:
230            return None
231        return self._response["elements"]
232
233    @property
234    def response_size_mib(self) -> float | None:
235        """
236        The size of the response in mebibytes.
237
238        Returns:
239            the size, or ``None`` if the query has not successfully finished (yet)
240        """
241        if self._response is None:
242            return None
243        return self._response_bytes / 1024.0 / 1024.0
244
245    @property
246    def maxsize_mib(self) -> float:
247        """
248        The current value of the [maxsize:*] setting in mebibytes.
249
250        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
251        as expected by the user. If the query needs more RAM than this value, the server may abort
252        the query with a memory exhaustion. The higher this size, the more probably the server
253        rejects the query before executing it.
254        """
255        return float(self._settings["maxsize"]) // 1024.0 // 1024.0
256
257    @maxsize_mib.setter
258    def maxsize_mib(self, value: float) -> None:
259        if value <= 0.0:
260            msg = "maxsize_mib must be > 0.0"
261            raise ValueError(msg)
262        self._settings["maxsize"] = int(value * 1024.0 * 1024.0)
263
264    @property
265    def timeout_secs(self) -> int:
266        """
267        The current value of the [timeout:*] setting in seconds.
268
269        This duration is the maximum allowed runtime for the query in seconds, as expected by the
270        user. If the query runs longer than this time, the server may abort the query. The higher
271        this duration, the more probably the server rejects the query before executing it.
272        """
273        return int(self._settings["timeout"])
274
275    @timeout_secs.setter
276    def timeout_secs(self, value: int) -> None:
277        if value < 1:
278            msg = "timeout_secs must be >= 1"
279            raise ValueError(msg)
280        self._settings["timeout"] = value
281
282    @property
283    def run_timeout_secs(self) -> float | None:
284        """
285        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
286
287        Defaults to no timeout.
288
289        The client will raise a ``GiveupError`` if the timeout is reached.
290
291        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
292        that limits a single query execution time. Instead, this value can be used to limit the
293        total client-side time spent on this query (see ``Client.run_query``).
294        """
295        return self._run_timeout_secs
296
297    @run_timeout_secs.setter
298    def run_timeout_secs(self, value: float | None) -> None:
299        if value is not None and value <= 0.0:
300            msg = "run_timeout_secs must be > 0"
301            raise ValueError(msg)
302        self._run_timeout_secs = value
303
304    @property
305    def run_timeout_elapsed(self) -> bool:
306        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
307        return (
308            self.run_timeout_secs is not None
309            and self.run_duration_secs is not None
310            and self.run_timeout_secs < self.run_duration_secs
311        )
312
313    @property
314    def request_timeout(self) -> "RequestTimeout":
315        """Request timeout settings for this query."""
316        return self._request_timeout
317
318    @request_timeout.setter
319    def request_timeout(self, value: "RequestTimeout") -> None:
320        self._request_timeout = value
321
322    def _code(self) -> str:
323        # TODO doc
324        # TODO refactor? this function might do a bit too much
325        # TODO needs tests
326        settings_copy = self._settings.copy()
327
328        max_timeout = settings_copy["timeout"]
329
330        # if a run timeout is set, the remaining time is the max query timeout we will use
331        if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs):
332            max_timeout = math.ceil(time_max - time_so_far)
333            if max_timeout <= 0:
334                raise GiveupError(kwargs=self.kwargs, after_secs=time_so_far)
335
336        # if we already had a query that exceeded a timeout that is >= that max timeout,
337        # we might as well give up already
338        if (min_needed := self._max_timeout_secs_exceeded) and min_needed >= max_timeout:
339            self._logger.error(f"give up on {self} since query will likely time out")
340            raise GiveupError(kwargs=self.kwargs, after_secs=self.run_duration_secs or 0.0)
341
342        # pick the timeout we will use for the next try
343        next_timeout_secs_used = min(settings_copy["timeout"], max_timeout)
344
345        # log if had to override the timeout setting with "max_timeout"
346        if next_timeout_secs_used != settings_copy["timeout"]:
347            settings_copy["timeout"] = next_timeout_secs_used
348            self._logger.info(f"adjust timeout to {next_timeout_secs_used}s")
349
350        # update the used timeout in state
351        self._last_timeout_secs_used = next_timeout_secs_used
352
353        # remove the original settings statement
354        code = _SETTING_PATTERN.sub("", self._input_code)
355
356        # put the adjusted settings in front
357        settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";"
358        return f"{settings}\n{code}"
359
360    @property
361    def cache_key(self) -> str:
362        """
363        Hash QL code, and return its digest as hexadecimal string.
364
365        The default query runner uses this as cache key.
366        """
367        # Remove the original settings statement
368        code = _SETTING_PATTERN.sub("", self._input_code)
369        hasher = hashlib.blake2b(digest_size=8)
370        hasher.update(code.encode("utf-8"))
371        return hasher.hexdigest()
372
373    @property
374    def done(self) -> bool:
375        """Returns ``True`` if the result set was received."""
376        return self._response is not None
377
378    @property
379    def request_duration_secs(self) -> float | None:
380        """
381        How long it took to fetch the result set in seconds.
382
383        This is the duration starting with the API request, and ending once
384        the result is written to this query object. Although it depends on how busy
385        the API instance is, this can give some indication of how long a query takes.
386
387        Returns:
388            the duration or ``None`` if there is no result set yet, or when it was cached.
389        """
390        if self._response is None or self.was_cached:
391            return None
392
393        assert self._time_end_try is not None
394        assert self._time_start_req is not None
395
396        return self._time_end_try - self._time_start_req
397
398    @property
399    def run_duration_secs(self) -> float | None:
400        """
401        The total required time for this query in seconds (so far).
402
403        Returns:
404            the duration or ``None`` if there is no result set yet, or when it was cached.
405        """
406        if self._time_start is None:
407            return None
408
409        if self._time_end_try:
410            return self._time_end_try - self._time_start
411
412        return self._time_start.elapsed_secs_since
413
414    @property
415    def api_version(self) -> str | None:
416        """
417        The Overpass API version used by the queried instance.
418
419        Returns:
420            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
421            has not successfully finished (yet)
422
423        References:
424            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
425        """
426        if self._response is None:
427            return None
428
429        return self._response["generator"]
430
431    @property
432    def timestamp_osm(self) -> datetime | None:
433        """
434        All OSM edits that have been uploaded before this date are included.
435
436        It can take a couple of minutes for changes to the database to show up in the
437        Overpass API query results.
438
439        Returns:
440            the timestamp, or ``None`` if the query has not successfully finished (yet)
441        """
442        if self._response is None:
443            return None
444
445        date_str = self._response["osm3s"]["timestamp_osm_base"]
446        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
447
448    @property
449    def timestamp_areas(self) -> datetime | None:
450        """
451        All area data edits that have been uploaded before this date are included.
452
453        If the query involves area data processing, this is the date of the latest edit
454        that has been considered in the most recent batch run of the area generation.
455
456        Returns:
457            the timestamp, or ``None`` if the query has not successfully finished (yet), or
458            if it does not involve area data processing.
459        """
460        if self._response is None:
461            return None
462
463        date_str = self._response["osm3s"].get("timestamp_areas_base")
464        if not date_str:
465            return None
466
467        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
468
469    @property
470    def copyright(self) -> str:
471        """A copyright notice that comes with the result set."""
472        if self._response is None:
473            return _COPYRIGHT
474
475        return self._response["osm3s"].get("copyright") or _COPYRIGHT
476
477    def __str__(self) -> str:
478        query = f"query{self.kwargs!r}"
479
480        size = self.response_size_mib
481        time_request = self.request_duration_secs
482        time_total = self.run_duration_secs
483
484        if self.nb_tries == 0:
485            details = "pending"
486        elif self.done:
487            if self.nb_tries == 1:
488                details = f"done - {size:.01f}mb in {time_request:.01f}s"
489            else:
490                details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s"
491        else:
492            t = "try" if self.nb_tries == 1 else "tries"
493            details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s"
494
495        return f"{query} ({details})"
496
497    def __repr__(self) -> str:
498        cls_name = type(self).__name__
499
500        details = {
501            "kwargs": self._kwargs,
502            "done": self.done,
503        }
504
505        if self.nb_tries == 0 or self.error:
506            details["tries"] = self.nb_tries
507
508        if self.error:
509            details["error"] = type(self.error).__name__
510
511        if self.done:
512            details["response_size"] = f"{self.response_size_mib:.02f}mb"
513
514            if not self.was_cached:
515                details["request_duration"] = f"{self.request_duration_secs:.02f}s"
516
517        if self.nb_tries > 0:
518            details["run_duration"] = f"{self.run_duration_secs:.02f}s"
519
520        details_str = ", ".join((f"{k}={v!r}" for k, v in details.items()))
521
522        return f"{cls_name}({details_str})"
523
524    def _mutator(self) -> "_QueryMutator":
525        return _QueryMutator(self)

State of a query that is either pending, running, successful, or failed.

Arguments:
  • input_code: The input Overpass QL code. Note that some settings might be changed by query runners, notably the 'timeout' and 'maxsize' settings.
  • logger: The logger to use for all logging output related to this query.
  • **kwargs: Additional keyword arguments that can be used to identify queries.
References:
Query( input_code: str, logger: logging.Logger = <RootLogger root (WARNING)>, **kwargs)
 92    def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None:
 93        self._input_code = input_code
 94        """the original given overpass ql code"""
 95
 96        self._logger = logger
 97        """logger to use for this query"""
 98
 99        self._kwargs = kwargs
100        """used to identify this query"""
101
102        self._settings = dict(_SETTING_PATTERN.findall(input_code))
103        """all overpass ql settings [k:v];"""
104
105        self._settings["out"] = "json"
106
107        if "maxsize" not in self._settings:
108            self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024
109
110        if "timeout" not in self._settings:
111            self._settings["timeout"] = DEFAULT_TIMEOUT_SECS
112
113        self._run_timeout_secs: float | None = None
114        """total time limit for running this query"""
115
116        self._request_timeout: RequestTimeout = RequestTimeout()
117        """config for request timeouts"""
118
119        self._error: ClientError | None = None
120        """error of the last try, or None"""
121
122        self._response: dict | None = None
123        """response JSON as a dict, or None"""
124
125        self._response_bytes = 0.0
126        """number of bytes in a response, or zero"""
127
128        self._nb_tries = 0
129        """number of tries so far, starting at zero"""
130
131        self._time_start: _Instant | None = None
132        """time prior to executing the first try"""
133
134        self._time_start_try: _Instant | None = None
135        """time prior to executing the most recent try"""
136
137        self._time_start_req: _Instant | None = None
138        """time prior to executing the most recent try's query request"""
139
140        self._time_end_try: _Instant | None = None
141        """time the most recent try finished"""
142
143        self._last_timeout_secs_used: int | None = None
144        """the last used 'timeout' setting"""
145
146        self._max_timeout_secs_exceeded: int | None = None
147        """the largest 'timeout' setting that was exceeded in a try of this query"""
def reset(self) -> None:
149    def reset(self) -> None:
150        """Reset the query to its initial state, ignoring previous tries."""
151        Query.__init__(self, input_code=self._input_code, **self._kwargs)

Reset the query to its initial state, ignoring previous tries.

input_code: str
153    @property
154    def input_code(self) -> str:
155        """The original input Overpass QL source code."""
156        return self._input_code

The original input Overpass QL source code.

kwargs: dict
158    @property
159    def kwargs(self) -> dict:
160        """
161        Keyword arguments that can be used to identify queries.
162
163        The default query runner will log these values when a query is run.
164        """
165        return self._kwargs

Keyword arguments that can be used to identify queries.

The default query runner will log these values when a query is run.

logger: logging.Logger
167    @property
168    def logger(self) -> logging.Logger:
169        """The logger used for logging output related to this query."""
170        return self._logger

The logger used for logging output related to this query.

nb_tries: int
172    @property
173    def nb_tries(self) -> int:
174        """Current number of tries."""
175        return self._nb_tries

Current number of tries.

error: aio_overpass.error.ClientError | None
177    @property
178    def error(self) -> ClientError | None:
179        """
180        Error of the most recent try.
181
182        Returns:
183            an error or ``None`` if the query wasn't tried or hasn't failed
184        """
185        return self._error

Error of the most recent try.

Returns:

an error or None if the query wasn't tried or hasn't failed

response: dict | None
187    @property
188    def response(self) -> dict | None:
189        """
190        The entire JSON response of the query.
191
192        Returns:
193            the response, or ``None`` if the query has not successfully finished (yet)
194        """
195        return self._response

The entire JSON response of the query.

Returns:

the response, or None if the query has not successfully finished (yet)

was_cached: bool | None
197    @property
198    def was_cached(self) -> bool | None:
199        """
200        Indicates whether the query result was cached.
201
202        Returns:
203            ``None`` if the query has not been run yet.
204            ``True`` if the query has a result set with zero tries.
205            ``False`` otherwise.
206        """
207        if self._response is None:
208            return None
209        return self._nb_tries == 0

Indicates whether the query result was cached.

Returns:

None if the query has not been run yet. True if the query has a result set with zero tries. False otherwise.

result_set: list[dict] | None
211    @property
212    def result_set(self) -> list[dict] | None:
213        """
214        The result set of the query.
215
216        This is open data, licensed under the Open Data Commons Open Database License (ODbL).
217        You are free to copy, distribute, transmit and adapt this data, as long as you credit
218        OpenStreetMap and its contributors. If you alter or build upon this data, you may
219        distribute the result only under the same licence.
220
221        Returns:
222            the elements of the result set, or ``None`` if the query has not successfully
223            finished (yet)
224
225        References:
226            - https://www.openstreetmap.org/copyright
227            - https://opendatacommons.org/licenses/odbl/1-0/
228        """
229        if not self._response:
230            return None
231        return self._response["elements"]

The result set of the query.

This is open data, licensed under the Open Data Commons Open Database License (ODbL). You are free to copy, distribute, transmit and adapt this data, as long as you credit OpenStreetMap and its contributors. If you alter or build upon this data, you may distribute the result only under the same licence.

Returns:

the elements of the result set, or None if the query has not successfully finished (yet)

References:
response_size_mib: float | None
233    @property
234    def response_size_mib(self) -> float | None:
235        """
236        The size of the response in mebibytes.
237
238        Returns:
239            the size, or ``None`` if the query has not successfully finished (yet)
240        """
241        if self._response is None:
242            return None
243        return self._response_bytes / 1024.0 / 1024.0

The size of the response in mebibytes.

Returns:

the size, or None if the query has not successfully finished (yet)

maxsize_mib: float
245    @property
246    def maxsize_mib(self) -> float:
247        """
248        The current value of the [maxsize:*] setting in mebibytes.
249
250        This size indicates the maximum allowed memory for the query in bytes RAM on the server,
251        as expected by the user. If the query needs more RAM than this value, the server may abort
252        the query with a memory exhaustion. The higher this size, the more probably the server
253        rejects the query before executing it.
254        """
255        return float(self._settings["maxsize"]) // 1024.0 // 1024.0

The current value of the [maxsize:*] setting in mebibytes.

This size indicates the maximum allowed memory for the query in bytes RAM on the server, as expected by the user. If the query needs more RAM than this value, the server may abort the query with a memory exhaustion. The higher this size, the more probably the server rejects the query before executing it.

timeout_secs: int
264    @property
265    def timeout_secs(self) -> int:
266        """
267        The current value of the [timeout:*] setting in seconds.
268
269        This duration is the maximum allowed runtime for the query in seconds, as expected by the
270        user. If the query runs longer than this time, the server may abort the query. The higher
271        this duration, the more probably the server rejects the query before executing it.
272        """
273        return int(self._settings["timeout"])

The current value of the [timeout:*] setting in seconds.

This duration is the maximum allowed runtime for the query in seconds, as expected by the user. If the query runs longer than this time, the server may abort the query. The higher this duration, the more probably the server rejects the query before executing it.

run_timeout_secs: float | None
282    @property
283    def run_timeout_secs(self) -> float | None:
284        """
285        A limit to ``run_duration_secs``, that cancels running the query when exceeded.
286
287        Defaults to no timeout.
288
289        The client will raise a ``GiveupError`` if the timeout is reached.
290
291        Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance,
292        that limits a single query execution time. Instead, this value can be used to limit the
293        total client-side time spent on this query (see ``Client.run_query``).
294        """
295        return self._run_timeout_secs

A limit to run_duration_secs, that cancels running the query when exceeded.

Defaults to no timeout.

The client will raise a GiveupError if the timeout is reached.

Not to be confused with timeout_secs, which is a setting for the Overpass API instance, that limits a single query execution time. Instead, this value can be used to limit the total client-side time spent on this query (see Client.run_query).

run_timeout_elapsed: bool
304    @property
305    def run_timeout_elapsed(self) -> bool:
306        """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed."""
307        return (
308            self.run_timeout_secs is not None
309            and self.run_duration_secs is not None
310            and self.run_timeout_secs < self.run_duration_secs
311        )

Returns True if run_timeout_secs is set and has elapsed.

request_timeout: RequestTimeout
313    @property
314    def request_timeout(self) -> "RequestTimeout":
315        """Request timeout settings for this query."""
316        return self._request_timeout

Request timeout settings for this query.

cache_key: str
360    @property
361    def cache_key(self) -> str:
362        """
363        Hash QL code, and return its digest as hexadecimal string.
364
365        The default query runner uses this as cache key.
366        """
367        # Remove the original settings statement
368        code = _SETTING_PATTERN.sub("", self._input_code)
369        hasher = hashlib.blake2b(digest_size=8)
370        hasher.update(code.encode("utf-8"))
371        return hasher.hexdigest()

Hash QL code, and return its digest as hexadecimal string.

The default query runner uses this as cache key.

done: bool
373    @property
374    def done(self) -> bool:
375        """Returns ``True`` if the result set was received."""
376        return self._response is not None

Returns True if the result set was received.

request_duration_secs: float | None
378    @property
379    def request_duration_secs(self) -> float | None:
380        """
381        How long it took to fetch the result set in seconds.
382
383        This is the duration starting with the API request, and ending once
384        the result is written to this query object. Although it depends on how busy
385        the API instance is, this can give some indication of how long a query takes.
386
387        Returns:
388            the duration or ``None`` if there is no result set yet, or when it was cached.
389        """
390        if self._response is None or self.was_cached:
391            return None
392
393        assert self._time_end_try is not None
394        assert self._time_start_req is not None
395
396        return self._time_end_try - self._time_start_req

How long it took to fetch the result set in seconds.

This is the duration starting with the API request, and ending once the result is written to this query object. Although it depends on how busy the API instance is, this can give some indication of how long a query takes.

Returns:

the duration or None if there is no result set yet, or when it was cached.

run_duration_secs: float | None
398    @property
399    def run_duration_secs(self) -> float | None:
400        """
401        The total required time for this query in seconds (so far).
402
403        Returns:
404            the duration or ``None`` if there is no result set yet, or when it was cached.
405        """
406        if self._time_start is None:
407            return None
408
409        if self._time_end_try:
410            return self._time_end_try - self._time_start
411
412        return self._time_start.elapsed_secs_since

The total required time for this query in seconds (so far).

Returns:

the duration or None if there is no result set yet, or when it was cached.

api_version: str | None
414    @property
415    def api_version(self) -> str | None:
416        """
417        The Overpass API version used by the queried instance.
418
419        Returns:
420            f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query
421            has not successfully finished (yet)
422
423        References:
424            - https://wiki.openstreetmap.org/wiki/Overpass_API/versions
425        """
426        if self._response is None:
427            return None
428
429        return self._response["generator"]

The Overpass API version used by the queried instance.

Returns:

f.e. "Overpass API 0.7.56.8 7d656e78", or None if the query has not successfully finished (yet)

References:
timestamp_osm: datetime.datetime | None
431    @property
432    def timestamp_osm(self) -> datetime | None:
433        """
434        All OSM edits that have been uploaded before this date are included.
435
436        It can take a couple of minutes for changes to the database to show up in the
437        Overpass API query results.
438
439        Returns:
440            the timestamp, or ``None`` if the query has not successfully finished (yet)
441        """
442        if self._response is None:
443            return None
444
445        date_str = self._response["osm3s"]["timestamp_osm_base"]
446        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)

All OSM edits that have been uploaded before this date are included.

It can take a couple of minutes for changes to the database to show up in the Overpass API query results.

Returns:

the timestamp, or None if the query has not successfully finished (yet)

timestamp_areas: datetime.datetime | None
448    @property
449    def timestamp_areas(self) -> datetime | None:
450        """
451        All area data edits that have been uploaded before this date are included.
452
453        If the query involves area data processing, this is the date of the latest edit
454        that has been considered in the most recent batch run of the area generation.
455
456        Returns:
457            the timestamp, or ``None`` if the query has not successfully finished (yet), or
458            if it does not involve area data processing.
459        """
460        if self._response is None:
461            return None
462
463        date_str = self._response["osm3s"].get("timestamp_areas_base")
464        if not date_str:
465            return None
466
467        return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)

All area data edits that have been uploaded before this date are included.

If the query involves area data processing, this is the date of the latest edit that has been considered in the most recent batch run of the area generation.

Returns:

the timestamp, or None if the query has not successfully finished (yet), or if it does not involve area data processing.

copyright: str
469    @property
470    def copyright(self) -> str:
471        """A copyright notice that comes with the result set."""
472        if self._response is None:
473            return _COPYRIGHT
474
475        return self._response["osm3s"].get("copyright") or _COPYRIGHT

A copyright notice that comes with the result set.

class QueryRunner(abc.ABC):
631class QueryRunner(ABC):
632    """
633    A query runner is an async function that is called before a client makes an API request.
634
635    Query runners can be used to…
636     - …retry queries when they fail
637     - …modify queries, f.e. to lower/increase their maxsize/timeout
638     - …log query results & errors
639     - …implement caching
640
641    The absolute minimum a query runner function has to do is to simply return to (re)try
642    a query, or to raise ``query.error`` to stop trying.
643    """
644
645    __slots__ = ()
646
647    @abstractmethod
648    async def __call__(self, query: Query) -> None:
649        """Called with the current query state before the client makes an API request."""
650        pass

A query runner is an async function that is called before a client makes an API request.

Query runners can be used to…

  • …retry queries when they fail
  • …modify queries, f.e. to lower/increase their maxsize/timeout
  • …log query results & errors
  • …implement caching

The absolute minimum a query runner function has to do is to simply return to (re)try a query, or to raise query.error to stop trying.

class DefaultQueryRunner(QueryRunner):
653class DefaultQueryRunner(QueryRunner):
654    """
655    The default query runner.
656
657    This runner…
658     - …retries with an increasing back-off period in between tries if the server is too busy
659     - …retries and doubles timeout and maxsize settings if they were exceeded
660     - …limits the number of tries
661     - …optionally caches query results in temp files
662
663    This runner does *not* lower timeout and maxsize settings if the server rejected a query.
664
665    Args:
666        max_tries: The maximum number of times a query is tried. (5 by default)
667        cache_ttl_secs: Amount of seconds a query's result set is cached for.
668                        Set to zero to disable caching. (zero by default)
669    """
670
671    __slots__ = (
672        "_max_tries",
673        "_cache_ttl_secs",
674    )
675
676    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
677        if max_tries < 1:
678            msg = "max_tries must be >= 1"
679            raise ValueError(msg)
680
681        if cache_ttl_secs < 0:
682            msg = "cache_ttl_secs must be >= 0"
683            raise ValueError(msg)
684
685        self._max_tries = max_tries
686        self._cache_ttl_secs = cache_ttl_secs
687
688    def _is_caching(self, query: Query) -> bool:
689        if self._cache_ttl_secs and _FORCE_DISABLE_CACHE:
690            query.logger.debug("caching is forced disabled")
691            return False
692        return self._cache_ttl_secs > 0
693
694    def _cache_read(self, query: Query) -> None:
695        logger = query.logger
696
697        now = int(time.time())
698
699        file_name = f"{query.cache_key}.json"
700        file_path = Path(tempfile.gettempdir()) / file_name
701
702        if not file_path.exists():
703            logger.info("result was not cached")
704            logger.debug(f"checked for cache at {file_path}")
705            return
706
707        try:
708            with Path(file_path).open(mode="r", encoding="utf-8") as file:
709                response = json.load(file)
710        except (OSError, json.JSONDecodeError):
711            logger.exception(f"failed to read cached {query}")
712            return
713
714        if response.get(_EXPIRATION_KEY, 0) <= now:
715            logger.info(f"{query} cache expired")
716            return
717
718        query._response = response
719        logger.info(f"{query} was cached")
720
721    def _cache_write(self, query: Query) -> None:
722        logger = query.logger
723
724        now = int(time.time())
725
726        assert query._response is not None
727        query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs
728
729        file_name = f"{query.cache_key}.json"
730        file_path = Path(tempfile.gettempdir()) / file_name
731
732        logger.debug(f"caching at {file_path}…")
733
734        try:
735            with Path(file_path).open(mode="w", encoding="utf-8") as file:
736                json.dump(query._response, file)
737        except OSError:
738            logger.exception(f"failed to cache {query}")
739
740    async def __call__(self, query: Query) -> None:
741        """Called with the current query state before the client makes an API request."""
742        logger = query.logger
743
744        # Check cache ahead of first try
745        if query.nb_tries == 0 and self._is_caching(query):
746            await asyncio.to_thread(self._cache_read, query)
747
748        # Success or cached
749        if query.done:
750            logger.info(f"{query}")
751            if not query.was_cached and self._is_caching(query):
752                await asyncio.to_thread(self._cache_write, query)
753            return
754
755        err = query.error
756
757        if err:
758            logger.info(f"try for query{query.kwargs!r} failed: {err}")
759
760        if is_server_error(err):
761            logger.error(f"unexpected response body:\n{err.body}")
762
763        # Do not retry if we exhausted all tries, when a retry would not change the result,
764        # or when the timeout was reached.
765        if err and (query.nb_tries == self._max_tries or not err.should_retry):
766            logger.error(f"give up on {query}", exc_info=err)
767            raise err
768
769        if is_rejection(err):
770            # Wait if the server is too busy.
771            if err.cause == QueryRejectCause.TOO_BUSY:
772                backoff = _fibo_backoff_secs(query.nb_tries)
773                logger.info(f"retry {query} in {backoff:.1f}s")
774                await asyncio.sleep(backoff)
775
776            # Wait until a slot opens if the rate limit was exceeded.
777            elif err.cause == QueryRejectCause.TOO_MANY_QUERIES:
778                pass  # let client enforce cooldown
779
780            # Double timeout if exceeded.
781            elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT:
782                old = f"{query.timeout_secs:.1f}s"
783                query.timeout_secs *= 2
784                new = f"{query.timeout_secs:.1f}s"
785                logger.info(f"increased [timeout:*] for {query} from {old} to {new}")
786
787            # Double maxsize if exceeded.
788            elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE:
789                old = f"{query.maxsize_mib:.1f}mib"
790                query.maxsize_mib *= 2
791                new = f"{query.maxsize_mib:.1f}mib"
792                logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")

The default query runner.

This runner…

  • …retries with an increasing back-off period in between tries if the server is too busy
  • …retries and doubles timeout and maxsize settings if they were exceeded
  • …limits the number of tries
  • …optionally caches query results in temp files

This runner does not lower timeout and maxsize settings if the server rejected a query.

Arguments:
  • max_tries: The maximum number of times a query is tried. (5 by default)
  • cache_ttl_secs: Amount of seconds a query's result set is cached for. Set to zero to disable caching. (zero by default)
DefaultQueryRunner(max_tries: int = 5, cache_ttl_secs: int = 0)
676    def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None:
677        if max_tries < 1:
678            msg = "max_tries must be >= 1"
679            raise ValueError(msg)
680
681        if cache_ttl_secs < 0:
682            msg = "cache_ttl_secs must be >= 0"
683            raise ValueError(msg)
684
685        self._max_tries = max_tries
686        self._cache_ttl_secs = cache_ttl_secs
@dataclass(kw_only=True, slots=True)
class RequestTimeout:
596@dataclass(kw_only=True, slots=True)
597class RequestTimeout:
598    """
599    Request timeout settings.
600
601    Attributes:
602        total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]``
603                                  setting is used as timeout duration of the entire request,
604                                  including connection establishment, request sending and response
605                                  reading (``aiohttp.ClientTimeout.total``).
606                                  Defaults to 20 seconds.
607        sock_connect_secs: The maximum number of seconds allowed for pure socket connection
608                           establishment (same as ``aiohttp.ClientTimeout.sock_connect``).
609        each_sock_read_secs: The maximum number of seconds allowed for the period between reading
610                             a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``).
611    """
612
613    total_without_query_secs: float | None = 20.0
614    sock_connect_secs: float | None = None
615    each_sock_read_secs: float | None = None
616
617    def __post_init__(self) -> None:
618        if self.total_without_query_secs is not None and self.total_without_query_secs <= 0.0:
619            msg = "'total_without_query_secs' has to be > 0"
620            raise ValueError(msg)
621
622        if self.sock_connect_secs is not None and self.sock_connect_secs <= 0.0:
623            msg = "'sock_connect_secs' has to be > 0"
624            raise ValueError(msg)
625
626        if self.each_sock_read_secs is not None and self.each_sock_read_secs <= 0.0:
627            msg = "'each_sock_read_secs' has to be > 0"
628            raise ValueError(msg)

Request timeout settings.

Attributes:
  • total_without_query_secs: If set, the sum of this duration and the query's [timeout:*] setting is used as timeout duration of the entire request, including connection establishment, request sending and response reading (aiohttp.ClientTimeout.total). Defaults to 20 seconds.
  • sock_connect_secs: The maximum number of seconds allowed for pure socket connection establishment (same as aiohttp.ClientTimeout.sock_connect).
  • each_sock_read_secs: The maximum number of seconds allowed for the period between reading a new chunk of data (same as aiohttp.ClientTimeout.sock_read).
RequestTimeout( *, total_without_query_secs: float | None = 20.0, sock_connect_secs: float | None = None, each_sock_read_secs: float | None = None)
total_without_query_secs: float | None
sock_connect_secs: float | None
each_sock_read_secs: float | None
DEFAULT_MAXSIZE_MIB = 512

Default maxsize setting in mebibytes.

DEFAULT_TIMEOUT_SECS = 180

Default timeout setting in seconds.