From bd8c09bdf9601746cc7f224ec3cb69fb0868898a Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 12:39:47 -0500 Subject: [PATCH 01/14] Add CQL filter passthrough to OGC waterdata getters Every `get_*` function that targets an OGC collection (`continuous`, `daily`, `field_measurements`, `monitoring_locations`, `time_series_metadata`, `latest_continuous`, `latest_daily`, `channel`) now accepts `filter` and `filter_lang` kwargs that are forwarded as the OGC `filter` / `filter-lang` query parameters. This unlocks server-side expressions that aren't expressible via the other kwargs. The motivating use case is pulling one-shot windows of continuous data around many field-measurement timestamps in a single request via OR'd BETWEEN clauses, instead of N round-trips. Caveats documented in each docstring and NEWS.md: - The server currently accepts `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not yet supported. - Long filters can exceed the URI length limit. A `UserWarning` is emitted above 5000 characters and the practical cap is around 75 OR-clauses before the server returns HTTP 414. Includes unit tests covering the filter / filter-lang URL construction for all OGC services and the long-filter warning. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 2 + dataretrieval/waterdata/api.py | 112 +++++++++++++++++++++++++++++++ dataretrieval/waterdata/utils.py | 19 ++++++ tests/waterdata_utils_test.py | 106 ++++++++++++++++++++++++++++- 4 files changed, 238 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index a3aa8361..09a7e026 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**04/22/2026:** The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and others) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. The server currently accepts `cql-text` (default) and `cql-json`; long filters can exceed the URI length limit, so `dataretrieval` warns above 5000 characters and the practical cap is around 75 OR-clauses before the server returns HTTP 414. + **12/04/2025:** The `get_continuous()` function was added to the `waterdata` module, which provides access to measurements collected via automated sensors at a high frequency (often 15 minute intervals) at a monitoring location. This is an early version of the continuous endpoint and should be used with caution as the API team improves its performance. In the future, we anticipate the addition of an endpoint(s) specifically for handling large data requests, so it may make sense for power users to hold off on heavy development using the new continuous endpoint. **11/24/2025:** `dataretrieval` is pleased to offer a new module, `waterdata`, which gives users access USGS's modernized [Water Data APIs](https://api.waterdata.usgs.gov/). The Water Data API endpoints include daily values, instantaneous values, field measurements (modernized groundwater levels service), time series metadata, and discrete water quality data from the Samples database. Though there will be a period of overlap, the functions within `waterdata` will eventually replace the `nwis` module, which currently provides access to the legacy [NWIS Water Services](https://waterservices.usgs.gov/). More example workflows and functions coming soon. Check `help(waterdata)` for more information. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index b2310e7a..6ad9e2a3 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -51,6 +51,8 @@ def get_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -177,6 +179,18 @@ def get_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -228,6 +242,8 @@ def get_continuous( last_modified: str | None = None, time: str | list[str] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -348,6 +364,18 @@ def get_continuous( allowable limit is 10000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector @@ -426,6 +454,8 @@ def get_monitoring_locations( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location @@ -635,6 +665,18 @@ def get_monitoring_locations( The returning object will be a data frame with no spatial information. Note that the USGS Water Data APIs use camelCase "skipGeometry" in CQL2 queries. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -697,6 +739,8 @@ def get_time_series_metadata( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, @@ -851,6 +895,18 @@ def get_time_series_metadata( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -903,6 +959,8 @@ def get_latest_continuous( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series @@ -1026,6 +1084,18 @@ def get_latest_continuous( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1075,6 +1145,8 @@ def get_latest_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -1200,6 +1272,18 @@ def get_latest_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1251,6 +1335,8 @@ def get_field_measurements( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a @@ -1366,6 +1452,18 @@ def get_field_measurements( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -2017,6 +2115,8 @@ def get_channel( skip_geometry: bool | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: str | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -2123,6 +2223,18 @@ def get_channel( vertical_velocity_description, longitudinal_velocity_description, measurement_type, last_modified, channel_measurement_type. The default (NA) will return all columns of the data. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. Long filters can exceed the + server's URI length limit: a warning is emitted above 5000 + characters, and the practical cap is ~75 OR-clauses before the + server returns HTTP 414. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index c58148d5..ba9ec378 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -4,6 +4,7 @@ import logging import os import re +import warnings from datetime import datetime from typing import Any, get_args @@ -419,6 +420,24 @@ def _construct_api_requests( if properties: params["properties"] = ",".join(properties) + # Translate CQL filter Python names to the hyphenated URL parameter that + # the OGC API expects. The Python kwarg is `filter_lang` because hyphens + # aren't valid in Python identifiers. + if "filter_lang" in params: + params["filter-lang"] = params.pop("filter_lang") + # Emit a warning when a long CQL filter is at risk of exceeding the + # server's URI length limit (HTTP 414). Empirically, the waterdata + # continuous endpoint begins returning 414 around ~7 KB of filter text + # (~75 OR-clauses of typical interval form). The threshold here is + # conservative. + if isinstance(params.get("filter"), str) and len(params["filter"]) > 5000: + warnings.warn( + "CQL `filter` is longer than 5000 characters; the server may " + "return HTTP 414 (URI Too Long). Consider splitting into batched " + "requests.", + stacklevel=2, + ) + headers = _default_headers() if POST: diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 772f75b7..1fcefe66 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -1,8 +1,19 @@ +import warnings from unittest import mock +from urllib.parse import parse_qs, urlsplit +import pytest import requests -from dataretrieval.waterdata.utils import _get_args, _walk_pages +from dataretrieval.waterdata.utils import ( + _construct_api_requests, + _get_args, + _walk_pages, +) + + +def _query_params(prepared_request): + return parse_qs(urlsplit(prepared_request.url).query) def test_get_args_basic(): @@ -77,3 +88,96 @@ def test_walk_pages_multiple_mocked(): assert mock_client.send.called assert mock_client.request.called assert mock_client.request.call_args[0][1] == "https://example.com/page2" + + +def test_construct_filter_passthrough(): + """`filter` is forwarded verbatim as a query parameter.""" + expr = ( + "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') " + "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" + ) + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + ) + qs = _query_params(req) + assert qs["filter"] == [expr] + + +def test_construct_filter_lang_hyphenated(): + """The Python kwarg `filter_lang` is sent as URL key `filter-lang`.""" + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter="time >= '2023-01-01T00:00:00Z'", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter-lang"] == ["cql-text"] + # The underscore form must NOT appear in the URL + assert "filter_lang" not in qs + + +def test_construct_long_filter_emits_warning(): + """CQL filter strings longer than 5000 characters warn about URI limits.""" + long_clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + # Build a filter string that comfortably exceeds the threshold + big = " OR ".join([long_clause] * 100) + assert len(big) > 5000 + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + filter=big, + ) + matching = [ + w + for w in caught + if issubclass(w.category, UserWarning) and "414" in str(w.message) + ] + seen = [str(w.message) for w in caught] + assert matching, f"expected a URI-length warning, got: {seen}" + + +def test_construct_short_filter_does_not_warn(): + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + filter="time >= '2023-01-01T00:00:00Z'", + ) + assert not [ + w + for w in caught + if issubclass(w.category, UserWarning) and "414" in str(w.message) + ] + + +@pytest.mark.parametrize( + "service", + [ + "daily", + "continuous", + "monitoring-locations", + "time-series-metadata", + "latest-continuous", + "latest-daily", + "field-measurements", + "channel-measurements", + ], +) +def test_construct_filter_on_all_ogc_services(service): + """Filter passthrough works uniformly for every OGC collection endpoint.""" + req = _construct_api_requests( + service=service, + filter="value > 0", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter"] == ["value > 0"] + assert qs["filter-lang"] == ["cql-text"] From aebc1445402d8dd87d77c5821e6c29c594e23597 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 12:59:39 -0500 Subject: [PATCH 02/14] Auto-chunk long CQL filters across multiple sub-requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A CQL `filter` made up of a top-level `OR` chain can exceed the server's URI length limit. Rather than asking the caller to handle that themselves, split the expression along its top-level OR boundaries into chunks that each fit under a conservative budget (`_CQL_FILTER_CHUNK_LEN`), issue one request per chunk, and concatenate the results (deduplicated by the service's output id). Splitting is paren- and quote-aware so `OR` inside sub-expressions or string literals is preserved. When the expression has no top-level OR — or any single clause already exceeds the budget — the filter is sent as-is (the server decides) rather than being mangled. Drops the 5000-character `UserWarning` added in the previous commit: chunking handles the common case transparently, and the docstring caveat about `HTTP 414` / `~75 OR-clauses` is no longer needed. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 2 +- dataretrieval/waterdata/api.py | 64 ++++++------ dataretrieval/waterdata/utils.py | 142 ++++++++++++++++++++++---- tests/waterdata_utils_test.py | 170 ++++++++++++++++++++++++------- 4 files changed, 293 insertions(+), 85 deletions(-) diff --git a/NEWS.md b/NEWS.md index 09a7e026..2e7c2f2d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -**04/22/2026:** The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and others) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. The server currently accepts `cql-text` (default) and `cql-json`; long filters can exceed the URI length limit, so `dataretrieval` warns above 5000 characters and the practical cap is around 75 OR-clauses before the server returns HTTP 414. +**04/22/2026:** The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and others) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. A long expression made up of a top-level `OR` chain is transparently split into multiple requests that each fit under the server's URI length limit, and the results are concatenated. **12/04/2025:** The `get_continuous()` function was added to the `waterdata` module, which provides access to measurements collected via automated sensors at a high frequency (often 15 minute intervals) at a monitoring location. This is an early version of the continuous endpoint and should be used with caution as the API team improves its performance. In the future, we anticipate the addition of an endpoint(s) specifically for handling large data requests, so it may make sense for power users to hold off on heavy development using the new continuous endpoint. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 6ad9e2a3..88ab1d3e 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -184,10 +184,10 @@ def get_daily( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -369,10 +369,10 @@ def get_continuous( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -670,10 +670,10 @@ def get_monitoring_locations( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -900,10 +900,10 @@ def get_time_series_metadata( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -1089,10 +1089,10 @@ def get_latest_continuous( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -1277,10 +1277,10 @@ def get_latest_daily( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -1457,10 +1457,10 @@ def get_field_measurements( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. @@ -2228,10 +2228,10 @@ def get_channel( ``filter`` query parameter. Commonly used to OR several time ranges into a single request. At the time of writing the server accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. Long filters can exceed the - server's URI length limit: a warning is emitted above 5000 - characters, and the practical cap is ~75 OR-clauses before the - server returns HTTP 414. + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. filter_lang : string, optional Language of the ``filter`` expression, for example ``cql-text`` (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index ba9ec378..d902987c 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -4,7 +4,6 @@ import logging import os import re -import warnings from datetime import datetime from typing import Any, get_args @@ -225,6 +224,103 @@ def _format_api_dates( raise ValueError("datetime_input should only include 1-2 values") +# Conservative budget (characters) for a single CQL `filter` query +# parameter before the URL risks exceeding the server's URI length limit. +# The continuous endpoint has been observed to return HTTP 414 around ~7 KB +# of filter text; 5000 leaves headroom for URL encoding and the other +# query parameters. +_CQL_FILTER_CHUNK_LEN = 5000 + + +def _split_top_level_or(expr: str) -> list[str]: + """Split a CQL expression at each top-level ``OR`` separator. + + Respects parentheses and single/double-quoted string literals so that + ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. + Matching is case-insensitive. Whitespace around each emitted part is + stripped; empty parts are dropped. + """ + parts = [] + depth = 0 + in_quote = None + last = 0 + i = 0 + n = len(expr) + while i < n: + ch = expr[i] + if in_quote is not None: + if ch == in_quote: + in_quote = None + i += 1 + continue + if ch in ("'", '"'): + in_quote = ch + i += 1 + continue + if ch == "(": + depth += 1 + i += 1 + continue + if ch == ")": + depth -= 1 + i += 1 + continue + # Match whitespace + OR + whitespace at depth 0, case-insensitive. + # The preceding char (or start-of-string) must also be whitespace. + if depth == 0 and ch.isspace(): + j = i + 1 + while j < n and expr[j].isspace(): + j += 1 + if j + 2 <= n and expr[j : j + 2].lower() == "or": + k = j + 2 + if k < n and expr[k].isspace(): + parts.append(expr[last:i].strip()) + # advance past the trailing whitespace too + m = k + 1 + while m < n and expr[m].isspace(): + m += 1 + last = m + i = m + continue + i += 1 + parts.append(expr[last:].strip()) + return [p for p in parts if p] + + +def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: + """Split a CQL expression into OR-chunks that each fit under ``max_len``. + + The splitter only understands top-level ``OR`` chains, since that is + the only shape that can be recombined losslessly as a disjunction of + independent sub-queries. Returns ``[expr]`` unchanged when the whole + expression already fits, when it contains no top-level ``OR``, or when + any single clause is larger than ``max_len`` on its own (we would + rather send a too-long request and surface the server's 414 than + silently drop data). + """ + if len(expr) <= max_len: + return [expr] + parts = _split_top_level_or(expr) + if len(parts) < 2 or any(len(p) > max_len for p in parts): + return [expr] + + chunks = [] + current = [] + current_len = 0 + for part in parts: + join_cost = len(" OR ") if current else 0 + if current and current_len + join_cost + len(part) > max_len: + chunks.append(" OR ".join(current)) + current = [part] + current_len = len(part) + else: + current.append(part) + current_len += join_cost + len(part) + if current: + chunks.append(" OR ".join(current)) + return chunks + + def _cql2_param(args: dict[str, Any]) -> str: """ Convert query parameters to CQL2 JSON format for POST requests. @@ -425,18 +521,6 @@ def _construct_api_requests( # aren't valid in Python identifiers. if "filter_lang" in params: params["filter-lang"] = params.pop("filter_lang") - # Emit a warning when a long CQL filter is at risk of exceeding the - # server's URI length limit (HTTP 414). Empirically, the waterdata - # continuous endpoint begins returning 414 around ~7 KB of filter text - # (~75 OR-clauses of typical interval form). The threshold here is - # conservative. - if isinstance(params.get("filter"), str) and len(params["filter"]) > 5000: - warnings.warn( - "CQL `filter` is longer than 5000 characters; the server may " - "return HTTP 414 (URI Too Long). Consider splitting into batched " - "requests.", - stacklevel=2, - ) headers = _default_headers() @@ -848,10 +932,34 @@ def get_ogc_data( convert_type = args.pop("convert_type", False) # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # Build API request - req = _construct_api_requests(**args) - # Run API request and iterate through pages if needed - return_list, response = _walk_pages(geopd=GEOPANDAS, req=req) + # If a long CQL `filter` can be split along a top-level OR chain, fan + # the request out into chunks that each fit under the server's URI + # length limit. Disjoint OR-clauses combine losslessly on the client + # side; overlapping clauses are deduplicated by output_id below. + filter_expr = args.get("filter") + if isinstance(filter_expr, str): + filter_chunks = _chunk_cql_or(filter_expr) + else: + filter_chunks = [None] + + frames = [] + response = None + for chunk in filter_chunks: + chunk_args = dict(args) + if chunk is not None: + chunk_args["filter"] = chunk + req = _construct_api_requests(**chunk_args) + chunk_df, response = _walk_pages(geopd=GEOPANDAS, req=req) + frames.append(chunk_df) + + if len(frames) > 1: + return_list = pd.concat(frames, ignore_index=True) + if output_id in return_list.columns: + return_list = return_list.drop_duplicates( + subset=output_id, ignore_index=True + ) + else: + return_list = frames[0] # Manage some aspects of the returned dataset return_list = _deal_with_empty(return_list, properties, service) diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 1fcefe66..74173616 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -1,4 +1,4 @@ -import warnings +import sys from unittest import mock from urllib.parse import parse_qs, urlsplit @@ -6,8 +6,11 @@ import requests from dataretrieval.waterdata.utils import ( + _CQL_FILTER_CHUNK_LEN, + _chunk_cql_or, _construct_api_requests, _get_args, + _split_top_level_or, _walk_pages, ) @@ -121,43 +124,73 @@ def test_construct_filter_lang_hyphenated(): assert "filter_lang" not in qs -def test_construct_long_filter_emits_warning(): - """CQL filter strings longer than 5000 characters warn about URI limits.""" - long_clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" - # Build a filter string that comfortably exceeds the threshold - big = " OR ".join([long_clause] * 100) - assert len(big) > 5000 - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _construct_api_requests( - service="continuous", - monitoring_location_id="USGS-07374525", - filter=big, - ) - matching = [ - w - for w in caught - if issubclass(w.category, UserWarning) and "414" in str(w.message) - ] - seen = [str(w.message) for w in caught] - assert matching, f"expected a URI-length warning, got: {seen}" - - -def test_construct_short_filter_does_not_warn(): - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always") - _construct_api_requests( - service="continuous", - monitoring_location_id="USGS-07374525", - filter="time >= '2023-01-01T00:00:00Z'", - ) - assert not [ - w - for w in caught - if issubclass(w.category, UserWarning) and "414" in str(w.message) +def test_split_top_level_or_simple(): + parts = _split_top_level_or("A OR B OR C") + assert parts == ["A", "B", "C"] + + +def test_split_top_level_or_case_insensitive(): + assert _split_top_level_or("A or B Or C") == ["A", "B", "C"] + + +def test_split_top_level_or_respects_parens(): + # Inner OR inside parens must not be split + expr = "(A OR B) OR (C OR D)" + assert _split_top_level_or(expr) == ["(A OR B)", "(C OR D)"] + + +def test_split_top_level_or_respects_quotes(): + # Literal OR inside a quoted string must not be treated as a separator + expr = "name = 'foo OR bar' OR id = 1" + assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] + + +def test_split_top_level_or_single_clause(): + assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ + "time >= '2023-01-01T00:00:00Z'" ] +def test_chunk_cql_or_short_passthrough(): + expr = "time >= '2023-01-01T00:00:00Z'" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +def test_chunk_cql_or_splits_into_multiple(): + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 200) + chunks = _chunk_cql_or(expr, max_len=1000) + # each chunk must be under the budget + assert all(len(c) <= 1000 for c in chunks) + # rejoined chunks must cover every clause + rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) + assert rejoined_clauses == 200 + # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) + assert len(chunks) > 1 + + +def test_chunk_cql_or_unsplittable_returns_input(): + # A long expression with no top-level OR stays intact — the caller + # either accepts the risk of a 414 or restructures the query. + big = "value > 0 AND " + ("A " * 4000) + result = _chunk_cql_or(big, max_len=1000) + assert result == [big] + + +def test_chunk_cql_or_single_clause_over_budget_returns_input(): + # If any single top-level clause already exceeds the budget, there's + # no safe split; fall back to the original. + huge_clause = "(value > " + "9" * 6000 + ")" + expr = f"{huge_clause} OR (value > 0)" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +def test_default_chunk_budget_is_conservative(): + # Guard against someone nudging the constant upward unintentionally — + # the observed server limit is around 7 KB of filter text. + assert _CQL_FILTER_CHUNK_LEN <= 5500 + + @pytest.mark.parametrize( "service", [ @@ -181,3 +214,70 @@ def test_construct_filter_on_all_ogc_services(service): qs = _query_params(req) assert qs["filter"] == ["value > 0"] assert qs["filter-lang"] == ["cql-text"] + + +@pytest.mark.skipif( + sys.version_info < (3, 10), + reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)", +) +def test_long_filter_fans_out_into_multiple_requests(requests_mock): + """An oversized top-level OR filter triggers multiple HTTP requests, + one per chunk, whose results are concatenated.""" + from dataretrieval.waterdata import get_continuous + + # 300 OR-clauses × ~70 chars each = ~21 KB; comfortably above + # _CQL_FILTER_CHUNK_LEN so at least several chunks are expected. + clause_template = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + clauses = [clause_template.format(day=(i % 28) + 1) for i in range(300)] + expr = " OR ".join(clauses) + assert len(expr) > _CQL_FILTER_CHUNK_LEN + + # Each mocked response carries a single feature; the final DataFrame + # should have one row per chunk the client issued. + call_count = {"n": 0} + + def respond(request, context): + context.status_code = 200 + call_count["n"] += 1 + return { + "type": "FeatureCollection", + "numberReturned": 1, + "features": [ + { + "type": "Feature", + "id": f"chunk-{call_count['n']}", + "geometry": None, + "properties": { + "continuous_id": f"chunk-{call_count['n']}", + "value": call_count["n"], + }, + } + ], + "links": [], + } + + requests_mock.get( + "https://api.waterdata.usgs.gov/ogcapi/v0/collections/continuous/items", + json=respond, + ) + + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Expected chunk count: mirror the library's splitter so the test + # exercises the real chunking behavior without hard-coding a number. + expected_chunks = _chunk_cql_or(expr) + assert len(expected_chunks) > 1 + assert call_count["n"] == len(expected_chunks) + assert len(df) == len(expected_chunks) + # Each sub-request's URL must stay under the chunk budget. + for req in requests_mock.request_history: + filter_qs = parse_qs(urlsplit(req.url).query).get("filter", [""])[0] + assert len(filter_qs) <= _CQL_FILTER_CHUNK_LEN From 5999274162fc7db1cd222b2c4a929556a59942d9 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 13:24:02 -0500 Subject: [PATCH 03/14] Tighten filter/chunking diff per review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Introduce a `FILTER_LANG = Literal["cql-text", "cql-json"]` type alias alongside the existing `SERVICES` / `PROFILES` Literals in `waterdata/types.py`, export it from the package, and use it in all eight OGC getter signatures. - Simplify the fan-out dispatch in `get_ogc_data`: one ternary picks the chunk list, and the single-chunk fast path is expressed as the early branch of an `if len(frames) == 1`. - Drop the tautological `test_default_chunk_budget_is_conservative` — the integration test already asserts each sub-request URL stays under the budget. - Extract `OGC_CONTINUOUS_URL` in the test file and strip a handful of WHAT-narrating comments in both the implementation and tests. The `filter_lang`/`filter-lang` mapping comment stays because the WHY (hyphens invalid in Python identifiers) isn't obvious. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/__init__.py | 2 ++ dataretrieval/waterdata/api.py | 17 ++++++----- dataretrieval/waterdata/types.py | 2 ++ dataretrieval/waterdata/utils.py | 24 +++++---------- tests/waterdata_utils_test.py | 45 ++++++++--------------------- 5 files changed, 33 insertions(+), 57 deletions(-) diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 2110de83..9bd80c91 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -27,6 +27,7 @@ ) from .types import ( CODE_SERVICES, + FILTER_LANG, PROFILE_LOOKUP, PROFILES, SERVICES, @@ -34,6 +35,7 @@ __all__ = [ "CODE_SERVICES", + "FILTER_LANG", "PROFILES", "PROFILE_LOOKUP", "SERVICES", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 88ab1d3e..54e9f531 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -18,6 +18,7 @@ from dataretrieval.utils import BaseMetadata, to_str from dataretrieval.waterdata.types import ( CODE_SERVICES, + FILTER_LANG, METADATA_COLLECTIONS, PROFILES, SERVICES, @@ -52,7 +53,7 @@ def get_daily( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -243,7 +244,7 @@ def get_continuous( time: str | list[str] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -455,7 +456,7 @@ def get_monitoring_locations( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location @@ -740,7 +741,7 @@ def get_time_series_metadata( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, @@ -960,7 +961,7 @@ def get_latest_continuous( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series @@ -1146,7 +1147,7 @@ def get_latest_daily( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -1336,7 +1337,7 @@ def get_field_measurements( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a @@ -2116,7 +2117,7 @@ def get_channel( bbox: list[float] | None = None, limit: int | None = None, filter: str | None = None, - filter_lang: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..c9655719 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,6 +40,8 @@ "results", ] +FILTER_LANG = Literal["cql-text", "cql-json"] + PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index d902987c..cec2feed 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -266,7 +266,6 @@ def _split_top_level_or(expr: str) -> list[str]: i += 1 continue # Match whitespace + OR + whitespace at depth 0, case-insensitive. - # The preceding char (or start-of-string) must also be whitespace. if depth == 0 and ch.isspace(): j = i + 1 while j < n and expr[j].isspace(): @@ -275,7 +274,6 @@ def _split_top_level_or(expr: str) -> list[str]: k = j + 2 if k < n and expr[k].isspace(): parts.append(expr[last:i].strip()) - # advance past the trailing whitespace too m = k + 1 while m < n and expr[m].isspace(): m += 1 @@ -932,34 +930,28 @@ def get_ogc_data( convert_type = args.pop("convert_type", False) # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # If a long CQL `filter` can be split along a top-level OR chain, fan - # the request out into chunks that each fit under the server's URI - # length limit. Disjoint OR-clauses combine losslessly on the client - # side; overlapping clauses are deduplicated by output_id below. + # Overlapping user OR-clauses are deduplicated by output_id further below. filter_expr = args.get("filter") - if isinstance(filter_expr, str): - filter_chunks = _chunk_cql_or(filter_expr) - else: - filter_chunks = [None] + filter_chunks = ( + _chunk_cql_or(filter_expr) if isinstance(filter_expr, str) else [None] + ) frames = [] response = None for chunk in filter_chunks: - chunk_args = dict(args) - if chunk is not None: - chunk_args["filter"] = chunk + chunk_args = args if chunk is None else {**args, "filter": chunk} req = _construct_api_requests(**chunk_args) chunk_df, response = _walk_pages(geopd=GEOPANDAS, req=req) frames.append(chunk_df) - if len(frames) > 1: + if len(frames) == 1: + return_list = frames[0] + else: return_list = pd.concat(frames, ignore_index=True) if output_id in return_list.columns: return_list = return_list.drop_duplicates( subset=output_id, ignore_index=True ) - else: - return_list = frames[0] # Manage some aspects of the returned dataset return_list = _deal_with_empty(return_list, properties, service) diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 74173616..8b3ce16b 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -14,6 +14,10 @@ _walk_pages, ) +OGC_CONTINUOUS_URL = ( + "https://api.waterdata.usgs.gov/ogcapi/v0/collections/continuous/items" +) + def _query_params(prepared_request): return parse_qs(urlsplit(prepared_request.url).query) @@ -120,7 +124,6 @@ def test_construct_filter_lang_hyphenated(): ) qs = _query_params(req) assert qs["filter-lang"] == ["cql-text"] - # The underscore form must NOT appear in the URL assert "filter_lang" not in qs @@ -134,13 +137,10 @@ def test_split_top_level_or_case_insensitive(): def test_split_top_level_or_respects_parens(): - # Inner OR inside parens must not be split - expr = "(A OR B) OR (C OR D)" - assert _split_top_level_or(expr) == ["(A OR B)", "(C OR D)"] + assert _split_top_level_or("(A OR B) OR (C OR D)") == ["(A OR B)", "(C OR D)"] def test_split_top_level_or_respects_quotes(): - # Literal OR inside a quoted string must not be treated as a separator expr = "name = 'foo OR bar' OR id = 1" assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] @@ -170,27 +170,16 @@ def test_chunk_cql_or_splits_into_multiple(): def test_chunk_cql_or_unsplittable_returns_input(): - # A long expression with no top-level OR stays intact — the caller - # either accepts the risk of a 414 or restructures the query. big = "value > 0 AND " + ("A " * 4000) - result = _chunk_cql_or(big, max_len=1000) - assert result == [big] + assert _chunk_cql_or(big, max_len=1000) == [big] def test_chunk_cql_or_single_clause_over_budget_returns_input(): - # If any single top-level clause already exceeds the budget, there's - # no safe split; fall back to the original. huge_clause = "(value > " + "9" * 6000 + ")" expr = f"{huge_clause} OR (value > 0)" assert _chunk_cql_or(expr, max_len=1000) == [expr] -def test_default_chunk_budget_is_conservative(): - # Guard against someone nudging the constant upward unintentionally — - # the observed server limit is around 7 KB of filter text. - assert _CQL_FILTER_CHUNK_LEN <= 5500 - - @pytest.mark.parametrize( "service", [ @@ -221,22 +210,17 @@ def test_construct_filter_on_all_ogc_services(service): reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)", ) def test_long_filter_fans_out_into_multiple_requests(requests_mock): - """An oversized top-level OR filter triggers multiple HTTP requests, - one per chunk, whose results are concatenated.""" + """An oversized top-level OR filter triggers multiple HTTP requests + whose results are concatenated.""" from dataretrieval.waterdata import get_continuous - # 300 OR-clauses × ~70 chars each = ~21 KB; comfortably above - # _CQL_FILTER_CHUNK_LEN so at least several chunks are expected. - clause_template = ( + clause = ( "(time >= '2023-01-{day:02d}T00:00:00Z' " "AND time <= '2023-01-{day:02d}T00:30:00Z')" ) - clauses = [clause_template.format(day=(i % 28) + 1) for i in range(300)] - expr = " OR ".join(clauses) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) assert len(expr) > _CQL_FILTER_CHUNK_LEN - # Each mocked response carries a single feature; the final DataFrame - # should have one row per chunk the client issued. call_count = {"n": 0} def respond(request, context): @@ -259,10 +243,7 @@ def respond(request, context): "links": [], } - requests_mock.get( - "https://api.waterdata.usgs.gov/ogcapi/v0/collections/continuous/items", - json=respond, - ) + requests_mock.get(OGC_CONTINUOUS_URL, json=respond) df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -271,13 +252,11 @@ def respond(request, context): filter_lang="cql-text", ) - # Expected chunk count: mirror the library's splitter so the test - # exercises the real chunking behavior without hard-coding a number. + # Mirror the library's splitter so the test doesn't hardcode a chunk count. expected_chunks = _chunk_cql_or(expr) assert len(expected_chunks) > 1 assert call_count["n"] == len(expected_chunks) assert len(df) == len(expected_chunks) - # Each sub-request's URL must stay under the chunk budget. for req in requests_mock.request_history: filter_qs = parse_qs(urlsplit(req.url).query).get("filter", [""])[0] assert len(filter_qs) <= _CQL_FILTER_CHUNK_LEN From d4774ac1fa873675a6bd10d246b2a9aa331e9298 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 13:24:08 -0500 Subject: [PATCH 04/14] NEWS.md: roll up highlights since v1.1.0 Replace the single-paragraph filter announcement with a broader round-up covering the post-release additions: `get_channel`, `get_stats_por` / `get_stats_date_range`, `get_reference_table` and its query-parameter passthrough, the `py.typed` marker, `pandas` 3.x support, and the removal of the `waterwatch` module and several defunct NWIS stubs. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 2e7c2f2d..48c91ea4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,10 @@ -**04/22/2026:** The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and others) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. A long expression made up of a top-level `OR` chain is transparently split into multiple requests that each fit under the server's URI length limit, and the results are concatenated. +**04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module: + +- Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207). +- Added `get_reference_table` (and made it considerably simpler and faster in #209), then extended it to accept arbitrary collections-API query parameters (#214). +- Removed the deprecated `waterwatch` module (#228) and several defunct NWIS stubs (#222, #225), and added `py.typed` so `dataretrieval` ships type information to downstream users (#186). +- Now supports `pandas` 3.x (#221). +- The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and the six others built on the same OGC collections) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. A long expression made up of a top-level `OR` chain is transparently split into multiple requests that each fit under the server's URI length limit, and the results are concatenated. **12/04/2025:** The `get_continuous()` function was added to the `waterdata` module, which provides access to measurements collected via automated sensors at a high frequency (often 15 minute intervals) at a monitoring location. This is an early version of the continuous endpoint and should be used with caution as the API team improves its performance. In the future, we anticipate the addition of an endpoint(s) specifically for handling large data requests, so it may make sense for power users to hold off on heavy development using the new continuous endpoint. From 850f14dc0b7e94c5738a18bdfbc60d7aa6e7ab81 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 14:14:37 -0500 Subject: [PATCH 05/14] Address Copilot review on chunked CQL filter - Dedupe on pre-rename feature `id` (always present at that stage) instead of `output_id`, which is the post-rename name and may not be on every OGC service's response. - Aggregate elapsed time across chunk responses so the returned metadata's query_time reflects the whole operation rather than just the last chunk. - Drop the redundant `continuous_id` from the fan-out test's mock properties so the assertion exercises the real `id`-based dedup path, and add a separate test that forces cross-chunk duplicate feature ids to prove they collapse to a single row. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 29 +++++++++++----- tests/waterdata_utils_test.py | 57 +++++++++++++++++++++++++++++--- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index cec2feed..b9c5fe72 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -930,28 +930,35 @@ def get_ogc_data( convert_type = args.pop("convert_type", False) # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # Overlapping user OR-clauses are deduplicated by output_id further below. + # Overlapping user OR-clauses are deduplicated by feature id further below. filter_expr = args.get("filter") filter_chunks = ( _chunk_cql_or(filter_expr) if isinstance(filter_expr, str) else [None] ) frames = [] - response = None + first_response = None + total_elapsed = None for chunk in filter_chunks: chunk_args = args if chunk is None else {**args, "filter": chunk} req = _construct_api_requests(**chunk_args) - chunk_df, response = _walk_pages(geopd=GEOPANDAS, req=req) + chunk_df, chunk_response = _walk_pages(geopd=GEOPANDAS, req=req) frames.append(chunk_df) + if first_response is None: + first_response = chunk_response + total_elapsed = chunk_response.elapsed + else: + total_elapsed = total_elapsed + chunk_response.elapsed if len(frames) == 1: return_list = frames[0] else: return_list = pd.concat(frames, ignore_index=True) - if output_id in return_list.columns: - return_list = return_list.drop_duplicates( - subset=output_id, ignore_index=True - ) + # The top-level feature "id" is always present at this stage (the + # rename to output_id happens later in _arrange_cols), so dedup on + # it directly to catch overlapping OR-clauses across chunks. + if "id" in return_list.columns: + return_list = return_list.drop_duplicates(subset="id", ignore_index=True) # Manage some aspects of the returned dataset return_list = _deal_with_empty(return_list, properties, service) @@ -961,8 +968,12 @@ def get_ogc_data( return_list = _arrange_cols(return_list, properties, output_id) return_list = _sort_rows(return_list) - # Create metadata object from response - metadata = BaseMetadata(response) + # Create metadata object from the first response. When the filter was + # chunked into multiple sub-requests, query_time reflects the total + # elapsed time across all chunks rather than just the first. + if len(frames) > 1: + first_response.elapsed = total_elapsed + metadata = BaseMetadata(first_response) return return_list, metadata diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 8b3ce16b..5fd81b87 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -234,10 +234,7 @@ def respond(request, context): "type": "Feature", "id": f"chunk-{call_count['n']}", "geometry": None, - "properties": { - "continuous_id": f"chunk-{call_count['n']}", - "value": call_count["n"], - }, + "properties": {"value": call_count["n"]}, } ], "links": [], @@ -260,3 +257,55 @@ def respond(request, context): for req in requests_mock.request_history: filter_qs = parse_qs(urlsplit(req.url).query).get("filter", [""])[0] assert len(filter_qs) <= _CQL_FILTER_CHUNK_LEN + + +@pytest.mark.skipif( + sys.version_info < (3, 10), + reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)", +) +def test_long_filter_deduplicates_cross_chunk_overlap(requests_mock): + """Features returned by multiple chunks (same feature `id`) are + deduplicated in the concatenated result.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def respond(request, context): + context.status_code = 200 + call_count["n"] += 1 + # Every chunk returns the same feature id so dedup should collapse + # the concatenated frame down to a single row. + return { + "type": "FeatureCollection", + "numberReturned": 1, + "features": [ + { + "type": "Feature", + "id": "shared-feature", + "geometry": None, + "properties": {"value": 1}, + } + ], + "links": [], + } + + requests_mock.get(OGC_CONTINUOUS_URL, json=respond) + + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + expected_chunks = _chunk_cql_or(expr) + assert len(expected_chunks) > 1 + assert call_count["n"] == len(expected_chunks) + # Even though each chunk returned a feature, dedup by id collapses them. + assert len(df) == 1 From a65e94d9aca5e29aacc3157298ee4ce5b31ee73b Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 14:30:35 -0500 Subject: [PATCH 06/14] Skip chunking for non-cql-text filters; simplify tests - `_chunk_cql_or` splits on the literal substring " OR " and only quote-aware for single quotes (CQL-text). Applying it to CQL-JSON would corrupt JSON string values or produce nonsense sub-requests. Gate chunking to `filter_lang in {None, "cql-text"}` and pass other languages through as a single request. - Replace the `requests_mock`-based fan-out/dedup tests with lighter `mock.patch` stubs of `_construct_api_requests` / `_walk_pages`, which also removes the py<3.10 skip (the tests no longer touch any HTTP or py3.10-only paths). Strengthen the fan-out assertion to `sent_filters == expected_chunks`. - Add `test_cql_json_filter_is_not_chunked` to pin the new guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 11 +- tests/waterdata_utils_test.py | 167 ++++++++++++++++++------------- 2 files changed, 103 insertions(+), 75 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index b9c5fe72..a8e60981 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -930,11 +930,16 @@ def get_ogc_data( convert_type = args.pop("convert_type", False) # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} + # Only cql-text filters can be safely chunked by splitting top-level OR + # chains. For cql-json (or unknown languages), pass through unchanged. # Overlapping user OR-clauses are deduplicated by feature id further below. filter_expr = args.get("filter") - filter_chunks = ( - _chunk_cql_or(filter_expr) if isinstance(filter_expr, str) else [None] - ) + filter_lang = args.get("filter_lang") + should_chunk_filter = isinstance(filter_expr, str) and filter_lang in { + None, + "cql-text", + } + filter_chunks = _chunk_cql_or(filter_expr) if should_chunk_filter else [None] frames = [] first_response = None diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 5fd81b87..e2af16a6 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -1,7 +1,9 @@ -import sys +from datetime import timedelta +from types import SimpleNamespace from unittest import mock from urllib.parse import parse_qs, urlsplit +import pandas as pd import pytest import requests @@ -14,10 +16,6 @@ _walk_pages, ) -OGC_CONTINUOUS_URL = ( - "https://api.waterdata.usgs.gov/ogcapi/v0/collections/continuous/items" -) - def _query_params(prepared_request): return parse_qs(urlsplit(prepared_request.url).query) @@ -205,11 +203,7 @@ def test_construct_filter_on_all_ogc_services(service): assert qs["filter-lang"] == ["cql-text"] -@pytest.mark.skipif( - sys.version_info < (3, 10), - reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)", -) -def test_long_filter_fans_out_into_multiple_requests(requests_mock): +def test_long_filter_fans_out_into_multiple_requests(): """An oversized top-level OR filter triggers multiple HTTP requests whose results are concatenated.""" from dataretrieval.waterdata import get_continuous @@ -221,49 +215,45 @@ def test_long_filter_fans_out_into_multiple_requests(requests_mock): expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) assert len(expr) > _CQL_FILTER_CHUNK_LEN - call_count = {"n": 0} - - def respond(request, context): - context.status_code = 200 - call_count["n"] += 1 - return { - "type": "FeatureCollection", - "numberReturned": 1, - "features": [ - { - "type": "Feature", - "id": f"chunk-{call_count['n']}", - "geometry": None, - "properties": {"value": call_count["n"]}, - } - ], - "links": [], - } - - requests_mock.get(OGC_CONTINUOUS_URL, json=respond) - - df, _ = get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-text", - ) + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return SimpleNamespace(url="https://example.test", method="GET", headers={}) + + def fake_walk_pages(*_args, **_kwargs): + idx = len(sent_filters) + frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) + resp = SimpleNamespace( + url="https://example.test", + elapsed=timedelta(milliseconds=1), + headers={}, + ) + return frame, resp + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) # Mirror the library's splitter so the test doesn't hardcode a chunk count. expected_chunks = _chunk_cql_or(expr) assert len(expected_chunks) > 1 - assert call_count["n"] == len(expected_chunks) + assert len(sent_filters) == len(expected_chunks) + assert sent_filters == expected_chunks assert len(df) == len(expected_chunks) - for req in requests_mock.request_history: - filter_qs = parse_qs(urlsplit(req.url).query).get("filter", [""])[0] - assert len(filter_qs) <= _CQL_FILTER_CHUNK_LEN + assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) -@pytest.mark.skipif( - sys.version_info < (3, 10), - reason="get_continuous requires py>=3.10 (see tests/waterdata_test.py)", -) -def test_long_filter_deduplicates_cross_chunk_overlap(requests_mock): +def test_long_filter_deduplicates_cross_chunk_overlap(): """Features returned by multiple chunks (same feature `id`) are deduplicated in the concatenated result.""" from dataretrieval.waterdata import get_continuous @@ -276,36 +266,69 @@ def test_long_filter_deduplicates_cross_chunk_overlap(requests_mock): call_count = {"n": 0} - def respond(request, context): - context.status_code = 200 + def fake_walk_pages(*_args, **_kwargs): call_count["n"] += 1 - # Every chunk returns the same feature id so dedup should collapse - # the concatenated frame down to a single row. - return { - "type": "FeatureCollection", - "numberReturned": 1, - "features": [ - { - "type": "Feature", - "id": "shared-feature", - "geometry": None, - "properties": {"value": 1}, - } - ], - "links": [], - } - - requests_mock.get(OGC_CONTINUOUS_URL, json=respond) - - df, _ = get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-text", - ) + frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) + resp = SimpleNamespace( + url="https://example.test", + elapsed=timedelta(milliseconds=1), + headers={}, + ) + return frame, resp + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=SimpleNamespace( + url="https://example.test", method="GET", headers={} + ), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) expected_chunks = _chunk_cql_or(expr) assert len(expected_chunks) > 1 assert call_count["n"] == len(expected_chunks) # Even though each chunk returned a feature, dedup by id collapses them. assert len(df) == 1 + + +def test_cql_json_filter_is_not_chunked(): + """Chunking applies only to cql-text; cql-json is passed through unchanged.""" + from dataretrieval.waterdata import get_continuous + + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 300) + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return SimpleNamespace(url="https://example.test", method="GET", headers={}) + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", + return_value=( + pd.DataFrame({"id": ["row-1"], "value": [1]}), + SimpleNamespace( + url="https://example.test", + elapsed=timedelta(milliseconds=1), + headers={}, + ), + ), + ): + get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-json", + ) + + assert sent_filters == [expr] From 5709cd5d02a5b945200620013a6b119808d21808 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 15:07:21 -0500 Subject: [PATCH 07/14] Document CQL filter usage on get_continuous Follow the existing ``time`` date-range example with two CQL-text ``filter`` examples: a two-interval OR expression (the common "pull several disjoint windows in one call" case), and a longer programmatically-built chain that shows the pattern used when pairing many discrete-measurement timestamps with surrounding instantaneous data (which is what the client's transparent chunking is there to support). Both examples were verified against the live Water Data OGC API on USGS-02238500 (00060). Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/api.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 54e9f531..f9a66b31 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -399,6 +399,37 @@ def get_continuous( ... parameter_code="00065", ... time="2021-01-01T00:00:00Z/2022-01-01T00:00:00Z", ... ) + + >>> # The ``time`` parameter accepts a single instant or a single + >>> # interval. To pull several disjoint windows in one call, pass a + >>> # CQL-text ``filter`` expression instead: + >>> df, md = dataretrieval.waterdata.get_continuous( + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... filter=( + ... "(time >= '2023-06-01T12:00:00Z' " + ... "AND time <= '2023-06-01T13:00:00Z') " + ... "OR (time >= '2023-06-15T12:00:00Z' " + ... "AND time <= '2023-06-15T13:00:00Z')" + ... ), + ... filter_lang="cql-text", + ... ) + + >>> # Long top-level ``OR`` chains (e.g. one window per discrete + >>> # measurement timestamp) are built up the same way. If the + >>> # resulting URL would exceed the server's length limit, the + >>> # client transparently splits it into multiple sub-requests and + >>> # returns the concatenated, deduplicated result. + >>> windows = [ + ... f"(time >= '2023-{m:02d}-15T00:00:00Z' " + ... f"AND time <= '2023-{m:02d}-15T00:30:00Z')" + ... for m in range(1, 13) + ... ] + >>> df, md = dataretrieval.waterdata.get_continuous( + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... filter=" OR ".join(windows), + ... ) """ service = "continuous" output_id = "continuous_id" From 8b9d7e96d2cb62128c24d123f476038e373e9bd4 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 15:21:39 -0500 Subject: [PATCH 08/14] Split _split_top_level_or into generator + consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pull the state machine out into ``_iter_or_boundaries``, a generator that yields ``(start, end)`` spans of each top-level ``OR`` separator, and reduce ``_split_top_level_or`` to a short slice loop over those spans. Behaviour is unchanged (all 26 existing tests pass); the win is readability — each function now has one job instead of three, and the producer/consumer split mirrors how ``re.finditer`` / ``tokenize`` are structured elsewhere in the stdlib. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 34 +++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index a8e60981..03b296d7 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -4,6 +4,7 @@ import logging import os import re +from collections.abc import Iterator from datetime import datetime from typing import Any, get_args @@ -232,18 +233,16 @@ def _format_api_dates( _CQL_FILTER_CHUNK_LEN = 5000 -def _split_top_level_or(expr: str) -> list[str]: - """Split a CQL expression at each top-level ``OR`` separator. +def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: + """Yield ``(start, end)`` spans of each top-level ``OR`` separator. - Respects parentheses and single/double-quoted string literals so that - ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. - Matching is case-insensitive. Whitespace around each emitted part is - stripped; empty parts are dropped. + Tracks single/double-quoted string literals and parenthesized + sub-expressions so that ``OR`` tokens inside them are skipped. + Matching is case-insensitive and the yielded span covers the + surrounding whitespace on both sides. """ - parts = [] depth = 0 in_quote = None - last = 0 i = 0 n = len(expr) while i < n: @@ -265,7 +264,6 @@ def _split_top_level_or(expr: str) -> list[str]: depth -= 1 i += 1 continue - # Match whitespace + OR + whitespace at depth 0, case-insensitive. if depth == 0 and ch.isspace(): j = i + 1 while j < n and expr[j].isspace(): @@ -273,14 +271,28 @@ def _split_top_level_or(expr: str) -> list[str]: if j + 2 <= n and expr[j : j + 2].lower() == "or": k = j + 2 if k < n and expr[k].isspace(): - parts.append(expr[last:i].strip()) m = k + 1 while m < n and expr[m].isspace(): m += 1 - last = m + yield i, m i = m continue i += 1 + + +def _split_top_level_or(expr: str) -> list[str]: + """Split a CQL expression at each top-level ``OR`` separator. + + Respects parentheses and single/double-quoted string literals so that + ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. + Matching is case-insensitive. Whitespace around each emitted part is + stripped; empty parts are dropped. + """ + parts = [] + last = 0 + for start, end in _iter_or_boundaries(expr): + parts.append(expr[last:start].strip()) + last = end parts.append(expr[last:].strip()) return [p for p in parts if p] From 686824784a12dd4742c621e051acc8e725cf6d5c Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 19:34:00 -0500 Subject: [PATCH 09/14] Budget CQL chunks against the URL byte limit, not raw filter length The previous 5 KB raw-filter budget was a static approximation. Empirically the Water Data API returns HTTP 414 at ~8,200 bytes of total URL, matching nginx's default 8 KB large_client_header_buffers. The raw-filter budget leaves unknown headroom that varies with: - URL encoding (a uniform time-interval filter inflates ~1.4x; heavy special-char content inflates more) - the URL space consumed by other query params Expose ``_WATERDATA_URL_BYTE_LIMIT = 8000`` with a comment describing what the limit represents, and add ``_effective_filter_budget`` which probes each request's non-filter URL cost and converts the remaining URL budget back to raw CQL bytes via the filter's own encoding ratio. ``get_ogc_data`` now uses that per-request budget instead of the fixed constant. Verified live: a 34 KB OR-chain that previously split into 8 chunks now packs into 7, with every produced URL staying at ~7.9 KB (well under the 8 KB limit and below the 8.2 KB observed 414 cliff). Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 55 ++++++++++++++++++---- tests/waterdata_utils_test.py | 78 ++++++++++++++++++++++++++++---- 2 files changed, 114 insertions(+), 19 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 03b296d7..3115feb0 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -7,6 +7,7 @@ from collections.abc import Iterator from datetime import datetime from typing import Any, get_args +from urllib.parse import quote_plus import pandas as pd import requests @@ -225,13 +226,20 @@ def _format_api_dates( raise ValueError("datetime_input should only include 1-2 values") -# Conservative budget (characters) for a single CQL `filter` query -# parameter before the URL risks exceeding the server's URI length limit. -# The continuous endpoint has been observed to return HTTP 414 around ~7 KB -# of filter text; 5000 leaves headroom for URL encoding and the other -# query parameters. +# Conservative fallback budget (characters) for a single CQL ``filter`` +# query parameter, used when the caller invokes ``_chunk_cql_or`` without +# a ``max_len``. ``get_ogc_data`` computes a tighter per-request budget +# from ``_WATERDATA_URL_BYTE_LIMIT`` below. _CQL_FILTER_CHUNK_LEN = 5000 +# Total URL byte limit the Water Data API will accept before replying +# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at +# ~8,200 bytes of full URL, which lines up with nginx's default +# ``large_client_header_buffers`` of 8 KB (8192). 8000 leaves ~200 bytes +# of headroom for request-line framing ("GET ... HTTP/1.1\r\n") and any +# intermediate proxy variance. +_WATERDATA_URL_BYTE_LIMIT = 8000 + def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: """Yield ``(start, end)`` spans of each top-level ``OR`` separator. @@ -331,6 +339,28 @@ def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: return chunks +def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: + """Compute the raw CQL byte budget for ``filter_expr`` in this request. + + The server limits total URL length (see ``_WATERDATA_URL_BYTE_LIMIT``), + not raw CQL length. To derive a raw-byte budget we can hand to + ``_chunk_cql_or``: + + 1. Probe the URL space consumed by the other query params by building + the request with a 1-byte placeholder filter. + 2. Subtract from the URL limit to get the bytes available for the + encoded filter value. + 3. Convert back to raw CQL bytes using the filter's own URL-encoding + ratio (e.g. uniform time-interval clauses inflate ~1.4x; heavy + special-char clauses can inflate more). + """ + probe = _construct_api_requests(**{**args, "filter": "x"}) + non_filter_url_bytes = len(probe.url) - 1 + available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes + encoding_ratio = len(quote_plus(filter_expr)) / len(filter_expr) + return max(100, int(available_url_bytes / encoding_ratio)) + + def _cql2_param(args: dict[str, Any]) -> str: """ Convert query parameters to CQL2 JSON format for POST requests. @@ -947,11 +977,16 @@ def get_ogc_data( # Overlapping user OR-clauses are deduplicated by feature id further below. filter_expr = args.get("filter") filter_lang = args.get("filter_lang") - should_chunk_filter = isinstance(filter_expr, str) and filter_lang in { - None, - "cql-text", - } - filter_chunks = _chunk_cql_or(filter_expr) if should_chunk_filter else [None] + should_chunk_filter = ( + isinstance(filter_expr, str) + and filter_expr + and filter_lang in {None, "cql-text"} + ) + if should_chunk_filter: + raw_budget = _effective_filter_budget(args, filter_expr) + filter_chunks = _chunk_cql_or(filter_expr, max_len=raw_budget) + else: + filter_chunks = [None] frames = [] first_response = None diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index e2af16a6..3374ccf2 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -9,8 +9,10 @@ from dataretrieval.waterdata.utils import ( _CQL_FILTER_CHUNK_LEN, + _WATERDATA_URL_BYTE_LIMIT, _chunk_cql_or, _construct_api_requests, + _effective_filter_budget, _get_args, _split_top_level_or, _walk_pages, @@ -236,6 +238,9 @@ def fake_walk_pages(*_args, **_kwargs): side_effect=fake_construct_api_requests, ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, ): df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -244,12 +249,18 @@ def fake_walk_pages(*_args, **_kwargs): filter_lang="cql-text", ) - # Mirror the library's splitter so the test doesn't hardcode a chunk count. - expected_chunks = _chunk_cql_or(expr) - assert len(expected_chunks) > 1 - assert len(sent_filters) == len(expected_chunks) - assert sent_filters == expected_chunks - assert len(df) == len(expected_chunks) + # Mocking _effective_filter_budget bypasses the URL-length probe, so + # sent_filters contains only real chunk requests. Assert invariants: + # chunking happened, every original clause is preserved exactly once + # in order, each chunk stays under the budget, and the mock's + # one-row-per-chunk responses concatenate to a row per chunk. + expected_parts = _split_top_level_or(expr) + assert len(sent_filters) > 1 + rejoined_parts = [] + for chunk in sent_filters: + rejoined_parts.extend(_split_top_level_or(chunk)) + assert rejoined_parts == expected_parts + assert len(df) == len(sent_filters) assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) @@ -283,6 +294,9 @@ def fake_walk_pages(*_args, **_kwargs): ), ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, ): df, _ = get_continuous( monitoring_location_id="USGS-07374525", @@ -291,13 +305,59 @@ def fake_walk_pages(*_args, **_kwargs): filter_lang="cql-text", ) - expected_chunks = _chunk_cql_or(expr) - assert len(expected_chunks) > 1 - assert call_count["n"] == len(expected_chunks) + # Chunking must have happened (otherwise dedup wouldn't be exercised). + assert call_count["n"] > 1 # Even though each chunk returned a feature, dedup by id collapses them. assert len(df) == 1 +def test_effective_filter_budget_respects_url_limit(): + """The computed budget, once encoded, fits within the URL byte limit + alongside the other query params.""" + from urllib.parse import quote_plus + + filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "parameter_code": "00060", + "filter": filter_expr, + "filter_lang": "cql-text", + } + raw_budget = _effective_filter_budget(args, filter_expr) + + # Build a chunk exactly at the raw budget (padded with the clause repeated) + # and confirm the full URL it produces stays under the URL byte limit. + padded = (" OR ".join([filter_expr] * 200))[:raw_budget] + req = _construct_api_requests(**{**args, "filter": padded}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT + # And the budget scales inversely with encoding ratio (sanity). + assert raw_budget < _WATERDATA_URL_BYTE_LIMIT + # Quick sanity on the encoding math itself. + assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_shrinks_with_more_url_params(): + """Adding more scalar query params consumes URL bytes and should + shrink the raw filter budget accordingly.""" + clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + sparse_args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": clause, + "filter_lang": "cql-text", + } + dense_args = { + **sparse_args, + "parameter_code": "00060", + "statistic_id": "00003", + "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", + } + sparse_budget = _effective_filter_budget(sparse_args, clause) + dense_budget = _effective_filter_budget(dense_args, clause) + assert dense_budget < sparse_budget + + def test_cql_json_filter_is_not_chunked(): """Chunking applies only to cql-text; cql-json is passed through unchanged.""" from dataretrieval.waterdata import get_continuous From 08c36fe234b14d1e1609ea8aaa4451c98fdae065 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 22 Apr 2026 21:54:18 -0500 Subject: [PATCH 10/14] Budget against max per-clause encoding ratio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The whole-filter ratio is an average; a chunk that happens to contain only the heavier-encoding clauses (e.g. heavy clauses clustered at one end of the filter) can exceed the average ratio and push the full URL a few bytes past _WATERDATA_URL_BYTE_LIMIT. The overflow was invisible in practice — the 8,000 declared budget vs 8,200 observed 414 cliff gave enough headroom — but the computed budget was technically being violated, and a more adversarial clause mix could grow the overflow. Compute the encoding ratio from the heaviest-encoding clause instead of the whole filter. Adds one extra chunk on adversarial inputs (8 instead of 7 for 100 heavy + 400 light) in exchange for every chunk provably staying under the declared URL limit. Verified live: the adversarial clustered-heavy filter now produces 8 chunks with max URL 7806 bytes, all returning 200 OK. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 11 +++++---- tests/waterdata_utils_test.py | 39 ++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 3115feb0..a055144f 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -350,14 +350,17 @@ def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: the request with a 1-byte placeholder filter. 2. Subtract from the URL limit to get the bytes available for the encoded filter value. - 3. Convert back to raw CQL bytes using the filter's own URL-encoding - ratio (e.g. uniform time-interval clauses inflate ~1.4x; heavy - special-char clauses can inflate more). + 3. Convert back to raw CQL bytes using the *maximum* per-clause + encoding ratio, not the whole-filter average. A chunk can end up + containing only the heavier-encoding clauses (e.g. heavy ones + clustered at one end of the filter), so budgeting against the + average lets such a chunk overflow the URL limit by a few bytes. """ probe = _construct_api_requests(**{**args, "filter": "x"}) non_filter_url_bytes = len(probe.url) - 1 available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes - encoding_ratio = len(quote_plus(filter_expr)) / len(filter_expr) + parts = _split_top_level_or(filter_expr) or [filter_expr] + encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts if p) return max(100, int(available_url_bytes / encoding_ratio)) diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 3374ccf2..39bb4f6a 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -337,6 +337,45 @@ def test_effective_filter_budget_respects_url_limit(): assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT +def test_effective_filter_budget_uses_max_clause_ratio(): + """Heavy clauses clustered in one part of the filter must not be able + to push any chunk over the URL limit. The budget is computed against + the max per-clause encoding ratio, not the whole-filter average, so + a chunk of only-heaviest-clauses still fits.""" + from urllib.parse import quote_plus + + heavy = ( + "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " + "AND approval_status IN ('Approved','Provisional','Revised'))" + ) + light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + # Heavy ratio < light ratio for these shapes; cluster them at opposite + # ends so the chunker must produce at least one light-only chunk. + clauses = [heavy] * 100 + [light] * 400 + expr = " OR ".join(clauses) + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + budget = _effective_filter_budget(args, expr) + chunks = _chunk_cql_or(expr, max_len=budget) + assert len(chunks) > 1 + + # Every chunk, once built into a full request, fits under the URL byte + # limit — even the all-light chunks that have a higher-than-average ratio. + for chunk in chunks: + req = _construct_api_requests(**{**args, "filter": chunk}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( + f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" + ) + + # Budget should be tight enough that a chunk of only-light clauses + # (the heavier-encoding shape here) still fits. + assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT + + def test_effective_filter_budget_shrinks_with_more_url_params(): """Adding more scalar query params consumes URL bytes and should shrink the raw filter budget accordingly.""" From 99c74e0343145c94d0a03f8e7a80d4198f5b5622 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 08:47:57 -0500 Subject: [PATCH 11/14] Filter empty frames before concatenating chunk results MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``_get_resp_data`` returns a plain ``pd.DataFrame()`` when a response contains no features, regardless of whether geopandas is enabled. If that empty frame lands first in ``pd.concat([empty, geodf, ...])``, concat can downgrade the result back to a plain DataFrame — silently dropping geometry and CRS when later chunks would have provided them. Drop the empties before concatenation. They contribute no rows either way, so discarding them is safe and keeps the GeoDataFrame type intact whenever any chunk returned one. When every chunk is empty, fall through with a plain ``pd.DataFrame()`` — same behavior as today. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 13 +++++-- tests/waterdata_utils_test.py | 61 ++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index a055144f..01e99d35 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -1005,10 +1005,17 @@ def get_ogc_data( else: total_elapsed = total_elapsed + chunk_response.elapsed - if len(frames) == 1: - return_list = frames[0] + # Drop empty frames before concat — `_get_resp_data` returns a plain + # ``pd.DataFrame()`` on empty responses, which can downgrade a concat + # of real GeoDataFrames back to a plain DataFrame (losing geometry/ + # CRS). Empty frames contribute no rows, so discarding them is safe. + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return_list = pd.DataFrame() + elif len(non_empty) == 1: + return_list = non_empty[0] else: - return_list = pd.concat(frames, ignore_index=True) + return_list = pd.concat(non_empty, ignore_index=True) # The top-level feature "id" is always present at this stage (the # rename to output_id happens later in _arrange_cols), so dedup on # it directly to catch overlapping OR-clauses across chunks. diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 39bb4f6a..cc58f080 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -311,6 +311,67 @@ def fake_walk_pages(*_args, **_kwargs): assert len(df) == 1 +def test_empty_chunks_do_not_downgrade_geodataframe(): + """A mix of empty and non-empty chunk responses must not downgrade a + GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` + returns ``pd.DataFrame()`` on empty responses, which would otherwise + strip geometry/CRS from the concatenated output.""" + pytest.importorskip("geopandas") + import geopandas as gpd + from shapely.geometry import Point + + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. + if call_count["n"] == 2: + frame = pd.DataFrame() + else: + frame = gpd.GeoDataFrame( + {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, + geometry=[Point(call_count["n"], call_count["n"])], + crs="EPSG:4326", + ) + resp = SimpleNamespace( + url="https://example.test", + elapsed=timedelta(milliseconds=1), + headers={}, + ) + return frame, resp + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=SimpleNamespace( + url="https://example.test", method="GET", headers={} + ), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # The empty chunk must not have stripped the GeoDataFrame type. + assert isinstance(df, gpd.GeoDataFrame) + assert "geometry" in df.columns + assert df.crs is not None + + def test_effective_filter_budget_respects_url_limit(): """The computed budget, once encoded, fits within the URL byte limit alongside the other query params.""" From 9ca75f0aa62704d06ea4aad8a963a463b150aa02 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 08:56:08 -0500 Subject: [PATCH 12/14] Pass through filter when non-filter URL already exceeds limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If ``_effective_filter_budget`` probes a request whose non-filter URL is already at or past ``_WATERDATA_URL_BYTE_LIMIT``, no chunk we could produce would fit — yet the previous ``max(100, int(available/ratio))`` floored the budget to 100 raw bytes, which ``_chunk_cql_or`` happily used to pack single-clause chunks. For a filter with N clauses that meant N guaranteed-414 sub-requests instead of one clear failure. Detect ``available_url_bytes <= 0`` and return a budget larger than the filter itself; ``_chunk_cql_or``'s first short-circuit then passes the expression through unchanged. The server returns one 414, which surfaces the problem directly to the caller. Also add a regression test for the CQL doubled-quote escape (``''``): the scanner's naive toggle-on-quote logic already handles this case correctly — the two quotes are adjacent so there's no content between them to misclassify — but lock the behavior in so a refactor can't regress it. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 6 +++++ tests/waterdata_utils_test.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 01e99d35..82dd505b 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -359,6 +359,12 @@ def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: probe = _construct_api_requests(**{**args, "filter": "x"}) non_filter_url_bytes = len(probe.url) - 1 available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes + if available_url_bytes <= 0: + # The non-filter URL already exceeds the byte limit, so no chunk + # we could produce would fit. Return a budget larger than the + # filter so _chunk_cql_or passes it through unchanged — one 414 + # from the server is clearer than a burst of N failing sub-requests. + return len(filter_expr) + 1 parts = _split_top_level_or(filter_expr) or [filter_expr] encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts if p) return max(100, int(available_url_bytes / encoding_ratio)) diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index cc58f080..c3a66cd0 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -145,6 +145,25 @@ def test_split_top_level_or_respects_quotes(): assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] +def test_split_top_level_or_handles_doubled_quote_escape(): + """CQL text escapes a single quote inside a literal as ``''``. The + two quotes are adjacent, so the scanner's naive toggle-on-quote logic + happens to land back in the correct state with nothing between the + toggles to misclassify. Lock that behavior in so a future refactor + can't regress it.""" + cases = [ + ("name = 'O''Reilly OR Co' OR id = 1", ["name = 'O''Reilly OR Co'", "id = 1"]), + ("name = 'It''s' OR id = 1", ["name = 'It''s'", "id = 1"]), + ( + "name = 'alpha ''or'' beta' OR id = 1", + ["name = 'alpha ''or'' beta'", "id = 1"], + ), + ("'x'' OR ''y' OR id = 1", ["'x'' OR ''y'", "id = 1"]), + ] + for expr, expected in cases: + assert _split_top_level_or(expr) == expected, expr + + def test_split_top_level_or_single_clause(): assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ "time >= '2023-01-01T00:00:00Z'" @@ -437,6 +456,28 @@ def test_effective_filter_budget_uses_max_clause_ratio(): assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT +def test_effective_filter_budget_passes_through_when_no_url_space(): + """If the non-filter URL already exceeds the byte limit, chunking + cannot make the request succeed. The budget helper should signal + pass-through (return a budget larger than the filter) so + ``_chunk_cql_or`` emits one chunk — one 414 from the server is + clearer than a burst of N guaranteed-414 sub-requests.""" + expr = " OR ".join( + ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 + ) + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=SimpleNamespace( + url="https://example.test/" + "A" * 9000, method="GET", headers={} + ), + ): + budget = _effective_filter_budget({"filter": expr}, expr) + # Budget is large enough that _chunk_cql_or returns the expression + # unchanged (passthrough) rather than producing many small chunks. + assert budget > len(expr) + assert _chunk_cql_or(expr, max_len=budget) == [expr] + + def test_effective_filter_budget_shrinks_with_more_url_params(): """Adding more scalar query params consumes URL bytes and should shrink the raw filter budget accordingly.""" From 443c8754a34841307ca7609542e3dafe664e4029 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 14:59:44 -0500 Subject: [PATCH 13/14] Simplify chunking loop and short-circuit trivial-fit budget probe Three small cleanups from a /simplify pass; no behaviour change on the chunked path: - ``_effective_filter_budget`` now has a fast path for filters whose encoded length already fits under the URL limit with a 1 KB headroom for everything-but-the-filter. Skips the throwaway ``_construct_api_requests`` probe + the splitter + the encoding- ratio loop on every short-filter call, which is the common case. - ``get_ogc_data`` now collects chunk responses into a single ``responses`` list instead of carrying ``first_response``, ``total_elapsed``, and a branching accumulator through the loop. Elapsed-time aggregation moves to one line after the loop. - ``tests/waterdata_utils_test.py`` factors the repeated ``SimpleNamespace(url=..., elapsed=..., headers={})`` mocks into ``_fake_prepared_request()`` / ``_fake_response()`` helpers; 5 copy-paste sites collapse to one-line calls. Bumped the budget- shrinks-with-URL-params test to a filter large enough to go past the new short-circuit. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 40 ++++++++++++------- tests/waterdata_utils_test.py | 67 ++++++++++++++------------------ 2 files changed, 56 insertions(+), 51 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 82dd505b..bf71d7a5 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -240,6 +240,12 @@ def _format_api_dates( # intermediate proxy variance. _WATERDATA_URL_BYTE_LIMIT = 8000 +# Conservative over-estimate of the URL bytes consumed by everything +# *except* the filter value — the base URL, other query params, and the +# ``&filter=`` / ``&filter-lang=...`` keys. Used only to decide whether a +# filter is small enough that the expensive budget probe can be skipped. +_NON_FILTER_URL_HEADROOM = 1000 + def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: """Yield ``(start, end)`` spans of each top-level ``OR`` separator. @@ -356,6 +362,15 @@ def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: clustered at one end of the filter), so budgeting against the average lets such a chunk overflow the URL limit by a few bytes. """ + # Fast path: if the whole encoded filter already fits with room for + # any plausible non-filter URL overhead, skip the probe and the + # splitter entirely. Signals pass-through via a budget larger than + # the filter. Saves a PreparedRequest build + a full splitter scan + # on every short-filter call. + encoded_len = len(quote_plus(filter_expr)) + if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: + return len(filter_expr) + 1 + probe = _construct_api_requests(**{**args, "filter": "x"}) non_filter_url_bytes = len(probe.url) - 1 available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes @@ -998,18 +1013,13 @@ def get_ogc_data( filter_chunks = [None] frames = [] - first_response = None - total_elapsed = None + responses = [] for chunk in filter_chunks: chunk_args = args if chunk is None else {**args, "filter": chunk} req = _construct_api_requests(**chunk_args) chunk_df, chunk_response = _walk_pages(geopd=GEOPANDAS, req=req) frames.append(chunk_df) - if first_response is None: - first_response = chunk_response - total_elapsed = chunk_response.elapsed - else: - total_elapsed = total_elapsed + chunk_response.elapsed + responses.append(chunk_response) # Drop empty frames before concat — `_get_resp_data` returns a plain # ``pd.DataFrame()`` on empty responses, which can downgrade a concat @@ -1036,12 +1046,16 @@ def get_ogc_data( return_list = _arrange_cols(return_list, properties, output_id) return_list = _sort_rows(return_list) - # Create metadata object from the first response. When the filter was - # chunked into multiple sub-requests, query_time reflects the total - # elapsed time across all chunks rather than just the first. - if len(frames) > 1: - first_response.elapsed = total_elapsed - metadata = BaseMetadata(first_response) + # Use the first response for URL/headers; when the filter was chunked, + # aggregate elapsed time across all chunks so ``query_time`` reflects + # the full operation rather than just the first sub-request. + metadata_response = responses[0] + if len(responses) > 1: + metadata_response.elapsed = sum( + (r.elapsed for r in responses[1:]), + start=metadata_response.elapsed, + ) + metadata = BaseMetadata(metadata_response) return return_list, metadata diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index c3a66cd0..ebfc685e 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -23,6 +23,20 @@ def _query_params(prepared_request): return parse_qs(urlsplit(prepared_request.url).query) +def _fake_prepared_request(url="https://example.test"): + """Stand-in for the object ``_construct_api_requests`` returns.""" + return SimpleNamespace(url=url, method="GET", headers={}) + + +def _fake_response(url="https://example.test", elapsed_ms=1): + """Stand-in for the response object ``_walk_pages`` returns.""" + return SimpleNamespace( + url=url, + elapsed=timedelta(milliseconds=elapsed_ms), + headers={}, + ) + + def test_get_args_basic(): local_vars = { "monitoring_location_id": "123", @@ -240,17 +254,12 @@ def test_long_filter_fans_out_into_multiple_requests(): def fake_construct_api_requests(**kwargs): sent_filters.append(kwargs.get("filter")) - return SimpleNamespace(url="https://example.test", method="GET", headers={}) + return _fake_prepared_request() def fake_walk_pages(*_args, **_kwargs): idx = len(sent_filters) frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) - resp = SimpleNamespace( - url="https://example.test", - elapsed=timedelta(milliseconds=1), - headers={}, - ) - return frame, resp + return frame, _fake_response() with mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", @@ -299,18 +308,11 @@ def test_long_filter_deduplicates_cross_chunk_overlap(): def fake_walk_pages(*_args, **_kwargs): call_count["n"] += 1 frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) - resp = SimpleNamespace( - url="https://example.test", - elapsed=timedelta(milliseconds=1), - headers={}, - ) - return frame, resp + return frame, _fake_response() with mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - return_value=SimpleNamespace( - url="https://example.test", method="GET", headers={} - ), + return_value=_fake_prepared_request(), ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages ), mock.patch( @@ -360,18 +362,11 @@ def fake_walk_pages(*_args, **_kwargs): geometry=[Point(call_count["n"], call_count["n"])], crs="EPSG:4326", ) - resp = SimpleNamespace( - url="https://example.test", - elapsed=timedelta(milliseconds=1), - headers={}, - ) - return frame, resp + return frame, _fake_response() with mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - return_value=SimpleNamespace( - url="https://example.test", method="GET", headers={} - ), + return_value=_fake_prepared_request(), ), mock.patch( "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages ), mock.patch( @@ -467,9 +462,7 @@ def test_effective_filter_budget_passes_through_when_no_url_space(): ) with mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", - return_value=SimpleNamespace( - url="https://example.test/" + "A" * 9000, method="GET", headers={} - ), + return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000), ): budget = _effective_filter_budget({"filter": expr}, expr) # Budget is large enough that _chunk_cql_or returns the expression @@ -480,12 +473,14 @@ def test_effective_filter_budget_passes_through_when_no_url_space(): def test_effective_filter_budget_shrinks_with_more_url_params(): """Adding more scalar query params consumes URL bytes and should - shrink the raw filter budget accordingly.""" + shrink the raw filter budget accordingly. Use a filter large enough + to skip the short-circuit fast path so the probe actually runs.""" clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + expr = " OR ".join([clause] * 100) sparse_args = { "service": "continuous", "monitoring_location_id": "USGS-02238500", - "filter": clause, + "filter": expr, "filter_lang": "cql-text", } dense_args = { @@ -494,8 +489,8 @@ def test_effective_filter_budget_shrinks_with_more_url_params(): "statistic_id": "00003", "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", } - sparse_budget = _effective_filter_budget(sparse_args, clause) - dense_budget = _effective_filter_budget(dense_args, clause) + sparse_budget = _effective_filter_budget(sparse_args, expr) + dense_budget = _effective_filter_budget(dense_args, expr) assert dense_budget < sparse_budget @@ -509,7 +504,7 @@ def test_cql_json_filter_is_not_chunked(): def fake_construct_api_requests(**kwargs): sent_filters.append(kwargs.get("filter")) - return SimpleNamespace(url="https://example.test", method="GET", headers={}) + return _fake_prepared_request() with mock.patch( "dataretrieval.waterdata.utils._construct_api_requests", @@ -518,11 +513,7 @@ def fake_construct_api_requests(**kwargs): "dataretrieval.waterdata.utils._walk_pages", return_value=( pd.DataFrame({"id": ["row-1"], "value": [1]}), - SimpleNamespace( - url="https://example.test", - elapsed=timedelta(milliseconds=1), - headers={}, - ), + _fake_response(), ), ): get_continuous( From 0d74e74df05319abd53133f2fc7d795fbe603a85 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 15:09:51 -0500 Subject: [PATCH 14/14] Split get_ogc_data into phase helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pull the chunked fan-out, frame combining, and metadata aggregation out of ``get_ogc_data`` into four private helpers so the top-level function reads as a short recipe rather than a 70-line procedure. Behaviour is unchanged (all 32 PR-related tests still pass); each helper docstring captures the non-obvious *why* of its phase: - ``_plan_filter_chunks`` decide how to fan out - ``_fetch_chunks`` one request per chunk, pure I/O loop - ``_combine_chunk_frames`` concat, drop empties to preserve GeoDataFrame type, dedup by feature id - ``_aggregate_response_metadata`` first response + summed elapsed The top-of-``get_ogc_data`` arg normalization stays inline — it's short and has a subtle ordering requirement (capture ``properties`` before the id-switch) that extraction would hide. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/utils.py | 124 +++++++++++++++++++------------ 1 file changed, 76 insertions(+), 48 deletions(-) diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index bf71d7a5..ec837744 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -984,79 +984,107 @@ def get_ogc_data( - Applies column cleanup and reordering based on service and properties. """ args = args.copy() - # Add service as an argument args["service"] = service - # Switch the input id to "id" if needed args = _switch_arg_id(args, id_name=output_id, service=service) + # Capture `properties` before the id-switch so post-processing sees + # the user-facing names, not the wire-format ones. properties = args.get("properties") - # Switch properties id to "id" if needed args["properties"] = _switch_properties_id( properties, id_name=output_id, service=service ) convert_type = args.pop("convert_type", False) - # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # Only cql-text filters can be safely chunked by splitting top-level OR - # chains. For cql-json (or unknown languages), pass through unchanged. - # Overlapping user OR-clauses are deduplicated by feature id further below. + + chunks = _plan_filter_chunks(args) + frames, responses = _fetch_chunks(args, chunks) + + return_list = _combine_chunk_frames(frames) + return_list = _deal_with_empty(return_list, properties, service) + if convert_type: + return_list = _type_cols(return_list) + return_list = _arrange_cols(return_list, properties, output_id) + return_list = _sort_rows(return_list) + + return return_list, _aggregate_response_metadata(responses) + + +def _plan_filter_chunks(args: dict[str, Any]) -> list[str | None]: + """Decide how to fan ``args["filter"]`` out across HTTP calls. + + Returns one entry per request to send. A ``None`` entry means "send + ``args`` as-is" — either there is no filter, or the filter language + is not one we can safely split (only cql-text top-level ``OR`` + chains are chunkable). Otherwise each string entry is a chunked + cql-text expression that replaces ``args["filter"]`` for its + sub-request. Overlapping user OR-clauses are deduplicated by feature + id later in ``_combine_chunk_frames``. + """ filter_expr = args.get("filter") filter_lang = args.get("filter_lang") - should_chunk_filter = ( + chunkable = ( isinstance(filter_expr, str) and filter_expr and filter_lang in {None, "cql-text"} ) - if should_chunk_filter: - raw_budget = _effective_filter_budget(args, filter_expr) - filter_chunks = _chunk_cql_or(filter_expr, max_len=raw_budget) - else: - filter_chunks = [None] - - frames = [] - responses = [] - for chunk in filter_chunks: + if not chunkable: + return [None] + raw_budget = _effective_filter_budget(args, filter_expr) + return _chunk_cql_or(filter_expr, max_len=raw_budget) + + +def _fetch_chunks( + args: dict[str, Any], chunks: list[str | None] +) -> tuple[list[pd.DataFrame], list[requests.Response]]: + """Send one request per chunk; return the per-chunk frames and responses.""" + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for chunk in chunks: chunk_args = args if chunk is None else {**args, "filter": chunk} req = _construct_api_requests(**chunk_args) - chunk_df, chunk_response = _walk_pages(geopd=GEOPANDAS, req=req) - frames.append(chunk_df) - responses.append(chunk_response) - - # Drop empty frames before concat — `_get_resp_data` returns a plain - # ``pd.DataFrame()`` on empty responses, which can downgrade a concat - # of real GeoDataFrames back to a plain DataFrame (losing geometry/ - # CRS). Empty frames contribute no rows, so discarding them is safe. - non_empty = [f for f in frames if not f.empty] - if not non_empty: - return_list = pd.DataFrame() - elif len(non_empty) == 1: - return_list = non_empty[0] - else: - return_list = pd.concat(non_empty, ignore_index=True) - # The top-level feature "id" is always present at this stage (the - # rename to output_id happens later in _arrange_cols), so dedup on - # it directly to catch overlapping OR-clauses across chunks. - if "id" in return_list.columns: - return_list = return_list.drop_duplicates(subset="id", ignore_index=True) - # Manage some aspects of the returned dataset - return_list = _deal_with_empty(return_list, properties, service) + frame, response = _walk_pages(geopd=GEOPANDAS, req=req) + frames.append(frame) + responses.append(response) + return frames, responses - if convert_type: - return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) +def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: + """Concatenate per-chunk frames, handling the edge cases. - return_list = _sort_rows(return_list) - # Use the first response for URL/headers; when the filter was chunked, - # aggregate elapsed time across all chunks so ``query_time`` reflects - # the full operation rather than just the first sub-request. + Drops empty frames before concat — ``_get_resp_data`` returns a + plain ``pd.DataFrame()`` on empty responses, which would downgrade + a concat of real GeoDataFrames back to a plain DataFrame and strip + geometry/CRS. Also dedups on the pre-rename feature ``id`` so + overlapping user-supplied OR-clauses don't produce duplicate rows + across chunks. + """ + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return pd.DataFrame() + if len(non_empty) == 1: + return non_empty[0] + combined = pd.concat(non_empty, ignore_index=True) + if "id" in combined.columns: + combined = combined.drop_duplicates(subset="id", ignore_index=True) + return combined + + +def _aggregate_response_metadata( + responses: list[requests.Response], +) -> BaseMetadata: + """Build metadata from the first response, summing elapsed across chunks. + + The first response's URL and headers are the representative ones to + return. When the filter was fanned across multiple chunks, replace + its elapsed with the sum so ``query_time`` reflects the full + operation rather than just the first sub-request. + """ metadata_response = responses[0] if len(responses) > 1: metadata_response.elapsed = sum( (r.elapsed for r in responses[1:]), start=metadata_response.elapsed, ) - metadata = BaseMetadata(metadata_response) - return return_list, metadata + return BaseMetadata(metadata_response) def _handle_stats_nesting(