aio_overpass
Async client for the Overpass API.
Usage
There are three basic steps to fetch the spatial data you need:
Formulate a query
- Either write your own custom query, f.e.
Query("node(5369192667); out;")
, - or use one of the
Query
subclasses, f.e.SingleRouteQuery(relation_id=1643324)
.
- Either write your own custom query, f.e.
Call the Overpass API
- Prepare your client with
client = Client(user_agent=...)
. - Use
await client.run_query(query)
to fetch the result set.
- Prepare your client with
Collect results
- Either access the raw result dictionaries with
query.result_set
, - or use a collector, f.e.
collect_elements(query)
to get a list of typedElements
. - Collectors are often specific to queries -
collect_routes
requires aRouteQuery
, for instance.
- Either access the raw result dictionaries with
Example: looking up a building in Hamburg
a) Results as Dictionaries
You may use the .result_set
property to get a list of all query results
without any extra processing:
from aio_overpass import Client, Query
query = Query('way["addr:housename"=Elbphilharmonie]; out geom;')
client = Client()
await client.run_query(query)
query.result_set
[
{
"type": "way",
"id": 24981342,
# ...
"tags": {
"addr:city": "Hamburg",
"addr:country": "DE",
"addr:housename": "Elbphilharmonie",
# ...
},
}
]
b) Results as Objects
This will give you a user-friendly Python interface
for nodes,
ways,
and relations.
Here we use the .tags
property:
from aio_overpass.element import collect_elements
elems = collect_elements(query)
elems[0].tags
{
"addr:city": "Hamburg",
"addr:country": "DE",
"addr:housename": "Elbphilharmonie",
# ...
}
c) Results as GeoJSON
The processed elements can also easily be converted to GeoJSON:
import json
json.dumps(elems[0].geojson, indent=4)
{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
9.9832434,
53.5415472
],
...
]
]
},
"properties": {
"id": 24981342,
"type": "way",
"tags": {
"addr:city": "Hamburg",
"addr:country": "DE",
"addr:housename": "Elbphilharmonie",
...
},
...
},
"bbox": [
9.9832434,
53.540877,
9.9849674
53.5416212,
]
}
Choosing Extras
This library can be installed with a number of optional extras.
Install no extras, if you're fine with
dict
result sets.Install the
shapely
extra, if you would like the convenience of typed OSM elements. It is also useful if you are interested in elements' geometries, and either already use Shapely, or want a simple way to export GeoJSON.- This includes the
pt
module to make it easier to interact with public transportation routes. Something seemingly trivial like listing the stops of a route can have unexpected pitfalls, since stops can have multiple route members, and may have a range of different tags and roles. This submodule will clean up the relation data for you.
- This includes the
Install the
networkx
extra to enable thept_ordered
module, if you want a route's path as a simple line from A to B. It is hard to do this consistently, mainly because ways are not always ordered, and stop positions might be missing. You can benefit from this submodule if you wish to- render a route's path between any two stops
- measure the route's travelled distance between any two stops
- validate the order of ways in the relation
- check if the route relation has gaps
Install the
joblib
extra to speed uppt_ordered.collect_ordered_routes()
, which can benefit greatly from parallelization.
Coordinates
- Geographic point locations are expressed by latitude (
lat
) and longitude (lon
) coordinates.- Latitude is given as an angle that ranges from –90° at the south pole to 90° at the north pole, with 0° at the Equator.
- Longitude is given as an angle ranging from 0° at the Prime Meridian (the line that divides the globe into Eastern and Western hemispheres), to +180° eastward and −180° westward.
lat/lon
values arefloats
that are exactly those degrees, just without the ° sign.
- This might help you remember which coordinate is which:
- If you think of a world map, usually it’s a rectangle.
- The long side (the largest side) is the longitude.
- Longitude is the x-axis, and latitude is the y-axis.
- Be wary of coordinate order:
- OpenStreetMap uses the WGS84 spatial reference system used by the Global Positioning System (GPS).
- OpenStreetMap node coordinates have seven decimal places, which gives them centimetric precision. However, the position accuracy of GPS data is only about 10m. A reasonable display accuracy could be five places, which is precise to 1.1 metres at the equator.
- Spatial features that cross the 180th meridian are
problematic,
since you go from longitude
180.0
to-180.0
. Such features usually have their geometries split up, like the area of Russia.
1""" 2Async client for the Overpass API. 3 4[Release Notes](https://github.com/timwie/aio-overpass/blob/main/RELEASES.md) 5 6[Examples](https://github.com/timwie/aio-overpass/tree/main/examples) 7""" 8 9import importlib.metadata 10from pathlib import Path 11 12 13__version__: str = importlib.metadata.version("aio-overpass") 14 15# we add this to all modules for pdoc; 16# see https://pdoc.dev/docs/pdoc.html#use-numpydoc-or-google-docstrings 17__docformat__ = "google" 18 19# we also use __all__ in all modules for pdoc; this lets us control the order 20__all__ = ( 21 "__version__", 22 "Client", 23 "ClientError", 24 "Query", 25 "client", 26 "element", # pyright: ignore[reportUnsupportedDunderAll] 27 "error", 28 "pt", # pyright: ignore[reportUnsupportedDunderAll] 29 "pt_ordered", # pyright: ignore[reportUnsupportedDunderAll] 30 "ql", # pyright: ignore[reportUnsupportedDunderAll] 31 "query", 32) 33 34from .client import Client 35from .error import ClientError 36from .query import Query 37 38 39# extend the module's docstring 40for filename in ("usage.md", "extras.md", "coordinates.md"): 41 __doc__ += "\n<br>\n" 42 __doc__ += (Path(__file__).parent / "doc" / filename).read_text()
79class Client: 80 """ 81 A client for the Overpass API. 82 83 Requests are rate-limited according to the configured number of slots per IP for the specified 84 API server. By default, queries are retried whenever the server is too busy, or the rate limit 85 was exceeded. Custom query runners can be used to implement your own retry strategy. 86 87 Args: 88 url: The url of an Overpass API instance. Defaults to the main Overpass API instance. 89 user_agent: A string used for the User-Agent header. It is good practice to provide a string 90 that identifies your application, and includes a way to contact you (f.e. an 91 e-mail, or a link to a repository). This is important if you make too many 92 requests, or queries that require a lot of resources. 93 concurrency: The maximum number of simultaneous connections. In practice the amount 94 of concurrent queries may be limited by the number of slots it provides for 95 each IP. 96 status_timeout_secs: If set, status requests to the Overpass API will time out after 97 this duration in seconds. Defaults to no timeout. 98 runner: You can provide another query runner if you want to implement your own retry 99 strategy. 100 101 References: 102 - https://wiki.openstreetmap.org/wiki/Overpass_API#Public_Overpass_API_instances 103 """ 104 105 __slots__ = ( 106 "_concurrency", 107 "_maybe_session", 108 "_runner", 109 "_status_timeout_secs", 110 "_url", 111 "_user_agent", 112 ) 113 114 def __init__( 115 self, 116 url: str = DEFAULT_INSTANCE, 117 user_agent: str = DEFAULT_USER_AGENT, 118 concurrency: int = 32, 119 status_timeout_secs: float | None = None, 120 runner: QueryRunner | None = None, 121 ) -> None: 122 if concurrency <= 0: 123 msg = "'concurrency' must be > 0" 124 raise ValueError(msg) 125 if status_timeout_secs is not None and status_timeout_secs <= 0.0: 126 msg = "'status_timeout_secs' must be > 0" 127 raise ValueError(msg) 128 129 self._url = url 130 self._user_agent = user_agent 131 self._concurrency = concurrency 132 self._status_timeout_secs = status_timeout_secs 133 self._runner = runner or DefaultQueryRunner() 134 135 self._maybe_session: aiohttp.ClientSession | None = None 136 137 def _session(self) -> aiohttp.ClientSession: 138 """The session used for all requests of this client.""" 139 if not self._maybe_session or self._maybe_session.closed: 140 headers = {"User-Agent": self._user_agent} 141 connector = aiohttp.TCPConnector(limit=self._concurrency) 142 self._maybe_session = aiohttp.ClientSession(headers=headers, connector=connector) 143 144 return self._maybe_session 145 146 async def close(self) -> None: 147 """Cancel all running queries and close the underlying session.""" 148 if self._maybe_session and not self._maybe_session.closed: 149 # do not care if this fails 150 with suppress(CallError): 151 _ = await self.cancel_queries() 152 153 # is raised when there are still active queries. that's ok 154 with suppress(aiohttp.ServerDisconnectedError): 155 await self._maybe_session.close() 156 157 async def _status(self, timeout: ClientTimeout | None = None) -> "Status": 158 endpoint = urljoin(self._url, "status") 159 timeout = timeout or aiohttp.ClientTimeout(total=self._status_timeout_secs) 160 async with ( 161 _map_request_error(timeout), 162 self._session().get(url=endpoint, timeout=timeout) as response, 163 ): 164 return await _parse_status(response) 165 166 async def status(self) -> Status: 167 """ 168 Check the current API status. 169 170 The timeout of this request is configured with the ``status_timeout_secs`` argument. 171 172 Raises: 173 ClientError: if the status could not be looked up 174 """ 175 return await self._status() 176 177 async def cancel_queries(self, timeout_secs: float | None = None) -> int: 178 """ 179 Cancel all running queries. 180 181 This can be used to terminate runaway queries that prevent you from sending new ones. 182 183 Returns: 184 the number of terminated queries 185 186 Raises: 187 ClientError: if the request to cancel queries failed 188 """ 189 timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None 190 headers = {"User-Agent": self._user_agent} 191 endpoint = urljoin(self._url, "kill_my_queries") 192 193 # use a new session here to get around our concurrency limit 194 async with ( 195 aiohttp.ClientSession(headers=headers) as session, 196 _map_request_error(timeout), 197 session.get(endpoint, timeout=timeout) as response, 198 ): 199 body = await response.text() 200 killed_pids = re.findall("\\(pid (\\d+)\\)", body) 201 return len(set(killed_pids)) 202 203 async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None: 204 """ 205 Send a query to the API, and await its completion. 206 207 "Running" the query entails acquiring a connection from the pool, the query requests 208 themselves (which may be retried), status requests when the server is busy, 209 and cooldown periods. 210 211 The query runner is invoked before every try, and once after the last try. 212 213 To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task, 214 f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``. 215 216 Args: 217 query: the query to run on this API instance 218 raise_on_failure: if ``True``, raises ``query.error`` if the query failed 219 220 Raises: 221 ClientError: when query or status requests fail. If the query was retried, the error 222 of the last try will be raised. The same exception is also captured in 223 ``query.error``. Raising can be prevented by setting ``raise_on_failure`` 224 to ``False``. 225 RunnerError: when a call to the query runner raises. This exception is raised 226 even if ``raise_on_failure` is ``False``, since it is likely an error 227 that is not just specific to this query. 228 """ 229 if query.done: 230 return # nothing to do 231 232 if query.nb_tries > 0: 233 query.reset() # reset failed queries 234 235 # query runner is invoked before every try, and once after the last try 236 while True: 237 await self._invoke_runner(query, raise_on_failure=raise_on_failure) 238 if query.done: 239 return 240 await self._try_query_once(query) 241 242 async def _invoke_runner(self, query: Query, *, raise_on_failure: bool) -> None: 243 """ 244 Invoke the query runner. 245 246 Raises: 247 ClientError: if the runner raises ``query.error`` 248 ValueError: if the runner raises a different ``ClientError`` than ``query.error`` 249 RunnerError: if the runner raises any other exception (which it shouldn't) 250 """ 251 try: 252 await self._runner(query) 253 except ClientError as err: 254 if err is not query.error: 255 msg = "query runner raised a ClientError other than 'query.error'" 256 raise ValueError(msg) from err 257 if raise_on_failure: 258 raise 259 except AssertionError: 260 raise 261 except BaseException as err: 262 raise RunnerError(cause=err) from err 263 264 async def _try_query_once(self, query: Query) -> None: 265 """A single iteration of running a query.""" 266 query_mut = query._mutator() 267 query_mut.begin_try() 268 269 try: 270 await self._cooldown(query) 271 272 req_timeout = _next_query_req_timeout(query) 273 274 if req_timeout.total and req_timeout.total <= 0.0: 275 assert query.run_duration_secs 276 raise GiveupError(kwargs=query.kwargs, after_secs=query.run_duration_secs) 277 278 query_mut.begin_request() 279 280 query.logger.info(f"call api for {query}") 281 282 async with ( 283 _map_request_error(req_timeout), 284 self._session().post( 285 url=urljoin(self._url, "interpreter"), 286 data=query._code(), 287 timeout=req_timeout, 288 ) as response, 289 ): 290 query_mut.succeed_try( 291 response=await _result_or_raise(response, query.kwargs, query.logger), 292 response_bytes=response.content.total_bytes, 293 ) 294 295 except CallTimeoutError as err: 296 fail_with: ClientError = err 297 if query.run_timeout_elapsed: 298 assert query.run_duration_secs is not None 299 fail_with = GiveupError(kwargs=query.kwargs, after_secs=query.run_duration_secs) 300 query_mut.fail_try(fail_with) 301 302 except ClientError as err: 303 query_mut.fail_try(err) 304 305 finally: 306 query_mut.end_try() 307 308 async def _cooldown(self, query: Query) -> None: 309 """ 310 If the given query failed with ``TOO_MANY_QUERIES``, check for a cooldown period. 311 312 Raises: 313 ClientError: if the status request to find out the cooldown period fails 314 GiveupError: if the cooldown is longer than the remaining run duration 315 """ 316 logger = query.logger 317 318 if not is_too_many_queries(query.error): 319 return 320 321 # If this client is running too many queries, we can check the status for a 322 # cooldown period. This request failing is a bit of an edge case. 323 # 'query.error' will be overwritten, which means we will not check for a 324 # cooldown in the next iteration. 325 status = await self._status(timeout=self._next_status_req_timeout(query)) 326 327 if not status.cooldown_secs: 328 return 329 330 run_duration = query.run_duration_secs 331 assert run_duration 332 333 if run_timeout := query.run_timeout_secs: 334 remaining = run_timeout - run_duration 335 336 if status.cooldown_secs > remaining: 337 logger.error(f"give up on {query} due to {status.cooldown_secs:.1f}s cooldown") 338 raise GiveupError(kwargs=query.kwargs, after_secs=run_duration) 339 340 logger.info(f"{query} has cooldown for {status.cooldown_secs:.1f}s") 341 await asyncio.sleep(status.cooldown_secs) 342 343 def _next_status_req_timeout(self, query: Query) -> aiohttp.ClientTimeout: 344 """Status request timeout; possibly limited by either the run or status timeout settings.""" 345 remaining = None 346 347 run_duration = query.run_duration_secs 348 assert run_duration 349 350 if run_timeout := query.run_timeout_secs: 351 remaining = run_timeout - run_duration 352 353 if remaining <= 0.0: 354 raise GiveupError(kwargs=query.kwargs, after_secs=run_duration) 355 356 if self._status_timeout_secs: 357 remaining = min(remaining, self._status_timeout_secs) # cap timeout if configured 358 359 return aiohttp.ClientTimeout(total=remaining)
A client for the Overpass API.
Requests are rate-limited according to the configured number of slots per IP for the specified API server. By default, queries are retried whenever the server is too busy, or the rate limit was exceeded. Custom query runners can be used to implement your own retry strategy.
Arguments:
- url: The url of an Overpass API instance. Defaults to the main Overpass API instance.
- user_agent: A string used for the User-Agent header. It is good practice to provide a string that identifies your application, and includes a way to contact you (f.e. an e-mail, or a link to a repository). This is important if you make too many requests, or queries that require a lot of resources.
- concurrency: The maximum number of simultaneous connections. In practice the amount of concurrent queries may be limited by the number of slots it provides for each IP.
- status_timeout_secs: If set, status requests to the Overpass API will time out after this duration in seconds. Defaults to no timeout.
- runner: You can provide another query runner if you want to implement your own retry strategy.
References:
114 def __init__( 115 self, 116 url: str = DEFAULT_INSTANCE, 117 user_agent: str = DEFAULT_USER_AGENT, 118 concurrency: int = 32, 119 status_timeout_secs: float | None = None, 120 runner: QueryRunner | None = None, 121 ) -> None: 122 if concurrency <= 0: 123 msg = "'concurrency' must be > 0" 124 raise ValueError(msg) 125 if status_timeout_secs is not None and status_timeout_secs <= 0.0: 126 msg = "'status_timeout_secs' must be > 0" 127 raise ValueError(msg) 128 129 self._url = url 130 self._user_agent = user_agent 131 self._concurrency = concurrency 132 self._status_timeout_secs = status_timeout_secs 133 self._runner = runner or DefaultQueryRunner() 134 135 self._maybe_session: aiohttp.ClientSession | None = None
146 async def close(self) -> None: 147 """Cancel all running queries and close the underlying session.""" 148 if self._maybe_session and not self._maybe_session.closed: 149 # do not care if this fails 150 with suppress(CallError): 151 _ = await self.cancel_queries() 152 153 # is raised when there are still active queries. that's ok 154 with suppress(aiohttp.ServerDisconnectedError): 155 await self._maybe_session.close()
Cancel all running queries and close the underlying session.
166 async def status(self) -> Status: 167 """ 168 Check the current API status. 169 170 The timeout of this request is configured with the ``status_timeout_secs`` argument. 171 172 Raises: 173 ClientError: if the status could not be looked up 174 """ 175 return await self._status()
Check the current API status.
The timeout of this request is configured with the status_timeout_secs
argument.
Raises:
- ClientError: if the status could not be looked up
177 async def cancel_queries(self, timeout_secs: float | None = None) -> int: 178 """ 179 Cancel all running queries. 180 181 This can be used to terminate runaway queries that prevent you from sending new ones. 182 183 Returns: 184 the number of terminated queries 185 186 Raises: 187 ClientError: if the request to cancel queries failed 188 """ 189 timeout = aiohttp.ClientTimeout(total=timeout_secs) if timeout_secs else None 190 headers = {"User-Agent": self._user_agent} 191 endpoint = urljoin(self._url, "kill_my_queries") 192 193 # use a new session here to get around our concurrency limit 194 async with ( 195 aiohttp.ClientSession(headers=headers) as session, 196 _map_request_error(timeout), 197 session.get(endpoint, timeout=timeout) as response, 198 ): 199 body = await response.text() 200 killed_pids = re.findall("\\(pid (\\d+)\\)", body) 201 return len(set(killed_pids))
Cancel all running queries.
This can be used to terminate runaway queries that prevent you from sending new ones.
Returns:
the number of terminated queries
Raises:
- ClientError: if the request to cancel queries failed
203 async def run_query(self, query: Query, *, raise_on_failure: bool = True) -> None: 204 """ 205 Send a query to the API, and await its completion. 206 207 "Running" the query entails acquiring a connection from the pool, the query requests 208 themselves (which may be retried), status requests when the server is busy, 209 and cooldown periods. 210 211 The query runner is invoked before every try, and once after the last try. 212 213 To run multiple queries concurrently, wrap the returned coroutines in an ``asyncio`` task, 214 f.e. with ``asyncio.create_task()`` and subsequent ``asyncio.gather()``. 215 216 Args: 217 query: the query to run on this API instance 218 raise_on_failure: if ``True``, raises ``query.error`` if the query failed 219 220 Raises: 221 ClientError: when query or status requests fail. If the query was retried, the error 222 of the last try will be raised. The same exception is also captured in 223 ``query.error``. Raising can be prevented by setting ``raise_on_failure`` 224 to ``False``. 225 RunnerError: when a call to the query runner raises. This exception is raised 226 even if ``raise_on_failure` is ``False``, since it is likely an error 227 that is not just specific to this query. 228 """ 229 if query.done: 230 return # nothing to do 231 232 if query.nb_tries > 0: 233 query.reset() # reset failed queries 234 235 # query runner is invoked before every try, and once after the last try 236 while True: 237 await self._invoke_runner(query, raise_on_failure=raise_on_failure) 238 if query.done: 239 return 240 await self._try_query_once(query)
Send a query to the API, and await its completion.
"Running" the query entails acquiring a connection from the pool, the query requests themselves (which may be retried), status requests when the server is busy, and cooldown periods.
The query runner is invoked before every try, and once after the last try.
To run multiple queries concurrently, wrap the returned coroutines in an asyncio
task,
f.e. with asyncio.create_task()
and subsequent asyncio.gather()
.
Arguments:
- query: the query to run on this API instance
- raise_on_failure: if
True
, raisesquery.error
if the query failed
Raises:
- ClientError: when query or status requests fail. If the query was retried, the error
of the last try will be raised. The same exception is also captured in
query.error
. Raising can be prevented by settingraise_on_failure
toFalse
. - RunnerError: when a call to the query runner raises. This exception is raised
even if
raise_on_failure` is
False``, since it is likely an error that is not just specific to this query.
59class ClientError(Exception): 60 """Base exception for failed Overpass API requests and queries.""" 61 62 @property 63 def should_retry(self) -> bool: 64 """Returns ``True`` if it's worth retrying when encountering this error.""" 65 return False
Base exception for failed Overpass API requests and queries.
62 @property 63 def should_retry(self) -> bool: 64 """Returns ``True`` if it's worth retrying when encountering this error.""" 65 return False
Returns True
if it's worth retrying when encountering this error.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
59class Query: 60 """ 61 State of a query that is either pending, running, successful, or failed. 62 63 Args: 64 input_code: The input Overpass QL code. Note that some settings might be changed 65 by query runners, notably the 'timeout' and 'maxsize' settings. 66 logger: The logger to use for all logging output related to this query. 67 **kwargs: Additional keyword arguments that can be used to identify queries. 68 69 References: 70 - https://wiki.openstreetmap.org/wiki/Overpass_API/Overpass_QL 71 """ 72 73 __slots__ = ( 74 "_error", 75 "_input_code", 76 "_kwargs", 77 "_last_timeout_secs_used", 78 "_logger", 79 "_max_timeout_secs_exceeded", 80 "_nb_tries", 81 "_request_timeout", 82 "_response", 83 "_response_bytes", 84 "_run_timeout_secs", 85 "_settings", 86 "_time_end_try", 87 "_time_start", 88 "_time_start_req", 89 "_time_start_try", 90 ) 91 92 def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None: 93 self._input_code = input_code 94 """the original given overpass ql code""" 95 96 self._logger = logger 97 """logger to use for this query""" 98 99 self._kwargs = kwargs 100 """used to identify this query""" 101 102 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 103 """all overpass ql settings [k:v];""" 104 105 self._settings["out"] = "json" 106 107 if "maxsize" not in self._settings: 108 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 109 110 if "timeout" not in self._settings: 111 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 112 113 self._run_timeout_secs: float | None = None 114 """total time limit for running this query""" 115 116 self._request_timeout: RequestTimeout = RequestTimeout() 117 """config for request timeouts""" 118 119 self._error: ClientError | None = None 120 """error of the last try, or None""" 121 122 self._response: dict | None = None 123 """response JSON as a dict, or None""" 124 125 self._response_bytes = 0.0 126 """number of bytes in a response, or zero""" 127 128 self._nb_tries = 0 129 """number of tries so far, starting at zero""" 130 131 self._time_start: _Instant | None = None 132 """time prior to executing the first try""" 133 134 self._time_start_try: _Instant | None = None 135 """time prior to executing the most recent try""" 136 137 self._time_start_req: _Instant | None = None 138 """time prior to executing the most recent try's query request""" 139 140 self._time_end_try: _Instant | None = None 141 """time the most recent try finished""" 142 143 self._last_timeout_secs_used: int | None = None 144 """the last used 'timeout' setting""" 145 146 self._max_timeout_secs_exceeded: int | None = None 147 """the largest 'timeout' setting that was exceeded in a try of this query""" 148 149 def reset(self) -> None: 150 """Reset the query to its initial state, ignoring previous tries.""" 151 Query.__init__(self, input_code=self._input_code, **self._kwargs) 152 153 @property 154 def input_code(self) -> str: 155 """The original input Overpass QL source code.""" 156 return self._input_code 157 158 @property 159 def kwargs(self) -> dict: 160 """ 161 Keyword arguments that can be used to identify queries. 162 163 The default query runner will log these values when a query is run. 164 """ 165 return self._kwargs 166 167 @property 168 def logger(self) -> logging.Logger: 169 """The logger used for logging output related to this query.""" 170 return self._logger 171 172 @property 173 def nb_tries(self) -> int: 174 """Current number of tries.""" 175 return self._nb_tries 176 177 @property 178 def error(self) -> ClientError | None: 179 """ 180 Error of the most recent try. 181 182 Returns: 183 an error or ``None`` if the query wasn't tried or hasn't failed 184 """ 185 return self._error 186 187 @property 188 def response(self) -> dict | None: 189 """ 190 The entire JSON response of the query. 191 192 Returns: 193 the response, or ``None`` if the query has not successfully finished (yet) 194 """ 195 return self._response 196 197 @property 198 def was_cached(self) -> bool | None: 199 """ 200 Indicates whether the query result was cached. 201 202 Returns: 203 ``None`` if the query has not been run yet. 204 ``True`` if the query has a result set with zero tries. 205 ``False`` otherwise. 206 """ 207 if self._response is None: 208 return None 209 return self._nb_tries == 0 210 211 @property 212 def result_set(self) -> list[dict] | None: 213 """ 214 The result set of the query. 215 216 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 217 You are free to copy, distribute, transmit and adapt this data, as long as you credit 218 OpenStreetMap and its contributors. If you alter or build upon this data, you may 219 distribute the result only under the same licence. 220 221 Returns: 222 the elements of the result set, or ``None`` if the query has not successfully 223 finished (yet) 224 225 References: 226 - https://www.openstreetmap.org/copyright 227 - https://opendatacommons.org/licenses/odbl/1-0/ 228 """ 229 if not self._response: 230 return None 231 return self._response["elements"] 232 233 @property 234 def response_size_mib(self) -> float | None: 235 """ 236 The size of the response in mebibytes. 237 238 Returns: 239 the size, or ``None`` if the query has not successfully finished (yet) 240 """ 241 if self._response is None: 242 return None 243 return self._response_bytes / 1024.0 / 1024.0 244 245 @property 246 def maxsize_mib(self) -> float: 247 """ 248 The current value of the [maxsize:*] setting in mebibytes. 249 250 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 251 as expected by the user. If the query needs more RAM than this value, the server may abort 252 the query with a memory exhaustion. The higher this size, the more probably the server 253 rejects the query before executing it. 254 """ 255 return float(self._settings["maxsize"]) // 1024.0 // 1024.0 256 257 @maxsize_mib.setter 258 def maxsize_mib(self, value: float) -> None: 259 if value <= 0.0: 260 msg = "maxsize_mib must be > 0.0" 261 raise ValueError(msg) 262 self._settings["maxsize"] = int(value * 1024.0 * 1024.0) 263 264 @property 265 def timeout_secs(self) -> int: 266 """ 267 The current value of the [timeout:*] setting in seconds. 268 269 This duration is the maximum allowed runtime for the query in seconds, as expected by the 270 user. If the query runs longer than this time, the server may abort the query. The higher 271 this duration, the more probably the server rejects the query before executing it. 272 """ 273 return int(self._settings["timeout"]) 274 275 @timeout_secs.setter 276 def timeout_secs(self, value: int) -> None: 277 if value < 1: 278 msg = "timeout_secs must be >= 1" 279 raise ValueError(msg) 280 self._settings["timeout"] = value 281 282 @property 283 def run_timeout_secs(self) -> float | None: 284 """ 285 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 286 287 Defaults to no timeout. 288 289 The client will raise a ``GiveupError`` if the timeout is reached. 290 291 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 292 that limits a single query execution time. Instead, this value can be used to limit the 293 total client-side time spent on this query (see ``Client.run_query``). 294 """ 295 return self._run_timeout_secs 296 297 @run_timeout_secs.setter 298 def run_timeout_secs(self, value: float | None) -> None: 299 if value is not None and value <= 0.0: 300 msg = "run_timeout_secs must be > 0" 301 raise ValueError(msg) 302 self._run_timeout_secs = value 303 304 @property 305 def run_timeout_elapsed(self) -> bool: 306 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 307 return ( 308 self.run_timeout_secs is not None 309 and self.run_duration_secs is not None 310 and self.run_timeout_secs < self.run_duration_secs 311 ) 312 313 @property 314 def request_timeout(self) -> "RequestTimeout": 315 """Request timeout settings for this query.""" 316 return self._request_timeout 317 318 @request_timeout.setter 319 def request_timeout(self, value: "RequestTimeout") -> None: 320 self._request_timeout = value 321 322 def _code(self) -> str: 323 # TODO doc 324 # TODO refactor? this function might do a bit too much 325 # TODO needs tests 326 settings_copy = self._settings.copy() 327 328 max_timeout = settings_copy["timeout"] 329 330 # if a run timeout is set, the remaining time is the max query timeout we will use 331 if (time_max := self.run_timeout_secs) and (time_so_far := self.run_duration_secs): 332 max_timeout = math.ceil(time_max - time_so_far) 333 if max_timeout <= 0: 334 raise GiveupError(kwargs=self.kwargs, after_secs=time_so_far) 335 336 # if we already had a query that exceeded a timeout that is >= that max timeout, 337 # we might as well give up already 338 if (min_needed := self._max_timeout_secs_exceeded) and min_needed >= max_timeout: 339 self._logger.error(f"give up on {self} since query will likely time out") 340 raise GiveupError(kwargs=self.kwargs, after_secs=self.run_duration_secs or 0.0) 341 342 # pick the timeout we will use for the next try 343 next_timeout_secs_used = min(settings_copy["timeout"], max_timeout) 344 345 # log if had to override the timeout setting with "max_timeout" 346 if next_timeout_secs_used != settings_copy["timeout"]: 347 settings_copy["timeout"] = next_timeout_secs_used 348 self._logger.info(f"adjust timeout to {next_timeout_secs_used}s") 349 350 # update the used timeout in state 351 self._last_timeout_secs_used = next_timeout_secs_used 352 353 # remove the original settings statement 354 code = _SETTING_PATTERN.sub("", self._input_code) 355 356 # put the adjusted settings in front 357 settings = "".join((f"[{k}:{v}]" for k, v in settings_copy.items())) + ";" 358 return f"{settings}\n{code}" 359 360 @property 361 def cache_key(self) -> str: 362 """ 363 Hash QL code, and return its digest as hexadecimal string. 364 365 The default query runner uses this as cache key. 366 """ 367 # Remove the original settings statement 368 code = _SETTING_PATTERN.sub("", self._input_code) 369 hasher = hashlib.blake2b(digest_size=8) 370 hasher.update(code.encode("utf-8")) 371 return hasher.hexdigest() 372 373 @property 374 def done(self) -> bool: 375 """Returns ``True`` if the result set was received.""" 376 return self._response is not None 377 378 @property 379 def request_duration_secs(self) -> float | None: 380 """ 381 How long it took to fetch the result set in seconds. 382 383 This is the duration starting with the API request, and ending once 384 the result is written to this query object. Although it depends on how busy 385 the API instance is, this can give some indication of how long a query takes. 386 387 Returns: 388 the duration or ``None`` if there is no result set yet, or when it was cached. 389 """ 390 if self._response is None or self.was_cached: 391 return None 392 393 assert self._time_end_try is not None 394 assert self._time_start_req is not None 395 396 return self._time_end_try - self._time_start_req 397 398 @property 399 def run_duration_secs(self) -> float | None: 400 """ 401 The total required time for this query in seconds (so far). 402 403 Returns: 404 the duration or ``None`` if there is no result set yet, or when it was cached. 405 """ 406 if self._time_start is None: 407 return None 408 409 if self._time_end_try: 410 return self._time_end_try - self._time_start 411 412 return self._time_start.elapsed_secs_since 413 414 @property 415 def api_version(self) -> str | None: 416 """ 417 The Overpass API version used by the queried instance. 418 419 Returns: 420 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 421 has not successfully finished (yet) 422 423 References: 424 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 425 """ 426 if self._response is None: 427 return None 428 429 return self._response["generator"] 430 431 @property 432 def timestamp_osm(self) -> datetime | None: 433 """ 434 All OSM edits that have been uploaded before this date are included. 435 436 It can take a couple of minutes for changes to the database to show up in the 437 Overpass API query results. 438 439 Returns: 440 the timestamp, or ``None`` if the query has not successfully finished (yet) 441 """ 442 if self._response is None: 443 return None 444 445 date_str = self._response["osm3s"]["timestamp_osm_base"] 446 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 447 448 @property 449 def timestamp_areas(self) -> datetime | None: 450 """ 451 All area data edits that have been uploaded before this date are included. 452 453 If the query involves area data processing, this is the date of the latest edit 454 that has been considered in the most recent batch run of the area generation. 455 456 Returns: 457 the timestamp, or ``None`` if the query has not successfully finished (yet), or 458 if it does not involve area data processing. 459 """ 460 if self._response is None: 461 return None 462 463 date_str = self._response["osm3s"].get("timestamp_areas_base") 464 if not date_str: 465 return None 466 467 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc) 468 469 @property 470 def copyright(self) -> str: 471 """A copyright notice that comes with the result set.""" 472 if self._response is None: 473 return _COPYRIGHT 474 475 return self._response["osm3s"].get("copyright") or _COPYRIGHT 476 477 def __str__(self) -> str: 478 query = f"query{self.kwargs!r}" 479 480 size = self.response_size_mib 481 time_request = self.request_duration_secs 482 time_total = self.run_duration_secs 483 484 if self.nb_tries == 0: 485 details = "pending" 486 elif self.done: 487 if self.nb_tries == 1: 488 details = f"done - {size:.01f}mb in {time_request:.01f}s" 489 else: 490 details = f"done after {time_total:.01f}s - {size:.01f}mb in {time_request:.01f}s" 491 else: 492 t = "try" if self.nb_tries == 1 else "tries" 493 details = f"failing after {self.nb_tries} {t}, {time_total:.01f}s" 494 495 return f"{query} ({details})" 496 497 def __repr__(self) -> str: 498 cls_name = type(self).__name__ 499 500 details = { 501 "kwargs": self._kwargs, 502 "done": self.done, 503 } 504 505 if self.nb_tries == 0 or self.error: 506 details["tries"] = self.nb_tries 507 508 if self.error: 509 details["error"] = type(self.error).__name__ 510 511 if self.done: 512 details["response_size"] = f"{self.response_size_mib:.02f}mb" 513 514 if not self.was_cached: 515 details["request_duration"] = f"{self.request_duration_secs:.02f}s" 516 517 if self.nb_tries > 0: 518 details["run_duration"] = f"{self.run_duration_secs:.02f}s" 519 520 details_str = ", ".join((f"{k}={v!r}" for k, v in details.items())) 521 522 return f"{cls_name}({details_str})" 523 524 def _mutator(self) -> "_QueryMutator": 525 return _QueryMutator(self)
State of a query that is either pending, running, successful, or failed.
Arguments:
- input_code: The input Overpass QL code. Note that some settings might be changed by query runners, notably the 'timeout' and 'maxsize' settings.
- logger: The logger to use for all logging output related to this query.
- **kwargs: Additional keyword arguments that can be used to identify queries.
References:
92 def __init__(self, input_code: str, logger: logging.Logger = _NULL_LOGGER, **kwargs) -> None: 93 self._input_code = input_code 94 """the original given overpass ql code""" 95 96 self._logger = logger 97 """logger to use for this query""" 98 99 self._kwargs = kwargs 100 """used to identify this query""" 101 102 self._settings = dict(_SETTING_PATTERN.findall(input_code)) 103 """all overpass ql settings [k:v];""" 104 105 self._settings["out"] = "json" 106 107 if "maxsize" not in self._settings: 108 self._settings["maxsize"] = DEFAULT_MAXSIZE_MIB * 1024 * 1024 109 110 if "timeout" not in self._settings: 111 self._settings["timeout"] = DEFAULT_TIMEOUT_SECS 112 113 self._run_timeout_secs: float | None = None 114 """total time limit for running this query""" 115 116 self._request_timeout: RequestTimeout = RequestTimeout() 117 """config for request timeouts""" 118 119 self._error: ClientError | None = None 120 """error of the last try, or None""" 121 122 self._response: dict | None = None 123 """response JSON as a dict, or None""" 124 125 self._response_bytes = 0.0 126 """number of bytes in a response, or zero""" 127 128 self._nb_tries = 0 129 """number of tries so far, starting at zero""" 130 131 self._time_start: _Instant | None = None 132 """time prior to executing the first try""" 133 134 self._time_start_try: _Instant | None = None 135 """time prior to executing the most recent try""" 136 137 self._time_start_req: _Instant | None = None 138 """time prior to executing the most recent try's query request""" 139 140 self._time_end_try: _Instant | None = None 141 """time the most recent try finished""" 142 143 self._last_timeout_secs_used: int | None = None 144 """the last used 'timeout' setting""" 145 146 self._max_timeout_secs_exceeded: int | None = None 147 """the largest 'timeout' setting that was exceeded in a try of this query"""
149 def reset(self) -> None: 150 """Reset the query to its initial state, ignoring previous tries.""" 151 Query.__init__(self, input_code=self._input_code, **self._kwargs)
Reset the query to its initial state, ignoring previous tries.
153 @property 154 def input_code(self) -> str: 155 """The original input Overpass QL source code.""" 156 return self._input_code
The original input Overpass QL source code.
158 @property 159 def kwargs(self) -> dict: 160 """ 161 Keyword arguments that can be used to identify queries. 162 163 The default query runner will log these values when a query is run. 164 """ 165 return self._kwargs
Keyword arguments that can be used to identify queries.
The default query runner will log these values when a query is run.
167 @property 168 def logger(self) -> logging.Logger: 169 """The logger used for logging output related to this query.""" 170 return self._logger
The logger used for logging output related to this query.
172 @property 173 def nb_tries(self) -> int: 174 """Current number of tries.""" 175 return self._nb_tries
Current number of tries.
177 @property 178 def error(self) -> ClientError | None: 179 """ 180 Error of the most recent try. 181 182 Returns: 183 an error or ``None`` if the query wasn't tried or hasn't failed 184 """ 185 return self._error
Error of the most recent try.
Returns:
an error or
None
if the query wasn't tried or hasn't failed
187 @property 188 def response(self) -> dict | None: 189 """ 190 The entire JSON response of the query. 191 192 Returns: 193 the response, or ``None`` if the query has not successfully finished (yet) 194 """ 195 return self._response
The entire JSON response of the query.
Returns:
the response, or
None
if the query has not successfully finished (yet)
197 @property 198 def was_cached(self) -> bool | None: 199 """ 200 Indicates whether the query result was cached. 201 202 Returns: 203 ``None`` if the query has not been run yet. 204 ``True`` if the query has a result set with zero tries. 205 ``False`` otherwise. 206 """ 207 if self._response is None: 208 return None 209 return self._nb_tries == 0
Indicates whether the query result was cached.
Returns:
None
if the query has not been run yet.True
if the query has a result set with zero tries.False
otherwise.
211 @property 212 def result_set(self) -> list[dict] | None: 213 """ 214 The result set of the query. 215 216 This is open data, licensed under the Open Data Commons Open Database License (ODbL). 217 You are free to copy, distribute, transmit and adapt this data, as long as you credit 218 OpenStreetMap and its contributors. If you alter or build upon this data, you may 219 distribute the result only under the same licence. 220 221 Returns: 222 the elements of the result set, or ``None`` if the query has not successfully 223 finished (yet) 224 225 References: 226 - https://www.openstreetmap.org/copyright 227 - https://opendatacommons.org/licenses/odbl/1-0/ 228 """ 229 if not self._response: 230 return None 231 return self._response["elements"]
The result set of the query.
This is open data, licensed under the Open Data Commons Open Database License (ODbL). You are free to copy, distribute, transmit and adapt this data, as long as you credit OpenStreetMap and its contributors. If you alter or build upon this data, you may distribute the result only under the same licence.
Returns:
the elements of the result set, or
None
if the query has not successfully finished (yet)
References:
233 @property 234 def response_size_mib(self) -> float | None: 235 """ 236 The size of the response in mebibytes. 237 238 Returns: 239 the size, or ``None`` if the query has not successfully finished (yet) 240 """ 241 if self._response is None: 242 return None 243 return self._response_bytes / 1024.0 / 1024.0
The size of the response in mebibytes.
Returns:
the size, or
None
if the query has not successfully finished (yet)
245 @property 246 def maxsize_mib(self) -> float: 247 """ 248 The current value of the [maxsize:*] setting in mebibytes. 249 250 This size indicates the maximum allowed memory for the query in bytes RAM on the server, 251 as expected by the user. If the query needs more RAM than this value, the server may abort 252 the query with a memory exhaustion. The higher this size, the more probably the server 253 rejects the query before executing it. 254 """ 255 return float(self._settings["maxsize"]) // 1024.0 // 1024.0
The current value of the [maxsize:*] setting in mebibytes.
This size indicates the maximum allowed memory for the query in bytes RAM on the server, as expected by the user. If the query needs more RAM than this value, the server may abort the query with a memory exhaustion. The higher this size, the more probably the server rejects the query before executing it.
264 @property 265 def timeout_secs(self) -> int: 266 """ 267 The current value of the [timeout:*] setting in seconds. 268 269 This duration is the maximum allowed runtime for the query in seconds, as expected by the 270 user. If the query runs longer than this time, the server may abort the query. The higher 271 this duration, the more probably the server rejects the query before executing it. 272 """ 273 return int(self._settings["timeout"])
The current value of the [timeout:*] setting in seconds.
This duration is the maximum allowed runtime for the query in seconds, as expected by the user. If the query runs longer than this time, the server may abort the query. The higher this duration, the more probably the server rejects the query before executing it.
282 @property 283 def run_timeout_secs(self) -> float | None: 284 """ 285 A limit to ``run_duration_secs``, that cancels running the query when exceeded. 286 287 Defaults to no timeout. 288 289 The client will raise a ``GiveupError`` if the timeout is reached. 290 291 Not to be confused with ``timeout_secs``, which is a setting for the Overpass API instance, 292 that limits a single query execution time. Instead, this value can be used to limit the 293 total client-side time spent on this query (see ``Client.run_query``). 294 """ 295 return self._run_timeout_secs
A limit to run_duration_secs
, that cancels running the query when exceeded.
Defaults to no timeout.
The client will raise a GiveupError
if the timeout is reached.
Not to be confused with timeout_secs
, which is a setting for the Overpass API instance,
that limits a single query execution time. Instead, this value can be used to limit the
total client-side time spent on this query (see Client.run_query
).
304 @property 305 def run_timeout_elapsed(self) -> bool: 306 """Returns ``True`` if ``run_timeout_secs`` is set and has elapsed.""" 307 return ( 308 self.run_timeout_secs is not None 309 and self.run_duration_secs is not None 310 and self.run_timeout_secs < self.run_duration_secs 311 )
Returns True
if run_timeout_secs
is set and has elapsed.
313 @property 314 def request_timeout(self) -> "RequestTimeout": 315 """Request timeout settings for this query.""" 316 return self._request_timeout
Request timeout settings for this query.
360 @property 361 def cache_key(self) -> str: 362 """ 363 Hash QL code, and return its digest as hexadecimal string. 364 365 The default query runner uses this as cache key. 366 """ 367 # Remove the original settings statement 368 code = _SETTING_PATTERN.sub("", self._input_code) 369 hasher = hashlib.blake2b(digest_size=8) 370 hasher.update(code.encode("utf-8")) 371 return hasher.hexdigest()
Hash QL code, and return its digest as hexadecimal string.
The default query runner uses this as cache key.
373 @property 374 def done(self) -> bool: 375 """Returns ``True`` if the result set was received.""" 376 return self._response is not None
Returns True
if the result set was received.
378 @property 379 def request_duration_secs(self) -> float | None: 380 """ 381 How long it took to fetch the result set in seconds. 382 383 This is the duration starting with the API request, and ending once 384 the result is written to this query object. Although it depends on how busy 385 the API instance is, this can give some indication of how long a query takes. 386 387 Returns: 388 the duration or ``None`` if there is no result set yet, or when it was cached. 389 """ 390 if self._response is None or self.was_cached: 391 return None 392 393 assert self._time_end_try is not None 394 assert self._time_start_req is not None 395 396 return self._time_end_try - self._time_start_req
How long it took to fetch the result set in seconds.
This is the duration starting with the API request, and ending once the result is written to this query object. Although it depends on how busy the API instance is, this can give some indication of how long a query takes.
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
398 @property 399 def run_duration_secs(self) -> float | None: 400 """ 401 The total required time for this query in seconds (so far). 402 403 Returns: 404 the duration or ``None`` if there is no result set yet, or when it was cached. 405 """ 406 if self._time_start is None: 407 return None 408 409 if self._time_end_try: 410 return self._time_end_try - self._time_start 411 412 return self._time_start.elapsed_secs_since
The total required time for this query in seconds (so far).
Returns:
the duration or
None
if there is no result set yet, or when it was cached.
414 @property 415 def api_version(self) -> str | None: 416 """ 417 The Overpass API version used by the queried instance. 418 419 Returns: 420 f.e. ``"Overpass API 0.7.56.8 7d656e78"``, or ``None`` if the query 421 has not successfully finished (yet) 422 423 References: 424 - https://wiki.openstreetmap.org/wiki/Overpass_API/versions 425 """ 426 if self._response is None: 427 return None 428 429 return self._response["generator"]
The Overpass API version used by the queried instance.
Returns:
f.e.
"Overpass API 0.7.56.8 7d656e78"
, orNone
if the query has not successfully finished (yet)
References:
431 @property 432 def timestamp_osm(self) -> datetime | None: 433 """ 434 All OSM edits that have been uploaded before this date are included. 435 436 It can take a couple of minutes for changes to the database to show up in the 437 Overpass API query results. 438 439 Returns: 440 the timestamp, or ``None`` if the query has not successfully finished (yet) 441 """ 442 if self._response is None: 443 return None 444 445 date_str = self._response["osm3s"]["timestamp_osm_base"] 446 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
All OSM edits that have been uploaded before this date are included.
It can take a couple of minutes for changes to the database to show up in the Overpass API query results.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet)
448 @property 449 def timestamp_areas(self) -> datetime | None: 450 """ 451 All area data edits that have been uploaded before this date are included. 452 453 If the query involves area data processing, this is the date of the latest edit 454 that has been considered in the most recent batch run of the area generation. 455 456 Returns: 457 the timestamp, or ``None`` if the query has not successfully finished (yet), or 458 if it does not involve area data processing. 459 """ 460 if self._response is None: 461 return None 462 463 date_str = self._response["osm3s"].get("timestamp_areas_base") 464 if not date_str: 465 return None 466 467 return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(timezone.utc)
All area data edits that have been uploaded before this date are included.
If the query involves area data processing, this is the date of the latest edit that has been considered in the most recent batch run of the area generation.
Returns:
the timestamp, or
None
if the query has not successfully finished (yet), or if it does not involve area data processing.