Skip to content

Commit 057eba6

Browse files
committed
Implemented Impressions strategy classes for Impression Manager
1 parent c5c84e4 commit 057eba6

2 files changed

Lines changed: 183 additions & 39 deletions

File tree

splitio/engine/impressions.py

Lines changed: 140 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Split evaluator module."""
2+
import abc
23
import threading
34
from collections import defaultdict, namedtuple
45
from enum import Enum
@@ -9,6 +10,8 @@
910
from splitio.client.listener import ImpressionListenerException
1011
from splitio import util
1112

13+
import logging
14+
_LOGGER = logging.getLogger(__name__)
1215

1316
_TIME_INTERVAL_MS = 3600 * 1000 # one hour
1417
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
@@ -150,6 +153,130 @@ def pop_all(self):
150153
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
151154
for (k, v) in old.items()]
152155

156+
class BaseStrategy(object, metaclass=abc.ABCMeta):
157+
"""Strategy interface."""
158+
159+
@abc.abstractmethod
160+
def process_impressions(self):
161+
"""
162+
Return a list(impressions) object
163+
164+
"""
165+
pass
166+
167+
@abc.abstractmethod
168+
def truncate_impressions_time(self):
169+
"""
170+
Return list(impressions) object
171+
172+
"""
173+
pass
174+
175+
def get_counts(self):
176+
"""
177+
Return A list of counter objects.
178+
179+
"""
180+
pass
181+
182+
class StrategyOptimizedMode(BaseStrategy):
183+
"""Optimized mode strategy."""
184+
185+
def __init__(self, standalone=True):
186+
"""
187+
Construct a strategy instance for optimized mode.
188+
189+
"""
190+
self._standalone = standalone
191+
self._counter = Counter() if self._standalone else None
192+
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if self._standalone else None
193+
194+
def process_impressions(self, impressions):
195+
"""
196+
Process impressions.
197+
198+
Impressions are analyzed to see if they've been seen before and counted.
199+
200+
:param impressions: List of impression objects with attributes
201+
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
202+
203+
:returns: Observed list of impressions
204+
:rtype: list[tuple[splitio.models.impression.Impression, dict]]
205+
"""
206+
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
207+
if self._observer else impressions
208+
if self._counter is not None:
209+
self._counter.track([imp for imp, _ in imps])
210+
return imps
211+
212+
def truncate_impressions_time(self, imps):
213+
"""
214+
Process impressions.
215+
216+
Impressions are truncated based on time
217+
218+
:param impressions: List of impression objects with attributes
219+
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
220+
221+
:returns: truncated list of impressions
222+
:rtype: list[splitio.models.impression.Impression]
223+
"""
224+
this_hour = truncate_time(util.utctime_ms())
225+
return [imp for imp, _ in imps] if self._counter is None \
226+
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
227+
228+
def get_counts(self):
229+
"""
230+
Return counts of impressions per features.
231+
232+
:returns: A list of counter objects.
233+
:rtype: list[Counter.CountPerFeature]
234+
"""
235+
return self._counter.pop_all() if self._counter is not None else []
236+
237+
class StrategyDebugMode(BaseStrategy):
238+
"""Debug mode strategy."""
239+
240+
def __init__(self, standalone=True):
241+
"""
242+
Construct a strategy instance for debug mode.
243+
244+
"""
245+
self._standalone = standalone
246+
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if self._standalone else None
247+
248+
def process_impressions(self, impressions):
249+
"""
250+
Process impressions.
251+
252+
Impressions are analyzed to see if they've been seen before.
253+
254+
:param impressions: List of impression objects with attributes
255+
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
256+
257+
:returns: Observed list of impressions
258+
:rtype: list[tuple[splitio.models.impression.Impression, dict]]
259+
"""
260+
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer is not None else impressions
261+
return imps
262+
263+
def truncate_impressions_time(self, imps):
264+
"""
265+
No counter implemented, return same impresisons passed.
266+
267+
:returns: list of impressions
268+
:rtype: list[splitio.models.impression.Impression]
269+
"""
270+
return [imp for imp, _ in imps]
271+
272+
def get_counts(self):
273+
"""
274+
No counter implemented, return empty array
275+
276+
:returns: empty list
277+
:rtype: list[]
278+
"""
279+
return []
153280

154281
class Manager(object): # pylint:disable=too-few-public-methods
155282
"""Impression manager."""
@@ -167,10 +294,18 @@ def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=Non
167294
:param listener: Optional impressions listener that will capture all seen impressions.
168295
:type listener: splitio.client.listener.ImpressionListenerWrapper
169296
"""
170-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
171-
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
297+
self._strategy = self.get_strategy(mode, standalone)
172298
self._listener = listener
173299

300+
def get_strategy(self, mode, standalone):
301+
"""
302+
Return a strategy object based on mode value.
303+
304+
:returns: A strategy object
305+
:rtype: (BaseStrategy)
306+
"""
307+
return StrategyOptimizedMode(standalone) if mode == ImpressionsMode.OPTIMIZED else StrategyDebugMode(standalone)
308+
174309
def process_impressions(self, impressions):
175310
"""
176311
Process impressions.
@@ -180,17 +315,9 @@ def process_impressions(self, impressions):
180315
:param impressions: List of impression objects with attributes
181316
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
182317
"""
183-
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
184-
if self._observer else impressions
185-
186-
if self._counter:
187-
self._counter.track([imp for imp, _ in imps])
188-
318+
imps = self._strategy.process_impressions(impressions)
189319
self._send_impressions_to_listener(imps)
190-
191-
this_hour = truncate_time(util.utctime_ms())
192-
return [imp for imp, _ in imps] if self._counter is None \
193-
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
320+
return self._strategy.truncate_impressions_time(imps)
194321

195322
def get_counts(self):
196323
"""
@@ -199,7 +326,7 @@ def get_counts(self):
199326
:returns: A list of counter objects.
200327
:rtype: list[Counter.CountPerFeature]
201328
"""
202-
return self._counter.pop_all() if self._counter is not None else []
329+
return self._strategy.get_counts()
203330

204331
def _send_impressions_to_listener(self, impressions):
205332
"""

tests/engine/test_impressions.py

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"""Impression manager, observer & hasher tests."""
22
from datetime import datetime
33
from splitio.engine.impressions import Hasher, Observer, Counter, Manager, \
4-
ImpressionsMode, truncate_time
4+
ImpressionsMode, truncate_time, StrategyDebugMode, StrategyOptimizedMode
55
from splitio.models.impressions import Impression
66
from splitio.client.listener import ImpressionListenerWrapper
77

8-
98
def utctime_ms_reimplement():
109
"""Re-implementation of utctime_ms to avoid conflicts with mock/patching."""
1110
return int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() * 1000)
@@ -99,18 +98,25 @@ def test_standalone_optimized(self, mocker):
9998
mocker.patch('splitio.util.utctime_ms', new=utc_time_mock)
10099

101100
manager = Manager() # no listener
102-
assert manager._counter is not None
103-
assert manager._observer is not None
101+
assert manager._strategy._counter is not None
102+
assert manager._strategy._observer is not None
104103
assert manager._listener is None
104+
assert isinstance(manager._strategy, StrategyOptimizedMode)
105105

106106
# An impression that hasn't happened in the last hour (pt = None) should be tracked
107107
imps = manager.process_impressions([
108108
(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3), None),
109109
(Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3), None)
110110
])
111+
111112
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3),
112113
Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]
113114

115+
assert [Counter.CountPerFeature(k.feature, k.timeframe, v)
116+
for (k, v) in manager._strategy._counter._data.items()] == [
117+
Counter.CountPerFeature('f1', truncate_time(utc_now-3), 1),
118+
Counter.CountPerFeature('f2', truncate_time(utc_now-3), 1)]
119+
114120
# Tracking the same impression a ms later should be empty
115121
imps = manager.process_impressions([
116122
(Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-2), None)
@@ -136,10 +142,10 @@ def test_standalone_optimized(self, mocker):
136142
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3),
137143
Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)]
138144

139-
assert len(manager._observer._cache._data) == 3 # distinct impressions seen
140-
assert len(manager._counter._data) == 3 # 2 distinct features. 1 seen in 2 different timeframes
145+
assert len(manager._strategy._observer._cache._data) == 3 # distinct impressions seen
146+
assert len(manager._strategy._counter._data) == 3 # 2 distinct features. 1 seen in 2 different timeframes
141147

142-
assert set(manager._counter.pop_all()) == set([
148+
assert set(manager._strategy._counter.pop_all()) == set([
143149
Counter.CountPerFeature('f1', truncate_time(old_utc), 3),
144150
Counter.CountPerFeature('f2', truncate_time(old_utc), 1),
145151
Counter.CountPerFeature('f1', truncate_time(utc_now), 2)
@@ -155,9 +161,10 @@ def test_standalone_debug(self, mocker):
155161
mocker.patch('splitio.util.utctime_ms', new=utc_time_mock)
156162

157163
manager = Manager(ImpressionsMode.DEBUG) # no listener
158-
assert manager._counter is None
159-
assert manager._observer is not None
164+
assert manager._strategy.get_counts() == []
165+
assert manager._strategy._observer is not None
160166
assert manager._listener is None
167+
assert isinstance(manager._strategy, StrategyDebugMode)
161168

162169
# An impression that hasn't happened in the last hour (pt = None) should be tracked
163170
imps = manager.process_impressions([
@@ -192,7 +199,7 @@ def test_standalone_debug(self, mocker):
192199
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3),
193200
Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)]
194201

195-
assert len(manager._observer._cache._data) == 3 # distinct impressions seen
202+
assert len(manager._strategy._observer._cache._data) == 3 # distinct impressions seen
196203

197204
def test_non_standalone_optimized(self, mocker):
198205
"""Test impressions manager in optimized mode with sdk in standalone mode."""
@@ -204,9 +211,10 @@ def test_non_standalone_optimized(self, mocker):
204211
mocker.patch('splitio.util.utctime_ms', new=utc_time_mock)
205212

206213
manager = Manager(ImpressionsMode.OPTIMIZED, False) # no listener
207-
assert manager._counter is None
208-
assert manager._observer is None
214+
assert manager._strategy._counter is None
215+
assert manager._strategy._observer is None
209216
assert manager._listener is None
217+
assert isinstance(manager._strategy, StrategyOptimizedMode)
210218

211219
# An impression that hasn't happened in the last hour (pt = None) should be tracked
212220
imps = manager.process_impressions([
@@ -250,9 +258,10 @@ def test_non_standalone_debug(self, mocker):
250258
mocker.patch('splitio.util.utctime_ms', new=utc_time_mock)
251259

252260
manager = Manager(ImpressionsMode.DEBUG, False) # no listener
253-
assert manager._counter is None
254-
assert manager._observer is None
261+
assert manager._strategy.get_counts() == []
262+
assert manager._strategy._observer is None
255263
assert manager._listener is None
264+
assert isinstance(manager._strategy, StrategyDebugMode)
256265

257266
# An impression that hasn't happened in the last hour (pt = None) should be tracked
258267
imps = manager.process_impressions([
@@ -298,9 +307,10 @@ def test_standalone_optimized_listener(self, mocker):
298307
listener = mocker.Mock(spec=ImpressionListenerWrapper)
299308

300309
manager = Manager(listener=listener) # no listener
301-
assert manager._counter is not None
302-
assert manager._observer is not None
310+
assert manager._strategy._counter is not None
311+
assert manager._strategy._observer is not None
303312
assert manager._listener is not None
313+
assert isinstance(manager._strategy, StrategyOptimizedMode)
304314

305315
# An impression that hasn't happened in the last hour (pt = None) should be tracked
306316
imps = manager.process_impressions([
@@ -309,6 +319,10 @@ def test_standalone_optimized_listener(self, mocker):
309319
])
310320
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-3),
311321
Impression('k1', 'f2', 'on', 'l1', 123, None, utc_now-3)]
322+
assert [Counter.CountPerFeature(k.feature, k.timeframe, v)
323+
for (k, v) in manager._strategy._counter._data.items()] == [
324+
Counter.CountPerFeature('f1', truncate_time(utc_now-3), 1),
325+
Counter.CountPerFeature('f2', truncate_time(utc_now-3), 1)]
312326

313327
# Tracking the same impression a ms later should return empty
314328
imps = manager.process_impressions([
@@ -335,10 +349,10 @@ def test_standalone_optimized_listener(self, mocker):
335349
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3),
336350
Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)]
337351

338-
assert len(manager._observer._cache._data) == 3 # distinct impressions seen
339-
assert len(manager._counter._data) == 3 # 2 distinct features. 1 seen in 2 different timeframes
352+
assert len(manager._strategy._observer._cache._data) == 3 # distinct impressions seen
353+
assert len(manager._strategy._counter._data) == 3 # 2 distinct features. 1 seen in 2 different timeframes
340354

341-
assert set(manager._counter.pop_all()) == set([
355+
assert set(manager._strategy._counter.pop_all()) == set([
342356
Counter.CountPerFeature('f1', truncate_time(old_utc), 3),
343357
Counter.CountPerFeature('f2', truncate_time(old_utc), 1),
344358
Counter.CountPerFeature('f1', truncate_time(utc_now), 2)
@@ -365,9 +379,10 @@ def test_standalone_debug_listener(self, mocker):
365379
imps = []
366380
listener = mocker.Mock(spec=ImpressionListenerWrapper)
367381
manager = Manager(ImpressionsMode.DEBUG, listener=listener)
368-
assert manager._counter is None
369-
assert manager._observer is not None
382+
assert manager._strategy.get_counts() == []
383+
assert manager._strategy._observer is not None
370384
assert manager._listener is not None
385+
assert isinstance(manager._strategy, StrategyDebugMode)
371386

372387
# An impression that hasn't happened in the last hour (pt = None) should be tracked
373388
imps = manager.process_impressions([
@@ -402,7 +417,7 @@ def test_standalone_debug_listener(self, mocker):
402417
assert imps == [Impression('k1', 'f1', 'on', 'l1', 123, None, utc_now-1, old_utc-3),
403418
Impression('k2', 'f1', 'on', 'l1', 123, None, utc_now-2, old_utc-1)]
404419

405-
assert len(manager._observer._cache._data) == 3 # distinct impressions seen
420+
assert len(manager._strategy._observer._cache._data) == 3 # distinct impressions seen
406421

407422
assert listener.log_impression.mock_calls == [
408423
mocker.call(Impression('k1', 'f1', 'on', 'l1', 123, None, old_utc-3), None),
@@ -425,9 +440,10 @@ def test_non_standalone_optimized_listener(self, mocker):
425440
imps = []
426441
listener = mocker.Mock(spec=ImpressionListenerWrapper)
427442
manager = Manager(ImpressionsMode.OPTIMIZED, False, listener) # no listener
428-
assert manager._counter is None
429-
assert manager._observer is None
443+
assert manager._strategy._counter is None
444+
assert manager._strategy._observer is None
430445
assert manager._listener is not None
446+
assert isinstance(manager._strategy, StrategyOptimizedMode)
431447

432448
# An impression that hasn't happened in the last hour (pt = None) should be tracked
433449
imps = manager.process_impressions([
@@ -482,9 +498,10 @@ def test_non_standalone_debug_listener(self, mocker):
482498

483499
listener = mocker.Mock(spec=ImpressionListenerWrapper)
484500
manager = Manager(ImpressionsMode.DEBUG, False, listener) # no listener
485-
assert manager._counter is None
486-
assert manager._observer is None
501+
assert manager._strategy.get_counts() == []
502+
assert manager._strategy._observer is None
487503
assert manager._listener is not None
504+
assert isinstance(manager._strategy, StrategyDebugMode)
488505

489506
# An impression that hasn't happened in the last hour (pt = None) should be tracked
490507
imps = manager.process_impressions([

0 commit comments

Comments
 (0)