diff --git a/NEWS.md b/NEWS.md index 18fb12b5..44e61df6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**05/15/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. + **05/14/2026:** Fixed two latent bugs in the paginated `waterdata` request loop (`_walk_pages` and `get_stats_data`). Previously, when `requests.Session.request(...)` itself raised mid-pagination (network error, timeout), the except block called `_error_body()` on the *prior page's* response, so the logged "error" described the wrong request and could itself crash on non-JSON bodies. Separately, no status-code check was performed on subsequent paginated responses, so a 5xx body that didn't include `numberReturned` was silently treated as an empty page — pagination quietly stopped and the user got truncated data with no error logged. The loop now status-checks each page like the initial request and reports the actual exception. The "best-effort" behavior (return whatever pages were collected) is preserved. **05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..025aafcd 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -230,6 +230,21 @@ def get_daily( ... parameter_code="00060", ... last_modified="P7D", ... ) + + >>> # Chain queries: pull all stream sites in a state, then their + >>> # daily discharge for the last week. The site list can be hundreds + >>> # of values long — the request is transparently chunked across + >>> # multiple sub-requests so the URL stays under the server's byte + >>> # limit. Combined output looks like a single query. + >>> sites_df, _ = dataretrieval.waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, md = dataretrieval.waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) """ service = "daily" output_id = "daily_id" diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py new file mode 100644 index 00000000..43690c9b --- /dev/null +++ b/dataretrieval/waterdata/chunking.py @@ -0,0 +1,421 @@ +"""Multi-value GET-parameter chunking for the Water Data OGC getters. + +PR 233 routes most services through GET with comma-separated values +(e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow +the server's ~8 KB URL byte limit. This module adds a decorator that +sits OUTSIDE ``filters.chunked`` and splits multi-value list params +across multiple sub-requests so each URL fits. See ``get_daily``'s +docstring for an end-to-end chained-query example. + +Design (orthogonal to filter chunking): + +- N-dimensional cartesian product: for each chunkable list param, the + values are partitioned into sub-lists; the planner emits the cartesian + product of those partitions. Sub-chunks of the same dim never overlap, + so frame concat needs no dedup across multi-value chunks. +- Greedy halving of the largest chunk in any dim until the worst-case + sub-request URL fits the limit. Minimises total request count. +- Date params, ``bbox``, and ``properties`` are not chunked: dates are + intervals not enumerable sets; bbox is a coord array; ``properties`` + determines output schema and chunking it would shard columns. + +Coordination with ``filters.chunked``: +The planner probes the URL with a synthetic clause sized to the inner +chunker's bail floor — ``len(longest_clause) * max(per-clause encoding +ratio)`` — when a chunkable filter is present. The inner chunker bails +(emits the full filter) when any single clause's URL-encoded length +exceeds its per-sub-request budget; mirroring +``filters._effective_filter_budget``, that floor already accounts for +the worst per-call encoding ratio, so a long alphanumeric clause +coexisting with a shorter heavily-encoded clause is sized correctly. +Without this coordination, a long OR-filter plus multi-value lists +would trigger a premature ``RequestTooLarge`` even though the combined +chunkers would have made things fit. +""" + +from __future__ import annotations + +import functools +import itertools +import math +from collections.abc import Callable +from typing import Any, TypeVar +from urllib.parse import quote_plus + +import pandas as pd +import requests + +from . import filters +from .filters import ( + _combine_chunk_frames, + _combine_chunk_responses, + _is_chunkable, + _split_top_level_or, +) + +# Params that look like lists but must NOT be chunked. ``properties`` is +# excluded because it defines the response schema; chunking it would +# return frames with different columns per sub-request. ``bbox`` is a +# fixed 4-element coord tuple. Date params are intervals not sets. The +# CQL ``filter`` (and its ``filter_lang``) is a string that has its own +# inner chunker (``filters.chunked``); if a caller passes ``filter`` as +# a list, treating it as a multi-value param would emit malformed CQL. +_NEVER_CHUNK = frozenset( + { + "properties", + "bbox", + "datetime", + "last_modified", + "begin", + "begin_utc", + "end", + "end_utc", + "time", + "filter", + "filter_lang", + } +) + +# Default cap on the number of sub-requests a single chunked call may +# emit. The USGS Water Data API rate-limits each HTTP request (including +# pagination), so the true budget is ``hourly_quota / avg_pages_per_chunk``. +# 1000 matches the default hourly quota and is a reasonable upper bound +# for single-page sub-requests; tune lower if your queries paginate. +# Override per-decorator via ``max_chunks=`` or by monkeypatching this +# module attribute (read lazily in the wrapper). +_DEFAULT_MAX_CHUNKS = 1000 + +# When ``x-ratelimit-remaining`` drops below this between sub-requests, +# the chunker bails with ``QuotaExhausted`` rather than risk a mid-call +# HTTP 429. Carries the partial result so callers can resume from a +# known offset instead of retrying the whole chunked call from scratch. +_DEFAULT_QUOTA_SAFETY_FLOOR = 50 + +# Sentinel returned by ``_read_remaining`` when the response has no +# parseable ``x-ratelimit-remaining`` header. Large enough to beat any +# plausible safety floor so a missing/malformed header doesn't trigger +# spurious ``QuotaExhausted`` aborts. +_QUOTA_UNKNOWN = 10**9 + + +class RequestTooLarge(ValueError): + """Raised when a chunked request cannot be issued. Two cases: + (1) URL exceeds the byte limit even with every multi-value param at + a singleton chunk and any chunkable filter reduced to its smallest + top-level OR-clause; (2) the cartesian-product plan would issue more + than ``max_chunks`` sub-requests.""" + + +class QuotaExhausted(RuntimeError): + """Raised mid-chunked-call when the API's reported remaining quota + (``x-ratelimit-remaining`` header) drops below the configured safety + floor. The chunker stops before issuing the next sub-request to + avoid a mid-call HTTP 429 that would silently truncate paginated + results (see PR #273 for the pagination side of that bug). + + The exception carries everything needed to resume: the combined + partial frame from completed sub-requests, the metadata for the + last successful sub-request, the number of chunks completed out of + the plan total, and the last-observed ``remaining`` value. + + Attributes + ---------- + partial_frame : pd.DataFrame + Concatenated, deduplicated result of every sub-request that + completed before the floor was crossed. + partial_response : requests.Response + Aggregated response (URL/headers of the last completed sub- + request, summed ``elapsed``). The last sub-request's headers + are preferred so callers inspecting ``x-ratelimit-remaining`` + see current quota state. Wrap in ``BaseMetadata`` to surface + to the caller alongside the partial frame. + completed_chunks : int + Number of sub-requests successfully completed. + total_chunks : int + Total sub-requests in the cartesian-product plan. + remaining : int + Last observed ``x-ratelimit-remaining`` value. + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + completed_chunks: int, + total_chunks: int, + remaining: int, + ) -> None: + super().__init__( + f"x-ratelimit-remaining dropped to {remaining} after " + f"{completed_chunks}/{total_chunks} chunks; aborting to avoid " + f"mid-call HTTP 429. Catch QuotaExhausted to access " + f".partial_frame and resume from chunk {completed_chunks}." + ) + self.partial_frame = partial_frame + self.partial_response = partial_response + self.completed_chunks = completed_chunks + self.total_chunks = total_chunks + self.remaining = remaining + + +def _chunkable_params(args: dict[str, Any]) -> dict[str, list]: + """Return ``{name: list(values)}`` for every list/tuple kwarg with + >1 element that is allowed to chunk.""" + return { + k: list(v) + for k, v in args.items() + if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1 + } + + +def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: + """Substitute the filter with a synthetic ASCII clause sized to the + inner chunker's bail floor if the filter is chunkable, otherwise + return ``args`` unchanged. + + The inner ``filters.chunked`` decorator splits a filter into chunks + each whose URL-encoded length is ≤ the per-sub-request budget, but + bails (emits the full filter unchanged) when ANY single OR-clause's + URL-encoded length exceeds the budget. Mirroring ``filters._ + effective_filter_budget``, the bail floor on the longest clause is + ``len(longest) * max(per_clause_encoding_ratio)``: even a clause + whose own ratio is low inherits the worst per-call ratio because + the budget is computed against the heaviest-encoding clause. + + Substituting a synthetic ASCII clause of that exact length (ASCII + has a 1:1 encoding ratio, so ``quote_plus`` is a no-op) makes the + planner's URL probe and the inner chunker's bail condition agree + on worst-case size — the planner won't approve a plan the inner + chunker would then refuse to emit, and won't prematurely raise + when the inner chunker could have made it fit. + """ + filter_expr = args.get("filter") + filter_lang = args.get("filter_lang") + if not _is_chunkable(filter_expr, filter_lang): + return args + parts = _split_top_level_or(filter_expr) + if len(parts) < 2: + return args # one-clause filter — inner chunker can't shrink it + encoding_ratio_max = max(len(quote_plus(p)) / len(p) for p in parts) + longest_raw = max(len(p) for p in parts) + probe_size = math.ceil(longest_raw * encoding_ratio_max) + return {**args, "filter": "x" * probe_size} + + +def _chunk_bytes(chunk: list) -> int: + """URL-encoded byte length of ``chunk`` when comma-joined into a + URL parameter value. + + Used both as the planner's biggest-chunk comparator (in + ``_worst_case_args`` and the halving loop) and indirectly as the + URL contribution estimate. Sizing with ``quote_plus`` rather than + raw ``,``-join length avoids mis-ranking chunks whose values + contain characters that expand under URL encoding (``%``, ``+``, + ``/``, ``&``, etc.) — for typical USGS multi-value workloads + (alphanumeric IDs and codes) the two are equal, but the encoded + form is always correct. + """ + return len(quote_plus(",".join(map(str, chunk)))) + + +def _request_bytes(req: requests.PreparedRequest) -> int: + """Total bytes of a prepared request: URL + body. + + GET routes have ``body=None`` and reduce to URL length. POST routes + (CQL2 JSON body) need body bytes — the URL stays short regardless of + payload, so URL-only sizing would underestimate the request and skip + chunking when it's needed. + """ + url_len = len(req.url) + body = req.body + if body is None: + return url_len + if isinstance(body, (bytes, bytearray)): + return url_len + len(body) + return url_len + len(body.encode("utf-8")) + + +def _worst_case_args( + probe_args: dict[str, Any], plan: dict[str, list[list]] +) -> dict[str, Any]: + """Args dict using the LARGEST chunk from each dim — represents the + most byte-heavy sub-request the plan will issue, with the filter + already reduced to its filter-chunker floor.""" + out = dict(probe_args) + for k, chunks in plan.items(): + out[k] = max(chunks, key=_chunk_bytes) + return out + + +def _plan_chunks( + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, + max_chunks: int | None = None, +) -> dict[str, list[list]] | None: + """Greedy halving until the worst-case sub-request URL fits. + + Returns ``None`` when no chunking is needed (request as-is fits or + no chunkable lists). Raises ``RequestTooLarge`` when: + - every multi-value param is already a singleton chunk AND the + filter (if any) is already at its smallest OR-clause and the URL + still exceeds ``url_limit`` (irreducible), or + - the cartesian-product plan exceeds ``max_chunks`` sub-requests + (the hourly API budget); checked after each split so we bail + promptly once the cap is unreachable. + + ``max_chunks`` defaults to ``_DEFAULT_MAX_CHUNKS`` resolved at call + time, so monkeypatching the module constant takes effect for + direct callers too. + """ + if max_chunks is None: + max_chunks = _DEFAULT_MAX_CHUNKS + chunkable = _chunkable_params(args) + if not chunkable: + return None + probe_args = _filter_aware_probe_args(args) + if _request_bytes(build_request(**probe_args)) <= url_limit: + return None + + plan: dict[str, list[list]] = {k: [v] for k, v in chunkable.items()} + + while True: + worst = _worst_case_args(probe_args, plan) + if _request_bytes(build_request(**worst)) <= url_limit: + return plan + + # Find the single biggest chunk across all dims and halve it. + best: tuple[str, int, int] | None = None # (dim, chunk_index, size) + for dim, dim_chunks in plan.items(): + for idx, chunk in enumerate(dim_chunks): + if len(chunk) <= 1: + continue + size = _chunk_bytes(chunk) + if best is None or size > best[2]: + best = (dim, idx, size) + + if best is None: + raise RequestTooLarge( + f"Request exceeds {url_limit} bytes (URL + body) even " + f"with every multi-value parameter at a singleton chunk " + f"and any chunkable filter reduced to one OR-clause. " + f"Reduce the number of values or split the call manually." + ) + dim, idx, _ = best + big = plan[dim][idx] + mid = len(big) // 2 + plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :] + + # Each split only grows the cartesian product, so once we + # cross max_chunks we can never come back under. Bail now + # rather than keep splitting (the URL probe could still take + # many more iterations). + total = math.prod(len(chunks) for chunks in plan.values()) + if total > max_chunks: + raise RequestTooLarge( + f"Chunked plan would issue {total} sub-requests, exceeding " + f"max_chunks={max_chunks} (USGS API's default hourly rate " + f"limit per key). Reduce input list sizes, narrow the time " + f"window, or raise max_chunks if you have a higher quota." + ) + + +_FetchOnce = TypeVar( + "_FetchOnce", + bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]], +) + + +def _read_remaining(response: requests.Response) -> int: + """Parse ``x-ratelimit-remaining`` from a response. Missing or + malformed header → return ``_QUOTA_UNKNOWN`` so the safety check + treats it as 'plenty of quota' (don't abort on header glitches).""" + raw = response.headers.get("x-ratelimit-remaining") + if raw is None: + return _QUOTA_UNKNOWN + try: + return int(raw) + except (TypeError, ValueError): + return _QUOTA_UNKNOWN + + +def multi_value_chunked( + *, + build_request: Callable[..., Any], + url_limit: int | None = None, + max_chunks: int | None = None, + quota_safety_floor: int | None = None, +) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that splits multi-value list params across sub-requests so + each URL fits ``url_limit`` bytes (defaults to ``filters._WATERDATA_ + URL_BYTE_LIMIT``) and the cartesian-product plan stays ≤ ``max_chunks`` + sub-requests (defaults to ``_DEFAULT_MAX_CHUNKS``). All defaults are + resolved at call time so tests/users that patch the module constants + affect this decorator uniformly. + + Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from + each response. If it drops below ``quota_safety_floor`` (default + ``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted`` + carrying the combined partial result and the chunk offset so callers + can resume after the hourly window resets, instead of crashing into + a mid-pagination HTTP 429 (which the upstream pagination loop in + ``_walk_pages`` historically truncated silently — see PR #273). + + Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, + filter-chunking is the inner loop. The wrapped function has the same + signature as ``filters.chunked`` expects — ``(args: dict) -> (frame, + response)`` — so the two decorators compose cleanly. The planner is + filter-aware so it doesn't raise prematurely when the inner filter + chunker would have shrunk the per-sub-request URL on its own. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + limit = ( + url_limit + if url_limit is not None + else filters._WATERDATA_URL_BYTE_LIMIT + ) + floor = ( + quota_safety_floor + if quota_safety_floor is not None + else _DEFAULT_QUOTA_SAFETY_FLOOR + ) + plan = _plan_chunks(args, build_request, limit, max_chunks) + if plan is None: + return fetch_once(args) + + keys = list(plan) + total = math.prod(len(plan[k]) for k in keys) + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))): + sub_args = {**args, **dict(zip(keys, combo))} + frame, response = fetch_once(sub_args) + frames.append(frame) + responses.append(response) + # Quota check happens BETWEEN sub-requests: skip on the + # last iteration because there's nothing left to abort. + if i < total - 1: + remaining = _read_remaining(response) + if remaining < floor: + raise QuotaExhausted( + partial_frame=_combine_chunk_frames(frames), + partial_response=_combine_chunk_responses(responses), + completed_chunks=i + 1, + total_chunks=total, + remaining=remaining, + ) + + return ( + _combine_chunk_frames(frames), + _combine_chunk_responses(responses), + ) + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 4c136b82..ade04b21 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -268,15 +268,20 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: def _combine_chunk_responses( responses: list[requests.Response], ) -> requests.Response: - """Return one response: first chunk's URL/headers + summed ``elapsed``. + """Return one response: last chunk's URL/headers + summed ``elapsed``. - Mutates the first response in place (only ``elapsed``); downstream only + Returning the latest sub-response (rather than the first) preserves + current rate-limit headers (e.g. ``x-ratelimit-remaining``), which the + outer ``multi_value_chunked`` decorator inspects to honor its + ``QuotaExhausted`` safety floor between sub-requests. + + Mutates the last response in place (only ``elapsed``); downstream only reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. """ - head = responses[0] + tail = responses[-1] if len(responses) > 1: - head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) - return head + tail.elapsed = sum((r.elapsed for r in responses[:-1]), start=tail.elapsed) + return tail _FetchOnce = TypeVar( @@ -295,8 +300,11 @@ def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _Fet - Chunkable cql-text filter: run the lexicographic-pitfall guard, split into URL-length-safe sub-expressions, call the wrapped function once per chunk, concatenate frames (drop empties, dedup by feature ``id``), - and return an aggregated response (first chunk's URL/headers, summed - ``elapsed``). + and return an aggregated response (last chunk's URL/headers, summed + ``elapsed``). The last chunk's headers are preferred so callers see + current rate-limit state (``x-ratelimit-remaining``) on which the + outer ``multi_value_chunked`` decorator's ``QuotaExhausted`` guard + depends. Either way the return shape matches the undecorated function's, so the caller wraps the response in ``BaseMetadata`` the same way in both paths. diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 91228357..a55bb1bd 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -14,7 +14,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata -from dataretrieval.waterdata import filters +from dataretrieval.waterdata import chunking, filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -643,7 +643,10 @@ def _walk_pages( pd.DataFrame A DataFrame containing the aggregated results from all pages. requests.Response - The initial response object containing metadata about the first request. + The latest response from the pagination walk. Returning the most + recent response (not the first) lets downstream callers observe + current rate-limit headers (e.g. ``x-ratelimit-remaining``) on + which the multi-value chunker's ``QuotaExhausted`` guard relies. Raises ------ @@ -666,9 +669,6 @@ def _walk_pages( resp = client.send(req) _raise_for_non_200(resp) - # Store the initial response for metadata - initial_response = resp - # Grab some aspects of the original request: headers and the # request type (GET or POST) method = req.method.upper() @@ -697,7 +697,7 @@ def _walk_pages( curr_url = None # Concatenate all pages at once for efficiency - return pd.concat(dfs, ignore_index=True), initial_response + return pd.concat(dfs, ignore_index=True), resp finally: if close_client: client.close() @@ -923,17 +923,20 @@ def get_ogc_data( return return_list, BaseMetadata(response) +@chunking.multi_value_chunked(build_request=_construct_api_requests) @filters.chunked(build_request=_construct_api_requests) def _fetch_once( args: dict[str, Any], ) -> tuple[pd.DataFrame, requests.Response]: """Send one prepared-args OGC request; return the frame + response. - Filter chunking is added orthogonally by the ``@filters.chunked`` - decorator: with no filter (or an un-chunkable one) the decorator - passes ``args`` through to this body; with a chunkable filter it - fans out and calls this body once per sub-filter, then combines. - Either way the return shape is ``(frame, response)``. + Two orthogonal chunkers wrap this body. ``@chunking.multi_value_chunked`` + (outer) splits multi-value list params (e.g. ``monitoring_location_id``) + across sub-requests so each URL fits the server byte limit; the + cartesian product of per-dim chunks is iterated. ``@filters.chunked`` + (inner) splits long cql-text filters at top-level ``OR``. With no + chunkable inputs both pass through unchanged. Either way the return + shape is ``(frame, response)``. """ req = _construct_api_requests(**args) return _walk_pages(geopd=GEOPANDAS, req=req) @@ -1125,9 +1128,6 @@ def get_stats_data( resp = client.send(req) _raise_for_non_200(resp) - # Store the initial response for metadata - initial_response = resp - # Grab some aspects of the original request: headers and the # request type (GET or POST) method = req.method.upper() @@ -1170,7 +1170,7 @@ def get_stats_data( if expand_percentiles: dfs = _expand_percentiles(dfs) - return dfs, BaseMetadata(initial_response) + return dfs, BaseMetadata(resp) finally: if close_client: client.close() diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 18e78594..97d8e134 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -28,6 +28,17 @@ get_stats_por, get_time_series_metadata, ) +from dataretrieval.waterdata.chunking import ( + _DEFAULT_MAX_CHUNKS, + _DEFAULT_QUOTA_SAFETY_FLOOR, + QuotaExhausted, + RequestTooLarge, + _chunkable_params, + _filter_aware_probe_args, + _plan_chunks, + _read_remaining, + multi_value_chunked, +) from dataretrieval.waterdata.utils import ( _check_monitoring_location_id, _check_profiles, @@ -207,6 +218,441 @@ def test_construct_api_requests_two_element_date_list_becomes_interval(): assert "time=2024-01-01%2F2024-01-31" in req.url +# ----- Multi-value GET-parameter chunker (chunking.py) ---------------------- +# +# These tests exercise the planner with a fake ``build_request`` whose URL +# byte length is a deterministic function of its inputs. Tests below model: +# - non-chunkable args contribute ``base_bytes``, +# - every multi-value list contributes ``len(",".join(map(str, v)))``, +# - the ``filter`` kwarg contributes ``len(filter)``. +# This isolates planner behaviour from the real HTTP request builder. + + +class _FakeReq: + __slots__ = ("url", "body") + + def __init__(self, url, body=None): + self.url = url + self.body = body + + +def _fake_build(*, base=200, **kwargs): + """Fake build_request: URL length deterministic in its inputs. + + Mirrors the GET-routed shape: payload goes in the URL, body is None. + The chunker's ``_request_bytes`` helper sums url + body, so this + stays equivalent to URL-only sizing for these tests. + """ + bytes_ = base + for v in kwargs.values(): + if isinstance(v, (list, tuple)): + bytes_ += len(",".join(map(str, v))) + elif isinstance(v, str): + bytes_ += len(v) + return _FakeReq("x" * bytes_) + + +def test_filter_aware_probe_args_passes_through_when_not_chunkable(): + """No filter, json-lang filter, single-clause filter — return unchanged.""" + assert _filter_aware_probe_args({"a": 1}) == {"a": 1} + assert _filter_aware_probe_args({"filter": "a='1'", "filter_lang": "cql-json"}) == { + "filter": "a='1'", + "filter_lang": "cql-json", + } + args = {"filter": "a='single clause with no OR'"} + assert _filter_aware_probe_args(args) == args + + +def test_filter_aware_probe_args_models_inner_chunker_bail_floor(): + """Chunkable filter → return args with filter replaced by a synthetic + ASCII clause whose URL byte count equals the inner chunker's bail + floor ``len(longest) * max(per_clause_encoding_ratio)``. Mirrors + ``filters._effective_filter_budget``'s worst-case model so the + planner doesn't approve plans the inner chunker would refuse.""" + import math + from urllib.parse import quote_plus + + args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} + probe = _filter_aware_probe_args(args) + parts = ["a='1'", "a='22'", "a='333'"] + expected = math.ceil( + max(len(p) for p in parts) * max(len(quote_plus(p)) / len(p) for p in parts) + ) + assert len(probe["filter"]) == expected + assert probe["filter"].isascii() and probe["filter"].isalnum() + assert probe["x"] == 7 + assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated + + +def test_plan_chunks_returns_none_when_request_fits(): + """URL under limit → planner returns None, decorator passes through.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = _plan_chunks(args, _fake_build, url_limit=8000) + assert plan is None + + +def test_plan_chunks_returns_none_when_no_chunkable_lists(): + """No multi-value lists, however over-limit → planner can't help, returns None + (decorator falls through; server may 414 but that's not chunker's job).""" + args = {"monitoring_location_id": "scalar-only"} + plan = _plan_chunks(args, _fake_build, url_limit=10) + assert plan is None + + +def test_plan_chunks_greedy_halving_targets_largest_dim(): + """Two dims with one much larger — the heavy dim halves first.""" + args = { + "monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30], + "parameter_code": ["00060", "00065"], + } + # full URL ≈ 200 + 123 + 12 = 335; force splitting heavy dim only. + plan = _plan_chunks(args, _fake_build, url_limit=310) + assert len(plan["monitoring_location_id"]) > 1 + assert len(plan["parameter_code"]) == 1 # heavy-dim split was enough + + +def test_plan_chunks_raises_request_too_large_at_singleton_floor(): + """Limit below singleton-per-dim floor (with no chunkable filter to + fall back on) → RequestTooLarge with a clear message.""" + args = {"monitoring_location_id": ["A", "B"]} + # base=200 alone exceeds limit; no relief possible. + with pytest.raises(RequestTooLarge, match="multi-value parameter"): + _plan_chunks(args, _fake_build, url_limit=100) + + +def test_plan_chunks_coordinates_with_filter_chunker(): + """COORDINATION REGRESSION TEST. + + With the FULL filter in URL-length probes, singleton-per-dim URL still + exceeds the limit and the planner would raise RequestTooLarge. With + filter-aware probing, the planner models the per-sub-request URL as + ``worst-dim-chunk + shortest-clause`` (what the inner filter chunker + will actually emit), sees it fits, and returns a plan. + + Sanity-check the *negative*: with filter-aware probing disabled, the + same inputs would raise. + """ + clauses = [f"f='{i}'" for i in range(10)] + args = { + "monitoring_location_id": ["A" * 10, "B" * 10, "C" * 10, "D" * 10], + "filter": " OR ".join(clauses), + } + # singleton+full-filter ≈ 200 + 10 + 86 = 296 (over limit 240) — would raise. + # max-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + # (Here all clauses are the same length, so min==max; the fix matters for + # filters with lopsided clauses where min < max.) + plan = _plan_chunks(args, _fake_build, url_limit=240) + assert plan is not None # coordination prevented the premature raise + assert len(plan["monitoring_location_id"]) > 1 # planner did split + + # Negative control: monkey-patch the probe helper to be a no-op + # (model "no filter awareness") and confirm the same inputs raise. + import dataretrieval.waterdata.chunking as ch + + saved = ch._filter_aware_probe_args + try: + ch._filter_aware_probe_args = lambda a: a # pretend no awareness + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=240) + finally: + ch._filter_aware_probe_args = saved + + +def test_plan_chunks_probes_with_max_clause_not_min(): + """Regression: with lopsided OR-clauses (one short, one long), probing + with min(parts) lets the planner falsely declare a plan feasible that + the inner filter chunker can't actually deliver — it bails when any + single clause exceeds the per-sub-request budget. Probing with the + longest clause is the safe lower bound on per-sub-request filter + size, so the planner correctly raises when no plan can fit.""" + args = { + "sites": ["A" * 10, "B" * 10], + "filter": "x='1' OR x='" + "a" * 28 + "'", # 5-char and 33-char clauses + } + # base 200 + singleton sites 10 + min-clause 5 = 215 (limit 230 -> fits) + # base 200 + singleton sites 10 + max-clause 33 = 243 (limit 230 -> exceeds) + # With min: planner succeeds, but real URL with full filter (42) exceeds + # 230 -> server 414. With max: planner raises early, as it should. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=230) + + +def test_plan_chunks_still_raises_when_even_min_clause_doesnt_fit(): + """If the limit is so tight that singleton + shortest-clause STILL + exceeds it, filter chunker can't save us either — raise.""" + args = { + "monitoring_location_id": ["A" * 10, "B" * 10], + "filter": "x='12345' OR x='67890'", # min clause is 9 chars + } + # Singleton + min-clause ≈ 200 + 10 + 9 = 219; limit below that → unrecoverable. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=210) + + +def test_multi_value_chunked_passes_through_when_url_fits(): + """No planning needed → decorator calls underlying function exactly once + with the original args.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch({"monitoring_location_id": ["A", "B"]}) + assert len(calls) == 1 + assert calls[0]["monitoring_location_id"] == ["A", "B"] + + +def test_multi_value_chunked_emits_cartesian_product(): + """Two chunkable dims, each split into 2 chunks → exactly 4 sub-calls, + each pairing one chunk from each dim.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append({k: v for k, v in args.items() if k in ("sites", "pcodes")}) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10], + "pcodes": ["P1" * 10, "P2" * 10, "P3" * 10, "P4" * 10], + } + ) + # Both heavy → planner should split both dims. Confirm a cartesian shape: + # every unique site-chunk pairs with every unique pcode-chunk. + sites_seen = {tuple(c["sites"]) for c in calls} + pcodes_seen = {tuple(c["pcodes"]) for c in calls} + assert len(calls) == len(sites_seen) * len(pcodes_seen) + assert len(sites_seen) > 1 + assert len(pcodes_seen) > 1 + + +def test_multi_value_chunked_lazy_url_limit(): + """``url_limit=None`` → resolve filters._WATERDATA_URL_BYTE_LIMIT at call + time, so tests that patch the constant affect this decorator too.""" + from dataretrieval.waterdata import filters as wd_filters + + calls = [] + + @multi_value_chunked(build_request=_fake_build) # url_limit defaults to None + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + saved = wd_filters._WATERDATA_URL_BYTE_LIMIT + try: + wd_filters._WATERDATA_URL_BYTE_LIMIT = 240 + # 4 sites of 10 chars → exceeds 240 → planner splits. + fetch({"sites": ["S" * 10 + str(i) for i in range(4)]}) + assert len(calls) > 1, "patched constant should drive chunking" + finally: + wd_filters._WATERDATA_URL_BYTE_LIMIT = saved + + +def test_default_max_chunks_matches_hourly_api_quota(): + """The default cap mirrors the USGS Water Data API's documented + per-API-key hourly limit. Locking this in so future changes have to + explicitly acknowledge the quota.""" + assert _DEFAULT_MAX_CHUNKS == 1000 + + +def test_plan_chunks_raises_when_plan_exceeds_max_chunks(): + """A converged plan with more sub-requests than ``max_chunks`` must + raise rather than silently issue them and burn the user's API quota.""" + # 2 dims with long values, each needing many singleton-ish chunks. + # Pick chunk sizes that converge to a plan exceeding a tight cap. + args = { + "dim_a": [f"long-string-value-{i}" for i in range(50)], + "dim_b": [f"another-long-value-{i}" for i in range(50)], + } + # url_limit forces splitting; max_chunks=10 forces the cap to fire. + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + _plan_chunks(args, _fake_build, url_limit=250, max_chunks=10) + + +def test_plan_chunks_respects_default_cap_without_explicit_arg(): + """Default kwarg path: ``max_chunks`` defaults to _DEFAULT_MAX_CHUNKS + when not specified, so direct callers (e.g., other library code) get + the same safety net as the decorator wrapper.""" + args = { + "dim_a": [f"v{i:03d}" for i in range(60)], + "dim_b": [f"v{i:03d}" for i in range(60)], + "dim_c": [f"v{i:03d}" for i in range(60)], + } + # Without explicit max_chunks: defaults to 1000. The plan for these + # inputs would emit > 1000 sub-requests at a tight limit, so should + # raise on default cap alone. + with pytest.raises(RequestTooLarge, match=r"max_chunks=1000"): + _plan_chunks(args, _fake_build, url_limit=220) + + +def test_multi_value_chunked_cap_override(): + """A decorator-time ``max_chunks`` override lets callers with higher + quotas raise the ceiling without monkeypatching the module constant.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=220, max_chunks=10) + def fetch(args): + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + fetch( + { + "dim_a": [f"longer-v{i}" for i in range(30)], + "dim_b": [f"longer-v{i}" for i in range(30)], + } + ) + + +def _quota_response(remaining: int | str | None) -> mock.Mock: + """A mock requests.Response-like object whose ``x-ratelimit-remaining`` + header reflects the given value (None → header absent).""" + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = ( + {} if remaining is None else {"x-ratelimit-remaining": str(remaining)} + ) + return resp + + +def test_read_remaining_parses_header(): + assert _read_remaining(_quota_response(42)) == 42 + + +def test_read_remaining_treats_missing_header_as_plenty(): + """Servers that don't echo a rate-limit header must not trigger + spurious QuotaExhausted aborts. Sentinel is a large integer so any + plausible safety floor compares cleanly.""" + assert _read_remaining(_quota_response(None)) >= 1_000_000 + + +def test_read_remaining_treats_malformed_header_as_plenty(): + """Defensive: non-integer header value → don't abort.""" + assert _read_remaining(_quota_response("not-a-number")) >= 1_000_000 + + +def test_default_quota_safety_floor(): + """Default floor lives at 50 — enough headroom for one final + chunked call's pagination spike without breaching the hourly cap.""" + assert _DEFAULT_QUOTA_SAFETY_FLOOR == 50 + + +def test_multi_value_chunked_aborts_when_quota_floor_breached(): + """Mid-call, when ``x-ratelimit-remaining`` drops below the floor, + the chunker must raise ``QuotaExhausted`` *before* issuing the next + sub-request — and the exception must carry the partial frame plus + the chunk offset so callers can resume.""" + # Build a fetch_once whose response 'remaining' header decrements + # through 200, 100, 40 (below floor=50), 10. + remaining_seq = iter([200, 100, 40, 10]) + page_idx = iter(range(10)) + + def fetch(args): + idx = next(page_idx) + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [idx] * len(args["sites"])} + ), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + # Plan forces 4 sub-requests (4 singleton site chunks). + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + # Aborted after the 3rd sub-request (remaining=40 < floor=50). + assert err.completed_chunks == 3 + assert err.total_chunks == 4 + assert err.remaining == 40 + # Partial frame combines rows from the first three completed sub-requests. + assert err.partial_frame is not None + assert set(err.partial_frame["page"]) == {0, 1, 2} + + +def test_multi_value_chunked_does_not_abort_on_last_chunk(): + """Aborting on the final sub-request would be pointless — there's + no 'next' to protect. The check is skipped there. Earlier chunks + stay above the floor; only the last drops below, and we still + return cleanly because the check is skipped at i == total-1.""" + remaining_seq = iter([500, 5]) # only the LAST chunk dips below floor=50 + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) # forces 2 chunks + assert len(df) == 2 # no raise — both chunks ran + + +def test_multi_value_chunked_quota_check_disabled_with_zero_floor(): + """Setting the floor to 0 effectively disables the quota guard — + counter can go to 1 without aborting (since 1 > 0 = floor).""" + remaining_seq = iter([5, 1]) + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=0, + )(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) + assert len(df) == 2 # no raise + + +def test_quota_exhausted_message_includes_resume_offset(): + """The error message must point the user at the chunk offset to + resume from, otherwise the partial_frame attribute is a footgun + — the user has no way to know which chunks still need re-issuing.""" + e = QuotaExhausted( + partial_frame=pd.DataFrame(), + partial_response=mock.Mock(), + completed_chunks=7, + total_chunks=20, + remaining=12, + ) + msg = str(e) + assert "7/20" in msg + assert "12" in msg + assert "QuotaExhausted" in msg or "resume" in msg + + +def test_chunkable_params_skips_filter_passed_as_list(): + """Defensive guard: ``filter`` is documented as a string. If a caller + mistakenly passes it as a list, the chunker must NOT treat it as a + multi-value dim — comma-joining CQL clauses inside the URL would + produce a malformed filter expression. The inner ``filters.chunked`` + is the only place that may shrink ``filter``.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "filter": ["a='1'", "a='2'"], # malformed input + "filter_lang": ["cql-text", "cql-json"], # ditto + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "filter" not in chunkable + assert "filter_lang" not in chunkable + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples(