Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
**05/14/2026:** Fixed two latent bugs in the paginated `waterdata` request loop (`_walk_pages` and `get_stats_data`). Previously, when `requests.Session.request(...)` itself raised mid-pagination (network error, timeout), the except block called `_error_body()` on the *prior page's* response, so the logged "error" described the wrong request and could itself crash on non-JSON bodies. Separately, no status-code check was performed on subsequent paginated responses, so a 5xx body that didn't include `numberReturned` was silently treated as an empty page — pagination quietly stopped and the user got truncated data with no error logged. The loop now status-checks each page like the initial request and reports the actual exception. The "best-effort" behavior (return whatever pages were collected) is preserved.

**05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade.

**05/07/2026:** `waterdata.get_samples()` and `wqp.get_results()` now append a derived `<prefix>DateTime` UTC column for every Date/Time/TimeZone triplet in the response (e.g. `Activity_StartDate` + `Activity_StartTime` + `Activity_StartTimeZone` → `Activity_StartDateTime`). Both the WQX3 (`<X>Date`/`<X>Time`/`<X>TimeZone`) and legacy WQP (`<X>Date`/`<X>Time/Time`/`<X>Time/TimeZoneCode`) shapes are recognized; abbreviations like EST/EDT/CST/PST resolve to a UTC `Timestamp`, unknown codes resolve to `NaT`, and the original triplet columns are preserved. Returned rows are also now sorted by `Activity_StartDateTime` (or the legacy `ActivityStartDateTime`) — the underlying APIs return rows in an unstable order. Mirrors R's `create_dateTime` and end-of-pipeline sort. Closes #266.
Expand Down
35 changes: 24 additions & 11 deletions dataretrieval/waterdata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ def _error_body(resp: requests.Response):
)


def _raise_for_non_200(resp: requests.Response) -> None:
"""Raise ``RuntimeError(_error_body(resp))`` if ``resp`` is not 200.

Routes through ``_error_body`` (USGS-API-aware: handles 429/403
specially, extracts ``code``/``description`` from JSON error bodies)
rather than ``Response.raise_for_status``, which raises
``HTTPError`` with a generic message.
"""
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))


def _construct_api_requests(
service: str,
properties: list[str] | None = None,
Expand Down Expand Up @@ -645,8 +657,7 @@ def _walk_pages(
client = client or requests.Session()
try:
resp = client.send(req)
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))
_raise_for_non_200(resp)

# Store the initial response for metadata
initial_response = resp
Expand All @@ -668,11 +679,11 @@ def _walk_pages(
headers=headers,
data=content if method == "POST" else None,
)
_raise_for_non_200(resp)
dfs.append(_get_resp_data(resp, geopd=geopd))
curr_url = _next_req_url(resp)
except Exception: # noqa: BLE001
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
except Exception as e: # noqa: BLE001
logger.exception("Request incomplete: %s", e)
logger.warning(
"Request failed for URL: %s. Data download interrupted.", curr_url
Comment on lines +685 to 688
)
Expand Down Expand Up @@ -1105,8 +1116,7 @@ def get_stats_data(

try:
resp = client.send(req)
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))
_raise_for_non_200(resp)

# Store the initial response for metadata
initial_response = resp
Expand All @@ -1132,14 +1142,17 @@ def get_stats_data(
params=args,
headers=headers,
)
_raise_for_non_200(resp)
body = resp.json()
all_dfs.append(_handle_stats_nesting(body, geopd=GEOPANDAS))
next_token = body["next"]
except Exception: # noqa: BLE001
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
except Exception as e: # noqa: BLE001
logger.exception("Request incomplete: %s", e)
logger.warning(
"Request failed for URL: %s. Data download interrupted.", resp.url
"Request failed for URL: %s (next_token=%s). "
"Data download interrupted.",
url,
next_token,
)
Comment thread
thodson-usgs marked this conversation as resolved.
next_token = None

Expand Down
169 changes: 169 additions & 0 deletions tests/waterdata_utils_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from unittest import mock

import pandas as pd
import requests

import dataretrieval.waterdata.utils as _utils_module
from dataretrieval.waterdata.utils import (
_arrange_cols,
_error_body,
Expand All @@ -12,6 +14,8 @@
_walk_pages,
)

_LOGGER_NAME = _utils_module.__name__


def test_get_args_basic():
local_vars = {
Expand Down Expand Up @@ -87,6 +91,171 @@ def test_walk_pages_multiple_mocked():
assert mock_client.request.call_args[0][1] == "https://example.com/page2"


def _resp_ok(features):
"""Build a 200-OK mock response carrying the given features list."""
links = [{"rel": "next", "href": "https://example.com/page2"}] if features else []
resp = mock.MagicMock()
resp.json.return_value = {
"numberReturned": len(features),
"features": features,
"links": links,
}
resp.headers = {}
resp.status_code = 200
resp.url = "https://example.com/page1"
return resp


def _error_log_messages(caplog):
"""Pull ERROR-and-above message strings out of caplog. The four
pagination-failure tests below all assert against the same shape."""
Comment on lines +110 to +111
return [r.getMessage() for r in caplog.records if r.levelno >= logging.ERROR]


def _walk_pages_with_failure(failure_resp_or_exc):
"""Run _walk_pages where page 1 succeeds and page 2 fails as given."""
resp1 = _resp_ok([{"id": "1", "properties": {"val": "a"}}])

mock_client = mock.MagicMock(spec=requests.Session)
mock_client.send.return_value = resp1
if isinstance(failure_resp_or_exc, BaseException):
mock_client.request.side_effect = failure_resp_or_exc
else:
mock_client.request.return_value = failure_resp_or_exc

mock_req = mock.MagicMock(spec=requests.PreparedRequest)
mock_req.method = "GET"
mock_req.headers = {}
mock_req.url = "https://example.com/page1"

return _walk_pages(geopd=False, req=mock_req, client=mock_client)


def test_walk_pages_logs_actual_exception_when_request_raises(caplog):
"""Exception from client.request() must be logged with its actual message."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

df, _ = _walk_pages_with_failure(requests.ConnectionError("boom"))

# First page's data is preserved (best-effort behavior).
assert list(df["val"]) == ["a"]
# Logged error mentions the actual ConnectionError, not a stale page body.
messages = _error_log_messages(caplog)
assert any("boom" in m for m in messages), messages


def test_walk_pages_surfaces_5xx_mid_pagination(caplog):
"""A non-200 mid-pagination response must be logged, not silently swallowed."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

page2_503 = mock.MagicMock()
page2_503.status_code = 503
page2_503.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}
page2_503.url = "https://example.com/page2"

df, _ = _walk_pages_with_failure(page2_503)

assert list(df["val"]) == ["a"]
messages = _error_log_messages(caplog)
assert any("503" in m or "ServiceUnavailable" in m for m in messages), messages


def _stats_initial_ok():
"""A 200-OK initial stats response: empty data list, signals one more page."""
resp = mock.MagicMock()
resp.status_code = 200
resp.json.return_value = {
"next": "tok2",
"features": [],
}
resp.headers = {}
resp.url = "https://example.com/stats?service=foo"
return resp


def _run_get_stats_data_with_failure(failure_resp_or_exc, monkeypatch):
"""Exercise get_stats_data where the initial response succeeds and the
paginated follow-up fails as given. Mirrors _walk_pages_with_failure.
`monkeypatch` stubs ``_handle_stats_nesting`` so the synthetic minimal
response body doesn't need to parse — these tests only assert on the
pagination loop's error surfacing."""
from dataretrieval.waterdata.utils import get_stats_data

monkeypatch.setattr(
_utils_module,
"_handle_stats_nesting",
mock.MagicMock(return_value=pd.DataFrame()),
)

mock_client = mock.MagicMock(spec=requests.Session)
mock_client.send.return_value = _stats_initial_ok()
if isinstance(failure_resp_or_exc, BaseException):
mock_client.request.side_effect = failure_resp_or_exc
else:
mock_client.request.return_value = failure_resp_or_exc

return get_stats_data(
args={"monitoring_location_id": "USGS-1"},
service="observationNormals",
expand_percentiles=False,
client=mock_client,
)


def test_get_stats_data_logs_actual_exception_when_request_raises(caplog, monkeypatch):
"""get_stats_data variant of the connection-error scenario."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

_run_get_stats_data_with_failure(
requests.ConnectionError("stats-boom"),
monkeypatch,
)

messages = _error_log_messages(caplog)
assert any("stats-boom" in m for m in messages), messages


def test_get_stats_data_surfaces_5xx_mid_pagination(caplog, monkeypatch):
"""get_stats_data variant of the mid-pagination 5xx scenario."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

page2_503 = mock.MagicMock()
page2_503.status_code = 503
page2_503.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}
page2_503.url = "https://example.com/stats?service=foo&next_token=tok2"

_run_get_stats_data_with_failure(page2_503, monkeypatch)

messages = _error_log_messages(caplog)
assert any("503" in m or "ServiceUnavailable" in m for m in messages), messages


def test_get_stats_data_warning_includes_next_token(caplog, monkeypatch):
"""The pagination-failure warning includes the next_token so operators
can identify which page in the sequence failed. (Addresses Copilot's
PR #273 review note: the base URL alone drops cursor context.)"""
caplog.set_level(logging.WARNING, logger=_LOGGER_NAME)

page2_503 = mock.MagicMock()
page2_503.status_code = 503
page2_503.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}

_run_get_stats_data_with_failure(page2_503, monkeypatch)

warnings_ = [r.getMessage() for r in caplog.records if r.levelno == logging.WARNING]
# The initial response from _stats_initial_ok carries next=tok2.
assert any("tok2" in m for m in warnings_), warnings_


def test_handle_stats_nesting_tolerates_missing_drop_columns():
"""If the upstream stats response shape ever changes such that one of
the columns we try to drop ("type", "properties.data") is absent, the
Expand Down
Loading