Skip to content

Commit b89e5a3

Browse files
committed
Merge branch 'feature/streaming' of github.com:splitio/python-client into tests/sdkmockserver
2 parents 8bcf95d + f4bea7b commit b89e5a3

18 files changed

Lines changed: 121 additions & 64 deletions

splitio/client/factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ def _wait_for_tasks_to_stop():
213213
wait_thread.start()
214214
else:
215215
self._sync_manager.stop(False)
216+
elif destroyed_event is not None:
217+
destroyed_event.set()
216218
finally:
217219
self._status = Status.DESTROYED
218220
with _INSTANTIATED_FACTORIES_LOCK:
@@ -300,7 +302,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
300302
),
301303
SegmentSynchronizationTask(
302304
synchronizers.segment_sync.synchronize_segments,
303-
synchronizers.segment_sync.worker_pool,
304305
cfg['segmentsRefreshRate'],
305306
),
306307
ImpressionsSyncTask(

splitio/sync/manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,7 @@ def stop(self, blocking):
7373
self._push_status_handler_active = False
7474
self._queue.put(self._CENTINEL_EVENT)
7575
self._push.stop()
76-
self._synchronizer.stop_periodic_fetching(True)
77-
self._synchronizer.stop_periodic_data_recording(blocking)
76+
self._synchronizer.shutdown(blocking)
7877

7978
def _streaming_feedback_handler(self):
8079
"""

splitio/sync/segment.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@ def __init__(self, segment_api, split_storage, segment_storage):
3030
self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment)
3131
self._worker_pool.start()
3232

33-
@property
34-
def worker_pool(self):
33+
def shutdown(self):
3534
"""
36-
Return worker_pool
37-
38-
:return: workerpool
39-
:rtype: splitio.tasks.util.WorkerPool
35+
Shutdown worker_pool
4036
4137
"""
42-
return self._worker_pool
38+
self._worker_pool.stop()
4339

4440
def synchronize_segment(self, segment_name, till=None):
4541
"""
@@ -78,8 +74,7 @@ def synchronize_segment(self, segment_name, till=None):
7874
segment_changes['till']
7975
)
8076

81-
if segment_changes['till'] == segment_changes['since'] \
82-
or (till is not None and segment_changes['till'] >= till):
77+
if segment_changes['till'] == segment_changes['since']:
8378
return
8479

8580
def synchronize_segments(self):

splitio/sync/split.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def synchronize_splits(self, till=None):
6060

6161
self._split_storage.set_change_number(split_changes['till'])
6262
if split_changes['till'] == split_changes['since'] \
63-
or (till is not None and split_changes['till'] >= till):
63+
and (till is None or split_changes['till'] >= till):
6464
return
6565

6666
def kill_split(self, split_name, default_treatment, change_number):

splitio/sync/synchronizer.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,8 @@ def start_periodic_fetching(self):
170170
pass
171171

172172
@abc.abstractmethod
173-
def stop_periodic_fetching(self, shutdown=False):
174-
"""
175-
Stop fetchers for splits and segments.
176-
177-
:param shutdown: flag to indicates if should pause or stop tasks
178-
:type shutdown: bool
179-
"""
173+
def stop_periodic_fetching(self):
174+
"""Stop fetchers for splits and segments."""
180175
pass
181176

182177
@abc.abstractmethod
@@ -203,6 +198,16 @@ def kill_split(self, split_name, default_treatment, change_number):
203198
"""
204199
pass
205200

201+
@abc.abstractmethod
202+
def shutdown(self, blocking):
203+
"""
204+
Stop tasks
205+
206+
:param blocking:flag to wait until tasks are stopped
207+
:type blocking: bool
208+
"""
209+
pass
210+
206211

207212
class Synchronizer(BaseSynchronizer):
208213
"""Synchronizer."""
@@ -256,25 +261,29 @@ def sync_all(self):
256261
_LOGGER.error('Failed syncing splits')
257262
raise_from(APIException('Failed to sync splits'), exc)
258263

264+
def shutdown(self, blocking):
265+
"""
266+
Stop tasks
267+
268+
:param blocking:flag to wait until tasks are stopped
269+
:type blocking: bool
270+
"""
271+
_LOGGER.debug('Shutting down tasks.')
272+
self._split_synchronizers.segment_sync.shutdown()
273+
self.stop_periodic_fetching()
274+
self.stop_periodic_data_recording(blocking)
275+
259276
def start_periodic_fetching(self):
260277
"""Start fetchers for splits and segments."""
261278
_LOGGER.debug('Starting periodic data fetching')
262279
self._split_tasks.split_task.start()
263280
self._split_tasks.segment_task.start()
264281

265-
def stop_periodic_fetching(self, shutdown=False):
266-
"""
267-
Stop fetchers for splits and segments.
268-
269-
:param shutdown: flag to indicates if should pause or stop tasks
270-
:type shutdown: bool
271-
"""
282+
def stop_periodic_fetching(self):
283+
"""Stop fetchers for splits and segments."""
272284
_LOGGER.debug('Stopping periodic fetching')
273285
self._split_tasks.split_task.stop()
274-
if shutdown: # stops task and worker pool
275-
self._split_tasks.segment_task.stop()
276-
else: # pauses task not worker pool
277-
self._split_tasks.segment_task.pause()
286+
self._split_tasks.segment_task.stop()
278287

279288
def start_periodic_data_recording(self):
280289
"""Start recorders."""
@@ -353,7 +362,7 @@ def start_periodic_fetching(self):
353362
_LOGGER.debug('Starting periodic data fetching')
354363
self._split_tasks.split_task.start()
355364

356-
def stop_periodic_fetching(self, shutdown=False):
365+
def stop_periodic_fetching(self):
357366
"""Stop fetchers for splits and segments."""
358367
_LOGGER.debug('Stopping periodic fetching')
359368
self._split_tasks.split_task.stop()

splitio/tasks/segment_sync.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
from splitio.api import APIException
55
from splitio.tasks import BaseSynchronizationTask
6-
from splitio.tasks.util import asynctask, workerpool
6+
from splitio.tasks.util import asynctask
77

88

99
_LOGGER = logging.getLogger(__name__)
@@ -12,33 +12,23 @@
1212
class SegmentSynchronizationTask(BaseSynchronizationTask):
1313
"""Segment Syncrhonization class."""
1414

15-
def __init__(self, synchronize_segments, worker_pool, period):
15+
def __init__(self, synchronize_segments, period):
1616
"""
1717
Clas constructor.
1818
1919
:param synchronize_segments: handler for syncing segments
2020
:type synchronize_segments: func
2121
22-
:param worker_pool: worker created by sync to be able to stop worker
23-
:type worker_pool: splitio.tasks.util.WorkerPool
24-
2522
"""
26-
self._worker_pool = worker_pool
2723
self._task = asynctask.AsyncTask(synchronize_segments, period, on_init=None)
2824

2925
def start(self):
3026
"""Start segment synchronization."""
3127
self._task.start()
3228

33-
def pause(self):
34-
"""Pause segment synchronization."""
35-
self._task.stop()
36-
3729
def stop(self, event=None):
3830
"""Stop segment synchronization."""
39-
self._task.stop()
40-
if self._worker_pool is not None:
41-
self._worker_pool.stop(event)
31+
self._task.stop(event)
4232

4333
def is_running(self):
4434
"""

splitio/tasks/split_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self, synchronize_splits, period):
2020
:type period: int
2121
"""
2222
self._period = period
23-
self._task = AsyncTask(synchronize_splits, period, on_init=synchronize_splits)
23+
self._task = AsyncTask(synchronize_splits, period, on_init=None)
2424

2525
def start(self):
2626
"""Start the task."""

tests/client/test_factory.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.client.config import DEFAULT_CONFIG
99
from splitio.storage import redis, inmemmory, uwsgi
1010
from splitio.tasks import events_sync, impressions_sync, split_sync, segment_sync, telemetry_sync
11-
from splitio.tasks.util import asynctask, workerpool
11+
from splitio.tasks.util import asynctask
1212
from splitio.api.splits import SplitsAPI
1313
from splitio.api.segments import SegmentsAPI
1414
from splitio.api.impressions import ImpressionsAPI
@@ -161,9 +161,8 @@ def _split_task_init_mock(self, synchronize_splits, period):
161161
segment_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
162162
segment_async_task_mock.stop.side_effect = stop_mock
163163

164-
def _segment_task_init_mock(self, synchronize_segments, worker_pool, period):
164+
def _segment_task_init_mock(self, synchronize_segments, period):
165165
self._task = segment_async_task_mock
166-
self._worker_pool = mocker.Mock()
167166
self._period = period
168167
mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__',
169168
new=_segment_task_init_mock)
@@ -256,9 +255,8 @@ def _split_task_init_mock(self, synchronize_splits, period):
256255
segment_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask)
257256
segment_async_task_mock.stop.side_effect = stop_mock_2
258257

259-
def _segment_task_init_mock(self, synchronize_segments, worker_pool, period):
258+
def _segment_task_init_mock(self, synchronize_segments, period):
260259
self._task = segment_async_task_mock
261-
self._worker_pool = mocker.Mock()
262260
self._period = period
263261
mocker.patch('splitio.client.factory.SegmentSynchronizationTask.__init__',
264262
new=_segment_task_init_mock)
@@ -332,6 +330,35 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse
332330
assert len(imp_count_async_task_mock.stop.mock_calls) == 1
333331
assert factory.destroyed is True
334332

333+
def test_destroy_with_event_redis(self, mocker):
334+
def _make_factory_with_apikey(apikey, *_, **__):
335+
return SplitFactory(apikey, {}, True, mocker.Mock(spec=ImpressionsManager), None)
336+
337+
factory_module_logger = mocker.Mock()
338+
build_redis = mocker.Mock()
339+
build_redis.side_effect = _make_factory_with_apikey
340+
mocker.patch('splitio.client.factory._LOGGER', new=factory_module_logger)
341+
mocker.patch('splitio.client.factory._build_redis_factory', new=build_redis)
342+
343+
config = {
344+
'redisDb': 0,
345+
'redisHost': 'localhost',
346+
'redisPosrt': 6379,
347+
}
348+
349+
factory = get_factory("none", config=config)
350+
event = threading.Event()
351+
factory.destroy(event)
352+
event.wait()
353+
assert factory.destroyed
354+
assert len(build_redis.mock_calls) == 1
355+
356+
factory = get_factory("none", config=config)
357+
factory.destroy(None)
358+
time.sleep(0.1)
359+
assert factory.destroyed
360+
assert len(build_redis.mock_calls) == 2
361+
335362
def test_multiple_factories(self, mocker):
336363
"""Test multiple factories instantiation and tracking."""
337364
sdk_ready_flag = threading.Event()
File renamed without changes.

tests/syncrhonizers/test_impressions_count_synchronizer.py renamed to tests/sync/test_impressions_count_synchronizer.py

File renamed without changes.

0 commit comments

Comments
 (0)