Skip to content

Commit 32f6528

Browse files
ESS-2234-2: Improvements to fetching aggregate data (#122)
* Added option to use period without number * Adding progress bar * Adding tqdm to pyproject.toml * Increased the default max_page_size * Reformatting, every time * Fixed docs
1 parent 4261440 commit 32f6528

5 files changed

Lines changed: 37 additions & 16 deletions

File tree

datareservoirio/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def as_binary_csv(self):
6767

6868
# Translation of user input parameters of the samples/aggregate method for more convenient use (matching pandas)
6969

70-
function_translation = {"std": "Stdev", "mean": "Avg"}
70+
function_translation = {"std": "Stdev", "mean": "Avg", "min": "Min", "max": "Max"}
7171

7272
period_translation = {
7373
"hours": "h",

datareservoirio/client.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
wait_chain,
2020
wait_fixed,
2121
)
22+
from tqdm.auto import tqdm
2223

2324
from ._logging import log_decorator
2425
from ._utils import function_translation, period_translation
@@ -39,6 +40,8 @@
3940

4041
_TIMEOUT_DEAULT = 120
4142

43+
_DEFAULT_MAX_PAGE_SIZE = 30000
44+
4245

4346
class Client:
4447
"""
@@ -424,7 +427,7 @@ def get_samples_aggregate(
424427
end=None,
425428
aggregation_period=None,
426429
aggregation_function=None,
427-
max_page_size=None,
430+
max_page_size=_DEFAULT_MAX_PAGE_SIZE,
428431
):
429432
"""
430433
Retrieve a series from DataReservoir.io using the samples/aggregate endpoint.
@@ -440,7 +443,7 @@ def get_samples_aggregate(
440443
Stop time (exclusive) of the aggregated series given as anything
441444
pandas.to_datetime is able to parse. Date must be within the past 90 days.
442445
aggregation_function : str
443-
One of "Avg", "Min", "Max", "Stdev".
446+
One of "mean", "min", "max", "std".
444447
aggregation_period : str
445448
Used in combination with aggregation function to specify the period for aggregation.
446449
Aggregation period is maximum 24 hours. Values can be in units of h, m, s, ms,
@@ -482,6 +485,9 @@ def get_samples_aggregate(
482485
if aggregation_function in function_translation:
483486
aggregation_function = function_translation[aggregation_function]
484487

488+
if not aggregation_period[0].isnumeric():
489+
aggregation_period = "1" + aggregation_period
490+
485491
for period_unit in period_translation:
486492
if (
487493
aggregation_period.endswith(period_unit)
@@ -493,18 +499,19 @@ def get_samples_aggregate(
493499
)
494500
break
495501

496-
start = pd.to_datetime(start, dayfirst=True, unit="ns", utc=True).isoformat()
497-
end = pd.to_datetime(end, dayfirst=True, unit="ns", utc=True).isoformat()
502+
start = pd.to_datetime(start, dayfirst=True, unit="ns", utc=True)
503+
end = pd.to_datetime(end, dayfirst=True, unit="ns", utc=True)
498504

499-
params = {}
505+
if start.value >= end.value:
506+
raise ValueError("Start must be before end.")
500507

501-
if max_page_size:
502-
params["maxPageSize"] = max_page_size
508+
params = {}
503509

510+
params["maxPageSize"] = max_page_size
504511
params["aggregationPeriod"] = aggregation_period
505512
params["aggregationFunction"] = aggregation_function
506-
params["start"] = start
507-
params["end"] = end
513+
params["start"] = start.isoformat()
514+
params["end"] = end.isoformat()
508515

509516
next_page_link = f"{environment.api_base_url}reservoir/timeseries/{series_id}/samples/aggregate?{urlencode(params)}"
510517

@@ -535,23 +542,32 @@ def get_samples_aggregate_page(url):
535542
timeout=_TIMEOUT_DEAULT,
536543
)
537544

545+
progress_bar = tqdm(unit=" pages", desc="Downloading aggregate data")
538546
while next_page_link:
539547
response = get_samples_aggregate_page(next_page_link)
540548
response.raise_for_status()
541549
response_json = response.json()
542550
next_page_link = response_json.get("@odata.nextLink", None)
543551

544552
content = [
545-
(pd.to_datetime(sample["Timestamp"], utc=True), sample["Value"])
553+
(
554+
pd.to_datetime(sample["Timestamp"], unit="ns", utc=True),
555+
sample["Value"],
556+
)
546557
for sample in response_json["value"]
547558
]
548559

560+
# update the progress bar
561+
if content:
562+
progress_bar.update(1)
563+
549564
new_df = pd.DataFrame(
550565
content, columns=("index", "values"), copy=False
551566
).astype({"values": "float64"}, errors="ignore")
552567

553568
df = pd.concat([df, new_df])
554569

570+
progress_bar.close()
555571
series = df.set_index("index").squeeze("columns").copy(deep=True)
556572

557573
return series

docs/user_guide/manage_series.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ You can also access any data you have ``TimeSeriesId`` (and authorization) for w
132132
.. code-block:: python
133133
134134
# Get entire timeseries
135-
timeseries = client.get_samples_aggregate(series_id, start='2018-01-01',
136-
end='2018-01-02', aggregation_period='15m',
137-
aggregation_function='Avg')
135+
timeseries = client.get_samples_aggregate(series_id, start='2024-01-01',
136+
end='2024-01-02', aggregation_period='15m',
137+
aggregation_function='mean')
138138
139139
.. note::
140140

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ dependencies = [
3030
"importlib_resources",
3131
"opencensus-ext-azure",
3232
"tenacity",
33-
"urllib3 > 2"
33+
"urllib3 > 2",
34+
"tqdm"
3435
]
3536

3637
[project.urls]

tests/test_client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,8 @@ def test_tries_error_does_not_throw_retry(self, client_with_invalid_json_error):
890890

891891
@pytest.mark.response_irrelevant
892892
@pytest.mark.parametrize(
893-
"aggregation_function, expected", [("mean", "Avg"), ("std", "Stdev")]
893+
"aggregation_function, expected",
894+
[("mean", "Avg"), ("std", "Stdev"), ("min", "Min"), ("max", "Max")],
894895
)
895896
def test_aggregation_function_gets_translated(
896897
self, client, mock_requests, aggregation_function, expected, response_cases
@@ -912,6 +913,9 @@ def test_aggregation_function_gets_translated(
912913
@pytest.mark.parametrize(
913914
"aggregation_period, expected",
914915
[
916+
("min", "1m"),
917+
("tick", "1tick"),
918+
("s", "1s"),
915919
("15minutes", "15m"),
916920
("15minute", "15m"),
917921
("15min", "15m"),

0 commit comments

Comments
 (0)