From 7fe353ba4094301e8916aa01c6ff1ebadd10bfa1 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Mon, 13 Apr 2026 19:09:20 -0500 Subject: [PATCH 01/12] Use GET with comma-separated values for multi-value waterdata queries The OGC API now supports comma-separated values for fields like monitoring_location_id, parameter_code, and statistic_id, making POST+CQL2 unnecessary for most services. Update _construct_api_requests to join list params with commas and use GET for daily, continuous, latest-daily, latest-continuous, field-measurements, time-series-metadata, and channel-measurements. The monitoring-locations endpoint does not yet support comma-separated GET parameters (returns 400); it retains the POST+CQL2 path. Closes #210. Co-Authored-By: Claude Sonnet 4.6 --- dataretrieval/waterdata/utils.py | 63 ++++++++++++++++++-------------- tests/waterdata_test.py | 23 ++++++++++++ 2 files changed, 58 insertions(+), 28 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 018e1c85..ef578447 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -417,9 +417,11 @@ def _construct_api_requests( """ Constructs an HTTP request object for the specified water data API service. - Depending on the input parameters (whether there's lists of multiple - argument values), the function determines whether to use a GET or POST - request, formats parameters appropriately, and sets required headers. + For most services, list parameters are comma-joined and sent as a single + GET request (e.g. ``parameter_code=["00060","00010"]`` becomes + ``parameter_code=00060,00010`` in the URL). For services that do not + support comma-separated values (currently only ``monitoring-locations``), + a POST request with CQL2 JSON is used instead. Parameters ---------- @@ -445,36 +447,41 @@ def _construct_api_requests( Notes ----- - Date/time parameters are automatically formatted to ISO8601. - - If multiple values are provided for non-single parameters, a POST request - is constructed. - - The function sets appropriate headers for GET and POST requests. """ service_url = f"{OGC_API_URL}/collections/{service}/items" - # Identify which parameters should be included in the POST content body - post_params = { - k: v - for k, v in kwargs.items() - if k not in _DATE_RANGE_PARAMS and isinstance(v, (list, tuple)) and len(v) > 1 - } - - # Everything else goes into the params dictionary for the URL - params = {k: v for k, v in kwargs.items() if k not in post_params} - # Set skipGeometry parameter (API expects camelCase) - params["skipGeometry"] = skip_geometry + # The monitoring-locations endpoint does not support comma-separated values + # for multi-value GET parameters; CQL2 POST is required for that service. + _cql2_required_services = {"monitoring-locations"} - # If limit is none or greater than 50000, then set limit to max results. Otherwise, - # use the limit - params["limit"] = 50000 if limit is None or limit > 50000 else limit + # Format date/time parameters to ISO8601 first — both routing paths need it. + for key in _DATE_RANGE_PARAMS: + if key in kwargs: + kwargs[key] = _format_api_dates( + kwargs[key], + date=(service == "daily" and key != "last_modified"), + ) - # Indicate if function needs to perform POST conversion - POST = bool(post_params) + if service in _cql2_required_services: + # Legacy path: POST with CQL2 for multi-value params + post_params = { + k: v + for k, v in kwargs.items() + if k not in _DATE_RANGE_PARAMS + and isinstance(v, (list, tuple)) + and len(v) > 1 + } + params = {k: v for k, v in kwargs.items() if k not in post_params} + else: + # Join list/tuple values with commas for multi-value GET parameters. + post_params = {} + params = { + k: ",".join(str(x) for x in v) if isinstance(v, (list, tuple)) else v + for k, v in kwargs.items() + } - # Convert dates to ISO08601 format - for i in _DATE_RANGE_PARAMS: - if i in params: - dates = service == "daily" and i != "last_modified" - params[i] = _format_api_dates(params[i], date=dates) + params["skipGeometry"] = skip_geometry + params["limit"] = 50000 if limit is None or limit > 50000 else limit # `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`. if bbox is not None and len(bbox) > 0: @@ -490,7 +497,7 @@ def _construct_api_requests( headers = _default_headers() - if POST: + if post_params: headers["Content-Type"] = "application/query-cql-json" request = requests.Request( method="POST", diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index e2ba4da8..bb61c94f 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -30,6 +30,7 @@ from dataretrieval.waterdata.utils import ( _check_monitoring_location_id, _check_profiles, + _construct_api_requests, _normalize_str_iterable, ) @@ -111,6 +112,28 @@ def test_check_profiles(): _check_profiles(service="results", profile="foo") +def test_construct_api_requests_multivalue_get(): + """Multi-value params use GET with comma-separated values for daily service.""" + req = _construct_api_requests( + "daily", + monitoring_location_id=["USGS-05427718", "USGS-05427719"], + parameter_code=["00060", "00065"], + ) + assert req.method == "GET" + assert "monitoring_location_id=USGS-05427718%2CUSGS-05427719" in req.url + assert "parameter_code=00060%2C00065" in req.url + + +def test_construct_api_requests_monitoring_locations_post(): + """monitoring-locations uses POST+CQL2 for multi-value params (API limitation).""" + req = _construct_api_requests( + "monitoring-locations", + hydrologic_unit_code=["010802050102", "010802050103"], + ) + assert req.method == "POST" + assert req.body is not None + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples( From fa7886934e86b86e813fe72d2020c4c19b15c912 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 14 May 2026 08:43:14 -0500 Subject: [PATCH 02/12] Polish PR 233: module-level constant, fix misleading comment, add 3 unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Style alignment with the rest of `waterdata/utils.py`: - Hoist `_cql2_required_services` from a function-local lowercase `set` to a module-level `_CQL2_REQUIRED_SERVICES = frozenset(...)` to match the convention of `_DATE_RANGE_PARAMS`, `_NO_NORMALIZE_PARAMS`, `_MONITORING_LOCATION_ID_RE`, etc. - Drop the "Legacy path:" prefix in the inline comment. POST/CQL2 is still the current and required path for monitoring-locations — the API team hasn't promised to add comma-GET there. Rephrased the two branches symmetrically ("POST with CQL2 JSON" / "GET with comma- separated values") so neither reads as deprecated. New unit tests: - `test_construct_api_requests_single_value_stays_get` — confirms a scalar `monitoring_location_id="USGS-..."` still produces a clean GET with no `%2C`, i.e. existing single-site callers see no change. - `test_construct_api_requests_numeric_list_joins_with_str` — pins down that `water_year=[2020, 2021]` reaches the URL as `water_year=2020%2C2021`, exercising the `str(x) for x in v` generator that exists specifically to handle non-string list params (without it, `",".join` on a list of ints would TypeError). - `test_construct_api_requests_two_element_date_list_becomes_interval` — pins down the contract that a two-element date list (`time=["2024-01-01", "2024-01-31"]`) is interpreted as start/end of an OGC datetime interval (joined with `/`), NOT as two discrete dates. The OGC `datetime` parameter doesn't support "these N specific dates" — that would require a CQL filter. Test exists so this semantic choice can't be silently changed. --- dataretrieval/waterdata/utils.py | 14 +++++------ tests/waterdata_test.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index ef578447..af5906e0 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -153,6 +153,10 @@ def _switch_properties_id(properties: list[str] | None, id_name: str, service: s {"datetime", "last_modified", "begin", "begin_utc", "end", "end_utc", "time"} ) +# Services that don't support comma-separated values for multi-value GET +# parameters and require POST with CQL2 JSON instead. +_CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"}) + def _parse_datetime(value: str) -> datetime | None: """Parse a single datetime string against the supported formats. @@ -450,10 +454,6 @@ def _construct_api_requests( """ service_url = f"{OGC_API_URL}/collections/{service}/items" - # The monitoring-locations endpoint does not support comma-separated values - # for multi-value GET parameters; CQL2 POST is required for that service. - _cql2_required_services = {"monitoring-locations"} - # Format date/time parameters to ISO8601 first — both routing paths need it. for key in _DATE_RANGE_PARAMS: if key in kwargs: @@ -462,8 +462,8 @@ def _construct_api_requests( date=(service == "daily" and key != "last_modified"), ) - if service in _cql2_required_services: - # Legacy path: POST with CQL2 for multi-value params + if service in _CQL2_REQUIRED_SERVICES: + # POST with CQL2 JSON: multi-value params go in the request body. post_params = { k: v for k, v in kwargs.items() @@ -473,7 +473,7 @@ def _construct_api_requests( } params = {k: v for k, v in kwargs.items() if k not in post_params} else: - # Join list/tuple values with commas for multi-value GET parameters. + # GET with comma-separated values: join list/tuple values into one string. post_params = {} params = { k: ",".join(str(x) for x in v) if isinstance(v, (list, tuple)) else v diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index bb61c94f..40ea0615 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -134,6 +134,47 @@ def test_construct_api_requests_monitoring_locations_post(): assert req.body is not None +def test_construct_api_requests_single_value_stays_get(): + """A length-1 list (or scalar) reaches the URL as a plain value, not a + comma-separated form, so existing single-site callers see no change.""" + req = _construct_api_requests( + "daily", + monitoring_location_id="USGS-05427718", + parameter_code="00060", + ) + assert req.method == "GET" + assert "monitoring_location_id=USGS-05427718" in req.url + assert "%2C" not in req.url # no comma-encoded multi-value + + +def test_construct_api_requests_numeric_list_joins_with_str(): + """Numeric-list params (e.g. ``water_year=[2020, 2021]`` on get_peaks) + must reach the URL as a comma-joined string, not crash on ``",".join`` + of ints. The generator-of-``str(x)`` exists exactly for this case.""" + req = _construct_api_requests( + "peaks", + monitoring_location_id="USGS-05427718", + water_year=[2020, 2021], + ) + assert req.method == "GET" + assert "water_year=2020%2C2021" in req.url + + +def test_construct_api_requests_two_element_date_list_becomes_interval(): + """A two-element date list is interpreted as start/end of an OGC datetime + interval (joined with '/'), NOT as two discrete dates. The OGC `datetime` + parameter does not support "these N specific dates" — that would require + a CQL filter. Verifying so this contract is locked in.""" + req = _construct_api_requests( + "daily", + monitoring_location_id="USGS-05427718", + time=["2024-01-01", "2024-01-31"], + ) + assert req.method == "GET" + # `/` URL-encodes to %2F. Confirms _format_api_dates ran before the join. + assert "time=2024-01-01%2F2024-01-31" in req.url + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples( From 22a09c7233686ffedb0f335393396645e4d34889 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 14 May 2026 21:09:23 -0500 Subject: [PATCH 03/12] Add multi-value GET-parameter chunker for waterdata OGC API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps _fetch_once with a cartesian-product chunker that sits OUTSIDE @filters.chunked. Splits multi-value list params (monitoring_location_id, parameter_code, statistic_id, etc.) across sub-requests so each URL fits the server's ~8 KB byte limit. Coordination with @filters.chunked: the planner's URL probe substitutes the filter with its longest top-level OR-clause via _filter_aware_probe_args, modeling the per-sub-request URL the inner filter chunker will actually emit. Without this coordination, a long OR-filter plus multi-value lists triggered premature RequestTooLarge even when the combined chunkers would have made things fit. Two safety guards: - max_chunks=1000 cap on cartesian-product size (matches USGS API hourly quota; raises RequestTooLarge with the actual count when exceeded). - QuotaExhausted abort: between sub-requests, reads x-ratelimit-remaining; if below quota_safety_floor (default 50), raises with the partial frame and chunk offset so callers can resume instead of crashing into a mid-call HTTP 429. 30 unit tests cover the planner, filter-aware coordination, the cap, and the quota-aware abort. Live tests in /tmp verify a 3-dim equivalence case (chunked == unchunked, 16 sub-requests, all axes split), 6 edge-case stress scenarios, and 3 mv/filter composition regimes. Depends on #273 (paginated silent-truncation fix) — this PR multiplies the frequency at which the silent-truncation bug class would have surfaced. Merge order: #273 -> #233 -> this PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 354 ++++++++++++++++++++++++ dataretrieval/waterdata/utils.py | 15 +- tests/waterdata_test.py | 407 ++++++++++++++++++++++++++++ 3 files changed, 770 insertions(+), 6 deletions(-) create mode 100644 dataretrieval/waterdata/chunking.py diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py new file mode 100644 index 00000000..1d0a1ff5 --- /dev/null +++ b/dataretrieval/waterdata/chunking.py @@ -0,0 +1,354 @@ +"""Multi-value GET-parameter chunking for the Water Data OGC getters. + +PR 233 routes most services through GET with comma-separated values +(e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow +the server's ~8 KB URL byte limit. This module adds a decorator that +sits OUTSIDE ``filters.chunked`` and splits multi-value list params +across multiple sub-requests so each URL fits. + +Design (orthogonal to filter chunking): + +- N-dimensional cartesian product: for each chunkable list param, the + values are partitioned into sub-lists; the planner emits the cartesian + product of those partitions. Sub-chunks of the same dim never overlap, + so frame concat needs no dedup across multi-value chunks. +- Greedy halving of the largest chunk in any dim until the worst-case + sub-request URL fits the limit. Minimises total request count. +- Date params, ``bbox``, and ``properties`` are not chunked: dates are + intervals not enumerable sets; bbox is a coord array; ``properties`` + determines output schema and chunking it would shard columns. + +Coordination with ``filters.chunked``: +The planner probes URL length using the SHORTEST top-level OR-clause +when a chunkable filter is present, not the full filter. ``filters. +chunked`` (inner) will split the filter per sub-request, so probing +with the smallest clause models the per-sub-request URL the stack will +actually produce. Without this, a long OR-filter plus multi-value +lists would trigger a premature ``RequestTooLarge`` even though the +combined chunkers would have made things fit. +""" + +from __future__ import annotations + +import functools +import itertools +from collections.abc import Callable +from typing import Any, TypeVar + +import pandas as pd +import requests + +from . import filters +from .filters import ( + _combine_chunk_frames, + _combine_chunk_responses, + _is_chunkable, + _split_top_level_or, +) + +# Params that look like lists but must NOT be chunked. ``properties`` is +# excluded because it defines the response schema; chunking it would +# return frames with different columns per sub-request. ``bbox`` is a +# fixed 4-element coord tuple. Date params are intervals not sets. The +# CQL ``filter`` (and its ``filter_lang``) is a string that has its own +# inner chunker (``filters.chunked``); if a caller passes ``filter`` as +# a list, treating it as a multi-value param would emit malformed CQL. +_NEVER_CHUNK = frozenset( + { + "properties", + "bbox", + "datetime", + "last_modified", + "begin", + "begin_utc", + "end", + "end_utc", + "time", + "filter", + "filter_lang", + } +) + +# Default cap on the number of sub-requests a single chunked call may +# emit. The USGS Water Data API rate-limits each HTTP request (including +# pagination), so the true budget is ``hourly_quota / avg_pages_per_chunk``. +# 1000 matches the default hourly quota and is a reasonable upper bound +# for single-page sub-requests; tune lower if your queries paginate. +# Override per-decorator via ``max_chunks=`` or by monkeypatching this +# module attribute (read lazily in the wrapper). +_DEFAULT_MAX_CHUNKS = 1000 + +# When ``x-ratelimit-remaining`` drops below this between sub-requests, +# the chunker bails with ``QuotaExhausted`` rather than risk a mid-call +# HTTP 429. Carries the partial result so callers can resume from a +# known offset instead of retrying the whole chunked call from scratch. +_DEFAULT_QUOTA_SAFETY_FLOOR = 50 + + +class RequestTooLarge(ValueError): + """Raised when a chunked request cannot be issued. Two cases: + (1) URL exceeds the byte limit even with every multi-value param at + a singleton chunk and any chunkable filter reduced to its smallest + top-level OR-clause; (2) the cartesian-product plan would issue more + than ``max_chunks`` sub-requests.""" + + +class QuotaExhausted(RuntimeError): + """Raised mid-chunked-call when the API's reported remaining quota + (``x-ratelimit-remaining`` header) drops below the configured safety + floor. The chunker stops before issuing the next sub-request to + avoid a mid-call HTTP 429 that would silently truncate paginated + results (see PR #273 for the pagination side of that bug). + + The exception carries everything needed to resume: the combined + partial frame from completed sub-requests, the metadata for the + last successful sub-request, the number of chunks completed out of + the plan total, and the last-observed ``remaining`` value. + + Attributes + ---------- + partial_frame : pd.DataFrame + Concatenated, deduplicated result of every sub-request that + completed before the floor was crossed. + partial_response : requests.Response + Aggregated response (URL/headers of the first sub-request, + summed ``elapsed``). Wrap in ``BaseMetadata`` to surface to + the caller alongside the partial frame. + completed_chunks : int + Number of sub-requests successfully completed. + total_chunks : int + Total sub-requests in the cartesian-product plan. + remaining : int + Last observed ``x-ratelimit-remaining`` value. + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + completed_chunks: int, + total_chunks: int, + remaining: int, + ) -> None: + super().__init__( + f"x-ratelimit-remaining dropped to {remaining} after " + f"{completed_chunks}/{total_chunks} chunks; aborting to avoid " + f"mid-call HTTP 429. Catch QuotaExhausted to access " + f".partial_frame and resume from chunk {completed_chunks}." + ) + self.partial_frame = partial_frame + self.partial_response = partial_response + self.completed_chunks = completed_chunks + self.total_chunks = total_chunks + self.remaining = remaining + + +def _chunkable_params(args: dict[str, Any]) -> dict[str, list]: + """Return ``{name: list(values)}`` for every list/tuple kwarg with + >1 element that is allowed to chunk.""" + return { + k: list(v) + for k, v in args.items() + if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1 + } + + +def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: + """Substitute the filter with its shortest top-level OR-clause if the + filter is chunkable, otherwise return ``args`` unchanged. + + The inner ``filters.chunked`` decorator will reduce the filter per + sub-request to at most one OR-clause (its hard floor — see + ``_chunk_cql_or``). Probing with that minimum models the per-sub- + request URL the decorator stack will actually emit, so we don't + plan around bytes the filter chunker has already promised to remove. + """ + filter_expr = args.get("filter") + filter_lang = args.get("filter_lang") + if not _is_chunkable(filter_expr, filter_lang): + return args + parts = _split_top_level_or(filter_expr) + if len(parts) < 2: + return args # one-clause filter — filter chunker can't shrink it + return {**args, "filter": min(parts, key=len)} + + +def _worst_case_args( + probe_args: dict[str, Any], plan: dict[str, list[list]] +) -> dict[str, Any]: + """Args dict using the LARGEST chunk from each dim — represents the + most byte-heavy sub-request the plan will issue, with the filter + already reduced to its filter-chunker floor.""" + out = dict(probe_args) + for k, chunks in plan.items(): + out[k] = max(chunks, key=lambda c: len(",".join(map(str, c)))) + return out + + +def _plan_chunks( + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, + max_chunks: int = _DEFAULT_MAX_CHUNKS, +) -> dict[str, list[list]] | None: + """Greedy halving until the worst-case sub-request URL fits. + + Returns ``None`` when no chunking is needed (request as-is fits or + no chunkable lists). Raises ``RequestTooLarge`` when: + - every multi-value param is already a singleton chunk AND the + filter (if any) is already at its smallest OR-clause and the URL + still exceeds ``url_limit`` (irreducible), or + - the converged cartesian-product plan would issue more than + ``max_chunks`` sub-requests (hourly API budget). + """ + chunkable = _chunkable_params(args) + if not chunkable: + return None + probe_args = _filter_aware_probe_args(args) + if len(build_request(**probe_args).url) <= url_limit: + return None + + plan: dict[str, list[list]] = {k: [v] for k, v in chunkable.items()} + + while True: + worst = _worst_case_args(probe_args, plan) + if len(build_request(**worst).url) <= url_limit: + break + + # Find the single biggest chunk across all dims and halve it. + best: tuple[str, int, int] | None = None # (dim, chunk_index, size) + for dim, dim_chunks in plan.items(): + for idx, chunk in enumerate(dim_chunks): + if len(chunk) <= 1: + continue + size = len(",".join(map(str, chunk))) + if best is None or size > best[2]: + best = (dim, idx, size) + + if best is None: + raise RequestTooLarge( + f"Request URL exceeds {url_limit} bytes even with every " + f"multi-value parameter at a singleton chunk and any " + f"chunkable filter reduced to one OR-clause. Reduce the " + f"number of values or split the call manually." + ) + dim, idx, _ = best + big = plan[dim][idx] + mid = len(big) // 2 + plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :] + + total = 1 + for chunks in plan.values(): + total *= len(chunks) + if total > max_chunks: + raise RequestTooLarge( + f"Chunked plan would issue {total} sub-requests, exceeding " + f"max_chunks={max_chunks} (USGS API's default hourly rate " + f"limit per key). Reduce input list sizes, narrow the time " + f"window, or raise max_chunks if you have a higher quota." + ) + return plan + + +_FetchOnce = TypeVar( + "_FetchOnce", + bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]], +) + + +def _read_remaining(response: requests.Response) -> int: + """Parse ``x-ratelimit-remaining`` from a response. Missing or + malformed header → return a large sentinel so the safety check + treats it as 'plenty of quota' (don't abort on header glitches).""" + raw = response.headers.get("x-ratelimit-remaining") + if raw is None: + return 10**9 + try: + return int(raw) + except (TypeError, ValueError): + return 10**9 + + +def multi_value_chunked( + *, + build_request: Callable[..., Any], + url_limit: int | None = None, + max_chunks: int | None = None, + quota_safety_floor: int | None = None, +) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that splits multi-value list params across sub-requests so + each URL fits ``url_limit`` bytes (defaults to ``filters._WATERDATA_ + URL_BYTE_LIMIT``) and the cartesian-product plan stays ≤ ``max_chunks`` + sub-requests (defaults to ``_DEFAULT_MAX_CHUNKS``). All defaults are + resolved at call time so tests/users that patch the module constants + affect this decorator uniformly. + + Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from + each response. If it drops below ``quota_safety_floor`` (default + ``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted`` + carrying the combined partial result and the chunk offset so callers + can resume after the hourly window resets, instead of crashing into + a mid-pagination HTTP 429 (which the upstream pagination loop in + ``_walk_pages`` historically truncated silently — see PR #273). + + Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, + filter-chunking is the inner loop. The wrapped function has the same + signature as ``filters.chunked`` expects — ``(args: dict) -> (frame, + response)`` — so the two decorators compose cleanly. The planner is + filter-aware so it doesn't raise prematurely when the inner filter + chunker would have shrunk the per-sub-request URL on its own. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + limit = ( + url_limit + if url_limit is not None + else filters._WATERDATA_URL_BYTE_LIMIT + ) + cap = max_chunks if max_chunks is not None else _DEFAULT_MAX_CHUNKS + floor = ( + quota_safety_floor + if quota_safety_floor is not None + else _DEFAULT_QUOTA_SAFETY_FLOOR + ) + plan = _plan_chunks(args, build_request, limit, cap) + if plan is None: + return fetch_once(args) + + keys = list(plan) + total = 1 + for k in keys: + total *= len(plan[k]) + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))): + sub_args = {**args, **dict(zip(keys, combo))} + frame, response = fetch_once(sub_args) + frames.append(frame) + responses.append(response) + # Quota check happens BETWEEN sub-requests: skip on the + # last iteration because there's nothing left to abort. + if i < total - 1: + remaining = _read_remaining(response) + if remaining < floor: + raise QuotaExhausted( + partial_frame=_combine_chunk_frames(frames), + partial_response=_combine_chunk_responses(responses), + completed_chunks=i + 1, + total_chunks=total, + remaining=remaining, + ) + + return ( + _combine_chunk_frames(frames), + _combine_chunk_responses(responses), + ) + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index af5906e0..dd71a8f4 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -14,7 +14,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata -from dataretrieval.waterdata import filters +from dataretrieval.waterdata import chunking, filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -912,17 +912,20 @@ def get_ogc_data( return return_list, BaseMetadata(response) +@chunking.multi_value_chunked(build_request=_construct_api_requests) @filters.chunked(build_request=_construct_api_requests) def _fetch_once( args: dict[str, Any], ) -> tuple[pd.DataFrame, requests.Response]: """Send one prepared-args OGC request; return the frame + response. - Filter chunking is added orthogonally by the ``@filters.chunked`` - decorator: with no filter (or an un-chunkable one) the decorator - passes ``args`` through to this body; with a chunkable filter it - fans out and calls this body once per sub-filter, then combines. - Either way the return shape is ``(frame, response)``. + Two orthogonal chunkers wrap this body. ``@chunking.multi_value_chunked`` + (outer) splits multi-value list params (e.g. ``monitoring_location_id``) + across sub-requests so each URL fits the server byte limit; the + cartesian product of per-dim chunks is iterated. ``@filters.chunked`` + (inner) splits long cql-text filters at top-level ``OR``. With no + chunkable inputs both pass through unchanged. Either way the return + shape is ``(frame, response)``. """ req = _construct_api_requests(**args) return _walk_pages(geopd=GEOPANDAS, req=req) diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 40ea0615..78f3a3ba 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -27,6 +27,17 @@ get_stats_por, get_time_series_metadata, ) +from dataretrieval.waterdata.chunking import ( + _DEFAULT_MAX_CHUNKS, + _DEFAULT_QUOTA_SAFETY_FLOOR, + QuotaExhausted, + RequestTooLarge, + _chunkable_params, + _filter_aware_probe_args, + _plan_chunks, + _read_remaining, + multi_value_chunked, +) from dataretrieval.waterdata.utils import ( _check_monitoring_location_id, _check_profiles, @@ -175,6 +186,402 @@ def test_construct_api_requests_two_element_date_list_becomes_interval(): assert "time=2024-01-01%2F2024-01-31" in req.url +# ----- Multi-value GET-parameter chunker (chunking.py) ---------------------- +# +# These tests exercise the planner with a fake ``build_request`` whose URL +# byte length is a deterministic function of its inputs. Tests below model: +# - non-chunkable args contribute ``base_bytes``, +# - every multi-value list contributes ``len(",".join(map(str, v)))``, +# - the ``filter`` kwarg contributes ``len(filter)``. +# This isolates planner behaviour from the real HTTP request builder. + + +class _FakeReq: + __slots__ = ("url",) + + def __init__(self, url): + self.url = url + + +def _fake_build(*, base=200, **kwargs): + """Fake build_request: URL length deterministic in its inputs.""" + bytes_ = base + for v in kwargs.values(): + if isinstance(v, (list, tuple)): + bytes_ += len(",".join(map(str, v))) + elif isinstance(v, str): + bytes_ += len(v) + return _FakeReq("x" * bytes_) + + +def test_filter_aware_probe_args_passes_through_when_not_chunkable(): + """No filter, json-lang filter, single-clause filter — return unchanged.""" + assert _filter_aware_probe_args({"a": 1}) == {"a": 1} + assert _filter_aware_probe_args({"filter": "a='1'", "filter_lang": "cql-json"}) == { + "filter": "a='1'", + "filter_lang": "cql-json", + } + args = {"filter": "a='single clause with no OR'"} + assert _filter_aware_probe_args(args) == args + + +def test_filter_aware_probe_args_substitutes_shortest_or_clause(): + """Chunkable filter → return args with filter replaced by shortest clause.""" + args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} + probe = _filter_aware_probe_args(args) + assert probe["filter"] == "a='1'" + assert probe["x"] == 7 + assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated + + +def test_plan_chunks_returns_none_when_request_fits(): + """URL under limit → planner returns None, decorator passes through.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = _plan_chunks(args, _fake_build, url_limit=8000) + assert plan is None + + +def test_plan_chunks_returns_none_when_no_chunkable_lists(): + """No multi-value lists, however over-limit → planner can't help, returns None + (decorator falls through; server may 414 but that's not chunker's job).""" + args = {"monitoring_location_id": "scalar-only"} + plan = _plan_chunks(args, _fake_build, url_limit=10) + assert plan is None + + +def test_plan_chunks_greedy_halving_targets_largest_dim(): + """Two dims with one much larger — the heavy dim halves first.""" + args = { + "monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30], + "parameter_code": ["00060", "00065"], + } + # full URL ≈ 200 + 123 + 12 = 335; force splitting heavy dim only. + plan = _plan_chunks(args, _fake_build, url_limit=310) + assert len(plan["monitoring_location_id"]) > 1 + assert len(plan["parameter_code"]) == 1 # heavy-dim split was enough + + +def test_plan_chunks_raises_request_too_large_at_singleton_floor(): + """Limit below singleton-per-dim floor (with no chunkable filter to + fall back on) → RequestTooLarge with a clear message.""" + args = {"monitoring_location_id": ["A", "B"]} + # base=200 alone exceeds limit; no relief possible. + with pytest.raises(RequestTooLarge, match="multi-value parameter"): + _plan_chunks(args, _fake_build, url_limit=100) + + +def test_plan_chunks_coordinates_with_filter_chunker(): + """COORDINATION REGRESSION TEST. + + With the FULL filter in URL-length probes, singleton-per-dim URL still + exceeds the limit and the planner would raise RequestTooLarge. With + filter-aware probing, the planner models the per-sub-request URL as + ``worst-dim-chunk + shortest-clause`` (what the inner filter chunker + will actually emit), sees it fits, and returns a plan. + + Sanity-check the *negative*: with filter-aware probing disabled, the + same inputs would raise. + """ + clauses = [f"f='{i}'" for i in range(10)] + args = { + "monitoring_location_id": ["A" * 10, "B" * 10, "C" * 10, "D" * 10], + "filter": " OR ".join(clauses), + } + # singleton+full-filter ≈ 200 + 10 + 86 = 296 (over limit 240) — would raise. + # min-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + plan = _plan_chunks(args, _fake_build, url_limit=240) + assert plan is not None # coordination prevented the premature raise + assert len(plan["monitoring_location_id"]) > 1 # planner did split + + # Negative control: monkey-patch the probe helper to be a no-op + # (model "no filter awareness") and confirm the same inputs raise. + import dataretrieval.waterdata.chunking as ch + + saved = ch._filter_aware_probe_args + try: + ch._filter_aware_probe_args = lambda a: a # pretend no awareness + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=240) + finally: + ch._filter_aware_probe_args = saved + + +def test_plan_chunks_still_raises_when_even_min_clause_doesnt_fit(): + """If the limit is so tight that singleton + shortest-clause STILL + exceeds it, filter chunker can't save us either — raise.""" + args = { + "monitoring_location_id": ["A" * 10, "B" * 10], + "filter": "x='12345' OR x='67890'", # min clause is 9 chars + } + # Singleton + min-clause ≈ 200 + 10 + 9 = 219; limit below that → unrecoverable. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=210) + + +def test_multi_value_chunked_passes_through_when_url_fits(): + """No planning needed → decorator calls underlying function exactly once + with the original args.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch({"monitoring_location_id": ["A", "B"]}) + assert len(calls) == 1 + assert calls[0]["monitoring_location_id"] == ["A", "B"] + + +def test_multi_value_chunked_emits_cartesian_product(): + """Two chunkable dims, each split into 2 chunks → exactly 4 sub-calls, + each pairing one chunk from each dim.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append({k: v for k, v in args.items() if k in ("sites", "pcodes")}) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10], + "pcodes": ["P1" * 10, "P2" * 10, "P3" * 10, "P4" * 10], + } + ) + # Both heavy → planner should split both dims. Confirm a cartesian shape: + # every unique site-chunk pairs with every unique pcode-chunk. + sites_seen = {tuple(c["sites"]) for c in calls} + pcodes_seen = {tuple(c["pcodes"]) for c in calls} + assert len(calls) == len(sites_seen) * len(pcodes_seen) + assert len(sites_seen) > 1 + assert len(pcodes_seen) > 1 + + +def test_multi_value_chunked_lazy_url_limit(): + """``url_limit=None`` → resolve filters._WATERDATA_URL_BYTE_LIMIT at call + time, so tests that patch the constant affect this decorator too.""" + from dataretrieval.waterdata import filters as wd_filters + + calls = [] + + @multi_value_chunked(build_request=_fake_build) # url_limit defaults to None + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + saved = wd_filters._WATERDATA_URL_BYTE_LIMIT + try: + wd_filters._WATERDATA_URL_BYTE_LIMIT = 240 + # 4 sites of 10 chars → exceeds 240 → planner splits. + fetch({"sites": ["S" * 10 + str(i) for i in range(4)]}) + assert len(calls) > 1, "patched constant should drive chunking" + finally: + wd_filters._WATERDATA_URL_BYTE_LIMIT = saved + + +def test_default_max_chunks_matches_hourly_api_quota(): + """The default cap mirrors the USGS Water Data API's documented + per-API-key hourly limit. Locking this in so future changes have to + explicitly acknowledge the quota.""" + assert _DEFAULT_MAX_CHUNKS == 1000 + + +def test_plan_chunks_raises_when_plan_exceeds_max_chunks(): + """A converged plan with more sub-requests than ``max_chunks`` must + raise rather than silently issue them and burn the user's API quota.""" + # 2 dims with long values, each needing many singleton-ish chunks. + # Pick chunk sizes that converge to a plan exceeding a tight cap. + args = { + "dim_a": [f"long-string-value-{i}" for i in range(50)], + "dim_b": [f"another-long-value-{i}" for i in range(50)], + } + # url_limit forces splitting; max_chunks=10 forces the cap to fire. + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + _plan_chunks(args, _fake_build, url_limit=250, max_chunks=10) + + +def test_plan_chunks_respects_default_cap_without_explicit_arg(): + """Default kwarg path: ``max_chunks`` defaults to _DEFAULT_MAX_CHUNKS + when not specified, so direct callers (e.g., other library code) get + the same safety net as the decorator wrapper.""" + args = { + "dim_a": [f"v{i:03d}" for i in range(60)], + "dim_b": [f"v{i:03d}" for i in range(60)], + "dim_c": [f"v{i:03d}" for i in range(60)], + } + # Without explicit max_chunks: defaults to 1000. The plan for these + # inputs would emit > 1000 sub-requests at a tight limit, so should + # raise on default cap alone. + with pytest.raises(RequestTooLarge, match=r"max_chunks=1000"): + _plan_chunks(args, _fake_build, url_limit=220) + + +def test_multi_value_chunked_cap_override(): + """A decorator-time ``max_chunks`` override lets callers with higher + quotas raise the ceiling without monkeypatching the module constant.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=220, max_chunks=10) + def fetch(args): + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + fetch( + { + "dim_a": [f"longer-v{i}" for i in range(30)], + "dim_b": [f"longer-v{i}" for i in range(30)], + } + ) + + +def _quota_response(remaining: int | str | None) -> mock.Mock: + """A mock requests.Response-like object whose ``x-ratelimit-remaining`` + header reflects the given value (None → header absent).""" + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = ( + {} if remaining is None else {"x-ratelimit-remaining": str(remaining)} + ) + return resp + + +def test_read_remaining_parses_header(): + assert _read_remaining(_quota_response(42)) == 42 + + +def test_read_remaining_treats_missing_header_as_plenty(): + """Servers that don't echo a rate-limit header must not trigger + spurious QuotaExhausted aborts. Sentinel is a large integer so any + plausible safety floor compares cleanly.""" + assert _read_remaining(_quota_response(None)) >= 1_000_000 + + +def test_read_remaining_treats_malformed_header_as_plenty(): + """Defensive: non-integer header value → don't abort.""" + assert _read_remaining(_quota_response("not-a-number")) >= 1_000_000 + + +def test_default_quota_safety_floor(): + """Default floor lives at 50 — enough headroom for one final + chunked call's pagination spike without breaching the hourly cap.""" + assert _DEFAULT_QUOTA_SAFETY_FLOOR == 50 + + +def test_multi_value_chunked_aborts_when_quota_floor_breached(): + """Mid-call, when ``x-ratelimit-remaining`` drops below the floor, + the chunker must raise ``QuotaExhausted`` *before* issuing the next + sub-request — and the exception must carry the partial frame plus + the chunk offset so callers can resume.""" + # Build a fetch_once whose response 'remaining' header decrements + # through 200, 100, 40 (below floor=50), 10. + remaining_seq = iter([200, 100, 40, 10]) + page_idx = iter(range(10)) + + def fetch(args): + idx = next(page_idx) + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [idx] * len(args["sites"])} + ), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + # Plan forces 4 sub-requests (4 singleton site chunks). + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + # Aborted after the 3rd sub-request (remaining=40 < floor=50). + assert err.completed_chunks == 3 + assert err.total_chunks == 4 + assert err.remaining == 40 + # Partial frame combines rows from the first three completed sub-requests. + assert err.partial_frame is not None + assert set(err.partial_frame["page"]) == {0, 1, 2} + + +def test_multi_value_chunked_does_not_abort_on_last_chunk(): + """Aborting on the final sub-request would be pointless — there's + no 'next' to protect. The check is skipped there. Earlier chunks + stay above the floor; only the last drops below, and we still + return cleanly because the check is skipped at i == total-1.""" + remaining_seq = iter([500, 5]) # only the LAST chunk dips below floor=50 + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) # forces 2 chunks + assert len(df) == 2 # no raise — both chunks ran + + +def test_multi_value_chunked_quota_check_disabled_with_zero_floor(): + """Setting the floor to 0 effectively disables the quota guard — + counter can go to 1 without aborting (since 1 > 0 = floor).""" + remaining_seq = iter([5, 1]) + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=0, + )(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) + assert len(df) == 2 # no raise + + +def test_quota_exhausted_message_includes_resume_offset(): + """The error message must point the user at the chunk offset to + resume from, otherwise the partial_frame attribute is a footgun + — the user has no way to know which chunks still need re-issuing.""" + e = QuotaExhausted( + partial_frame=pd.DataFrame(), + partial_response=mock.Mock(), + completed_chunks=7, + total_chunks=20, + remaining=12, + ) + msg = str(e) + assert "7/20" in msg + assert "12" in msg + assert "QuotaExhausted" in msg or "resume" in msg + + +def test_chunkable_params_skips_filter_passed_as_list(): + """Defensive guard: ``filter`` is documented as a string. If a caller + mistakenly passes it as a list, the chunker must NOT treat it as a + multi-value dim — comma-joining CQL clauses inside the URL would + produce a malformed filter expression. The inner ``filters.chunked`` + is the only place that may shrink ``filter``.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "filter": ["a='1'", "a='2'"], # malformed input + "filter_lang": ["cql-text", "cql-json"], # ditto + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "filter" not in chunkable + assert "filter_lang" not in chunkable + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples( From c7ef181bd04771c21432512c5448511982f4b16b Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 14 May 2026 21:25:00 -0500 Subject: [PATCH 04/12] Probe with longest OR-clause, not shortest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The filter chunker (`filters.chunked`) splits a filter into chunks each ≤ the per-sub-request budget, but bails (returns the full filter unchanged) when ANY single OR-clause exceeds the budget. So the smallest filter size the inner chunker can guarantee to emit per sub-request is bounded below by the LARGEST single clause, not the smallest. The original implementation used `min(parts)` to model the chunker's output floor. For filters with uniform clause sizes (all my prior tests), min == max and the bug was hidden. For filters with lopsided clauses — e.g. `id='1' OR id='abcdef…long-string'` — using `min` would let the planner falsely declare a plan feasible. The inner chunker would then bail on the large clause, the real per-sub-request URL would carry the full filter, and the request would 414 server-side. Switch to `max(parts, key=len)`. If singleton+max-clause fits the URL limit, the inner chunker's budget is ≥ max(parts), so all clauses fit individually and chunking succeeds. If singleton+max-clause doesn't fit, the planner correctly raises `RequestTooLarge` instead of producing an unservable plan. Regression test: `test_plan_chunks_probes_with_max_clause_not_min`. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 33 +++++++++++++++++------------ tests/waterdata_test.py | 32 ++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 1d0a1ff5..f3ef8af6 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -19,13 +19,14 @@ determines output schema and chunking it would shard columns. Coordination with ``filters.chunked``: -The planner probes URL length using the SHORTEST top-level OR-clause +The planner probes URL length using the LONGEST top-level OR-clause when a chunkable filter is present, not the full filter. ``filters. -chunked`` (inner) will split the filter per sub-request, so probing -with the smallest clause models the per-sub-request URL the stack will -actually produce. Without this, a long OR-filter plus multi-value -lists would trigger a premature ``RequestTooLarge`` even though the -combined chunkers would have made things fit. +chunked`` (inner) will split the filter per sub-request but bails if +any single clause exceeds its budget, so the longest clause is the +smallest filter size the stack is guaranteed to emit. Without this +coordination, a long OR-filter plus multi-value lists would trigger a +premature ``RequestTooLarge`` even though the combined chunkers would +have made things fit. """ from __future__ import annotations @@ -155,14 +156,18 @@ def _chunkable_params(args: dict[str, Any]) -> dict[str, list]: def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: - """Substitute the filter with its shortest top-level OR-clause if the + """Substitute the filter with its LONGEST top-level OR-clause if the filter is chunkable, otherwise return ``args`` unchanged. - The inner ``filters.chunked`` decorator will reduce the filter per - sub-request to at most one OR-clause (its hard floor — see - ``_chunk_cql_or``). Probing with that minimum models the per-sub- - request URL the decorator stack will actually emit, so we don't - plan around bytes the filter chunker has already promised to remove. + The inner ``filters.chunked`` decorator splits a filter into chunks + each ≤ the per-sub-request byte budget, but bails (returns the full + filter unchanged) when ANY single OR-clause exceeds the budget. So + the smallest filter the inner chunker is guaranteed to emit per + sub-request is bounded below by the largest single clause — not the + smallest. Probing with ``max(parts, key=len)`` models the worst + achievable per-sub-request URL the decorator stack can produce; if + that fits, we know the inner chunker won't bail and the actual URL + will fit too. """ filter_expr = args.get("filter") filter_lang = args.get("filter_lang") @@ -170,8 +175,8 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: return args parts = _split_top_level_or(filter_expr) if len(parts) < 2: - return args # one-clause filter — filter chunker can't shrink it - return {**args, "filter": min(parts, key=len)} + return args # one-clause filter — inner chunker can't shrink it + return {**args, "filter": max(parts, key=len)} def _worst_case_args( diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 78f3a3ba..0d18a2a1 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -225,11 +225,14 @@ def test_filter_aware_probe_args_passes_through_when_not_chunkable(): assert _filter_aware_probe_args(args) == args -def test_filter_aware_probe_args_substitutes_shortest_or_clause(): - """Chunkable filter → return args with filter replaced by shortest clause.""" +def test_filter_aware_probe_args_substitutes_longest_or_clause(): + """Chunkable filter → return args with filter replaced by the LONGEST + clause. The inner filter chunker bails if any single clause exceeds + the per-sub-request budget, so the longest clause is the smallest + filter size the chunker can guarantee to emit per sub-request.""" args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} probe = _filter_aware_probe_args(args) - assert probe["filter"] == "a='1'" + assert probe["filter"] == "a='333'" assert probe["x"] == 7 assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated @@ -288,7 +291,9 @@ def test_plan_chunks_coordinates_with_filter_chunker(): "filter": " OR ".join(clauses), } # singleton+full-filter ≈ 200 + 10 + 86 = 296 (over limit 240) — would raise. - # min-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + # max-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + # (Here all clauses are the same length, so min==max; the fix matters for + # filters with lopsided clauses where min < max.) plan = _plan_chunks(args, _fake_build, url_limit=240) assert plan is not None # coordination prevented the premature raise assert len(plan["monitoring_location_id"]) > 1 # planner did split @@ -306,6 +311,25 @@ def test_plan_chunks_coordinates_with_filter_chunker(): ch._filter_aware_probe_args = saved +def test_plan_chunks_probes_with_max_clause_not_min(): + """Regression: with lopsided OR-clauses (one short, one long), probing + with min(parts) lets the planner falsely declare a plan feasible that + the inner filter chunker can't actually deliver — it bails when any + single clause exceeds the per-sub-request budget. Probing with the + longest clause is the safe lower bound on per-sub-request filter + size, so the planner correctly raises when no plan can fit.""" + args = { + "sites": ["A" * 10, "B" * 10], + "filter": "x='1' OR x='" + "a" * 28 + "'", # 5-char and 33-char clauses + } + # base 200 + singleton sites 10 + min-clause 5 = 215 (limit 230 -> fits) + # base 200 + singleton sites 10 + max-clause 33 = 243 (limit 230 -> exceeds) + # With min: planner succeeds, but real URL with full filter (42) exceeds + # 230 -> server 414. With max: planner raises early, as it should. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=230) + + def test_plan_chunks_still_raises_when_even_min_clause_doesnt_fit(): """If the limit is so tight that singleton + shortest-clause STILL exceeds it, filter chunker can't save us either — raise.""" From ed216dde95b4efd773249a15177c56fc22407246 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 14 May 2026 21:31:05 -0500 Subject: [PATCH 05/12] Document the chained-query use case in chunker, get_daily, NEWS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The motivating user story for this PR is the same one R `dataRetrieval` covers in #870: pull a long monitoring-location list from one getter, feed it to another. Before chunking this fails with HTTP 414 once the URL grows past the server's ~8 KB limit; after it transparently fans out. - chunking.py: prepend a docstring example showing the Ohio-stream-sites → daily-discharge chained call, so readers landing on the module file see the motivating scenario immediately. - api.py get_daily: add the same chained example to the Examples block (where similar single-site and multi-site examples already live), so the most-used getter's docstring shows what just became possible. - NEWS.md: user-visible entry framing the change in terms of "this now works" — chained queries, transparent chunking, max_chunks cap, and QuotaExhausted resume. References R PR #870 as the analogous change. No code changes; pure docs. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 2 ++ dataretrieval/waterdata/api.py | 15 +++++++++++++++ dataretrieval/waterdata/chunking.py | 15 +++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/NEWS.md b/NEWS.md index 246ede15..cf808afb 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**05/15/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. + **05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade. **05/07/2026:** `waterdata.get_samples()` and `wqp.get_results()` now append a derived `DateTime` UTC column for every Date/Time/TimeZone triplet in the response (e.g. `Activity_StartDate` + `Activity_StartTime` + `Activity_StartTimeZone` → `Activity_StartDateTime`). Both the WQX3 (`Date`/`Time`/`TimeZone`) and legacy WQP (`Date`/`Time/Time`/`Time/TimeZoneCode`) shapes are recognized; abbreviations like EST/EDT/CST/PST resolve to a UTC `Timestamp`, unknown codes resolve to `NaT`, and the original triplet columns are preserved. Returned rows are also now sorted by `Activity_StartDateTime` (or the legacy `ActivityStartDateTime`) — the underlying APIs return rows in an unstable order. Mirrors R's `create_dateTime` and end-of-pipeline sort. Closes #266. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..025aafcd 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -230,6 +230,21 @@ def get_daily( ... parameter_code="00060", ... last_modified="P7D", ... ) + + >>> # Chain queries: pull all stream sites in a state, then their + >>> # daily discharge for the last week. The site list can be hundreds + >>> # of values long — the request is transparently chunked across + >>> # multiple sub-requests so the URL stays under the server's byte + >>> # limit. Combined output looks like a single query. + >>> sites_df, _ = dataretrieval.waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, md = dataretrieval.waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) """ service = "daily" output_id = "daily_id" diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index f3ef8af6..3a8da944 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -6,6 +6,21 @@ sits OUTSIDE ``filters.chunked`` and splits multi-value list params across multiple sub-requests so each URL fits. +Motivating use case: chained queries where one getter feeds the next: + + >>> # All stream sites in Ohio, then their daily discharge. + >>> # Without chunking the second call's URL would exceed the + >>> # server's byte limit for any state with > ~500 stations. + >>> sites_df, _ = waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, _ = waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) + Design (orthogonal to filter chunking): - N-dimensional cartesian product: for each chunkable list param, the From 07682459d57804a7b0fe4d263f5d5430b986ee8e Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 14:58:13 -0500 Subject: [PATCH 06/12] Tidy chunking.py: extract _chunk_bytes, name quota sentinel, use math.prod Three small simplifications, no behavior change: - Extract _chunk_bytes(chunk) helper for len(",".join(map(str, chunk))). Used in both _worst_case_args and _plan_chunks; the helper documents the cost model the planner compares chunks under. - Name the magic sentinel 10**9 as _QUOTA_UNKNOWN. _read_remaining returns it on missing/malformed x-ratelimit-remaining headers; having one definition prevents the value from drifting between branches. - Use math.prod for the cartesian-product cardinality calculation in _plan_chunks (max_chunks check) and the wrapper (quota-floor loop bound). Replaces an open-coded multiply-loop in two places. All 25 chunker tests and 88 filter tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 35 ++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 3a8da944..49422921 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -48,6 +48,7 @@ import functools import itertools +import math from collections.abc import Callable from typing import Any, TypeVar @@ -100,6 +101,12 @@ # known offset instead of retrying the whole chunked call from scratch. _DEFAULT_QUOTA_SAFETY_FLOOR = 50 +# Sentinel returned by ``_read_remaining`` when the response has no +# parseable ``x-ratelimit-remaining`` header. Large enough to beat any +# plausible safety floor so a missing/malformed header doesn't trigger +# spurious ``QuotaExhausted`` aborts. +_QUOTA_UNKNOWN = 10**9 + class RequestTooLarge(ValueError): """Raised when a chunked request cannot be issued. Two cases: @@ -194,6 +201,16 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: return {**args, "filter": max(parts, key=len)} +def _chunk_bytes(chunk: list) -> int: + """Byte length of ``chunk`` when comma-joined into a URL param value. + + This is the cost the planner uses to compare chunks across dims; the + real URL builder also URL-encodes the comma, but the byte counts come + out the same modulo a constant per-chunk overhead. + """ + return len(",".join(map(str, chunk))) + + def _worst_case_args( probe_args: dict[str, Any], plan: dict[str, list[list]] ) -> dict[str, Any]: @@ -202,7 +219,7 @@ def _worst_case_args( already reduced to its filter-chunker floor.""" out = dict(probe_args) for k, chunks in plan.items(): - out[k] = max(chunks, key=lambda c: len(",".join(map(str, c)))) + out[k] = max(chunks, key=_chunk_bytes) return out @@ -242,7 +259,7 @@ def _plan_chunks( for idx, chunk in enumerate(dim_chunks): if len(chunk) <= 1: continue - size = len(",".join(map(str, chunk))) + size = _chunk_bytes(chunk) if best is None or size > best[2]: best = (dim, idx, size) @@ -258,9 +275,7 @@ def _plan_chunks( mid = len(big) // 2 plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :] - total = 1 - for chunks in plan.values(): - total *= len(chunks) + total = math.prod(len(chunks) for chunks in plan.values()) if total > max_chunks: raise RequestTooLarge( f"Chunked plan would issue {total} sub-requests, exceeding " @@ -279,15 +294,15 @@ def _plan_chunks( def _read_remaining(response: requests.Response) -> int: """Parse ``x-ratelimit-remaining`` from a response. Missing or - malformed header → return a large sentinel so the safety check + malformed header → return ``_QUOTA_UNKNOWN`` so the safety check treats it as 'plenty of quota' (don't abort on header glitches).""" raw = response.headers.get("x-ratelimit-remaining") if raw is None: - return 10**9 + return _QUOTA_UNKNOWN try: return int(raw) except (TypeError, ValueError): - return 10**9 + return _QUOTA_UNKNOWN def multi_value_chunked( @@ -341,9 +356,7 @@ def wrapper( return fetch_once(args) keys = list(plan) - total = 1 - for k in keys: - total *= len(plan[k]) + total = math.prod(len(plan[k]) for k in keys) frames: list[pd.DataFrame] = [] responses: list[requests.Response] = [] for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))): From 29413287261b4e7acf924de8ef1b74b1d482d12d Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 15:10:08 -0500 Subject: [PATCH 07/12] Probe URL + body bytes (not just URL) to chunk POST-routed services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 233 routes monitoring-locations through POST with the multi-value list embedded in a CQL2 JSON body — the URL stays ~200 bytes regardless of how many sites are passed. The chunker was probing url length only, so it concluded "no need to split" for any number of monitoring_location_id values and let the request go out unchunked. The server then rejected it with HTTP 403 ("Query request denied. Possible reasons include query exceeding server limits") once the CQL2 body grew past its own server-side limit. Empirical: get_monitoring_locations(monitoring_location_id=[671 CAMELS gauges], properties=[...]) failed; bisection on 100 / 250 / 500 / 671 sites showed the boundary between 100 (PASS) and 250 (FAIL). Add a small _request_bytes() helper that sums URL and body lengths, and route both planner probes (the initial "fits?" check and the greedy-halving loop) through it. For GET routes (body is None) this reduces to the previous URL-only sizing — no behavior change. For POST routes, the body bytes now drive the chunking decision. The test _FakeReq fixture grows a body slot defaulting to None to keep its GET-shape contract while satisfying the new probe. Verified against live API: the same four monitoring-locations calls now succeed (100 / 250 / 500 / 671 sites). Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 33 +++++++++++++++++++++++------ tests/waterdata_test.py | 12 ++++++++--- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 49422921..c0b271f7 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -211,6 +211,27 @@ def _chunk_bytes(chunk: list) -> int: return len(",".join(map(str, chunk))) +def _request_bytes(req: Any) -> int: + """Total bytes of a prepared request: URL + body. + + For the GET-routed services PR 233 introduced, the multi-value list + lives in the URL and ``req.body`` is ``None`` — this reduces to URL + length. For POST-routed services (currently only ``monitoring- + locations``, which the upstream API still rejects comma-separated + values for and routes through CQL2 JSON), the multi-value list lives + in the body and the URL stays ~200 bytes regardless of payload; + counting body bytes is the only way the planner can recognize that + a POST request needs to chunk. + """ + url_len = len(req.url) + body = req.body + if body is None: + return url_len + if isinstance(body, (bytes, bytearray)): + return url_len + len(body) + return url_len + len(str(body).encode("utf-8")) + + def _worst_case_args( probe_args: dict[str, Any], plan: dict[str, list[list]] ) -> dict[str, Any]: @@ -243,14 +264,14 @@ def _plan_chunks( if not chunkable: return None probe_args = _filter_aware_probe_args(args) - if len(build_request(**probe_args).url) <= url_limit: + if _request_bytes(build_request(**probe_args)) <= url_limit: return None plan: dict[str, list[list]] = {k: [v] for k, v in chunkable.items()} while True: worst = _worst_case_args(probe_args, plan) - if len(build_request(**worst).url) <= url_limit: + if _request_bytes(build_request(**worst)) <= url_limit: break # Find the single biggest chunk across all dims and halve it. @@ -265,10 +286,10 @@ def _plan_chunks( if best is None: raise RequestTooLarge( - f"Request URL exceeds {url_limit} bytes even with every " - f"multi-value parameter at a singleton chunk and any " - f"chunkable filter reduced to one OR-clause. Reduce the " - f"number of values or split the call manually." + f"Request exceeds {url_limit} bytes (URL + body) even " + f"with every multi-value parameter at a singleton chunk " + f"and any chunkable filter reduced to one OR-clause. " + f"Reduce the number of values or split the call manually." ) dim, idx, _ = best big = plan[dim][idx] diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 4c4ad129..14d15a1f 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -229,14 +229,20 @@ def test_construct_api_requests_two_element_date_list_becomes_interval(): class _FakeReq: - __slots__ = ("url",) + __slots__ = ("url", "body") - def __init__(self, url): + def __init__(self, url, body=None): self.url = url + self.body = body def _fake_build(*, base=200, **kwargs): - """Fake build_request: URL length deterministic in its inputs.""" + """Fake build_request: URL length deterministic in its inputs. + + Mirrors the GET-routed shape: payload goes in the URL, body is None. + The chunker's ``_request_bytes`` helper sums url + body, so this + stays equivalent to URL-only sizing for these tests. + """ bytes_ = base for v in kwargs.values(): if isinstance(v, (list, tuple)): From a9cf2d7366b8dac80edcae21d2d57a0d5aec8d44 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 15:19:22 -0500 Subject: [PATCH 08/12] Tighten _request_bytes: type the param and drop redundant str() wrap - Annotate ``req`` as ``requests.PreparedRequest`` (the only caller flow is ``build_request(...).prepare()``; ``requests`` is already imported). - ``_cql2_param`` returns ``str``, which ``requests.Request(data=...)`` carries through to ``req.body``. The hot path on POST routes was ``str(body).encode("utf-8")``; ``str()`` is a no-op, so drop it and let ``body.encode("utf-8")`` allocate once. - Trim docstring: replaces the rotting "PR 233" / "currently only monitoring-locations" anchors with a behavioral description that doesn't rely on which routes happen to be POST today. No behavior change. 30 chunker unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index c0b271f7..dddae65e 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -211,17 +211,13 @@ def _chunk_bytes(chunk: list) -> int: return len(",".join(map(str, chunk))) -def _request_bytes(req: Any) -> int: +def _request_bytes(req: requests.PreparedRequest) -> int: """Total bytes of a prepared request: URL + body. - For the GET-routed services PR 233 introduced, the multi-value list - lives in the URL and ``req.body`` is ``None`` — this reduces to URL - length. For POST-routed services (currently only ``monitoring- - locations``, which the upstream API still rejects comma-separated - values for and routes through CQL2 JSON), the multi-value list lives - in the body and the URL stays ~200 bytes regardless of payload; - counting body bytes is the only way the planner can recognize that - a POST request needs to chunk. + GET routes have ``body=None`` and reduce to URL length. POST routes + (CQL2 JSON body) need body bytes — the URL stays short regardless of + payload, so URL-only sizing would underestimate the request and skip + chunking when it's needed. """ url_len = len(req.url) body = req.body @@ -229,7 +225,7 @@ def _request_bytes(req: Any) -> int: return url_len if isinstance(body, (bytes, bytearray)): return url_len + len(body) - return url_len + len(str(body).encode("utf-8")) + return url_len + len(body.encode("utf-8")) def _worst_case_args( From 7aeacd51d674589a574a3fb198fdfb7f099edbf3 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 16:47:43 -0500 Subject: [PATCH 09/12] Return latest paginated/chunked response so QuotaExhausted floor sees current quota MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ``multi_value_chunked`` decorator reads ``x-ratelimit-remaining`` from the response returned by ``fetch_once(sub_args)`` to honor its documented ``QuotaExhausted`` safety floor. That response was two layers stale: 1. ``_walk_pages`` captured ``initial_response = resp`` before pagination and returned it, so any sub-request with N > 1 pages bubbled up only the first page's headers — the loop already kept overwriting ``resp`` each iteration; we just weren't returning the latest. 2. ``_combine_chunk_responses`` returned ``responses[0]`` with summed ``elapsed``, so when ``filters.chunked`` fanned out a long OR-filter into N sub-chunks the outer wrapper only saw the first sub-chunk's headers. Composed, the staleness gap per outer chunk was ``inner_chunks × pages_per_inner_chunk − 1`` HTTP requests of quota consumption the chunker was blind to. For the canonical workload (chained query, long site list, paginated filter) that gap easily exceeds the default floor of 50, so the guard never tripped — users hit ``RuntimeError("429: Too many requests...")`` from ``_raise_for_non_200`` instead of the structured ``QuotaExhausted`` with ``partial_frame``/``completed_chunks`` they were promised. Fix both layers: ``_walk_pages`` returns the latest ``resp`` (which the loop was already maintaining), and ``_combine_chunk_responses`` returns ``responses[-1]`` (with ``elapsed`` summed onto it instead of onto ``responses[0]``). Both changes match ``QuotaExhausted.partial_response``'s docstring ("metadata for the last successful sub-request"). Same fix applied to the parallel pagination loop in the stats helper for consistency. No behavior change for single-page mocked tests (initial == latest). 209 waterdata unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/filters.py | 15 ++++++++++----- dataretrieval/waterdata/utils.py | 15 ++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 4c136b82..8fd13125 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -268,15 +268,20 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: def _combine_chunk_responses( responses: list[requests.Response], ) -> requests.Response: - """Return one response: first chunk's URL/headers + summed ``elapsed``. + """Return one response: last chunk's URL/headers + summed ``elapsed``. - Mutates the first response in place (only ``elapsed``); downstream only + Returning the latest sub-response (rather than the first) preserves + current rate-limit headers (e.g. ``x-ratelimit-remaining``), which the + outer ``multi_value_chunked`` decorator inspects to honor its + ``QuotaExhausted`` safety floor between sub-requests. + + Mutates the last response in place (only ``elapsed``); downstream only reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. """ - head = responses[0] + tail = responses[-1] if len(responses) > 1: - head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) - return head + tail.elapsed = sum((r.elapsed for r in responses[:-1]), start=tail.elapsed) + return tail _FetchOnce = TypeVar( diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 342df231..a55bb1bd 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -643,7 +643,10 @@ def _walk_pages( pd.DataFrame A DataFrame containing the aggregated results from all pages. requests.Response - The initial response object containing metadata about the first request. + The latest response from the pagination walk. Returning the most + recent response (not the first) lets downstream callers observe + current rate-limit headers (e.g. ``x-ratelimit-remaining``) on + which the multi-value chunker's ``QuotaExhausted`` guard relies. Raises ------ @@ -666,9 +669,6 @@ def _walk_pages( resp = client.send(req) _raise_for_non_200(resp) - # Store the initial response for metadata - initial_response = resp - # Grab some aspects of the original request: headers and the # request type (GET or POST) method = req.method.upper() @@ -697,7 +697,7 @@ def _walk_pages( curr_url = None # Concatenate all pages at once for efficiency - return pd.concat(dfs, ignore_index=True), initial_response + return pd.concat(dfs, ignore_index=True), resp finally: if close_client: client.close() @@ -1128,9 +1128,6 @@ def get_stats_data( resp = client.send(req) _raise_for_non_200(resp) - # Store the initial response for metadata - initial_response = resp - # Grab some aspects of the original request: headers and the # request type (GET or POST) method = req.method.upper() @@ -1173,7 +1170,7 @@ def get_stats_data( if expand_percentiles: dfs = _expand_percentiles(dfs) - return dfs, BaseMetadata(initial_response) + return dfs, BaseMetadata(resp) finally: if close_client: client.close() From 13d5e0c86b6b0218fb1fddac7563c8d579510645 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 17:14:55 -0500 Subject: [PATCH 10/12] Probe filter URL with encoding-ratio-weighted size, not just raw length MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``_filter_aware_probe_args`` substituted the LONGEST raw OR-clause into the URL probe, but the inner ``filters._effective_filter_budget`` computes its bail floor as ``len(longest) * max(per_clause_encoding_ ratio)`` — the worst per-call ratio across all clauses, not the ratio of the longest one. Under lopsided encoding (e.g. a long alphanumeric clause alongside short clauses heavy in ``%27`` / ``%2C`` / non-ASCII), ``encoding_ratio_max`` exceeds ``ratio_of_longest`` and the planner could approve a plan the inner chunker then refuses to emit, leaving the actual URL over the limit. Mirror the inner chunker's model: synthesize an ASCII probe clause of length ``ceil(len(longest) * encoding_ratio_max)``. ASCII has 1:1 URL encoding, so the URL builder sees exactly the bail-floor byte count and the planner's check coincides with the inner chunker's bail condition. Dormant in practice for typical USGS CQL filters (``field='value'`` encoding ratios all cluster between 1.16 and 1.67), but the docstring claimed a categorical guarantee that was technically false. This restores that guarantee. Test ``test_filter_aware_probe_args_substitutes_longest_or_clause`` was renamed and rewritten to verify the new contract: the probe filter is a synthetic ASCII string whose length matches the bail floor model. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 52 ++++++++++++++++++----------- tests/waterdata_test.py | 21 ++++++++---- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index dddae65e..de5187bd 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -34,14 +34,17 @@ determines output schema and chunking it would shard columns. Coordination with ``filters.chunked``: -The planner probes URL length using the LONGEST top-level OR-clause -when a chunkable filter is present, not the full filter. ``filters. -chunked`` (inner) will split the filter per sub-request but bails if -any single clause exceeds its budget, so the longest clause is the -smallest filter size the stack is guaranteed to emit. Without this -coordination, a long OR-filter plus multi-value lists would trigger a -premature ``RequestTooLarge`` even though the combined chunkers would -have made things fit. +The planner probes the URL with a synthetic clause sized to the inner +chunker's bail floor — ``len(longest_clause) * max(per_clause_encoding +_ratio)`` — when a chunkable filter is present. The inner chunker +bails (emits the full filter) when any single clause's URL-encoded +length exceeds its per-sub-request budget; mirroring ``filters._ +effective_filter_budget``, that floor already accounts for the worst +per-call encoding ratio, so a long alphanumeric clause coexisting +with a shorter heavily-encoded clause is sized correctly. Without +this coordination, a long OR-filter plus multi-value lists would +trigger a premature ``RequestTooLarge`` even though the combined +chunkers would have made things fit. """ from __future__ import annotations @@ -51,6 +54,7 @@ import math from collections.abc import Callable from typing import Any, TypeVar +from urllib.parse import quote_plus import pandas as pd import requests @@ -178,18 +182,25 @@ def _chunkable_params(args: dict[str, Any]) -> dict[str, list]: def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: - """Substitute the filter with its LONGEST top-level OR-clause if the - filter is chunkable, otherwise return ``args`` unchanged. + """Substitute the filter with a synthetic ASCII clause sized to the + inner chunker's bail floor if the filter is chunkable, otherwise + return ``args`` unchanged. The inner ``filters.chunked`` decorator splits a filter into chunks - each ≤ the per-sub-request byte budget, but bails (returns the full - filter unchanged) when ANY single OR-clause exceeds the budget. So - the smallest filter the inner chunker is guaranteed to emit per - sub-request is bounded below by the largest single clause — not the - smallest. Probing with ``max(parts, key=len)`` models the worst - achievable per-sub-request URL the decorator stack can produce; if - that fits, we know the inner chunker won't bail and the actual URL - will fit too. + each whose URL-encoded length is ≤ the per-sub-request budget, but + bails (emits the full filter unchanged) when ANY single OR-clause's + URL-encoded length exceeds the budget. Mirroring ``filters._ + effective_filter_budget``, the bail floor on the longest clause is + ``len(longest) * max(per_clause_encoding_ratio)``: even a clause + whose own ratio is low inherits the worst per-call ratio because + the budget is computed against the heaviest-encoding clause. + + Substituting a synthetic ASCII clause of that exact length (ASCII + has a 1:1 encoding ratio, so ``quote_plus`` is a no-op) makes the + planner's URL probe and the inner chunker's bail condition agree + on worst-case size — the planner won't approve a plan the inner + chunker would then refuse to emit, and won't prematurely raise + when the inner chunker could have made it fit. """ filter_expr = args.get("filter") filter_lang = args.get("filter_lang") @@ -198,7 +209,10 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: parts = _split_top_level_or(filter_expr) if len(parts) < 2: return args # one-clause filter — inner chunker can't shrink it - return {**args, "filter": max(parts, key=len)} + encoding_ratio_max = max(len(quote_plus(p)) / len(p) for p in parts) + longest_raw = max(len(p) for p in parts) + probe_size = math.ceil(longest_raw * encoding_ratio_max) + return {**args, "filter": "x" * probe_size} def _chunk_bytes(chunk: list) -> int: diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 14d15a1f..97d8e134 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -263,14 +263,23 @@ def test_filter_aware_probe_args_passes_through_when_not_chunkable(): assert _filter_aware_probe_args(args) == args -def test_filter_aware_probe_args_substitutes_longest_or_clause(): - """Chunkable filter → return args with filter replaced by the LONGEST - clause. The inner filter chunker bails if any single clause exceeds - the per-sub-request budget, so the longest clause is the smallest - filter size the chunker can guarantee to emit per sub-request.""" +def test_filter_aware_probe_args_models_inner_chunker_bail_floor(): + """Chunkable filter → return args with filter replaced by a synthetic + ASCII clause whose URL byte count equals the inner chunker's bail + floor ``len(longest) * max(per_clause_encoding_ratio)``. Mirrors + ``filters._effective_filter_budget``'s worst-case model so the + planner doesn't approve plans the inner chunker would refuse.""" + import math + from urllib.parse import quote_plus + args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} probe = _filter_aware_probe_args(args) - assert probe["filter"] == "a='333'" + parts = ["a='1'", "a='22'", "a='333'"] + expected = math.ceil( + max(len(p) for p in parts) * max(len(quote_plus(p)) / len(p) for p in parts) + ) + assert len(probe["filter"]) == expected + assert probe["filter"].isascii() and probe["filter"].isalnum() assert probe["x"] == 7 assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated From df25e0dc2c12c0ea2e2dbcbd34b26cf58997e7aa Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 17:27:09 -0500 Subject: [PATCH 11/12] Address PR review: lazy max_chunks default, early-exit cap check, doc fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five fixes from the PR review: - ``_plan_chunks`` checks ``total > max_chunks`` inside the halving loop now: each split only grows the cartesian product, so once the cap is crossed it can never come back under. Continuing to halve the URL just wastes work. - ``_plan_chunks``'s ``max_chunks`` default becomes ``int | None = None`` and resolves to ``_DEFAULT_MAX_CHUNKS`` at call time. The previous ``max_chunks: int = _DEFAULT_MAX_CHUNKS`` bound the constant at module-import time, defeating the documented monkeypatch path for direct callers (the wrapper already resolved lazily, but ``_plan_chunks`` direct calls saw the import-time value). - ``_chunk_bytes`` docstring no longer claims the URL-encoded comma overhead is "constant per chunk" — it scales with ``2 * (len - 1)``. The function still uses raw ``,`` length because the planner only needs a monotone comparator across dims, but the wording was wrong. - ``QuotaExhausted.partial_response`` docstring now says "last completed sub-request" to match the bug_001 fix in ``_combine_chunk_responses``. - Module-level docstring drops the chained-query example (duplicated from ``get_daily``'s docstring) and points readers there. No behavior change for existing callers. 209 waterdata tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 73 ++++++++++++++--------------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index de5187bd..1a31f244 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -4,22 +4,8 @@ (e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow the server's ~8 KB URL byte limit. This module adds a decorator that sits OUTSIDE ``filters.chunked`` and splits multi-value list params -across multiple sub-requests so each URL fits. - -Motivating use case: chained queries where one getter feeds the next: - - >>> # All stream sites in Ohio, then their daily discharge. - >>> # Without chunking the second call's URL would exceed the - >>> # server's byte limit for any state with > ~500 stations. - >>> sites_df, _ = waterdata.get_monitoring_locations( - ... state_name="Ohio", - ... site_type="Stream", - ... ) - >>> df, _ = waterdata.get_daily( - ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), - ... parameter_code="00060", - ... time="P7D", - ... ) +across multiple sub-requests so each URL fits. See ``get_daily``'s +docstring for an end-to-end chained-query example. Design (orthogonal to filter chunking): @@ -138,9 +124,11 @@ class QuotaExhausted(RuntimeError): Concatenated, deduplicated result of every sub-request that completed before the floor was crossed. partial_response : requests.Response - Aggregated response (URL/headers of the first sub-request, - summed ``elapsed``). Wrap in ``BaseMetadata`` to surface to - the caller alongside the partial frame. + Aggregated response (URL/headers of the last completed sub- + request, summed ``elapsed``). The last sub-request's headers + are preferred so callers inspecting ``x-ratelimit-remaining`` + see current quota state. Wrap in ``BaseMetadata`` to surface + to the caller alongside the partial frame. completed_chunks : int Number of sub-requests successfully completed. total_chunks : int @@ -218,9 +206,11 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: def _chunk_bytes(chunk: list) -> int: """Byte length of ``chunk`` when comma-joined into a URL param value. - This is the cost the planner uses to compare chunks across dims; the - real URL builder also URL-encodes the comma, but the byte counts come - out the same modulo a constant per-chunk overhead. + The planner uses this only to compare chunks across dims, so the + raw `",".join` length is sufficient: the real URL builder + URL-encodes each comma to ``%2C`` (adding ``2 * (len(chunk) - 1)`` + bytes), but that overhead is monotone in chunk size and doesn't + change the relative ordering this function exists to support. """ return len(",".join(map(str, chunk))) @@ -258,7 +248,7 @@ def _plan_chunks( args: dict[str, Any], build_request: Callable[..., Any], url_limit: int, - max_chunks: int = _DEFAULT_MAX_CHUNKS, + max_chunks: int | None = None, ) -> dict[str, list[list]] | None: """Greedy halving until the worst-case sub-request URL fits. @@ -267,9 +257,16 @@ def _plan_chunks( - every multi-value param is already a singleton chunk AND the filter (if any) is already at its smallest OR-clause and the URL still exceeds ``url_limit`` (irreducible), or - - the converged cartesian-product plan would issue more than - ``max_chunks`` sub-requests (hourly API budget). + - the cartesian-product plan exceeds ``max_chunks`` sub-requests + (the hourly API budget); checked after each split so we bail + promptly once the cap is unreachable. + + ``max_chunks`` defaults to ``_DEFAULT_MAX_CHUNKS`` resolved at call + time, so monkeypatching the module constant takes effect for + direct callers too. """ + if max_chunks is None: + max_chunks = _DEFAULT_MAX_CHUNKS chunkable = _chunkable_params(args) if not chunkable: return None @@ -282,7 +279,7 @@ def _plan_chunks( while True: worst = _worst_case_args(probe_args, plan) if _request_bytes(build_request(**worst)) <= url_limit: - break + return plan # Find the single biggest chunk across all dims and halve it. best: tuple[str, int, int] | None = None # (dim, chunk_index, size) @@ -306,15 +303,18 @@ def _plan_chunks( mid = len(big) // 2 plan[dim] = plan[dim][:idx] + [big[:mid], big[mid:]] + plan[dim][idx + 1 :] - total = math.prod(len(chunks) for chunks in plan.values()) - if total > max_chunks: - raise RequestTooLarge( - f"Chunked plan would issue {total} sub-requests, exceeding " - f"max_chunks={max_chunks} (USGS API's default hourly rate " - f"limit per key). Reduce input list sizes, narrow the time " - f"window, or raise max_chunks if you have a higher quota." - ) - return plan + # Each split only grows the cartesian product, so once we + # cross max_chunks we can never come back under. Bail now + # rather than keep splitting (the URL probe could still take + # many more iterations). + total = math.prod(len(chunks) for chunks in plan.values()) + if total > max_chunks: + raise RequestTooLarge( + f"Chunked plan would issue {total} sub-requests, exceeding " + f"max_chunks={max_chunks} (USGS API's default hourly rate " + f"limit per key). Reduce input list sizes, narrow the time " + f"window, or raise max_chunks if you have a higher quota." + ) _FetchOnce = TypeVar( @@ -376,13 +376,12 @@ def wrapper( if url_limit is not None else filters._WATERDATA_URL_BYTE_LIMIT ) - cap = max_chunks if max_chunks is not None else _DEFAULT_MAX_CHUNKS floor = ( quota_safety_floor if quota_safety_floor is not None else _DEFAULT_QUOTA_SAFETY_FLOOR ) - plan = _plan_chunks(args, build_request, limit, cap) + plan = _plan_chunks(args, build_request, limit, max_chunks) if plan is None: return fetch_once(args) From 16f02fdd172a7b16963fb7da2fbd0e7d74ae9efb Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Fri, 15 May 2026 17:53:30 -0500 Subject: [PATCH 12/12] Address PR review round 2: URL-encoded chunk sizing, doc fixes - ``_chunk_bytes`` now returns URL-encoded length via ``quote_plus`` instead of raw ``,``-join length. The function is the planner's biggest-chunk comparator and indirect URL contribution estimate; values containing URL-special chars (``%``, ``+``, ``/``, ``&``, etc.) expand under encoding and could mis-rank chunks under the raw-length form. For typical USGS multi-value workloads (alphanumeric IDs and codes) the two are equal, but the encoded form is always correct and matches what ``_request_bytes`` sees. - ``filters.chunked``'s docstring now says "last chunk's URL/headers" to match what ``_combine_chunk_responses`` returns after the bug_001 fix, with a note about why (rate-limit state). - Module docstring rewrapped so identifiers (``filters._effective_ filter_budget``, ``per-clause encoding ratio``) don't break across line endings. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/chunking.py | 38 ++++++++++++++++------------- dataretrieval/waterdata/filters.py | 7 ++++-- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 1a31f244..43690c9b 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -21,15 +21,15 @@ Coordination with ``filters.chunked``: The planner probes the URL with a synthetic clause sized to the inner -chunker's bail floor — ``len(longest_clause) * max(per_clause_encoding -_ratio)`` — when a chunkable filter is present. The inner chunker -bails (emits the full filter) when any single clause's URL-encoded -length exceeds its per-sub-request budget; mirroring ``filters._ -effective_filter_budget``, that floor already accounts for the worst -per-call encoding ratio, so a long alphanumeric clause coexisting -with a shorter heavily-encoded clause is sized correctly. Without -this coordination, a long OR-filter plus multi-value lists would -trigger a premature ``RequestTooLarge`` even though the combined +chunker's bail floor — ``len(longest_clause) * max(per-clause encoding +ratio)`` — when a chunkable filter is present. The inner chunker bails +(emits the full filter) when any single clause's URL-encoded length +exceeds its per-sub-request budget; mirroring +``filters._effective_filter_budget``, that floor already accounts for +the worst per-call encoding ratio, so a long alphanumeric clause +coexisting with a shorter heavily-encoded clause is sized correctly. +Without this coordination, a long OR-filter plus multi-value lists +would trigger a premature ``RequestTooLarge`` even though the combined chunkers would have made things fit. """ @@ -204,15 +204,19 @@ def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: def _chunk_bytes(chunk: list) -> int: - """Byte length of ``chunk`` when comma-joined into a URL param value. - - The planner uses this only to compare chunks across dims, so the - raw `",".join` length is sufficient: the real URL builder - URL-encodes each comma to ``%2C`` (adding ``2 * (len(chunk) - 1)`` - bytes), but that overhead is monotone in chunk size and doesn't - change the relative ordering this function exists to support. + """URL-encoded byte length of ``chunk`` when comma-joined into a + URL parameter value. + + Used both as the planner's biggest-chunk comparator (in + ``_worst_case_args`` and the halving loop) and indirectly as the + URL contribution estimate. Sizing with ``quote_plus`` rather than + raw ``,``-join length avoids mis-ranking chunks whose values + contain characters that expand under URL encoding (``%``, ``+``, + ``/``, ``&``, etc.) — for typical USGS multi-value workloads + (alphanumeric IDs and codes) the two are equal, but the encoded + form is always correct. """ - return len(",".join(map(str, chunk))) + return len(quote_plus(",".join(map(str, chunk)))) def _request_bytes(req: requests.PreparedRequest) -> int: diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 8fd13125..ade04b21 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -300,8 +300,11 @@ def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _Fet - Chunkable cql-text filter: run the lexicographic-pitfall guard, split into URL-length-safe sub-expressions, call the wrapped function once per chunk, concatenate frames (drop empties, dedup by feature ``id``), - and return an aggregated response (first chunk's URL/headers, summed - ``elapsed``). + and return an aggregated response (last chunk's URL/headers, summed + ``elapsed``). The last chunk's headers are preferred so callers see + current rate-limit state (``x-ratelimit-remaining``) on which the + outer ``multi_value_chunked`` decorator's ``QuotaExhausted`` guard + depends. Either way the return shape matches the undecorated function's, so the caller wraps the response in ``BaseMetadata`` the same way in both paths.