aio_overpass.query
Query state and runner.
1"""Query state and runner.""" 2 3import asyncio 4import hashlib 5import json 6import logging 7import math 8import os 9import re 10import sys 11import tempfile 12import time 13from abc import ABC, abstractmethod 14from dataclasses import dataclass 15from datetime import datetime, timezone 16from pathlib import Path 17 18from aio_overpass.error import ( 19 ClientError, 20 GiveupError, 21 QueryRejectCause, 22 is_exceeding_timeout, 23 is_rejection, 24 is_server_error, 25) 26 27 28__docformat__ = "google" 29__all__ = ( 30 "Query", 31 "QueryRunner", 32 "DefaultQueryRunner", 33 "RequestTimeout", 34 "DEFAULT_MAXSIZE_MIB", 35 "DEFAULT_TIMEOUT_SECS", 36) 37 38 39DEFAULT_MAXSIZE_MIB = 512 40"""Default ``maxsize`` setting in mebibytes.""" 41 42DEFAULT_TIMEOUT_SECS = 180 43"""Default ``timeout`` setting in seconds.""" 44 45_COPYRIGHT = ( 46 "The data included in this document is from www.openstreetmap.org." 47 " The data is made available under ODbL." 48) 49"""This is the same copyright notice included in result sets""" 50 51_SETTING_PATTERN = re.compile(r"\[(\w+?):(.+?)]\s*;?") 52"""A pattern to match setting declarations (not the entire settings statement).""" 53 54_NULL_LOGGER = logging.getLogger() 55_NULL_LOGGER.addHandler(logging.NullHandler()) 56 57 58class Query: 59 """ 60 State of a query that is either pending, running, successful, or failed. 61 62 Args: 63 input_code: The input Overpass QL code. Note that some settings might be changed 64 by query runners, notably the 'timeout' and 'maxsize' settings. 65 logger: The logger to use for all logging output related to this query. 66 **kwargs: Additional keyword arguments that can be used to identify queries. 67 68 References: 69 - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL 70 """ 71 72 __slots__ = ( 73 "_error", 74 "_input_code", 75 "_kwargs", 76 "_last_timeout_secs_used", 77 "_logger", 78 "_max_timeout_secs_exceeded", 79 "_nb_tries", 80 "_request_timeout", 81 "_response", 82 "_response_bytes", 83 "_run_timeout_secs", 84 "_settings", 85 "_time_end_try", 86 "_time_start", 87 "_time_start_req", 88 "_time_start_try", 89 ) 90 91 def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None: 92 self._input_code = input_code 93 """the original given overpass ql code""" 94 95 self._logger = logger 96 """logger to use for this query""" 97 98 self._kwargs = kwargs 99 """used to identify this query""" 100 101 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 102 """all overpass ql settings [k:v];""" 103 104 self._settings["out"] = "json" 105 106 if "maxsize" not in self._settings: 107 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 108 109 if "timeout" not in self._settings: 110 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 111 112 self._run_timeout_secs: float | None = None 113 """total time limit for running this query""" 114 115 self._request_timeout: RequestTimeout = RequestTimeout() 116 """config for request timeouts""" 117 118 self._error: ClientError | None = None 119 """error of the last try, or None""" 120 121 self._response: dict | None = None 122 """response JSON as a dict, or None""" 123 124 self._response_bytes = 0.0 125 """number of bytes in a response, or zero""" 126 127 self._nb_tries = 0 128 """number of tries so far, starting at zero""" 129 130 self._time_start: _Instant | None = None 131 """time prior to executing the first try""" 132 133 self._time_start_try: _Instant | None = None 134 """time prior to executing the most recent try""" 135 136 self._time_start_req: _Instant | None = None 137 """time prior to executing the most recent try's query request""" 138 139 self._time_end_try: _Instant | None = None 140 """time the most recent try finished""" 141 142 self._last_timeout_secs_used: int | None = None 143 """the last used 'timeout' setting""" 144 145 self._max_timeout_secs_exceeded: int | None = None 146 """the largest 'timeout' setting that was exceeded in a try of this query""" 147 148 def reset(self) -> None: 149 """Reset the query to its initial state, ignoring previous tries.""" 150 Query.__init__(self, input_code=self._input_code, **self._kwargs) 151 152 @property 153 def input_code(self) -> str: 154 """The original input Overpass QL source code.""" 155 return self._input_code 156 157 @property 158 def kwargs(self) -> dict: 159 """ 160 Keyword arguments that can be used to identify queries. 161 162 The default query runner will log these values when a query is run. 163 """ 164 return self._kwargs 165 166 @property 167 def logger(self) -> logging.Logger: 168 """The logger used for logging output related to this query.""" 169 return self._logger 170 171 @property 172 def nb_tries(self) -> int: 173 """Current number of tries.""" 174 return self._nb_tries 175 176 @property 177 def error(self) -> ClientError | None: 178 """ 179 Error of the most recent try. 180 181 Returns: 182 an error or ``None`` if the query wasn't tried or hasn't failed 183 """ 184 return self._error 185 186 @property 187 def response(self) -> dict | None: 188 """ 189 The entire JSON response of the query. 190 191 Returns: 192 the response, or ``None`` if the query has not successfully finished (yet) 193 """ 194 return self._response 195 196 @property 197 def was_cached(self) -> bool | None: 198 """ 199 Indicates whether the query result was cached. 200 201 Returns: 202 ``None`` if the query has not been run yet. 203 ``True`` if the query has a result set with zero tries. 204 ``False`` otherwise. 205 """ 206 if self._response is None: 207 return None 208 return self._nb_tries == 0 209 210 @property 211 def result_set(self) -> list[dict] | None: 212 """ 213 The result set of the query. 214 215 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 216 You are free to copy, distribute, transmit and adapt this data, as long as you credit 217 OpenStreetMap and its contributors. If you alter or build upon this data, you may 218 distribute the result only under the same licence. 219 220 Returns: 221 the elements of the result set, or ``None`` if the query has not successfully 222 finished (yet) 223 224 References: 225 - https://www.openstreetmap.org/copyright 226 - https://opendatacommons.org/licenses/odbl/1-0/ 227 """ 228 if not self._response: 229 return None 230 return self._response["elements"] 231 232 @property 233 def response_size_mib(self) -> float | None: 234 """ 235 The size of the response in mebibytes. 236 237 Returns: 238 the size, or ``None`` if the query has not successfully finished (yet) 239 """ 240 if self._response is None: 241 return None 242 return self._response_bytes / 1024.0 / 1024.0 243 244 @property 245 def maxsize_mib(self) -> float: 246 """ 247 The current value of the [maxsize:*] setting in mebibytes. 248 249 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 250 as expected by the user. If the query needs more RAM than this value, the server may abort 251 the query with a memory exhaustion. The higher this size, the more probably the server 252 rejects the query before executing it. 253 """ 254 return float(self._settings["maxsize"]) // 1024.0 // 1024.0 255 256 @maxsize_mib.setter 257 def maxsize_mib(self, value: float) -> None: 258 if value <= 0.0: 259 msg = "maxsize_mib must be > 0.0" 260 raise ValueError(msg) 261 self._settings["maxsize"] = int(value * 1024.0 * 1024.0) 262 263 @property 264 def timeout_secs(self) -> int: 265 """ 266 The current value of the [timeout:*] setting in seconds. 267 268 This duration is the maximum allowed runtime for the query in seconds, as expected by the 269 user. If the query runs longer than this time, the server may abort the query. The higher 270 this duration, the more probably the server rejects the query before executing it. 271 """ 272 return int(self._settings["timeout"]) 273 274 @timeout_secs.setter 275 def timeout_secs(self, value: int) -> None: 276 if value < 1: 277 msg = "timeout_secs must be >= 1" 278 raise ValueError(msg) 279 self._settings["timeout"] = value 280 281 @property 282 def run_timeout_secs(self) -> float | None: 283 """ 284 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 285 286 Defaults to no timeout. 287 288 The client will raise a ``GiveupError`` if the timeout is reached. 289 290 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 291 that limits a single query execution time. Instead, this value can be used to limit the 292 total client-side time spent on this query (see ``Client.run_query``). 293 """ 294 return self._run_timeout_secs 295 296 @run_timeout_secs.setter 297 def run_timeout_secs(self, value: float | None) -> None: 298 if value is not None and value <= 0.0: 299 msg = "run_timeout_secs must be > 0" 300 raise ValueError(msg) 301 self._run_timeout_secs = value 302 303 @property 304 def run_timeout_elapsed(self) -> bool: 305 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 306 return ( 307 self.run_timeout_secs is not None 308 and self.run_duration_secs is not None 309 and self.run_timeout_secs < self.run_duration_secs 310 ) 311 312 @property 313 def request_timeout(self) -> "RequestTimeout": 314 """Request timeout settings for this query.""" 315 return self._request_timeout 316 317 @request_timeout.setter 318 def request_timeout(self, value: "RequestTimeout") -> None: 319 self._request_timeout = value 320 321 def _code(self) -> str: 322 # TODO doc 323 # TODO refactor? this function might do a bit too much 324 # TODO needs tests 325 settings_copy = self._settings.copy() 326 327 max_timeout = settings_copy["timeout"] 328 329 # if a run timeout is set, the remaining time is the max query timeout we will use 330 if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs): 331 max_timeout = math.ceil(time_max - time_so_far) 332 if max_timeout <= 0: 333 raise GiveupError(kwargs=self.kwargs, after_secs=time_so_far) 334 335 # if we already had a query that exceeded a timeout that is >= that max timeout, 336 # we might as well give up already 337 if (min_needed := self._max_timeout_secs_exceeded) and min_needed >= max_timeout: 338 self._logger.error(f"give up on {self} since query will likely time out") 339 raise GiveupError(kwargs=self.kwargs, after_secs=self.run_duration_secs or 0.0) 340 341 # pick the timeout we will use for the next try 342 next_timeout_secs_used = min(settings_copy["timeout"], max_timeout) 343 344 # log if had to override the timeout setting with "max_timeout" 345 if next_timeout_secs_used != settings_copy["timeout"]: 346 settings_copy["timeout"] = next_timeout_secs_used 347 self._logger.info(f"adjust timeout to {next_timeout_secs_used}s") 348 349 # update the used timeout in state 350 self._last_timeout_secs_used = next_timeout_secs_used 351 352 # remove the original settings statement 353 code = _SETTING_PATTERN.sub("", self._input_code) 354 355 # put the adjusted settings in front 356 settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";" 357 return f"{settings}\n{code}" 358 359 @property 360 def cache_key(self) -> str: 361 """ 362 Hash QL code, and return its digest as hexadecimal string. 363 364 The default query runner uses this as cache key. 365 """ 366 # Remove the original settings statement 367 code = _SETTING_PATTERN.sub("", self._input_code) 368 hasher = hashlib.blake2b(digest_size=8) 369 hasher.update(code.encode("utf-8")) 370 return hasher.hexdigest() 371 372 @property 373 def done(self) -> bool: 374 """Returns ``True`` if the result set was received.""" 375 return self._response is not None 376 377 @property 378 def request_duration_secs(self) -> float | None: 379 """ 380 How long it took to fetch the result set in seconds. 381 382 This is the duration starting with the API request, and ending once 383 the result is written to this query object. Although it depends on how busy 384 the API instance is, this can give some indication of how long a query takes. 385 386 Returns: 387 the duration or ``None`` if there is no result set yet, or when it was cached. 388 """ 389 if self._response is None or self.was_cached: 390 return None 391 392 assert self._time_end_try is not None 393 assert self._time_start_req is not None 394 395 return self._time_end_try - self._time_start_req 396 397 @property 398 def run_duration_secs(self) -> float | None: 399 """ 400 The total required time for this query in seconds (so far). 401 402 Returns: 403 the duration or ``None`` if there is no result set yet, or when it was cached. 404 """ 405 if self._time_start is None: 406 return None 407 408 if self._time_end_try: 409 return self._time_end_try - self._time_start 410 411 return self._time_start.elapsed_secs_since 412 413 @property 414 def api_version(self) -> str | None: 415 """ 416 The Overpass API version used by the queried instance. 417 418 Returns: 419 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 420 has not successfully finished (yet) 421 422 References: 423 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 424 """ 425 if self._response is None: 426 return None 427 428 return self._response["generator"] 429 430 @property 431 def timestamp_osm(self) -> datetime | None: 432 """ 433 All OSM edits that have been uploaded before this date are included. 434 435 It can take a couple of minutes for changes to the database to show up in the 436 Overpass API query results. 437 438 Returns: 439 the timestamp, or ``None`` if the query has not successfully finished (yet) 440 """ 441 if self._response is None: 442 return None 443 444 date_str = self._response["osm3s"]["timestamp_osm_base"] 445 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 446 447 @property 448 def timestamp_areas(self) -> datetime | None: 449 """ 450 All area data edits that have been uploaded before this date are included. 451 452 If the query involves area data processing, this is the date of the latest edit 453 that has been considered in the most recent batch run of the area generation. 454 455 Returns: 456 the timestamp, or ``None`` if the query has not successfully finished (yet), or 457 if it does not involve area data processing. 458 """ 459 if self._response is None: 460 return None 461 462 date_str = self._response["osm3s"].get("timestamp_areas_base") 463 if not date_str: 464 return None 465 466 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 467 468 @property 469 def copyright(self) -> str: 470 """A copyright notice that comes with the result set.""" 471 if self._response is None: 472 return _COPYRIGHT 473 474 return self._response["osm3s"].get("copyright") or _COPYRIGHT 475 476 def __str__(self) -> str: 477 query = f"query{self.kwargs!r}" 478 479 size = self.response_size_mib 480 time_request = self.request_duration_secs 481 time_total = self.run_duration_secs 482 483 if self.nb_tries == 0: 484 details = "pending" 485 elif self.done: 486 if self.nb_tries == 1: 487 details = f"done - {size:.01f}mb in {time_request:.01f}s" 488 else: 489 details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s" 490 else: 491 t = "try" if self.nb_tries == 1 else "tries" 492 details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s" 493 494 return f"{query} ({details})" 495 496 def __repr__(self) -> str: 497 cls_name = type(self).__name__ 498 499 details = { 500 "kwargs": self._kwargs, 501 "done": self.done, 502 } 503 504 if self.nb_tries == 0 or self.error: 505 details["tries"] = self.nb_tries 506 507 if self.error: 508 details["error"] = type(self.error).__name__ 509 510 if self.done: 511 details["response_size"] = f"{self.response_size_mib:.02f}mb" 512 513 if not self.was_cached: 514 details["request_duration"] = f"{self.request_duration_secs:.02f}s" 515 516 if self.nb_tries > 0: 517 details["run_duration"] = f"{self.run_duration_secs:.02f}s" 518 519 details_str = ", ".join((f"{k}={v!r}" for k, v in details.items())) 520 521 return f"{cls_name}({details_str})" 522 523 def _mutator(self) -> "_QueryMutator": 524 return _QueryMutator(self) 525 526 527class _QueryMutator: 528 __slots__ = ("_query",) 529 530 def __init__(self, query: Query) -> None: 531 self._query = query 532 533 def begin_try(self) -> None: 534 if self._query._time_start is None: 535 self._query._time_start = _Instant.now() 536 537 self._query._time_start_try = _Instant.now() 538 self._query._time_start_req = None 539 self._query._time_end_try = None 540 541 def begin_request(self) -> None: 542 self._query._time_start_req = _Instant.now() 543 544 def succeed_try(self, response: dict, response_bytes: int) -> None: 545 self._query._time_end_try = _Instant.now() 546 self._query._response = response 547 self._query._response_bytes = response_bytes 548 self._query._error = None 549 550 def fail_try(self, err: ClientError) -> None: 551 self._query._error = err 552 553 if is_exceeding_timeout(err): 554 assert self._query._last_timeout_secs_used 555 self._query._max_timeout_secs_exceeded = self._query._last_timeout_secs_used 556 557 def end_try(self) -> None: 558 self._query._nb_tries += 1 559 560 561@dataclass(kw_only=True, slots=True, frozen=True, repr=False, order=True) 562class _Instant: 563 """ 564 Measurement of a monotonic clock. 565 566 Attributes: 567 when: the current time, according to the event loop's internal monotonic clock 568 (details are unspecified and may differ per event loop). 569 """ 570 571 when: float 572 573 @classmethod 574 def now(cls) -> "_Instant": 575 return cls(when=asyncio.get_event_loop().time()) 576 577 @property 578 def ceil(self) -> int: 579 return math.ceil(self.when) 580 581 @property 582 def elapsed_secs_since(self) -> float: 583 return asyncio.get_event_loop().time() - self.when 584 585 def __sub__(self, earlier: "_Instant") -> float: 586 if self.when < earlier.when: 587 msg = f"{self} is earlier than {earlier}" 588 raise ValueError(msg) 589 return self.when - earlier.when 590 591 def __repr__(self) -> str: 592 return f"{type(self).__name__}({self.when:.02f})" 593 594 595@dataclass(kw_only=True, slots=True) 596class RequestTimeout: 597 """ 598 Request timeout settings. 599 600 Attributes: 601 total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]`` 602 setting is used as timeout duration of the entire request, 603 including connection establishment, request sending and response 604 reading (``aiohttp.ClientTimeout.total``). 605 Defaults to 20 seconds. 606 sock_connect_secs: The maximum number of seconds allowed for pure socket connection 607 establishment (same as ``aiohttp.ClientTimeout.sock_connect``). 608 each_sock_read_secs: The maximum number of seconds allowed for the period between reading 609 a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``). 610 """ 611 612 total_without_query_secs: float | None = 20.0 613 sock_connect_secs: float | None = None 614 each_sock_read_secs: float | None = None 615 616 def __post_init__(self) -> None: 617 if self.total_without_query_secs is not None and self.total_without_query_secs <= 0.0: 618 msg = "'total_without_query_secs' has to be > 0" 619 raise ValueError(msg) 620 621 if self.sock_connect_secs is not None and self.sock_connect_secs <= 0.0: 622 msg = "'sock_connect_secs' has to be > 0" 623 raise ValueError(msg) 624 625 if self.each_sock_read_secs is not None and self.each_sock_read_secs <= 0.0: 626 msg = "'each_sock_read_secs' has to be > 0" 627 raise ValueError(msg) 628 629 630class QueryRunner(ABC): 631 """ 632 A query runner is an async function that is called before a client makes an API request. 633 634 Query runners can be used to… 635 - …retry queries when they fail 636 - …modify queries, f.e. to lower/increase their maxsize/timeout 637 - …log query results & errors 638 - …implement caching 639 640 The absolute minimum a query runner function has to do is to simply return to (re)try 641 a query, or to raise ``query.error`` to stop trying. 642 """ 643 644 __slots__ = () 645 646 @abstractmethod 647 async def __call__(self, query: Query) -> None: 648 """Called with the current query state before the client makes an API request.""" 649 pass 650 651 652class DefaultQueryRunner(QueryRunner): 653 """ 654 The default query runner. 655 656 This runner… 657 - …retries with an increasing back-off period in between tries if the server is too busy 658 - …retries and doubles timeout and maxsize settings if they were exceeded 659 - …limits the number of tries 660 - …optionally caches query results in temp files 661 662 This runner does *not* lower timeout and maxsize settings if the server rejected a query. 663 664 Args: 665 max_tries: The maximum number of times a query is tried. (5 by default) 666 cache_ttl_secs: Amount of seconds a query's result set is cached for. 667 Set to zero to disable caching. (zero by default) 668 """ 669 670 __slots__ = ( 671 "_max_tries", 672 "_cache_ttl_secs", 673 ) 674 675 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 676 if max_tries < 1: 677 msg = "max_tries must be >= 1" 678 raise ValueError(msg) 679 680 if cache_ttl_secs < 0: 681 msg = "cache_ttl_secs must be >= 0" 682 raise ValueError(msg) 683 684 self._max_tries = max_tries 685 self._cache_ttl_secs = cache_ttl_secs 686 687 def _is_caching(self, query: Query) -> bool: 688 if self._cache_ttl_secs and _FORCE_DISABLE_CACHE: 689 query.logger.debug("caching is forced disabled") 690 return False 691 return self._cache_ttl_secs > 0 692 693 def _cache_read(self, query: Query) -> None: 694 logger = query.logger 695 696 now = int(time.time()) 697 698 file_name = f"{query.cache_key}.json" 699 file_path = Path(tempfile.gettempdir()) / file_name 700 701 if not file_path.exists(): 702 logger.info("result was not cached") 703 logger.debug(f"checked for cache at {file_path}") 704 return 705 706 try: 707 with Path(file_path).open(mode="r", encoding="utf-8") as file: 708 response = json.load(file) 709 except (OSError, json.JSONDecodeError): 710 logger.exception(f"failed to read cached {query}") 711 return 712 713 if response.get(_EXPIRATION_KEY, 0) <= now: 714 logger.info(f"{query} cache expired") 715 return 716 717 query._response = response 718 logger.info(f"{query} was cached") 719 720 def _cache_write(self, query: Query) -> None: 721 logger = query.logger 722 723 now = int(time.time()) 724 725 assert query._response is not None 726 query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs 727 728 file_name = f"{query.cache_key}.json" 729 file_path = Path(tempfile.gettempdir()) / file_name 730 731 logger.debug(f"caching at {file_path}…") 732 733 try: 734 with Path(file_path).open(mode="w", encoding="utf-8") as file: 735 json.dump(query._response, file) 736 except OSError: 737 logger.exception(f"failed to cache {query}") 738 739 async def __call__(self, query: Query) -> None: 740 """Called with the current query state before the client makes an API request.""" 741 logger = query.logger 742 743 # Check cache ahead of first try 744 if query.nb_tries == 0 and self._is_caching(query): 745 await asyncio.to_thread(self._cache_read, query) 746 747 # Success or cached 748 if query.done: 749 logger.info(f"{query}") 750 if not query.was_cached and self._is_caching(query): 751 await asyncio.to_thread(self._cache_write, query) 752 return 753 754 err = query.error 755 756 if err: 757 logger.info(f"try for query{query.kwargs!r} failed: {err}") 758 759 if is_server_error(err): 760 logger.error(f"unexpected response body:\n{err.body}") 761 762 # Do not retry if we exhausted all tries, when a retry would not change the result, 763 # or when the timeout was reached. 764 if err and (query.nb_tries == self._max_tries or not err.should_retry): 765 logger.error(f"give up on {query}", exc_info=err) 766 raise err 767 768 if is_rejection(err): 769 # Wait if the server is too busy. 770 if err.cause == QueryRejectCause.TOO_BUSY: 771 backoff = _fibo_backoff_secs(query.nb_tries) 772 logger.info(f"retry {query} in {backoff:.1f}s") 773 await asyncio.sleep(backoff) 774 775 # Wait until a slot opens if the rate limit was exceeded. 776 elif err.cause == QueryRejectCause.TOO_MANY_QUERIES: 777 pass # let client enforce cooldown 778 779 # Double timeout if exceeded. 780 elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT: 781 old = f"{query.timeout_secs:.1f}s" 782 query.timeout_secs *= 2 783 new = f"{query.timeout_secs:.1f}s" 784 logger.info(f"increased [timeout:*] for {query} from {old} to {new}") 785 786 # Double maxsize if exceeded. 787 elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE: 788 old = f"{query.maxsize_mib:.1f}mib" 789 query.maxsize_mib *= 2 790 new = f"{query.maxsize_mib:.1f}mib" 791 logger.info(f"increased [maxsize:*] for {query} from {old} to {new}") 792 793 794def _fibo_backoff_secs(tries: int) -> float: 795 """Fibonacci sequence without zero: 1, 1, 2, 3, 5, 8, etc.""" 796 a, b = 1.0, 1.0 797 798 for _ in range(tries): 799 a, b = b, a + b 800 801 return a 802 803 804def __cache_delete(query: Query) -> None: 805 """Clear a response cached by the default runner (only to be used in tests).""" 806 file_name = f"{query.cache_key}.json" 807 file_path = Path(tempfile.gettempdir()) / file_name 808 file_path.unlink(missing_ok=True) 809 810 811def __cache_expire(query: Query) -> None: 812 """Clear a response cached by the default runner (only to be used in tests).""" 813 file_name = f"{query.cache_key}.json" 814 file_path = Path(tempfile.gettempdir()) / file_name 815 816 with Path(file_path).open(mode="r", encoding="utf-8") as file: 817 response = json.load(file) 818 819 response[_EXPIRATION_KEY] = 0 820 821 with Path(file_path).open(mode="w", encoding="utf-8") as file: 822 json.dump(response, file) 823 824 825_EXPIRATION_KEY = "__expiration__" 826_IS_CI = os.getenv("GITHUB_ACTIONS") == "true" 827_IS_UNIT_TEST = "pytest" in sys.modules 828_FORCE_DISABLE_CACHE = _IS_CI and not _IS_UNIT_TEST
59class Query: 60 """ 61 State of a query that is either pending, running, successful, or failed. 62 63 Args: 64 input_code: The input Overpass QL code. Note that some settings might be changed 65 by query runners, notably the 'timeout' and 'maxsize' settings. 66 logger: The logger to use for all logging output related to this query. 67 **kwargs: Additional keyword arguments that can be used to identify queries. 68 69 References: 70 - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL 71 """ 72 73 __slots__ = ( 74 "_error", 75 "_input_code", 76 "_kwargs", 77 "_last_timeout_secs_used", 78 "_logger", 79 "_max_timeout_secs_exceeded", 80 "_nb_tries", 81 "_request_timeout", 82 "_response", 83 "_response_bytes", 84 "_run_timeout_secs", 85 "_settings", 86 "_time_end_try", 87 "_time_start", 88 "_time_start_req", 89 "_time_start_try", 90 ) 91 92 def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None: 93 self._input_code = input_code 94 """the original given overpass ql code""" 95 96 self._logger = logger 97 """logger to use for this query""" 98 99 self._kwargs = kwargs 100 """used to identify this query""" 101 102 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 103 """all overpass ql settings [k:v];""" 104 105 self._settings["out"] = "json" 106 107 if "maxsize" not in self._settings: 108 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 109 110 if "timeout" not in self._settings: 111 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 112 113 self._run_timeout_secs: float | None = None 114 """total time limit for running this query""" 115 116 self._request_timeout: RequestTimeout = RequestTimeout() 117 """config for request timeouts""" 118 119 self._error: ClientError | None = None 120 """error of the last try, or None""" 121 122 self._response: dict | None = None 123 """response JSON as a dict, or None""" 124 125 self._response_bytes = 0.0 126 """number of bytes in a response, or zero""" 127 128 self._nb_tries = 0 129 """number of tries so far, starting at zero""" 130 131 self._time_start: _Instant | None = None 132 """time prior to executing the first try""" 133 134 self._time_start_try: _Instant | None = None 135 """time prior to executing the most recent try""" 136 137 self._time_start_req: _Instant | None = None 138 """time prior to executing the most recent try's query request""" 139 140 self._time_end_try: _Instant | None = None 141 """time the most recent try finished""" 142 143 self._last_timeout_secs_used: int | None = None 144 """the last used 'timeout' setting""" 145 146 self._max_timeout_secs_exceeded: int | None = None 147 """the largest 'timeout' setting that was exceeded in a try of this query""" 148 149 def reset(self) -> None: 150 """Reset the query to its initial state, ignoring previous tries.""" 151 Query.__init__(self, input_code=self._input_code, **self._kwargs) 152 153 @property 154 def input_code(self) -> str: 155 """The original input Overpass QL source code.""" 156 return self._input_code 157 158 @property 159 def kwargs(self) -> dict: 160 """ 161 Keyword arguments that can be used to identify queries. 162 163 The default query runner will log these values when a query is run. 164 """ 165 return self._kwargs 166 167 @property 168 def logger(self) -> logging.Logger: 169 """The logger used for logging output related to this query.""" 170 return self._logger 171 172 @property 173 def nb_tries(self) -> int: 174 """Current number of tries.""" 175 return self._nb_tries 176 177 @property 178 def error(self) -> ClientError | None: 179 """ 180 Error of the most recent try. 181 182 Returns: 183 an error or ``None`` if the query wasn't tried or hasn't failed 184 """ 185 return self._error 186 187 @property 188 def response(self) -> dict | None: 189 """ 190 The entire JSON response of the query. 191 192 Returns: 193 the response, or ``None`` if the query has not successfully finished (yet) 194 """ 195 return self._response 196 197 @property 198 def was_cached(self) -> bool | None: 199 """ 200 Indicates whether the query result was cached. 201 202 Returns: 203 ``None`` if the query has not been run yet. 204 ``True`` if the query has a result set with zero tries. 205 ``False`` otherwise. 206 """ 207 if self._response is None: 208 return None 209 return self._nb_tries == 0 210 211 @property 212 def result_set(self) -> list[dict] | None: 213 """ 214 The result set of the query. 215 216 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 217 You are free to copy, distribute, transmit and adapt this data, as long as you credit 218 OpenStreetMap and its contributors. If you alter or build upon this data, you may 219 distribute the result only under the same licence. 220 221 Returns: 222 the elements of the result set, or ``None`` if the query has not successfully 223 finished (yet) 224 225 References: 226 - https://www.openstreetmap.org/copyright 227 - https://opendatacommons.org/licenses/odbl/1-0/ 228 """ 229 if not self._response: 230 return None 231 return self._response["elements"] 232 233 @property 234 def response_size_mib(self) -> float | None: 235 """ 236 The size of the response in mebibytes. 237 238 Returns: 239 the size, or ``None`` if the query has not successfully finished (yet) 240 """ 241 if self._response is None: 242 return None 243 return self._response_bytes / 1024.0 / 1024.0 244 245 @property 246 def maxsize_mib(self) -> float: 247 """ 248 The current value of the [maxsize:*] setting in mebibytes. 249 250 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 251 as expected by the user. If the query needs more RAM than this value, the server may abort 252 the query with a memory exhaustion. The higher this size, the more probably the server 253 rejects the query before executing it. 254 """ 255 return float(self._settings["maxsize"]) // 1024.0 // 1024.0 256 257 @maxsize_mib.setter 258 def maxsize_mib(self, value: float) -> None: 259 if value <= 0.0: 260 msg = "maxsize_mib must be > 0.0" 261 raise ValueError(msg) 262 self._settings["maxsize"] = int(value * 1024.0 * 1024.0) 263 264 @property 265 def timeout_secs(self) -> int: 266 """ 267 The current value of the [timeout:*] setting in seconds. 268 269 This duration is the maximum allowed runtime for the query in seconds, as expected by the 270 user. If the query runs longer than this time, the server may abort the query. The higher 271 this duration, the more probably the server rejects the query before executing it. 272 """ 273 return int(self._settings["timeout"]) 274 275 @timeout_secs.setter 276 def timeout_secs(self, value: int) -> None: 277 if value < 1: 278 msg = "timeout_secs must be >= 1" 279 raise ValueError(msg) 280 self._settings["timeout"] = value 281 282 @property 283 def run_timeout_secs(self) -> float | None: 284 """ 285 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 286 287 Defaults to no timeout. 288 289 The client will raise a ``GiveupError`` if the timeout is reached. 290 291 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 292 that limits a single query execution time. Instead, this value can be used to limit the 293 total client-side time spent on this query (see ``Client.run_query``). 294 """ 295 return self._run_timeout_secs 296 297 @run_timeout_secs.setter 298 def run_timeout_secs(self, value: float | None) -> None: 299 if value is not None and value <= 0.0: 300 msg = "run_timeout_secs must be > 0" 301 raise ValueError(msg) 302 self._run_timeout_secs = value 303 304 @property 305 def run_timeout_elapsed(self) -> bool: 306 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 307 return ( 308 self.run_timeout_secs is not None 309 and self.run_duration_secs is not None 310 and self.run_timeout_secs < self.run_duration_secs 311 ) 312 313 @property 314 def request_timeout(self) -> "RequestTimeout": 315 """Request timeout settings for this query.""" 316 return self._request_timeout 317 318 @request_timeout.setter 319 def request_timeout(self, value: "RequestTimeout") -> None: 320 self._request_timeout = value 321 322 def _code(self) -> str: 323 # TODO doc 324 # TODO refactor? this function might do a bit too much 325 # TODO needs tests 326 settings_copy = self._settings.copy() 327 328 max_timeout = settings_copy["timeout"] 329 330 # if a run timeout is set, the remaining time is the max query timeout we will use 331 if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs): 332 max_timeout = math.ceil(time_max - time_so_far) 333 if max_timeout <= 0: 334 raise GiveupError(kwargs=self.kwargs, after_secs=time_so_far) 335 336 # if we already had a query that exceeded a timeout that is >= that max timeout, 337 # we might as well give up already 338 if (min_needed := self._max_timeout_secs_exceeded) and min_needed >= max_timeout: 339 self._logger.error(f"give up on {self} since query will likely time out") 340 raise GiveupError(kwargs=self.kwargs, after_secs=self.run_duration_secs or 0.0) 341 342 # pick the timeout we will use for the next try 343 next_timeout_secs_used = min(settings_copy["timeout"], max_timeout) 344 345 # log if had to override the timeout setting with "max_timeout" 346 if next_timeout_secs_used != settings_copy["timeout"]: 347 settings_copy["timeout"] = next_timeout_secs_used 348 self._logger.info(f"adjust timeout to {next_timeout_secs_used}s") 349 350 # update the used timeout in state 351 self._last_timeout_secs_used = next_timeout_secs_used 352 353 # remove the original settings statement 354 code = _SETTING_PATTERN.sub("", self._input_code) 355 356 # put the adjusted settings in front 357 settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";" 358 return f"{settings}\n{code}" 359 360 @property 361 def cache_key(self) -> str: 362 """ 363 Hash QL code, and return its digest as hexadecimal string. 364 365 The default query runner uses this as cache key. 366 """ 367 # Remove the original settings statement 368 code = _SETTING_PATTERN.sub("", self._input_code) 369 hasher = hashlib.blake2b(digest_size=8) 370 hasher.update(code.encode("utf-8")) 371 return hasher.hexdigest() 372 373 @property 374 def done(self) -> bool: 375 """Returns ``True`` if the result set was received.""" 376 return self._response is not None 377 378 @property 379 def request_duration_secs(self) -> float | None: 380 """ 381 How long it took to fetch the result set in seconds. 382 383 This is the duration starting with the API request, and ending once 384 the result is written to this query object. Although it depends on how busy 385 the API instance is, this can give some indication of how long a query takes. 386 387 Returns: 388 the duration or ``None`` if there is no result set yet, or when it was cached. 389 """ 390 if self._response is None or self.was_cached: 391 return None 392 393 assert self._time_end_try is not None 394 assert self._time_start_req is not None 395 396 return self._time_end_try - self._time_start_req 397 398 @property 399 def run_duration_secs(self) -> float | None: 400 """ 401 The total required time for this query in seconds (so far). 402 403 Returns: 404 the duration or ``None`` if there is no result set yet, or when it was cached. 405 """ 406 if self._time_start is None: 407 return None 408 409 if self._time_end_try: 410 return self._time_end_try - self._time_start 411 412 return self._time_start.elapsed_secs_since 413 414 @property 415 def api_version(self) -> str | None: 416 """ 417 The Overpass API version used by the queried instance. 418 419 Returns: 420 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 421 has not successfully finished (yet) 422 423 References: 424 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 425 """ 426 if self._response is None: 427 return None 428 429 return self._response["generator"] 430 431 @property 432 def timestamp_osm(self) -> datetime | None: 433 """ 434 All OSM edits that have been uploaded before this date are included. 435 436 It can take a couple of minutes for changes to the database to show up in the 437 Overpass API query results. 438 439 Returns: 440 the timestamp, or ``None`` if the query has not successfully finished (yet) 441 """ 442 if self._response is None: 443 return None 444 445 date_str = self._response["osm3s"]["timestamp_osm_base"] 446 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 447 448 @property 449 def timestamp_areas(self) -> datetime | None: 450 """ 451 All area data edits that have been uploaded before this date are included. 452 453 If the query involves area data processing, this is the date of the latest edit 454 that has been considered in the most recent batch run of the area generation. 455 456 Returns: 457 the timestamp, or ``None`` if the query has not successfully finished (yet), or 458 if it does not involve area data processing. 459 """ 460 if self._response is None: 461 return None 462 463 date_str = self._response["osm3s"].get("timestamp_areas_base") 464 if not date_str: 465 return None 466 467 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 468 469 @property 470 def copyright(self) -> str: 471 """A copyright notice that comes with the result set.""" 472 if self._response is None: 473 return _COPYRIGHT 474 475 return self._response["osm3s"].get("copyright") or _COPYRIGHT 476 477 def __str__(self) -> str: 478 query = f"query{self.kwargs!r}" 479 480 size = self.response_size_mib 481 time_request = self.request_duration_secs 482 time_total = self.run_duration_secs 483 484 if self.nb_tries == 0: 485 details = "pending" 486 elif self.done: 487 if self.nb_tries == 1: 488 details = f"done - {size:.01f}mb in {time_request:.01f}s" 489 else: 490 details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s" 491 else: 492 t = "try" if self.nb_tries == 1 else "tries" 493 details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s" 494 495 return f"{query} ({details})" 496 497 def __repr__(self) -> str: 498 cls_name = type(self).__name__ 499 500 details = { 501 "kwargs": self._kwargs, 502 "done": self.done, 503 } 504 505 if self.nb_tries == 0 or self.error: 506 details["tries"] = self.nb_tries 507 508 if self.error: 509 details["error"] = type(self.error).__name__ 510 511 if self.done: 512 details["response_size"] = f"{self.response_size_mib:.02f}mb" 513 514 if not self.was_cached: 515 details["request_duration"] = f"{self.request_duration_secs:.02f}s" 516 517 if self.nb_tries > 0: 518 details["run_duration"] = f"{self.run_duration_secs:.02f}s" 519 520 details_str = ", ".join((f"{k}={v!r}" for k, v in details.items())) 521 522 return f"{cls_name}({details_str})" 523 524 def _mutator(self) -> "_QueryMutator": 525 return _QueryMutator(self)
State of a query that is either pending, running, successful, or failed.
Arguments:
- input_code: The input Overpass QL code. Note that some settings might be changed by query runners, notably the 'timeout' and 'maxsize' settings.
- logger: The logger to use for all logging output related to this query.
- **kwargs: Additional keyword arguments that can be used to identify queries.
References:
92 def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None: 93 self._input_code = input_code 94 """the original given overpass ql code""" 95 96 self._logger = logger 97 """logger to use for this query""" 98 99 self._kwargs = kwargs 100 """used to identify this query""" 101 102 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 103 """all overpass ql settings [k:v];""" 104 105 self._settings["out"] = "json" 106 107 if "maxsize" not in self._settings: 108 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 109 110 if "timeout" not in self._settings: 111 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 112 113 self._run_timeout_secs: float | None = None 114 """total time limit for running this query""" 115 116 self._request_timeout: RequestTimeout = RequestTimeout() 117 """config for request timeouts""" 118 119 self._error: ClientError | None = None 120 """error of the last try, or None""" 121 122 self._response: dict | None = None 123 """response JSON as a dict, or None""" 124 125 self._response_bytes = 0.0 126 """number of bytes in a response, or zero""" 127 128 self._nb_tries = 0 129 """number of tries so far, starting at zero""" 130 131 self._time_start: _Instant | None = None 132 """time prior to executing the first try""" 133 134 self._time_start_try: _Instant | None = None 135 """time prior to executing the most recent try""" 136 137 self._time_start_req: _Instant | None = None 138 """time prior to executing the most recent try's query request""" 139 140 self._time_end_try: _Instant | None = None 141 """time the most recent try finished""" 142 143 self._last_timeout_secs_used: int | None = None 144 """the last used 'timeout' setting""" 145 146 self._max_timeout_secs_exceeded: int | None = None 147 """the largest 'timeout' setting that was exceeded in a try of this query"""
149 def reset(self) -> None: 150 """Reset the query to its initial state, ignoring previous tries.""" 151 Query.__init__(self, input_code=self._input_code, **self._kwargs)
Reset the query to its initial state, ignoring previous tries.
153 @property 154 def input_code(self) -> str: 155 """The original input Overpass QL source code.""" 156 return self._input_code
The original input Overpass QL source code.
158 @property 159 def kwargs(self) -> dict: 160 """ 161 Keyword arguments that can be used to identify queries. 162 163 The default query runner will log these values when a query is run. 164 """ 165 return self._kwargs
Keyword arguments that can be used to identify queries.
The default query runner will log these values when a query is run.
167 @property 168 def logger(self) -> logging.Logger: 169 """The logger used for logging output related to this query.""" 170 return self._logger
The logger used for logging output related to this query.
172 @property 173 def nb_tries(self) -> int: 174 """Current number of tries.""" 175 return self._nb_tries
Current number of tries.
177 @property 178 def error(self) -> ClientError | None: 179 """ 180 Error of the most recent try. 181 182 Returns: 183 an error or ``None`` if the query wasn't tried or hasn't failed 184 """ 185 return self._error
Error of the most recent try.
Returns:
an error or
None
if the query wasn't tried or hasn't failed
187 @property 188 def response(self) -> dict | None: 189 """ 190 The entire JSON response of the query. 191 192 Returns: 193 the response, or ``None`` if the query has not successfully finished (yet) 194 """ 195 return self._response
The entire JSON response of the query.
Returns:
the response, or
None
if the query has not successfully finished (yet)
197 @property 198 def was_cached(self) -> bool | None: 199 """ 200 Indicates whether the query result was cached. 201 202 Returns: 203 ``None`` if the query has not been run yet. 204 ``True`` if the query has a result set with zero tries. 205 ``False`` otherwise. 206 """ 207 if self._response is None: 208 return None 209 return self._nb_tries == 0
Indicates whether the query result was cached.
Returns:
None
if the query has not been run yet.True
if the query has a result set with zero tries.False
otherwise.
211 @property 212 def result_set(self) -> list[dict] | None: 213 """ 214 The result set of the query. 215 216 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 217 You are free to copy, distribute, transmit and adapt this data, as long as you credit 218 OpenStreetMap and its contributors. If you alter or build upon this data, you may 219 distribute the result only under the same licence. 220 221 Returns: 222 the elements of the result set, or ``None`` if the query has not successfully 223 finished (yet) 224 225 References: 226 - https://www.openstreetmap.org/copyright 227 - https://opendatacommons.org/licenses/odbl/1-0/ 228 """ 229 if not self._response: 230 return None 231 return self._response["elements"]
The result set of the query.
This is open data, licensed under the Open Data Commons Open Database License (ODbL). You are free to copy, distribute, transmit and adapt this data, as long as you credit OpenStreetMap and its contributors. If you alter or build upon this data, you may distribute the result only under the same licence.
Returns:
the elements of the result set, or
None
if the query has not successfully finished (yet)
References:
233 @property 234 def response_size_mib(self) -> float | None: 235 """ 236 The size of the response in mebibytes. 237 238 Returns: 239 the size, or ``None`` if the query has not successfully finished (yet) 240 """ 241 if self._response is None: 242 return None 243 return self._response_bytes / 1024.0 / 1024.0
The size of the response in mebibytes.
Returns:
the size, or
None
if the query has not successfully finished (yet)
245 @property 246 def maxsize_mib(self) -> float: 247 """ 248 The current value of the [maxsize:*] setting in mebibytes. 249 250 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 251 as expected by the user. If the query needs more RAM than this value, the server may abort 252 the query with a memory exhaustion. The higher this size, the more probably the server 253 rejects the query before executing it. 254 """ 255 return float(self._settings["maxsize"]) // 1024.0 // 1024.0
The current value of the [maxsize:*] setting in mebibytes.
This size indicates the maximum allowed memory for the query in bytes RAM on the server, as expected by the user. If the query needs more RAM than this value, the server may abort the query with a memory exhaustion. The higher this size, the more probably the server rejects the query before executing it.
264 @property 265 def timeout_secs(self) -> int: 266 """ 267 The current value of the [timeout:*] setting in seconds. 268 269 This duration is the maximum allowed runtime for the query in seconds, as expected by the 270 user. If the query runs longer than this time, the server may abort the query. The higher 271 this duration, the more probably the server rejects the query before executing it. 272 """ 273 return int(self._settings["timeout"])
The current value of the [timeout:*] setting in seconds.
This duration is the maximum allowed runtime for the query in seconds, as expected by the user. If the query runs longer than this time, the server may abort the query. The higher this duration, the more probably the server rejects the query before executing it.
282 @property 283 def run_timeout_secs(self) -> float | None: 284 """ 285 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 286 287 Defaults to no timeout. 288 289 The client will raise a ``GiveupError`` if the timeout is reached. 290 291 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 292 that limits a single query execution time. Instead, this value can be used to limit the 293 total client-side time spent on this query (see ``Client.run_query``). 294 """ 295 return self._run_timeout_secs
A limit to run_duration_secs
, that cancels running the query when exceeded.
Defaults to no timeout.
The client will raise a GiveupError
if the timeout is reached.
Not to be confused with timeout_secs
, which is a setting for the Overpass API instance,
that limits a single query execution time. Instead, this value can be used to limit the
total client-side time spent on this query (see Client.run_query
).
304 @property 305 def run_timeout_elapsed(self) -> bool: 306 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 307 return ( 308 self.run_timeout_secs is not None 309 and self.run_duration_secs is not None 310 and self.run_timeout_secs < self.run_duration_secs 311 )
Returns True
if run_timeout_secs
is set and has elapsed.
313 @property 314 def request_timeout(self) -> "RequestTimeout": 315 """Request timeout settings for this query.""" 316 return self._request_timeout
Request timeout settings for this query.
360 @property 361 def cache_key(self) -> str: 362 """ 363 Hash QL code, and return its digest as hexadecimal string. 364 365 The default query runner uses this as cache key. 366 """ 367 # Remove the original settings statement 368 code = _SETTING_PATTERN.sub("", self._input_code) 369 hasher = hashlib.blake2b(digest_size=8) 370 hasher.update(code.encode("utf-8")) 371 return hasher.hexdigest()
Hash QL code, and return its digest as hexadecimal string.
The default query runner uses this as cache key.
373 @property 374 def done(self) -> bool: 375 """Returns ``True`` if the result set was received.""" 376 return self._response is not None
Returns True
if the result set was received.
378 @property 379 def request_duration_secs(self) -> float | None: 380 """ 381 How long it took to fetch the result set in seconds. 382 383 This is the duration starting with the API request, and ending once 384 the result is written to this query object. Although it depends on how busy 385 the API instance is, this can give some indication of how long a query takes. 386 387 Returns: 388 the duration or ``None`` if there is no result set yet, or when it was cached. 389 """ 390 if self._response is None or self.was_cached: 391 return None 392 393 assert self._time_end_try is not None 394 assert self._time_start_req is not None 395 396 return self._time_end_try - self._time_start_req
How long it took to fetch the result set in seconds.
This is the duration starting with the API request, and ending once the result is written to this query object. Although it depends on how busy the API instance is, this can give some indication of how long a query takes.
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
398 @property 399 def run_duration_secs(self) -> float | None: 400 """ 401 The total required time for this query in seconds (so far). 402 403 Returns: 404 the duration or ``None`` if there is no result set yet, or when it was cached. 405 """ 406 if self._time_start is None: 407 return None 408 409 if self._time_end_try: 410 return self._time_end_try - self._time_start 411 412 return self._time_start.elapsed_secs_since
The total required time for this query in seconds (so far).
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
414 @property 415 def api_version(self) -> str | None: 416 """ 417 The Overpass API version used by the queried instance. 418 419 Returns: 420 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 421 has not successfully finished (yet) 422 423 References: 424 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 425 """ 426 if self._response is None: 427 return None 428 429 return self._response["generator"]
The Overpass API version used by the queried instance.
Returns:
f.e.
"Overpass API 0.7.56.8 7d656e78"
, orNone
if the query has not successfully finished (yet)
References:
431 @property 432 def timestamp_osm(self) -> datetime | None: 433 """ 434 All OSM edits that have been uploaded before this date are included. 435 436 It can take a couple of minutes for changes to the database to show up in the 437 Overpass API query results. 438 439 Returns: 440 the timestamp, or ``None`` if the query has not successfully finished (yet) 441 """ 442 if self._response is None: 443 return None 444 445 date_str = self._response["osm3s"]["timestamp_osm_base"] 446 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
All OSM edits that have been uploaded before this date are included.
It can take a couple of minutes for changes to the database to show up in the Overpass API query results.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet)
448 @property 449 def timestamp_areas(self) -> datetime | None: 450 """ 451 All area data edits that have been uploaded before this date are included. 452 453 If the query involves area data processing, this is the date of the latest edit 454 that has been considered in the most recent batch run of the area generation. 455 456 Returns: 457 the timestamp, or ``None`` if the query has not successfully finished (yet), or 458 if it does not involve area data processing. 459 """ 460 if self._response is None: 461 return None 462 463 date_str = self._response["osm3s"].get("timestamp_areas_base") 464 if not date_str: 465 return None 466 467 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
All area data edits that have been uploaded before this date are included.
If the query involves area data processing, this is the date of the latest edit that has been considered in the most recent batch run of the area generation.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet), or if it does not involve area data processing.
631class QueryRunner(ABC): 632 """ 633 A query runner is an async function that is called before a client makes an API request. 634 635 Query runners can be used to… 636 - …retry queries when they fail 637 - …modify queries, f.e. to lower/increase their maxsize/timeout 638 - …log query results & errors 639 - …implement caching 640 641 The absolute minimum a query runner function has to do is to simply return to (re)try 642 a query, or to raise ``query.error`` to stop trying. 643 """ 644 645 __slots__ = () 646 647 @abstractmethod 648 async def __call__(self, query: Query) -> None: 649 """Called with the current query state before the client makes an API request.""" 650 pass
A query runner is an async function that is called before a client makes an API request.
Query runners can be used to…
- …retry queries when they fail
- …modify queries, f.e. to lower/increase their maxsize/timeout
- …log query results & errors
- …implement caching
The absolute minimum a query runner function has to do is to simply return to (re)try
a query, or to raise query.error
to stop trying.
653class DefaultQueryRunner(QueryRunner): 654 """ 655 The default query runner. 656 657 This runner… 658 - …retries with an increasing back-off period in between tries if the server is too busy 659 - …retries and doubles timeout and maxsize settings if they were exceeded 660 - …limits the number of tries 661 - …optionally caches query results in temp files 662 663 This runner does *not* lower timeout and maxsize settings if the server rejected a query. 664 665 Args: 666 max_tries: The maximum number of times a query is tried. (5 by default) 667 cache_ttl_secs: Amount of seconds a query's result set is cached for. 668 Set to zero to disable caching. (zero by default) 669 """ 670 671 __slots__ = ( 672 "_max_tries", 673 "_cache_ttl_secs", 674 ) 675 676 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 677 if max_tries < 1: 678 msg = "max_tries must be >= 1" 679 raise ValueError(msg) 680 681 if cache_ttl_secs < 0: 682 msg = "cache_ttl_secs must be >= 0" 683 raise ValueError(msg) 684 685 self._max_tries = max_tries 686 self._cache_ttl_secs = cache_ttl_secs 687 688 def _is_caching(self, query: Query) -> bool: 689 if self._cache_ttl_secs and _FORCE_DISABLE_CACHE: 690 query.logger.debug("caching is forced disabled") 691 return False 692 return self._cache_ttl_secs > 0 693 694 def _cache_read(self, query: Query) -> None: 695 logger = query.logger 696 697 now = int(time.time()) 698 699 file_name = f"{query.cache_key}.json" 700 file_path = Path(tempfile.gettempdir()) / file_name 701 702 if not file_path.exists(): 703 logger.info("result was not cached") 704 logger.debug(f"checked for cache at {file_path}") 705 return 706 707 try: 708 with Path(file_path).open(mode="r", encoding="utf-8") as file: 709 response = json.load(file) 710 except (OSError, json.JSONDecodeError): 711 logger.exception(f"failed to read cached {query}") 712 return 713 714 if response.get(_EXPIRATION_KEY, 0) <= now: 715 logger.info(f"{query} cache expired") 716 return 717 718 query._response = response 719 logger.info(f"{query} was cached") 720 721 def _cache_write(self, query: Query) -> None: 722 logger = query.logger 723 724 now = int(time.time()) 725 726 assert query._response is not None 727 query._response[_EXPIRATION_KEY] = now + self._cache_ttl_secs 728 729 file_name = f"{query.cache_key}.json" 730 file_path = Path(tempfile.gettempdir()) / file_name 731 732 logger.debug(f"caching at {file_path}…") 733 734 try: 735 with Path(file_path).open(mode="w", encoding="utf-8") as file: 736 json.dump(query._response, file) 737 except OSError: 738 logger.exception(f"failed to cache {query}") 739 740 async def __call__(self, query: Query) -> None: 741 """Called with the current query state before the client makes an API request.""" 742 logger = query.logger 743 744 # Check cache ahead of first try 745 if query.nb_tries == 0 and self._is_caching(query): 746 await asyncio.to_thread(self._cache_read, query) 747 748 # Success or cached 749 if query.done: 750 logger.info(f"{query}") 751 if not query.was_cached and self._is_caching(query): 752 await asyncio.to_thread(self._cache_write, query) 753 return 754 755 err = query.error 756 757 if err: 758 logger.info(f"try for query{query.kwargs!r} failed: {err}") 759 760 if is_server_error(err): 761 logger.error(f"unexpected response body:\n{err.body}") 762 763 # Do not retry if we exhausted all tries, when a retry would not change the result, 764 # or when the timeout was reached. 765 if err and (query.nb_tries == self._max_tries or not err.should_retry): 766 logger.error(f"give up on {query}", exc_info=err) 767 raise err 768 769 if is_rejection(err): 770 # Wait if the server is too busy. 771 if err.cause == QueryRejectCause.TOO_BUSY: 772 backoff = _fibo_backoff_secs(query.nb_tries) 773 logger.info(f"retry {query} in {backoff:.1f}s") 774 await asyncio.sleep(backoff) 775 776 # Wait until a slot opens if the rate limit was exceeded. 777 elif err.cause == QueryRejectCause.TOO_MANY_QUERIES: 778 pass # let client enforce cooldown 779 780 # Double timeout if exceeded. 781 elif err.cause == QueryRejectCause.EXCEEDED_TIMEOUT: 782 old = f"{query.timeout_secs:.1f}s" 783 query.timeout_secs *= 2 784 new = f"{query.timeout_secs:.1f}s" 785 logger.info(f"increased [timeout:*] for {query} from {old} to {new}") 786 787 # Double maxsize if exceeded. 788 elif err.cause == QueryRejectCause.EXCEEDED_MAXSIZE: 789 old = f"{query.maxsize_mib:.1f}mib" 790 query.maxsize_mib *= 2 791 new = f"{query.maxsize_mib:.1f}mib" 792 logger.info(f"increased [maxsize:*] for {query} from {old} to {new}")
The default query runner.
This runner…
- …retries with an increasing back-off period in between tries if the server is too busy
- …retries and doubles timeout and maxsize settings if they were exceeded
- …limits the number of tries
- …optionally caches query results in temp files
This runner does not lower timeout and maxsize settings if the server rejected a query.
Arguments:
- max_tries: The maximum number of times a query is tried. (5 by default)
- cache_ttl_secs: Amount of seconds a query's result set is cached for. Set to zero to disable caching. (zero by default)
676 def __init__(self, max_tries: int = 5, cache_ttl_secs: int = 0) -> None: 677 if max_tries < 1: 678 msg = "max_tries must be >= 1" 679 raise ValueError(msg) 680 681 if cache_ttl_secs < 0: 682 msg = "cache_ttl_secs must be >= 0" 683 raise ValueError(msg) 684 685 self._max_tries = max_tries 686 self._cache_ttl_secs = cache_ttl_secs
596@dataclass(kw_only=True, slots=True) 597class RequestTimeout: 598 """ 599 Request timeout settings. 600 601 Attributes: 602 total_without_query_secs: If set, the sum of this duration and the query's ``[timeout:*]`` 603 setting is used as timeout duration of the entire request, 604 including connection establishment, request sending and response 605 reading (``aiohttp.ClientTimeout.total``). 606 Defaults to 20 seconds. 607 sock_connect_secs: The maximum number of seconds allowed for pure socket connection 608 establishment (same as ``aiohttp.ClientTimeout.sock_connect``). 609 each_sock_read_secs: The maximum number of seconds allowed for the period between reading 610 a new chunk of data (same as ``aiohttp.ClientTimeout.sock_read``). 611 """ 612 613 total_without_query_secs: float | None = 20.0 614 sock_connect_secs: float | None = None 615 each_sock_read_secs: float | None = None 616 617 def __post_init__(self) -> None: 618 if self.total_without_query_secs is not None and self.total_without_query_secs <= 0.0: 619 msg = "'total_without_query_secs' has to be > 0" 620 raise ValueError(msg) 621 622 if self.sock_connect_secs is not None and self.sock_connect_secs <= 0.0: 623 msg = "'sock_connect_secs' has to be > 0" 624 raise ValueError(msg) 625 626 if self.each_sock_read_secs is not None and self.each_sock_read_secs <= 0.0: 627 msg = "'each_sock_read_secs' has to be > 0" 628 raise ValueError(msg)
Request timeout settings.
Attributes:
- total_without_query_secs: If set, the sum of this duration and the query's
[timeout:*]
setting is used as timeout duration of the entire request, including connection establishment, request sending and response reading (aiohttp.ClientTimeout.total
). Defaults to 20 seconds. - sock_connect_secs: The maximum number of seconds allowed for pure socket connection
establishment (same as
aiohttp.ClientTimeout.sock_connect
). - each_sock_read_secs: The maximum number of seconds allowed for the period between reading
a new chunk of data (same as
aiohttp.ClientTimeout.sock_read
).
Default maxsize
setting in mebibytes.
Default timeout
setting in seconds.