-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathtest_events_task.py
More file actions
74 lines (54 loc) · 2.35 KB
/
test_events_task.py
File metadata and controls
74 lines (54 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""EventsManager test module."""
import pytest
import queue
import time
from splitio.models.events import SdkInternalEvent
from splitio.models.notification import SdkInternalEventNotification
from splitio.events.events_metadata import EventsMetadata
from splitio.events.events_metadata import SdkEventType
from splitio.events.events_task import EventsTask
class EventsTaskTests(object):
"""Tests for EventsManager."""
internal_event = None
metadata = None
def test_firing_events(self):
events_queue = queue.Queue()
events_task = EventsTask(self._event_callback, events_queue)
events_task.start()
assert events_task.is_running()
metadata = EventsMetadata(SdkEventType.FLAG_UPDATE, { "feature1" })
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, metadata))
time.sleep(.5)
assert self.internal_event == SdkInternalEvent.SDK_READY
self._verify_metadata(metadata)
self._reset_flags()
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.RB_SEGMENTS_UPDATED, metadata))
time.sleep(.5)
assert self.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
self._verify_metadata(metadata)
events_task.stop()
time.sleep(.5)
assert not events_task.is_running()
def test_on_error(self):
events_queue = queue.Queue()
def handler_sync(internal_event, metadata):
raise Exception('some')
events_task = EventsTask(handler_sync, events_queue)
events_task.start()
assert events_task.is_running()
events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))
with pytest.raises(Exception):
events_task._handler()
assert events_task.is_running()
events_task.stop()
time.sleep(1)
assert not events_task.is_running()
def _reset_flags(self):
self.internal_event = None
self.metadata = None
def _event_callback(self, internal_event, metadata):
self.internal_event = internal_event
self.metadata = metadata
def _verify_metadata(self, metadata):
assert metadata.get_type() == self.metadata.get_type()
assert metadata.get_names() == self.metadata.get_names()