|
1 | 1 | """Split evaluator module.""" |
2 | | -import abc |
3 | | -import threading |
4 | | -from collections import defaultdict, namedtuple |
5 | 2 | from enum import Enum |
6 | 3 |
|
7 | | -from splitio.models.impressions import Impression |
8 | | -from splitio.engine.hashfns import murmur_128 |
9 | | -from splitio.engine.cache.lru import SimpleLruCache |
10 | 4 | from splitio.client.listener import ImpressionListenerException |
11 | | -from splitio import util |
| 5 | +from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode |
| 6 | +from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode |
12 | 7 |
|
13 | 8 | import logging |
14 | 9 | _LOGGER = logging.getLogger(__name__) |
15 | 10 |
|
16 | | -_TIME_INTERVAL_MS = 3600 * 1000 # one hour |
17 | | -_IMPRESSION_OBSERVER_CACHE_SIZE = 500000 |
18 | | - |
19 | | - |
20 | 11 | class ImpressionsMode(Enum): |
21 | 12 | """Impressions tracking mode.""" |
22 | 13 |
|
23 | 14 | OPTIMIZED = "OPTIMIZED" |
24 | 15 | DEBUG = "DEBUG" |
25 | 16 |
|
26 | | - |
27 | | -def truncate_time(timestamp_ms): |
28 | | - """ |
29 | | - Truncate a timestamp in milliseconds to have hour granularity. |
30 | | -
|
31 | | - :param timestamp_ms: timestamp generated in the impression. |
32 | | - :type timestamp_ms: int |
33 | | -
|
34 | | - :returns: a timestamp with hour, min, seconds, and ms set to 0. |
35 | | - :rtype: int |
36 | | - """ |
37 | | - return timestamp_ms - (timestamp_ms % _TIME_INTERVAL_MS) |
38 | | - |
39 | | - |
40 | | -class Hasher(object): # pylint:disable=too-few-public-methods |
41 | | - """Impression hasher.""" |
42 | | - |
43 | | - _PATTERN = "%s:%s:%s:%s:%d" |
44 | | - |
45 | | - def __init__(self, hash_fn=murmur_128, seed=0): |
46 | | - """ |
47 | | - Class constructor. |
48 | | -
|
49 | | - :param hash_fn: Hash function to apply (str, int) -> int |
50 | | - :type hash_fn: callable |
51 | | -
|
52 | | - :param seed: seed to be provided when hashing |
53 | | - :type seed: int |
54 | | - """ |
55 | | - self._hash_fn = hash_fn |
56 | | - self._seed = seed |
57 | | - |
58 | | - def _stringify(self, impression): |
59 | | - """ |
60 | | - Stringify an impression. |
61 | | -
|
62 | | - :param impression: Impression to stringify using _PATTERN |
63 | | - :type impression: splitio.models.impressions.Impression |
64 | | -
|
65 | | - :returns: a string representation of the impression |
66 | | - :rtype: str |
67 | | - """ |
68 | | - return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN', |
69 | | - impression.feature_name if impression.feature_name else 'UNKNOWN', |
70 | | - impression.treatment if impression.treatment else 'UNKNOWN', |
71 | | - impression.label if impression.label else 'UNKNOWN', |
72 | | - impression.change_number if impression.change_number else 0) |
73 | | - |
74 | | - def process(self, impression): |
75 | | - """ |
76 | | - Hash an impression. |
77 | | -
|
78 | | - :param impression: Impression to hash. |
79 | | - :type impression: splitio.models.impressions.Impression |
80 | | -
|
81 | | - :returns: a hash of the supplied impression's relevant fields. |
82 | | - :rtype: int |
83 | | - """ |
84 | | - return self._hash_fn(self._stringify(impression), self._seed) |
85 | | - |
86 | | - |
87 | | -class Observer(object): # pylint:disable=too-few-public-methods |
88 | | - """Observe impression and add a previous time if applicable.""" |
89 | | - |
90 | | - def __init__(self, size): |
91 | | - """Class constructor.""" |
92 | | - self._hasher = Hasher() |
93 | | - self._cache = SimpleLruCache(size) |
94 | | - |
95 | | - def test_and_set(self, impression): |
96 | | - """ |
97 | | - Examine an impression to determine and set it's previous time accordingly. |
98 | | -
|
99 | | - :param impression: Impression to track |
100 | | - :type impression: splitio.models.impressions.Impression |
101 | | -
|
102 | | - :returns: Impression with populated previous time |
103 | | - :rtype: splitio.models.impressions.Impression |
104 | | - """ |
105 | | - previous_time = self._cache.test_and_set(self._hasher.process(impression), impression.time) |
106 | | - return Impression(impression.matching_key, |
107 | | - impression.feature_name, |
108 | | - impression.treatment, |
109 | | - impression.label, |
110 | | - impression.change_number, |
111 | | - impression.bucketing_key, |
112 | | - impression.time, |
113 | | - previous_time) |
114 | | - |
115 | | - |
116 | | -class Counter(object): |
117 | | - """Class that counts impressions per timeframe.""" |
118 | | - |
119 | | - CounterKey = namedtuple('Count', ['feature', 'timeframe']) |
120 | | - CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count']) |
121 | | - |
122 | | - def __init__(self): |
123 | | - """Class constructor.""" |
124 | | - self._data = defaultdict(lambda: 0) |
125 | | - self._lock = threading.Lock() |
126 | | - |
127 | | - def track(self, impressions, inc=1): |
128 | | - """ |
129 | | - Register N new impressions for a feature in a specific timeframe. |
130 | | -
|
131 | | - :param impressions: generated impressions |
132 | | - :type impressions: list[splitio.models.impressions.Impression] |
133 | | -
|
134 | | - :param inc: amount to increment (defaults to 1) |
135 | | - :type inc: int |
136 | | - """ |
137 | | - keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions] |
138 | | - with self._lock: |
139 | | - for key in keys: |
140 | | - self._data[key] += inc |
141 | | - |
142 | | - def pop_all(self): |
143 | | - """ |
144 | | - Clear and return all the counters currently stored. |
145 | | -
|
146 | | - :returns: List of count per feature/timeframe objects |
147 | | - :rtype: list[ImpressionCounter.CountPerFeature] |
148 | | - """ |
149 | | - with self._lock: |
150 | | - old = self._data |
151 | | - self._data = defaultdict(lambda: 0) |
152 | | - |
153 | | - return [Counter.CountPerFeature(k.feature, k.timeframe, v) |
154 | | - for (k, v) in old.items()] |
155 | | - |
156 | | -class BaseStrategy(object, metaclass=abc.ABCMeta): |
157 | | - """Strategy interface.""" |
158 | | - |
159 | | - @abc.abstractmethod |
160 | | - def process_impressions(self): |
161 | | - """ |
162 | | - Return a list(impressions) object |
163 | | -
|
164 | | - """ |
165 | | - pass |
166 | | - |
167 | | - @abc.abstractmethod |
168 | | - def truncate_impressions_time(self): |
169 | | - """ |
170 | | - Return list(impressions) object |
171 | | -
|
172 | | - """ |
173 | | - pass |
174 | | - |
175 | | - def get_counts(self): |
176 | | - """ |
177 | | - Return A list of counter objects. |
178 | | -
|
179 | | - """ |
180 | | - pass |
181 | | - |
182 | | -class StrategyOptimizedMode(BaseStrategy): |
183 | | - """Optimized mode strategy.""" |
184 | | - |
185 | | - def __init__(self, standalone=True): |
186 | | - """ |
187 | | - Construct a strategy instance for optimized mode. |
188 | | -
|
189 | | - """ |
190 | | - self._standalone = standalone |
191 | | - self._counter = Counter() if self._standalone else None |
192 | | - self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if self._standalone else None |
193 | | - |
194 | | - def process_impressions(self, impressions): |
195 | | - """ |
196 | | - Process impressions. |
197 | | -
|
198 | | - Impressions are analyzed to see if they've been seen before and counted. |
199 | | -
|
200 | | - :param impressions: List of impression objects with attributes |
201 | | - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] |
202 | | -
|
203 | | - :returns: Observed list of impressions |
204 | | - :rtype: list[tuple[splitio.models.impression.Impression, dict]] |
205 | | - """ |
206 | | - imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \ |
207 | | - if self._observer else impressions |
208 | | - if self._counter is not None: |
209 | | - self._counter.track([imp for imp, _ in imps]) |
210 | | - return imps |
211 | | - |
212 | | - def truncate_impressions_time(self, imps): |
213 | | - """ |
214 | | - Process impressions. |
215 | | -
|
216 | | - Impressions are truncated based on time |
217 | | -
|
218 | | - :param impressions: List of impression objects with attributes |
219 | | - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] |
220 | | -
|
221 | | - :returns: truncated list of impressions |
222 | | - :rtype: list[splitio.models.impression.Impression] |
223 | | - """ |
224 | | - this_hour = truncate_time(util.utctime_ms()) |
225 | | - return [imp for imp, _ in imps] if self._counter is None \ |
226 | | - else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour] |
227 | | - |
228 | | - def get_counts(self): |
229 | | - """ |
230 | | - Return counts of impressions per features. |
231 | | -
|
232 | | - :returns: A list of counter objects. |
233 | | - :rtype: list[Counter.CountPerFeature] |
234 | | - """ |
235 | | - return self._counter.pop_all() if self._counter is not None else [] |
236 | | - |
237 | | -class StrategyDebugMode(BaseStrategy): |
238 | | - """Debug mode strategy.""" |
239 | | - |
240 | | - def __init__(self, standalone=True): |
241 | | - """ |
242 | | - Construct a strategy instance for debug mode. |
243 | | -
|
244 | | - """ |
245 | | - self._standalone = standalone |
246 | | - self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if self._standalone else None |
247 | | - |
248 | | - def process_impressions(self, impressions): |
249 | | - """ |
250 | | - Process impressions. |
251 | | -
|
252 | | - Impressions are analyzed to see if they've been seen before. |
253 | | -
|
254 | | - :param impressions: List of impression objects with attributes |
255 | | - :type impressions: list[tuple[splitio.models.impression.Impression, dict]] |
256 | | -
|
257 | | - :returns: Observed list of impressions |
258 | | - :rtype: list[tuple[splitio.models.impression.Impression, dict]] |
259 | | - """ |
260 | | - imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer is not None else impressions |
261 | | - return imps |
262 | | - |
263 | | - def truncate_impressions_time(self, imps): |
264 | | - """ |
265 | | - No counter implemented, return same impresisons passed. |
266 | | -
|
267 | | - :returns: list of impressions |
268 | | - :rtype: list[splitio.models.impression.Impression] |
269 | | - """ |
270 | | - return [imp for imp, _ in imps] |
271 | | - |
272 | | - def get_counts(self): |
273 | | - """ |
274 | | - No counter implemented, return empty array |
275 | | -
|
276 | | - :returns: empty list |
277 | | - :rtype: list[] |
278 | | - """ |
279 | | - return [] |
280 | | - |
281 | 17 | class Manager(object): # pylint:disable=too-few-public-methods |
282 | 18 | """Impression manager.""" |
283 | 19 |
|
|
0 commit comments