Skip to content

Commit cf2ee4a

Browse files
ENH: Add wrapper for new endpoint for fetching aggregated timeseries (#120)
* ESS-2234
1 parent 44dc639 commit cf2ee4a

10 files changed

Lines changed: 424 additions & 9 deletions

File tree

datareservoirio/_utils.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,33 @@ def as_binary_csv(self):
6363
df.to_csv(fp, lineterminator="\n", **kwargs)
6464
csv = fp.getvalue()
6565
return csv
66+
67+
68+
# Translation of user input parameters of the samples/aggregate method for more convenient use (matching pandas)
69+
70+
function_translation = {"std": "Stdev", "mean": "Avg"}
71+
72+
period_translation = {
73+
"hours": "h",
74+
"hour": "h",
75+
"hr": "h",
76+
"h": "h",
77+
"minutes": "m",
78+
"minute": "m",
79+
"min": "m",
80+
"m": "m",
81+
"seconds": "s",
82+
"second": "s",
83+
"sec": "s",
84+
"s": "s",
85+
"milliseconds": "ms",
86+
"millisecond": "ms",
87+
"millis": "ms",
88+
"milli": "ms",
89+
"ms": "ms",
90+
"microseconds": "microsecond",
91+
"microsecond": "microsecond",
92+
"micros": "microsecond",
93+
"micro": "microsecond",
94+
"tick": "tick",
95+
}

datareservoirio/client.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import warnings
44
from collections import defaultdict
55
from concurrent.futures import ThreadPoolExecutor
6+
from datetime import datetime
67
from functools import wraps
78
from operator import itemgetter
9+
from urllib.parse import urlencode
810
from uuid import uuid4
911

1012
import pandas as pd
@@ -19,6 +21,7 @@
1921
)
2022

2123
from ._logging import log_decorator
24+
from ._utils import function_translation, period_translation
2225
from .globalsettings import environment
2326
from .storage import Storage
2427

@@ -285,7 +288,7 @@ def delete(self, series_id):
285288
)
286289

287290
def _timer(func):
288-
"""Decorator used to log latency of the ``get`` method"""
291+
"""Decorator used to log latency of the ``get`` and ``get_samples_aggregate`` method"""
289292

290293
@wraps(func)
291294
def wrapper(self, series_id, start=None, end=None, **kwargs):
@@ -411,6 +414,148 @@ def get(
411414

412415
return series
413416

417+
@log_decorator("exception")
418+
@_timer
419+
@log_decorator("warning")
420+
def get_samples_aggregate(
421+
self,
422+
series_id,
423+
start=None,
424+
end=None,
425+
aggregation_period=None,
426+
aggregation_function=None,
427+
max_page_size=None,
428+
):
429+
"""
430+
Retrieve a series from DataReservoir.io using the samples/aggregate endpoint.
431+
432+
Parameters
433+
----------
434+
series_id : str
435+
Identifier of the series to download
436+
start: required
437+
Start time (inclusive) of the aggregated series given as anything
438+
pandas.to_datetime is able to parse. Date must be within the past 90 days.
439+
end:
440+
Stop time (exclusive) of the aggregated series given as anything
441+
pandas.to_datetime is able to parse. Date must be within the past 90 days.
442+
aggregation_function : str
443+
One of "Avg", "Min", "Max", "Stdev".
444+
aggregation_period : str
445+
Used in combination with aggregation function to specify the period for aggregation.
446+
Aggregation period is maximum 24 hours. Values can be in units of h, m, s, ms,
447+
microsecond or tick. Use 100 ms instead of 0.1s for 10Hz.
448+
max_page_size : optional
449+
Maximum number of samples to return per page. The method automatically follows links
450+
to next pages and returns the entire series. For advanced usage.
451+
Returns
452+
-------
453+
pandas.Series
454+
Series data
455+
"""
456+
if not start:
457+
# Required parameter
458+
raise ValueError(
459+
"You must specify the start date in ISO 8601 format, for example 2023-12-01"
460+
)
461+
462+
if not end:
463+
# Required parameter
464+
raise ValueError(
465+
"You must specify the end date in ISO 8601 format, for example 2023-12-31."
466+
)
467+
468+
if not aggregation_period:
469+
# Required parameter
470+
raise ValueError(
471+
"Aggregation period must be specified using integers and one of these units: h, m, s, ms, microsecond or tick, or their Pandas equivalents"
472+
)
473+
474+
if not aggregation_function:
475+
# Required parameter
476+
raise ValueError(
477+
"Aggregation function must be one of: Avg (mean), Min, Max, Stdev (std)"
478+
)
479+
480+
# Translating some pandas terms to API terms
481+
# Note the API is case insensitive so both min and Min will work
482+
if aggregation_function in function_translation:
483+
aggregation_function = function_translation[aggregation_function]
484+
485+
for period_unit in period_translation:
486+
if (
487+
aggregation_period.endswith(period_unit)
488+
and aggregation_period[-len(period_unit) - 1].isnumeric()
489+
):
490+
aggregation_period = (
491+
aggregation_period[: -len(period_unit)]
492+
+ period_translation[period_unit]
493+
)
494+
break
495+
496+
start = pd.to_datetime(start, dayfirst=True, unit="ns", utc=True).isoformat()
497+
end = pd.to_datetime(end, dayfirst=True, unit="ns", utc=True).isoformat()
498+
499+
params = {}
500+
501+
if max_page_size:
502+
params["maxPageSize"] = max_page_size
503+
504+
params["aggregationPeriod"] = aggregation_period
505+
params["aggregationFunction"] = aggregation_function
506+
params["start"] = start
507+
params["end"] = end
508+
509+
next_page_link = f"{environment.api_base_url}reservoir/timeseries/{series_id}/samples/aggregate?{urlencode(params)}"
510+
511+
df = (
512+
pd.DataFrame(columns=("index", "values"))
513+
.astype({"index": "int64"})
514+
.astype({"values": "float64"}, errors="ignore")
515+
)
516+
517+
@retry(
518+
stop=stop_after_attempt(
519+
4
520+
), # Attempt!, not retry attempt. Attempt 2, is 1 retry
521+
retry=retry_if_exception_type(
522+
(
523+
ConnectionError,
524+
requests.exceptions.ChunkedEncodingError,
525+
requests.ReadTimeout,
526+
ConnectionRefusedError,
527+
requests.ConnectionError,
528+
)
529+
),
530+
wait=wait_chain(*[wait_fixed(0.1), wait_fixed(0.5), wait_fixed(30)]),
531+
)
532+
def get_samples_aggregate_page(url):
533+
return self._auth_session.get(
534+
url,
535+
timeout=_TIMEOUT_DEAULT,
536+
)
537+
538+
while next_page_link:
539+
response = get_samples_aggregate_page(next_page_link)
540+
response.raise_for_status()
541+
response_json = response.json()
542+
next_page_link = response_json.get("@odata.nextLink", None)
543+
544+
content = [
545+
(pd.to_datetime(sample["Timestamp"], utc=True), sample["Value"])
546+
for sample in response_json["value"]
547+
]
548+
549+
new_df = pd.DataFrame(
550+
content, columns=("index", "values"), copy=False
551+
).astype({"values": "float64"}, errors="ignore")
552+
553+
df = pd.concat([df, new_df])
554+
555+
series = df.set_index("index").squeeze("columns").copy(deep=True)
556+
557+
return series
558+
414559
def set_metadata(
415560
self,
416561
series_id,

docs/user_guide/advanced_config.rst

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,11 @@ Instrumentation
141141

142142
For monitoring purposes, the external logger can be enabled to report errors and performance metrics to 4insight Team.
143143

144-
To enable logging, environmental variable ``DRIO_PYTHON_APPINSIGHTS`` needs to be set to ``true``.
144+
To enable logging, environmental variable ``DRIO_PYTHON_APPINSIGHTS`` needs to be set to ``true``.
145+
146+
Using the :py:mod:`max_page_size` parameter in :py:mod:`get_samples_aggregate` method
147+
-------------------------------------------------------------------------------------
148+
149+
The :py:meth:`Client.get_samples_aggregate` method uses an endpoint that has support for paging of responses. This means that instead of making one big request, it might make a series of smaller requests traversing links to next pages returned in each partial response.
150+
151+
Normally this is something you don't have to think about. In case you do want to change the maximum number of results returned in one page, you can use the parameter called ``max_page_size`` to alter this number.

docs/user_guide/manage_series.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,26 @@ You can access any data you have ``TimeSeriesId`` (and authorization) for:
124124
:py:meth:`Client.get` returns :py:class:`pandas.Series`.
125125

126126

127+
Access existing data with aggregation
128+
-------------------------------------
129+
130+
You can also access any data you have ``TimeSeriesId`` (and authorization) for with applied aggregation using:
131+
132+
.. code-block:: python
133+
134+
# Get entire timeseries
135+
timeseries = client.get_samples_aggregate(series_id, start='2018-01-01',
136+
end='2018-01-02', aggregation_period='15m',
137+
aggregation_function='Avg')
138+
139+
.. note::
140+
141+
:py:meth:`Client.get_samples_aggregate` also returns :py:class:`pandas.Series`. The :py:mod:`start`, :py:mod:`end`, :py:mod:`aggregation_period` and :py:mod:`aggregation_function` parameters are required.
142+
143+
.. important::
144+
145+
Retrieving aggregated data is available only for the last 90 days.
146+
127147
Delete data
128148
-----------
129149

@@ -140,3 +160,5 @@ is removed from the `DataReservoir.io`_ inventory:
140160
.. _Pandas: https://pandas.pydata.org/
141161

142162

163+
164+
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import os
2+
import time
3+
from datetime import datetime, timedelta
4+
5+
import numpy as np
6+
import pandas as pd
7+
8+
import datareservoirio as drio
9+
10+
11+
def wait_for_data_to_appear(func, delay=3):
12+
count = 0
13+
while count == 0:
14+
series = func()
15+
count = len(series)
16+
if count == 0:
17+
time.sleep(delay)
18+
else:
19+
return series
20+
21+
22+
def test_non_paged(cleanup_series):
23+
"""
24+
Integration test for the samples/aggregate endpoint.
25+
26+
Tests the following:
27+
* Creates a non-empty timeseries in DataReservoir.io.
28+
* Fetches it using the samples/aggregate endpoint.
29+
* Checks that the data matches with what was created.
30+
"""
31+
32+
# Initialize client
33+
auth_session = drio.authenticate.ClientAuthenticator(
34+
os.getenv("DRIO_CLIENT_ID"), os.getenv("DRIO_CLIENT_SECRET")
35+
)
36+
client = drio.Client(auth_session, cache=False)
37+
38+
# Create some dummy data
39+
# Relative from today, because we have the 3 month limitation
40+
end = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
41+
start = end - timedelta(hours=1)
42+
freq = pd.to_timedelta(1, "s")
43+
index = pd.date_range(
44+
start, end, freq=freq, tz="utc", inclusive="left", name="index"
45+
)
46+
series = pd.Series(data=np.random.random(len(index)), index=index, name="values")
47+
response_create = client.create(series=series, wait_on_verification=True)
48+
series_id = response_create["TimeSeriesId"]
49+
50+
# For cleaning up after test runs
51+
cleanup_series.add(series_id)
52+
53+
# Get data from DataReservoir.io using the samples/aggregate endpoint
54+
get_timeseries = lambda: client.get_samples_aggregate(
55+
series_id,
56+
start=start,
57+
end=end,
58+
aggregation_function="Avg",
59+
aggregation_period="1s",
60+
)
61+
62+
# Wait for data to be available
63+
series_fetched = wait_for_data_to_appear(get_timeseries)
64+
65+
# Check downloaded data
66+
pd.testing.assert_series_equal(series, series_fetched, check_freq=False)
67+
68+
69+
def test_paged(cleanup_series):
70+
"""
71+
Integration test for the samples/aggregate endpoint.
72+
73+
Tests the following:
74+
* Creates a non-empty timeseries in DataReservoir.io.
75+
* Fetches it using the samples/aggregate endpoint and checks that the data is matches with what was created.
76+
* uses a maxPageSize of 1000 which means the response will be paged (into 4 pages)
77+
"""
78+
79+
# Initialize client
80+
auth_session = drio.authenticate.ClientAuthenticator(
81+
os.getenv("DRIO_CLIENT_ID"), os.getenv("DRIO_CLIENT_SECRET")
82+
)
83+
client = drio.Client(auth_session, cache=False)
84+
85+
# Create some dummy data
86+
# Relative from today, because we have the 3 month limitation
87+
end = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
88+
start = end - timedelta(hours=1)
89+
freq = pd.to_timedelta(1, "s")
90+
91+
index = pd.date_range(
92+
start, end, freq=freq, tz="utc", inclusive="left", name="index"
93+
)
94+
series = pd.Series(data=np.random.random(len(index)), index=index, name="values")
95+
response_create = client.create(series=series, wait_on_verification=True)
96+
series_id = response_create["TimeSeriesId"]
97+
98+
# For cleaning up after test runs
99+
cleanup_series.add(series_id)
100+
101+
# Get data from DataReservoir.io using the samples/aggregate endpoint
102+
check_func = lambda: client.get_samples_aggregate(
103+
series_id,
104+
start=start,
105+
end=end,
106+
aggregation_function="Avg",
107+
aggregation_period="1s",
108+
max_page_size=1000,
109+
)
110+
111+
# Wait for data to be available
112+
series_fetched = wait_for_data_to_appear(check_func)
113+
114+
# Check downloaded data
115+
pd.testing.assert_series_equal(series, series_fetched, check_freq=False)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ version = {attr = "datareservoirio.__version__"}
4848

4949
[tool.pytest.ini_options]
5050
pythonpath = [".", "src"]
51+
markers = ["response_irrelevant"]
5152

5253
[tool.tox]
5354
legacy_tox_ini = """

0 commit comments

Comments
 (0)