Skip to content

Commit 0eccc7a

Browse files
committed
Merge branch 'feature/streaming' of github.com:splitio/python-client into task/fixes
2 parents 1a59dc4 + 36bf9b8 commit 0eccc7a

10 files changed

Lines changed: 263 additions & 109 deletions

File tree

splitio/push/manager.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ def _handle_control(self, event):
9292
_LOGGER.debug('handling control event: %s', str(event))
9393
feedback = self._status_tracker.handle_control_message(event)
9494
if feedback is not None:
95-
# Send this event back to sync manager
96-
pass
95+
self._feedback_loop.put(feedback)
9796

9897
def _handle_occupancy(self, event):
9998
"""
@@ -105,8 +104,19 @@ def _handle_occupancy(self, event):
105104
_LOGGER.debug('handling occupancy event: %s', str(event))
106105
feedback = self._status_tracker.handle_occupancy(event)
107106
if feedback is not None:
108-
# Send this event back to sync manager
109-
pass
107+
self._feedback_loop.put(feedback)
108+
109+
def _handle_connection_end(self, shutdown_requested):
110+
"""
111+
Handle a connection ending.
112+
113+
If the connection shutdown was not requested, trigger a restart.
114+
115+
:param shutdown_requested: whether the shutdown was requested or unexpected.
116+
:type shutdown_requested: True
117+
"""
118+
if not shutdown_requested:
119+
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
110120

111121
def _handle_error(self, event):
112122
"""
@@ -118,8 +128,7 @@ def _handle_error(self, event):
118128
_LOGGER.debug('handling ably error event: %s', str(event))
119129
feedback = self._status_tracker.handle_ably_error(event)
120130
if feedback is not None:
121-
# Send this event back to sync manager
122-
pass
131+
self._feedback_loop.put(feedback)
123132

124133
def _event_handler(self, event):
125134
"""
@@ -189,6 +198,7 @@ def _setup_next_token_refresh(self, token):
189198
self._next_refresh.cancel()
190199
self._next_refresh = Timer((token.exp - token.iat) - _TOKEN_REFRESH_GRACE_PERIOD,
191200
self._token_refresh)
201+
self._next_refresh.setName('TokenRefresh')
192202
self._next_refresh.start()
193203

194204
def update_workers_status(self, enabled):

splitio/push/splitsse.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ def connect(url):
109109
try:
110110
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
111111
finally:
112-
self._sse_connection_closed.set()
113112
self._status = SplitSSEClient._Status.IDLE
113+
self._sse_connection_closed.set()
114114

115115
url = self._build_url(token)
116116
task = threading.Thread(target=connect, name='SSEConnection', args=(url,))
@@ -122,7 +122,9 @@ def connect(url):
122122
def stop(self, blocking=False, timeout=None):
123123
"""Abort the ongoing connection."""
124124
if self._status == SplitSSEClient._Status.IDLE:
125-
raise Exception('SseClient not running')
125+
_LOGGER.warn('sse already closed. ignoring')
126+
return
127+
126128
self._client.shutdown()
127129
if blocking:
128130
self._sse_connection_closed.wait(timeout)

splitio/push/sse.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def build(self):
8080
class SSEClient(object):
8181
"""SSE Client implementation."""
8282

83-
_DEFAULT_HEADERS = {'Accept': 'text/event-stream'}
83+
_DEFAULT_HEADERS = {'accept': 'text/event-stream'}
8484
_EVENT_SEPARATORS = set([b'\n', b'\r\n'])
8585

8686
def __init__(self, callback):

tests/helpers/mockserver.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
"""SSE mock server."""
2+
import json
3+
from collections import namedtuple
4+
import queue
5+
import threading
6+
7+
from http.server import HTTPServer, BaseHTTPRequestHandler
8+
9+
10+
Request = namedtuple('Request', ['method', 'path', 'headers', 'body'])
11+
12+
13+
class SSEMockServer(object):
14+
"""SSE server for testing purposes."""
15+
16+
protocol_version = 'HTTP/1.1'
17+
18+
GRACEFUL_REQUEST_END = 'REQ-END'
19+
VIOLENT_REQUEST_END = 'REQ-KILL'
20+
21+
def __init__(self, req_queue=None):
22+
"""Consruct a mock server."""
23+
self._queue = queue.Queue()
24+
self._server = HTTPServer(('localhost', 0),
25+
lambda *xs: SSEHandler(self._queue, *xs, req_queue=req_queue))
26+
self._server_thread = threading.Thread(target=self._blocking_run)
27+
self._server_thread.setDaemon(True)
28+
self._done_event = threading.Event()
29+
30+
def _blocking_run(self):
31+
"""Execute."""
32+
self._server.serve_forever()
33+
self._done_event.set()
34+
35+
def port(self):
36+
"""Return the assigned port."""
37+
return self._server.server_port
38+
39+
def publish(self, event):
40+
"""Publish an event."""
41+
self._queue.put(event, block=False)
42+
43+
def start(self):
44+
"""Start the server asyncrhonously."""
45+
self._server_thread.start()
46+
47+
def wait(self, timeout=None):
48+
"""Wait for the server to shutdown."""
49+
return self._done_event.wait(timeout)
50+
51+
def stop(self):
52+
"""Stop the server."""
53+
self._server.shutdown()
54+
55+
56+
class SSEHandler(BaseHTTPRequestHandler):
57+
"""Handler."""
58+
59+
def __init__(self, event_queue, *args, **kwargs):
60+
"""Construct a handler."""
61+
self._queue = event_queue
62+
self._req_queue = kwargs.get('req_queue')
63+
BaseHTTPRequestHandler.__init__(self, *args)
64+
65+
def do_GET(self): #pylint:disable=invalid-name
66+
"""Respond to a GET request."""
67+
self.send_response(200)
68+
self.send_header("Content-type", "text/event-stream")
69+
self.send_header("Transfer-Encoding", "chunked")
70+
self.send_header("Connection", "keep-alive")
71+
self.end_headers()
72+
73+
if self._req_queue is not None:
74+
headers = dict(zip(self.headers.keys(), self.headers.values()))
75+
self._req_queue.put(Request('GET', self.path, headers, None))
76+
77+
def write_chunk(chunk):
78+
"""Write an event/chunk."""
79+
tosend = '%X\r\n%s\r\n'%(len(chunk), chunk)
80+
self.wfile.write(tosend.encode('utf-8'))
81+
82+
while True:
83+
event = self._queue.get()
84+
if event == SSEMockServer.GRACEFUL_REQUEST_END:
85+
break
86+
elif event == SSEMockServer.VIOLENT_REQUEST_END:
87+
raise Exception('exploding')
88+
89+
chunk = ''
90+
chunk += 'id: % s\n' % event['id'] if 'id' in event else ''
91+
chunk += 'event: % s\n' % event['event'] if 'event' in event else ''
92+
chunk += 'retry: % s\n' % event['retry'] if 'retry' in event else ''
93+
chunk += 'data: % s\n' % event['data'] if 'data' in event else ''
94+
if chunk != '':
95+
write_chunk(chunk + '\r\n')
96+
97+
self.wfile.write('0\r\n\r\n'.encode('utf-8'))
98+
99+
100+
class SplitMockServer(object):
101+
"""SDK server mock for testing purposes."""
102+
103+
protocol_version = 'HTTP/1.1'
104+
105+
def __init__(self, split_changes=None, segment_changes=None, req_queue=None):
106+
"""
107+
Consruct a mock server.
108+
109+
:param changes: mapping of changeNumbers to splitChanges responses
110+
:type changes: dict
111+
"""
112+
split_changes = split_changes if split_changes is not None else {}
113+
segment_changes = segment_changes if segment_changes is not None else {}
114+
self._server = HTTPServer(('localhost', 0),
115+
lambda *xs: SDKHandler(split_changes, segment_changes, *xs,
116+
req_queue=req_queue)) # pylint:disable=line-too-long
117+
self._server_thread = threading.Thread(target=self._blocking_run, name="SplitMockServer")
118+
self._server_thread.setDaemon(True)
119+
self._done_event = threading.Event()
120+
121+
def _blocking_run(self):
122+
"""Execute."""
123+
self._server.serve_forever()
124+
self._done_event.set()
125+
126+
def port(self):
127+
"""Return the assigned port."""
128+
return self._server.server_port
129+
130+
def start(self):
131+
"""Start the server asyncrhonously."""
132+
self._server_thread.start()
133+
134+
def wait(self, timeout=None):
135+
"""Wait for the server to shutdown."""
136+
return self._done_event.wait(timeout)
137+
138+
def stop(self):
139+
"""Stop the server."""
140+
self._server.shutdown()
141+
142+
143+
class SDKHandler(BaseHTTPRequestHandler):
144+
"""Handler."""
145+
146+
def __init__(self, split_changes, segment_changes, *args, **kwargs):
147+
"""Construct a handler."""
148+
self._req_queue = kwargs.get('req_queue')
149+
self._split_changes = split_changes
150+
self._segment_changes = segment_changes
151+
BaseHTTPRequestHandler.__init__(self, *args)
152+
153+
def _parse_qs(self):
154+
raw_query = self.path.split('?')[1] if '?' in self.path else ''
155+
return dict([item.split('=') for item in raw_query.split('&')])
156+
157+
def _handle_segment_changes(self):
158+
qstring = self._parse_qs()
159+
since = int(qstring.get('since', -1))
160+
name = qstring.get('name')
161+
if name is None:
162+
self.send_response(400)
163+
self.send_header("Content-type", "application/json")
164+
self.end_headers()
165+
self.wfile.write('{}'.encode('utf-8'))
166+
return
167+
168+
to_send = self._segment_changes.get((name, since,))
169+
if to_send is None:
170+
self.send_response(404)
171+
self.send_header("Content-type", "application/json")
172+
self.end_headers()
173+
self.wfile.write('{}'.encode('utf-8'))
174+
return
175+
176+
self.send_response(200)
177+
self.send_header("Content-type", "application/json")
178+
self.end_headers()
179+
self.wfile.write(json.dumps(to_send).encode('utf-8'))
180+
181+
def _handle_split_changes(self):
182+
qstring = self._parse_qs()
183+
since = int(qstring.get('since', -1))
184+
to_send = self._split_changes.get(since)
185+
if to_send is None:
186+
self.send_response(404)
187+
self.send_header("Content-type", "application/json")
188+
self.end_headers()
189+
self.wfile.write('{}'.encode('utf-8'))
190+
return
191+
192+
self.send_response(200)
193+
self.send_header("Content-type", "application/json")
194+
self.end_headers()
195+
self.wfile.write(json.dumps(to_send).encode('utf-8'))
196+
197+
def do_GET(self): #pylint:disable=invalid-name
198+
"""Respond to a GET request."""
199+
if self._req_queue is not None:
200+
headers = dict(zip(self.headers.keys(), self.headers.values()))
201+
self._req_queue.put(Request('GET', self.path, headers, None))
202+
203+
if self.path.startswith('/api/splitChanges'):
204+
self._handle_split_changes()
205+
elif self.path.startswith('/api/segmentChanges'):
206+
self._handle_segment_changes()
207+
else:
208+
self.send_response(404)
209+
self.send_header("Content-type", "application/json")
210+
self.end_headers()
211+
212+
def do_POST(self): #pylint:disable=invalid-name
213+
"""Respond to a GET request."""
214+
if self._req_queue is not None:
215+
length = int(self.headers.getheader('content-length'))
216+
body = self.rfile.read(length) if length else None
217+
headers = dict(zip(self.headers.keys(), self.headers.values()))
218+
self._req_queue.put(Request('GET', self.path, headers, body))
219+
220+
if self.path in set(['/api/testImpressions/bulk', '/testImpressions/count',
221+
'/api/events/bulk', '/metrics/times', '/metrics/count',
222+
'/metrics/gauge']):
223+
224+
self.send_response(200)
225+
self.send_header("Content-type", "application/json")
226+
self.end_headers()
227+
else:
228+
self.send_response(404)
229+
self.send_header("Content-type", "application/json")
230+
self.end_headers()

tests/integration/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)