Skip to content

Commit f4bea7b

Browse files
authored
Merge pull request #207 from splitio/task/fixes
added shutdown method
2 parents 79f65ad + 2b9434e commit f4bea7b

5 files changed

Lines changed: 100 additions & 27 deletions

File tree

splitio/client/factory.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ def _wait_for_tasks_to_stop():
213213
wait_thread.start()
214214
else:
215215
self._sync_manager.stop(False)
216+
elif destroyed_event is not None:
217+
destroyed_event.set()
216218
finally:
217219
self._status = Status.DESTROYED
218220
with _INSTANTIATED_FACTORIES_LOCK:

splitio/sync/manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ def stop(self, blocking):
7373
self._push_status_handler_active = False
7474
self._queue.put(self._CENTINEL_EVENT)
7575
self._push.stop()
76-
self._synchronizer.stop_periodic_fetching(True)
77-
self._synchronizer.stop_periodic_data_recording(blocking)
76+
self._synchronizer.shutdown(blocking)
7877

7978
def _streaming_feedback_handler(self):
8079
"""

splitio/sync/synchronizer.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,8 @@ def start_periodic_fetching(self):
170170
pass
171171

172172
@abc.abstractmethod
173-
def stop_periodic_fetching(self, shutdown=False):
174-
"""
175-
Stop fetchers for splits and segments.
176-
177-
:param shutdown: flag to indicates if should pause or stop tasks
178-
:type shutdown: bool
179-
"""
173+
def stop_periodic_fetching(self):
174+
"""Stop fetchers for splits and segments."""
180175
pass
181176

182177
@abc.abstractmethod
@@ -203,6 +198,16 @@ def kill_split(self, split_name, default_treatment, change_number):
203198
"""
204199
pass
205200

201+
@abc.abstractmethod
202+
def shutdown(self, blocking):
203+
"""
204+
Stop tasks
205+
206+
:param blocking:flag to wait until tasks are stopped
207+
:type blocking: bool
208+
"""
209+
pass
210+
206211

207212
class Synchronizer(BaseSynchronizer):
208213
"""Synchronizer."""
@@ -256,23 +261,28 @@ def sync_all(self):
256261
_LOGGER.error('Failed syncing splits')
257262
raise_from(APIException('Failed to sync splits'), exc)
258263

264+
def shutdown(self, blocking):
265+
"""
266+
Stop tasks
267+
268+
:param blocking:flag to wait until tasks are stopped
269+
:type blocking: bool
270+
"""
271+
_LOGGER.debug('Shutting down tasks.')
272+
self._split_synchronizers.segment_sync.shutdown()
273+
self.stop_periodic_fetching()
274+
self.stop_periodic_data_recording(blocking)
275+
259276
def start_periodic_fetching(self):
260277
"""Start fetchers for splits and segments."""
261278
_LOGGER.debug('Starting periodic data fetching')
262279
self._split_tasks.split_task.start()
263280
self._split_tasks.segment_task.start()
264281

265-
def stop_periodic_fetching(self, shutdown=False):
266-
"""
267-
Stop fetchers for splits and segments.
268-
269-
:param shutdown: flag to indicates if should pause or stop tasks
270-
:type shutdown: bool
271-
"""
282+
def stop_periodic_fetching(self):
283+
"""Stop fetchers for splits and segments."""
272284
_LOGGER.debug('Stopping periodic fetching')
273285
self._split_tasks.split_task.stop()
274-
if shutdown: # stops task and worker pool
275-
self._split_synchronizers.segment_sync.shutdown()
276286
self._split_tasks.segment_task.stop()
277287

278288
def start_periodic_data_recording(self):
@@ -352,7 +362,7 @@ def start_periodic_fetching(self):
352362
_LOGGER.debug('Starting periodic data fetching')
353363
self._split_tasks.split_task.start()
354364

355-
def stop_periodic_fetching(self, shutdown=False):
365+
def stop_periodic_fetching(self):
356366
"""Stop fetchers for splits and segments."""
357367
_LOGGER.debug('Stopping periodic fetching')
358368
self._split_tasks.split_task.stop()

tests/client/test_factory.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,35 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse
330330
assert len(imp_count_async_task_mock.stop.mock_calls) == 1
331331
assert factory.destroyed is True
332332

333+
def test_destroy_with_event_redis(self, mocker):
334+
def _make_factory_with_apikey(apikey, *_, **__):
335+
return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), None)
336+
337+
factory_module_logger = mocker.Mock()
338+
build_redis = mocker.Mock()
339+
build_redis.side_effect = _make_factory_with_apikey
340+
mocker.patch('splitio.client.factory._LOGGER', new=factory_module_logger)
341+
mocker.patch('splitio.client.factory._build_redis_factory', new=build_redis)
342+
343+
config = {
344+
'redisDb': 0,
345+
'redisHost': 'localhost',
346+
'redisPosrt': 6379,
347+
}
348+
349+
factory = get_factory("none", config=config)
350+
event = threading.Event()
351+
factory.destroy(event)
352+
event.wait()
353+
assert factory.destroyed
354+
assert len(build_redis.mock_calls) == 1
355+
356+
factory = get_factory("none", config=config)
357+
factory.destroy(None)
358+
time.sleep(0.1)
359+
assert factory.destroyed
360+
assert len(build_redis.mock_calls) == 2
361+
333362
def test_multiple_factories(self, mocker):
334363
"""Test multiple factories instantiation and tracking."""
335364
sdk_ready_flag = threading.Event()

tests/sync/test_synchronizer.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,17 +124,11 @@ def test_stop_periodic_fetching(self, mocker):
124124
split_tasks = SplitTasks(split_task, segment_task, mocker.Mock(), mocker.Mock(),
125125
mocker.Mock(), mocker.Mock())
126126
synchronizer = Synchronizer(split_synchronizers, split_tasks)
127-
synchronizer.stop_periodic_fetching(True)
127+
synchronizer.stop_periodic_fetching()
128128

129129
assert len(split_task.stop.mock_calls) == 1
130130
assert len(segment_task.stop.mock_calls) == 1
131-
assert len(segment_sync.shutdown.mock_calls) == 1
132-
133-
synchronizer.stop_periodic_fetching(False)
134-
135-
assert len(split_task.stop.mock_calls) == 2
136-
assert len(segment_task.stop.mock_calls) == 2
137-
assert len(segment_sync.shutdown.mock_calls) == 1 # not called here
131+
assert len(segment_sync.shutdown.mock_calls) == 0
138132

139133
def test_start_periodic_data_recording(self, mocker):
140134
impression_task = mocker.Mock(spec=ImpressionsSyncTask)
@@ -177,3 +171,42 @@ def stop_mock_2():
177171
assert len(impression_count_task.stop.mock_calls) == 1
178172
assert len(event_task.stop.mock_calls) == 1
179173
assert len(telemetry_task.stop.mock_calls) == 1
174+
175+
def test_shutdown(self, mocker):
176+
177+
def stop_mock(event):
178+
event.set()
179+
return
180+
181+
def stop_mock_2():
182+
return
183+
184+
split_task = mocker.Mock(spec=SplitSynchronizationTask)
185+
split_task.stop.side_effect = stop_mock_2
186+
segment_task = mocker.Mock(spec=SegmentSynchronizationTask)
187+
segment_task.stop.side_effect = stop_mock_2
188+
impression_task = mocker.Mock(spec=ImpressionsSyncTask)
189+
impression_task.stop.side_effect = stop_mock
190+
impression_count_task = mocker.Mock(spec=ImpressionsCountSyncTask)
191+
impression_count_task.stop.side_effect = stop_mock
192+
event_task = mocker.Mock(spec=EventsSyncTask)
193+
event_task.stop.side_effect = stop_mock
194+
telemetry_task = mocker.Mock(spec=TelemetrySynchronizationTask)
195+
telemetry_task.stop.side_effect = stop_mock_2
196+
197+
segment_sync = mocker.Mock(spec=SegmentSynchronizer)
198+
199+
split_synchronizers = SplitSynchronizers(mocker.Mock(), segment_sync, mocker.Mock(),
200+
mocker.Mock(), mocker.Mock(), mocker.Mock())
201+
split_tasks = SplitTasks(split_task, segment_task, impression_task, event_task,
202+
telemetry_task, impression_count_task)
203+
synchronizer = Synchronizer(split_synchronizers, split_tasks)
204+
synchronizer.shutdown(True)
205+
206+
assert len(split_task.stop.mock_calls) == 1
207+
assert len(segment_task.stop.mock_calls) == 1
208+
assert len(segment_sync.shutdown.mock_calls) == 1
209+
assert len(impression_task.stop.mock_calls) == 1
210+
assert len(impression_count_task.stop.mock_calls) == 1
211+
assert len(event_task.stop.mock_calls) == 1
212+
assert len(telemetry_task.stop.mock_calls) == 1

0 commit comments

Comments
 (0)