diff --git a/NEWS.md b/NEWS.md index a3aa8361..48c91ea4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,11 @@ +**04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module: + +- Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207). +- Added `get_reference_table` (and made it considerably simpler and faster in #209), then extended it to accept arbitrary collections-API query parameters (#214). +- Removed the deprecated `waterwatch` module (#228) and several defunct NWIS stubs (#222, #225), and added `py.typed` so `dataretrieval` ships type information to downstream users (#186). +- Now supports `pandas` 3.x (#221). +- The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and the six others built on the same OGC collections) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. A long expression made up of a top-level `OR` chain is transparently split into multiple requests that each fit under the server's URI length limit, and the results are concatenated. + **12/04/2025:** The `get_continuous()` function was added to the `waterdata` module, which provides access to measurements collected via automated sensors at a high frequency (often 15 minute intervals) at a monitoring location. This is an early version of the continuous endpoint and should be used with caution as the API team improves its performance. In the future, we anticipate the addition of an endpoint(s) specifically for handling large data requests, so it may make sense for power users to hold off on heavy development using the new continuous endpoint. **11/24/2025:** `dataretrieval` is pleased to offer a new module, `waterdata`, which gives users access USGS's modernized [Water Data APIs](https://api.waterdata.usgs.gov/). The Water Data API endpoints include daily values, instantaneous values, field measurements (modernized groundwater levels service), time series metadata, and discrete water quality data from the Samples database. Though there will be a period of overlap, the functions within `waterdata` will eventually replace the `nwis` module, which currently provides access to the legacy [NWIS Water Services](https://waterservices.usgs.gov/). More example workflows and functions coming soon. Check `help(waterdata)` for more information. diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 2110de83..9bd80c91 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -27,6 +27,7 @@ ) from .types import ( CODE_SERVICES, + FILTER_LANG, PROFILE_LOOKUP, PROFILES, SERVICES, @@ -34,6 +35,7 @@ __all__ = [ "CODE_SERVICES", + "FILTER_LANG", "PROFILES", "PROFILE_LOOKUP", "SERVICES", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index b2310e7a..f9a66b31 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -18,6 +18,7 @@ from dataretrieval.utils import BaseMetadata, to_str from dataretrieval.waterdata.types import ( CODE_SERVICES, + FILTER_LANG, METADATA_COLLECTIONS, PROFILES, SERVICES, @@ -51,6 +52,8 @@ def get_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -177,6 +180,18 @@ def get_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -228,6 +243,8 @@ def get_continuous( last_modified: str | None = None, time: str | list[str] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -348,6 +365,18 @@ def get_continuous( allowable limit is 10000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector @@ -370,6 +399,37 @@ def get_continuous( ... parameter_code="00065", ... time="2021-01-01T00:00:00Z/2022-01-01T00:00:00Z", ... ) + + >>> # The ``time`` parameter accepts a single instant or a single + >>> # interval. To pull several disjoint windows in one call, pass a + >>> # CQL-text ``filter`` expression instead: + >>> df, md = dataretrieval.waterdata.get_continuous( + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... filter=( + ... "(time >= '2023-06-01T12:00:00Z' " + ... "AND time <= '2023-06-01T13:00:00Z') " + ... "OR (time >= '2023-06-15T12:00:00Z' " + ... "AND time <= '2023-06-15T13:00:00Z')" + ... ), + ... filter_lang="cql-text", + ... ) + + >>> # Long top-level ``OR`` chains (e.g. one window per discrete + >>> # measurement timestamp) are built up the same way. If the + >>> # resulting URL would exceed the server's length limit, the + >>> # client transparently splits it into multiple sub-requests and + >>> # returns the concatenated, deduplicated result. + >>> windows = [ + ... f"(time >= '2023-{m:02d}-15T00:00:00Z' " + ... f"AND time <= '2023-{m:02d}-15T00:30:00Z')" + ... for m in range(1, 13) + ... ] + >>> df, md = dataretrieval.waterdata.get_continuous( + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... filter=" OR ".join(windows), + ... ) """ service = "continuous" output_id = "continuous_id" @@ -426,6 +486,8 @@ def get_monitoring_locations( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location @@ -635,6 +697,18 @@ def get_monitoring_locations( The returning object will be a data frame with no spatial information. Note that the USGS Water Data APIs use camelCase "skipGeometry" in CQL2 queries. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -697,6 +771,8 @@ def get_time_series_metadata( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, @@ -851,6 +927,18 @@ def get_time_series_metadata( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -903,6 +991,8 @@ def get_latest_continuous( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series @@ -1026,6 +1116,18 @@ def get_latest_continuous( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1075,6 +1177,8 @@ def get_latest_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -1200,6 +1304,18 @@ def get_latest_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1251,6 +1367,8 @@ def get_field_measurements( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a @@ -1366,6 +1484,18 @@ def get_field_measurements( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -2017,6 +2147,8 @@ def get_channel( skip_geometry: bool | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -2123,6 +2255,18 @@ def get_channel( vertical_velocity_description, longitudinal_velocity_description, measurement_type, last_modified, channel_measurement_type. The default (NA) will return all columns of the data. + filter : string, optional + A CQL text or JSON expression passed through to the OGC API + ``filter`` query parameter. Commonly used to OR several time + ranges into a single request. At the time of writing the server + accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / + ``cql2-json`` are not yet supported. A long expression made up + of a top-level ``OR`` chain is automatically split into + multiple requests that each fit under the server's URI length + limit; the results are concatenated. + filter_lang : string, optional + Language of the ``filter`` expression, for example ``cql-text`` + (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..c9655719 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,6 +40,8 @@ "results", ] +FILTER_LANG = Literal["cql-text", "cql-json"] + PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index c58148d5..ec837744 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -4,8 +4,10 @@ import logging import os import re +from collections.abc import Iterator from datetime import datetime from typing import Any, get_args +from urllib.parse import quote_plus import pandas as pd import requests @@ -224,6 +226,165 @@ def _format_api_dates( raise ValueError("datetime_input should only include 1-2 values") +# Conservative fallback budget (characters) for a single CQL ``filter`` +# query parameter, used when the caller invokes ``_chunk_cql_or`` without +# a ``max_len``. ``get_ogc_data`` computes a tighter per-request budget +# from ``_WATERDATA_URL_BYTE_LIMIT`` below. +_CQL_FILTER_CHUNK_LEN = 5000 + +# Total URL byte limit the Water Data API will accept before replying +# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at +# ~8,200 bytes of full URL, which lines up with nginx's default +# ``large_client_header_buffers`` of 8 KB (8192). 8000 leaves ~200 bytes +# of headroom for request-line framing ("GET ... HTTP/1.1\r\n") and any +# intermediate proxy variance. +_WATERDATA_URL_BYTE_LIMIT = 8000 + +# Conservative over-estimate of the URL bytes consumed by everything +# *except* the filter value — the base URL, other query params, and the +# ``&filter=`` / ``&filter-lang=...`` keys. Used only to decide whether a +# filter is small enough that the expensive budget probe can be skipped. +_NON_FILTER_URL_HEADROOM = 1000 + + +def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: + """Yield ``(start, end)`` spans of each top-level ``OR`` separator. + + Tracks single/double-quoted string literals and parenthesized + sub-expressions so that ``OR`` tokens inside them are skipped. + Matching is case-insensitive and the yielded span covers the + surrounding whitespace on both sides. + """ + depth = 0 + in_quote = None + i = 0 + n = len(expr) + while i < n: + ch = expr[i] + if in_quote is not None: + if ch == in_quote: + in_quote = None + i += 1 + continue + if ch in ("'", '"'): + in_quote = ch + i += 1 + continue + if ch == "(": + depth += 1 + i += 1 + continue + if ch == ")": + depth -= 1 + i += 1 + continue + if depth == 0 and ch.isspace(): + j = i + 1 + while j < n and expr[j].isspace(): + j += 1 + if j + 2 <= n and expr[j : j + 2].lower() == "or": + k = j + 2 + if k < n and expr[k].isspace(): + m = k + 1 + while m < n and expr[m].isspace(): + m += 1 + yield i, m + i = m + continue + i += 1 + + +def _split_top_level_or(expr: str) -> list[str]: + """Split a CQL expression at each top-level ``OR`` separator. + + Respects parentheses and single/double-quoted string literals so that + ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. + Matching is case-insensitive. Whitespace around each emitted part is + stripped; empty parts are dropped. + """ + parts = [] + last = 0 + for start, end in _iter_or_boundaries(expr): + parts.append(expr[last:start].strip()) + last = end + parts.append(expr[last:].strip()) + return [p for p in parts if p] + + +def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: + """Split a CQL expression into OR-chunks that each fit under ``max_len``. + + The splitter only understands top-level ``OR`` chains, since that is + the only shape that can be recombined losslessly as a disjunction of + independent sub-queries. Returns ``[expr]`` unchanged when the whole + expression already fits, when it contains no top-level ``OR``, or when + any single clause is larger than ``max_len`` on its own (we would + rather send a too-long request and surface the server's 414 than + silently drop data). + """ + if len(expr) <= max_len: + return [expr] + parts = _split_top_level_or(expr) + if len(parts) < 2 or any(len(p) > max_len for p in parts): + return [expr] + + chunks = [] + current = [] + current_len = 0 + for part in parts: + join_cost = len(" OR ") if current else 0 + if current and current_len + join_cost + len(part) > max_len: + chunks.append(" OR ".join(current)) + current = [part] + current_len = len(part) + else: + current.append(part) + current_len += join_cost + len(part) + if current: + chunks.append(" OR ".join(current)) + return chunks + + +def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: + """Compute the raw CQL byte budget for ``filter_expr`` in this request. + + The server limits total URL length (see ``_WATERDATA_URL_BYTE_LIMIT``), + not raw CQL length. To derive a raw-byte budget we can hand to + ``_chunk_cql_or``: + + 1. Probe the URL space consumed by the other query params by building + the request with a 1-byte placeholder filter. + 2. Subtract from the URL limit to get the bytes available for the + encoded filter value. + 3. Convert back to raw CQL bytes using the *maximum* per-clause + encoding ratio, not the whole-filter average. A chunk can end up + containing only the heavier-encoding clauses (e.g. heavy ones + clustered at one end of the filter), so budgeting against the + average lets such a chunk overflow the URL limit by a few bytes. + """ + # Fast path: if the whole encoded filter already fits with room for + # any plausible non-filter URL overhead, skip the probe and the + # splitter entirely. Signals pass-through via a budget larger than + # the filter. Saves a PreparedRequest build + a full splitter scan + # on every short-filter call. + encoded_len = len(quote_plus(filter_expr)) + if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: + return len(filter_expr) + 1 + + probe = _construct_api_requests(**{**args, "filter": "x"}) + non_filter_url_bytes = len(probe.url) - 1 + available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes + if available_url_bytes <= 0: + # The non-filter URL already exceeds the byte limit, so no chunk + # we could produce would fit. Return a budget larger than the + # filter so _chunk_cql_or passes it through unchanged — one 414 + # from the server is clearer than a burst of N failing sub-requests. + return len(filter_expr) + 1 + parts = _split_top_level_or(filter_expr) or [filter_expr] + encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts if p) + return max(100, int(available_url_bytes / encoding_ratio)) + + def _cql2_param(args: dict[str, Any]) -> str: """ Convert query parameters to CQL2 JSON format for POST requests. @@ -419,6 +580,12 @@ def _construct_api_requests( if properties: params["properties"] = ",".join(properties) + # Translate CQL filter Python names to the hyphenated URL parameter that + # the OGC API expects. The Python kwarg is `filter_lang` because hyphens + # aren't valid in Python identifiers. + if "filter_lang" in params: + params["filter-lang"] = params.pop("filter_lang") + headers = _default_headers() if POST: @@ -817,34 +984,107 @@ def get_ogc_data( - Applies column cleanup and reordering based on service and properties. """ args = args.copy() - # Add service as an argument args["service"] = service - # Switch the input id to "id" if needed args = _switch_arg_id(args, id_name=output_id, service=service) + # Capture `properties` before the id-switch so post-processing sees + # the user-facing names, not the wire-format ones. properties = args.get("properties") - # Switch properties id to "id" if needed args["properties"] = _switch_properties_id( properties, id_name=output_id, service=service ) convert_type = args.pop("convert_type", False) - # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # Build API request - req = _construct_api_requests(**args) - # Run API request and iterate through pages if needed - return_list, response = _walk_pages(geopd=GEOPANDAS, req=req) - # Manage some aspects of the returned dataset - return_list = _deal_with_empty(return_list, properties, service) + chunks = _plan_filter_chunks(args) + frames, responses = _fetch_chunks(args, chunks) + + return_list = _combine_chunk_frames(frames) + return_list = _deal_with_empty(return_list, properties, service) if convert_type: return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) - return_list = _sort_rows(return_list) - # Create metadata object from response - metadata = BaseMetadata(response) - return return_list, metadata + + return return_list, _aggregate_response_metadata(responses) + + +def _plan_filter_chunks(args: dict[str, Any]) -> list[str | None]: + """Decide how to fan ``args["filter"]`` out across HTTP calls. + + Returns one entry per request to send. A ``None`` entry means "send + ``args`` as-is" — either there is no filter, or the filter language + is not one we can safely split (only cql-text top-level ``OR`` + chains are chunkable). Otherwise each string entry is a chunked + cql-text expression that replaces ``args["filter"]`` for its + sub-request. Overlapping user OR-clauses are deduplicated by feature + id later in ``_combine_chunk_frames``. + """ + filter_expr = args.get("filter") + filter_lang = args.get("filter_lang") + chunkable = ( + isinstance(filter_expr, str) + and filter_expr + and filter_lang in {None, "cql-text"} + ) + if not chunkable: + return [None] + raw_budget = _effective_filter_budget(args, filter_expr) + return _chunk_cql_or(filter_expr, max_len=raw_budget) + + +def _fetch_chunks( + args: dict[str, Any], chunks: list[str | None] +) -> tuple[list[pd.DataFrame], list[requests.Response]]: + """Send one request per chunk; return the per-chunk frames and responses.""" + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for chunk in chunks: + chunk_args = args if chunk is None else {**args, "filter": chunk} + req = _construct_api_requests(**chunk_args) + frame, response = _walk_pages(geopd=GEOPANDAS, req=req) + frames.append(frame) + responses.append(response) + return frames, responses + + +def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: + """Concatenate per-chunk frames, handling the edge cases. + + Drops empty frames before concat — ``_get_resp_data`` returns a + plain ``pd.DataFrame()`` on empty responses, which would downgrade + a concat of real GeoDataFrames back to a plain DataFrame and strip + geometry/CRS. Also dedups on the pre-rename feature ``id`` so + overlapping user-supplied OR-clauses don't produce duplicate rows + across chunks. + """ + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return pd.DataFrame() + if len(non_empty) == 1: + return non_empty[0] + combined = pd.concat(non_empty, ignore_index=True) + if "id" in combined.columns: + combined = combined.drop_duplicates(subset="id", ignore_index=True) + return combined + + +def _aggregate_response_metadata( + responses: list[requests.Response], +) -> BaseMetadata: + """Build metadata from the first response, summing elapsed across chunks. + + The first response's URL and headers are the representative ones to + return. When the filter was fanned across multiple chunks, replace + its elapsed with the sum so ``query_time`` reflects the full + operation rather than just the first sub-request. + """ + metadata_response = responses[0] + if len(responses) > 1: + metadata_response.elapsed = sum( + (r.elapsed for r in responses[1:]), + start=metadata_response.elapsed, + ) + return BaseMetadata(metadata_response) def _handle_stats_nesting( diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 772f75b7..ebfc685e 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -1,8 +1,40 @@ +from datetime import timedelta +from types import SimpleNamespace from unittest import mock +from urllib.parse import parse_qs, urlsplit +import pandas as pd +import pytest import requests -from dataretrieval.waterdata.utils import _get_args, _walk_pages +from dataretrieval.waterdata.utils import ( + _CQL_FILTER_CHUNK_LEN, + _WATERDATA_URL_BYTE_LIMIT, + _chunk_cql_or, + _construct_api_requests, + _effective_filter_budget, + _get_args, + _split_top_level_or, + _walk_pages, +) + + +def _query_params(prepared_request): + return parse_qs(urlsplit(prepared_request.url).query) + + +def _fake_prepared_request(url="https://example.test"): + """Stand-in for the object ``_construct_api_requests`` returns.""" + return SimpleNamespace(url=url, method="GET", headers={}) + + +def _fake_response(url="https://example.test", elapsed_ms=1): + """Stand-in for the response object ``_walk_pages`` returns.""" + return SimpleNamespace( + url=url, + elapsed=timedelta(milliseconds=elapsed_ms), + headers={}, + ) def test_get_args_basic(): @@ -77,3 +109,418 @@ def test_walk_pages_multiple_mocked(): assert mock_client.send.called assert mock_client.request.called assert mock_client.request.call_args[0][1] == "https://example.com/page2" + + +def test_construct_filter_passthrough(): + """`filter` is forwarded verbatim as a query parameter.""" + expr = ( + "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') " + "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" + ) + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + ) + qs = _query_params(req) + assert qs["filter"] == [expr] + + +def test_construct_filter_lang_hyphenated(): + """The Python kwarg `filter_lang` is sent as URL key `filter-lang`.""" + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter="time >= '2023-01-01T00:00:00Z'", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter-lang"] == ["cql-text"] + assert "filter_lang" not in qs + + +def test_split_top_level_or_simple(): + parts = _split_top_level_or("A OR B OR C") + assert parts == ["A", "B", "C"] + + +def test_split_top_level_or_case_insensitive(): + assert _split_top_level_or("A or B Or C") == ["A", "B", "C"] + + +def test_split_top_level_or_respects_parens(): + assert _split_top_level_or("(A OR B) OR (C OR D)") == ["(A OR B)", "(C OR D)"] + + +def test_split_top_level_or_respects_quotes(): + expr = "name = 'foo OR bar' OR id = 1" + assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] + + +def test_split_top_level_or_handles_doubled_quote_escape(): + """CQL text escapes a single quote inside a literal as ``''``. The + two quotes are adjacent, so the scanner's naive toggle-on-quote logic + happens to land back in the correct state with nothing between the + toggles to misclassify. Lock that behavior in so a future refactor + can't regress it.""" + cases = [ + ("name = 'O''Reilly OR Co' OR id = 1", ["name = 'O''Reilly OR Co'", "id = 1"]), + ("name = 'It''s' OR id = 1", ["name = 'It''s'", "id = 1"]), + ( + "name = 'alpha ''or'' beta' OR id = 1", + ["name = 'alpha ''or'' beta'", "id = 1"], + ), + ("'x'' OR ''y' OR id = 1", ["'x'' OR ''y'", "id = 1"]), + ] + for expr, expected in cases: + assert _split_top_level_or(expr) == expected, expr + + +def test_split_top_level_or_single_clause(): + assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ + "time >= '2023-01-01T00:00:00Z'" + ] + + +def test_chunk_cql_or_short_passthrough(): + expr = "time >= '2023-01-01T00:00:00Z'" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +def test_chunk_cql_or_splits_into_multiple(): + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 200) + chunks = _chunk_cql_or(expr, max_len=1000) + # each chunk must be under the budget + assert all(len(c) <= 1000 for c in chunks) + # rejoined chunks must cover every clause + rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) + assert rejoined_clauses == 200 + # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) + assert len(chunks) > 1 + + +def test_chunk_cql_or_unsplittable_returns_input(): + big = "value > 0 AND " + ("A " * 4000) + assert _chunk_cql_or(big, max_len=1000) == [big] + + +def test_chunk_cql_or_single_clause_over_budget_returns_input(): + huge_clause = "(value > " + "9" * 6000 + ")" + expr = f"{huge_clause} OR (value > 0)" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +@pytest.mark.parametrize( + "service", + [ + "daily", + "continuous", + "monitoring-locations", + "time-series-metadata", + "latest-continuous", + "latest-daily", + "field-measurements", + "channel-measurements", + ], +) +def test_construct_filter_on_all_ogc_services(service): + """Filter passthrough works uniformly for every OGC collection endpoint.""" + req = _construct_api_requests( + service=service, + filter="value > 0", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter"] == ["value > 0"] + assert qs["filter-lang"] == ["cql-text"] + + +def test_long_filter_fans_out_into_multiple_requests(): + """An oversized top-level OR filter triggers multiple HTTP requests + whose results are concatenated.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + assert len(expr) > _CQL_FILTER_CHUNK_LEN + + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + def fake_walk_pages(*_args, **_kwargs): + idx = len(sent_filters) + frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Mocking _effective_filter_budget bypasses the URL-length probe, so + # sent_filters contains only real chunk requests. Assert invariants: + # chunking happened, every original clause is preserved exactly once + # in order, each chunk stays under the budget, and the mock's + # one-row-per-chunk responses concatenate to a row per chunk. + expected_parts = _split_top_level_or(expr) + assert len(sent_filters) > 1 + rejoined_parts = [] + for chunk in sent_filters: + rejoined_parts.extend(_split_top_level_or(chunk)) + assert rejoined_parts == expected_parts + assert len(df) == len(sent_filters) + assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) + + +def test_long_filter_deduplicates_cross_chunk_overlap(): + """Features returned by multiple chunks (same feature `id`) are + deduplicated in the concatenated result.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Chunking must have happened (otherwise dedup wouldn't be exercised). + assert call_count["n"] > 1 + # Even though each chunk returned a feature, dedup by id collapses them. + assert len(df) == 1 + + +def test_empty_chunks_do_not_downgrade_geodataframe(): + """A mix of empty and non-empty chunk responses must not downgrade a + GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` + returns ``pd.DataFrame()`` on empty responses, which would otherwise + strip geometry/CRS from the concatenated output.""" + pytest.importorskip("geopandas") + import geopandas as gpd + from shapely.geometry import Point + + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. + if call_count["n"] == 2: + frame = pd.DataFrame() + else: + frame = gpd.GeoDataFrame( + {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, + geometry=[Point(call_count["n"], call_count["n"])], + crs="EPSG:4326", + ) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.utils._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # The empty chunk must not have stripped the GeoDataFrame type. + assert isinstance(df, gpd.GeoDataFrame) + assert "geometry" in df.columns + assert df.crs is not None + + +def test_effective_filter_budget_respects_url_limit(): + """The computed budget, once encoded, fits within the URL byte limit + alongside the other query params.""" + from urllib.parse import quote_plus + + filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "parameter_code": "00060", + "filter": filter_expr, + "filter_lang": "cql-text", + } + raw_budget = _effective_filter_budget(args, filter_expr) + + # Build a chunk exactly at the raw budget (padded with the clause repeated) + # and confirm the full URL it produces stays under the URL byte limit. + padded = (" OR ".join([filter_expr] * 200))[:raw_budget] + req = _construct_api_requests(**{**args, "filter": padded}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT + # And the budget scales inversely with encoding ratio (sanity). + assert raw_budget < _WATERDATA_URL_BYTE_LIMIT + # Quick sanity on the encoding math itself. + assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_uses_max_clause_ratio(): + """Heavy clauses clustered in one part of the filter must not be able + to push any chunk over the URL limit. The budget is computed against + the max per-clause encoding ratio, not the whole-filter average, so + a chunk of only-heaviest-clauses still fits.""" + from urllib.parse import quote_plus + + heavy = ( + "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " + "AND approval_status IN ('Approved','Provisional','Revised'))" + ) + light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + # Heavy ratio < light ratio for these shapes; cluster them at opposite + # ends so the chunker must produce at least one light-only chunk. + clauses = [heavy] * 100 + [light] * 400 + expr = " OR ".join(clauses) + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + budget = _effective_filter_budget(args, expr) + chunks = _chunk_cql_or(expr, max_len=budget) + assert len(chunks) > 1 + + # Every chunk, once built into a full request, fits under the URL byte + # limit — even the all-light chunks that have a higher-than-average ratio. + for chunk in chunks: + req = _construct_api_requests(**{**args, "filter": chunk}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( + f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" + ) + + # Budget should be tight enough that a chunk of only-light clauses + # (the heavier-encoding shape here) still fits. + assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_passes_through_when_no_url_space(): + """If the non-filter URL already exceeds the byte limit, chunking + cannot make the request succeed. The budget helper should signal + pass-through (return a budget larger than the filter) so + ``_chunk_cql_or`` emits one chunk — one 414 from the server is + clearer than a burst of N guaranteed-414 sub-requests.""" + expr = " OR ".join( + ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 + ) + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000), + ): + budget = _effective_filter_budget({"filter": expr}, expr) + # Budget is large enough that _chunk_cql_or returns the expression + # unchanged (passthrough) rather than producing many small chunks. + assert budget > len(expr) + assert _chunk_cql_or(expr, max_len=budget) == [expr] + + +def test_effective_filter_budget_shrinks_with_more_url_params(): + """Adding more scalar query params consumes URL bytes and should + shrink the raw filter budget accordingly. Use a filter large enough + to skip the short-circuit fast path so the probe actually runs.""" + clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + expr = " OR ".join([clause] * 100) + sparse_args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + dense_args = { + **sparse_args, + "parameter_code": "00060", + "statistic_id": "00003", + "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", + } + sparse_budget = _effective_filter_budget(sparse_args, expr) + dense_budget = _effective_filter_budget(dense_args, expr) + assert dense_budget < sparse_budget + + +def test_cql_json_filter_is_not_chunked(): + """Chunking applies only to cql-text; cql-json is passed through unchanged.""" + from dataretrieval.waterdata import get_continuous + + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 300) + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", + return_value=( + pd.DataFrame({"id": ["row-1"], "value": [1]}), + _fake_response(), + ), + ): + get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-json", + ) + + assert sent_filters == [expr]