Skip to content

Commit e4a00c8

Browse files
committed
PYCO-55: Add GHA linting and tests
Changes ======= * Add test.yml workflow * Update error handling to take into account timeout scenarios after `send_request` is called again after retry * Update error handling to take into account httpx ReadTimeout, WriteError and WriteTimeout * Update JSON streaming/parsing to handle when ijson uses Python as the backend and thus can raise different errors * Update ijson from 3.3.0 to 3.4.0 to get Python 3.13 wheel
1 parent bc0d57f commit e4a00c8

16 files changed

Lines changed: 969 additions & 235 deletions

File tree

.github/workflows/tests.yml

Lines changed: 564 additions & 0 deletions
Large diffs are not rendered by default.

acouchbase_analytics/protocol/_core/async_json_stream.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,20 @@ async def _process_token_stream(self) -> None:
154154
await self._send_to_stream(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR), close=True)
155155
self._handle_notification(ParsedResultType.ERROR)
156156
return
157+
except ijson.common.JSONError as ex:
158+
ex_str = str(ex)
159+
self._log_message(f'JSON error encountered: {ex_str}', LogLevel.ERROR)
160+
self._token_stream_exhausted = True
161+
await self._send_to_stream(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR), close=True)
162+
self._handle_notification(ParsedResultType.ERROR)
163+
return
164+
except ijson.backends.python.UnexpectedSymbol as ex:
165+
ex_str = str(ex)
166+
self._log_message(f'Unexpected symbol encountered: {ex_str}', LogLevel.ERROR)
167+
self._token_stream_exhausted = True
168+
await self._send_to_stream(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR), close=True)
169+
self._handle_notification(ParsedResultType.ERROR)
170+
return
157171

158172
if self._token_stream_exhausted:
159173
result_type = ParsedResultType.ERROR if self._json_token_parser.has_errors else ParsedResultType.END

acouchbase_analytics/protocol/_core/retries.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from functools import wraps
2020
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, Union
2121

22-
from httpx import ConnectError, ConnectTimeout
22+
from httpx import ConnectError, ConnectTimeout, ReadTimeout, WriteError, WriteTimeout
2323

2424
from acouchbase_analytics.protocol._core.anyio_utils import sleep
2525
from couchbase_analytics.common.errors import AnalyticsError, InternalSDKError, TimeoutError
@@ -39,7 +39,7 @@ class AsyncRetryHandler:
3939

4040
@staticmethod
4141
async def handle_httpx_retry(
42-
ex: Union[ConnectError, ConnectTimeout], ctx: AsyncRequestContext
42+
ex: Union[ConnectError, ConnectTimeout, WriteError, WriteTimeout], ctx: AsyncRequestContext
4343
) -> Optional[Exception]:
4444
err_str = str(ex)
4545
if 'SSL:' in err_str:
@@ -107,17 +107,30 @@ async def wrapped_fn(self: AsyncHttpStreamingResponse) -> None: # noqa: C901
107107
continue
108108
await self._request_context.shutdown(type(ex), ex, ex.__traceback__)
109109
raise err from None
110-
except (ConnectError, ConnectTimeout) as ex:
110+
except (ConnectError, ConnectTimeout, WriteError, WriteTimeout) as ex:
111111
err = await AsyncRetryHandler.handle_httpx_retry(ex, self._request_context)
112112
if err is None:
113113
continue
114114
await self._request_context.shutdown(type(ex), ex, ex.__traceback__)
115115
raise err from None
116+
except ReadTimeout as ex:
117+
# we set the read timeout to the query timeout, so if we get a ReadTimeout,
118+
# it means the request timed out from the httpx client
119+
await self._request_context.shutdown(type(ex), ex, ex.__traceback__)
120+
raise TimeoutError(
121+
message='Request timed out.', context=str(self._request_context.error_context)
122+
) from None
116123
except AnalyticsError:
117124
# if an AnalyticsError is raised, we have already shut down the request context
118125
raise
119126
except RuntimeError as ex:
120127
await self._request_context.shutdown(type(ex), ex, ex.__traceback__)
128+
if self._request_context.timed_out:
129+
raise TimeoutError(
130+
message='Request timeout.', context=str(self._request_context.error_context)
131+
) from None
132+
if self._request_context.cancelled:
133+
raise CancelledError('Request was cancelled.') from None
121134
raise ex
122135
except BaseException as ex:
123136
await self._request_context.shutdown(type(ex), ex, ex.__traceback__)

acouchbase_analytics/tests/json_parsing_t.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ async def test_invalid_empty(self) -> None:
312312
res = await parser.get_result()
313313
assert res.result_type == ParsedResultType.ERROR
314314
assert res.value is not None
315-
assert 'parse error' in str(res.value.decode('utf-8'))
315+
decoded_value = res.value.decode('utf-8')
316+
assert ('parse error' in decoded_value or 'Incomplete JSON content' in decoded_value) is True
316317

317318
@pytest.mark.anyio
318319
async def test_invalid_garbage_between_objects(self) -> None:
@@ -324,7 +325,8 @@ async def test_invalid_garbage_between_objects(self) -> None:
324325
res = await parser.get_result()
325326
assert res.result_type == ParsedResultType.ERROR
326327
assert res.value is not None
327-
assert 'lexical error' in str(res.value.decode('utf-8'))
328+
decoded_value = res.value.decode('utf-8')
329+
assert ('lexical error' in decoded_value or 'Unexpected symbol' in decoded_value) is True
328330

329331
@pytest.mark.anyio
330332
async def test_invalid_leading_garbage(self) -> None:
@@ -336,7 +338,8 @@ async def test_invalid_leading_garbage(self) -> None:
336338
res = await parser.get_result()
337339
assert res.result_type == ParsedResultType.ERROR
338340
assert res.value is not None
339-
assert 'lexical error' in str(res.value.decode('utf-8'))
341+
decoded_value = res.value.decode('utf-8')
342+
assert ('lexical error' in decoded_value or 'Unexpected symbol' in decoded_value) is True
340343

341344
@pytest.mark.anyio
342345
async def test_invalid_trailing_garbage(self) -> None:
@@ -348,7 +351,8 @@ async def test_invalid_trailing_garbage(self) -> None:
348351
res = await parser.get_result()
349352
assert res.result_type == ParsedResultType.ERROR
350353
assert res.value is not None
351-
assert 'parse error' in str(res.value.decode('utf-8'))
354+
decoded_value = res.value.decode('utf-8')
355+
assert ('parse error' in decoded_value or 'Additional data found' in decoded_value) is True
352356

353357
@pytest.mark.anyio
354358
async def test_invalid_whitespace_only(self) -> None:
@@ -360,7 +364,8 @@ async def test_invalid_whitespace_only(self) -> None:
360364
res = await parser.get_result()
361365
assert res.result_type == ParsedResultType.ERROR
362366
assert res.value is not None
363-
assert 'parse error' in str(res.value.decode('utf-8'))
367+
decoded_value = res.value.decode('utf-8')
368+
assert ('parse error' in decoded_value or 'Incomplete JSON content' in decoded_value) is True
364369

365370
@pytest.mark.anyio
366371
async def test_value_bool(self) -> None:

acouchbase_analytics/tests/test_server_t.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async def test_error_timeout(self, test_env: AsyncTestEnvironment, server_side:
148148
if server_side:
149149
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 1, 'server_side': True}
150150
else:
151-
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 2}
151+
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 3}
152152

153153
test_env.update_request_json(req_json)
154154
statement = 'SELECT "Hello, data!" AS greeting'

couchbase_analytics/protocol/_core/json_stream.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,20 @@ def _process_token_stream(self, request_context: Optional[RequestContext] = None
151151
self._put(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR))
152152
self._handle_notification(ParsedResultType.ERROR)
153153
return
154+
except ijson.common.JSONError as ex:
155+
ex_str = str(ex)
156+
self._log_message(f'JSON error encountered: {ex_str}', LogLevel.ERROR)
157+
self._token_stream_exhausted = True
158+
self._put(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR))
159+
self._handle_notification(ParsedResultType.ERROR)
160+
return
161+
except ijson.backends.python.UnexpectedSymbol as ex:
162+
ex_str = str(ex)
163+
self._log_message(f'Unexpected symbol encountered: {ex_str}', LogLevel.ERROR)
164+
self._token_stream_exhausted = True
165+
self._put(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR))
166+
self._handle_notification(ParsedResultType.ERROR)
167+
return
154168

155169
if self._token_stream_exhausted:
156170
result_type = ParsedResultType.ERROR if self._json_token_parser.has_errors else ParsedResultType.END

couchbase_analytics/protocol/_core/request_context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def _reset_stream(self) -> None:
212212
del self._json_stream
213213
self._request_state = RequestState.ResetAndNotStarted
214214
self._stage_notification_ft = None
215+
self.log_message('Request state has been reset', LogLevel.DEBUG)
215216

216217
def _start_next_stage(
217218
self,
@@ -454,5 +455,8 @@ def wait_for_stage_notification(self) -> None:
454455
raise TimeoutError(message='Request timed out waiting for stage notification', context=str(self._error_ctx))
455456
result_type = self._stage_notification_ft.result(timeout=deadline)
456457
if result_type == ParsedResultType.ROW:
458+
self.log_message('Received row, setting status to streaming', LogLevel.DEBUG)
457459
# we move to iterating rows
458460
self._request_state = RequestState.StreamingResults
461+
else:
462+
self.log_message(f'Received result type {result_type.name}', LogLevel.DEBUG)

couchbase_analytics/protocol/_core/retries.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from time import sleep
2121
from typing import TYPE_CHECKING, Callable, Optional, Union
2222

23-
from httpx import ConnectError, ConnectTimeout
23+
from httpx import ConnectError, ConnectTimeout, ReadTimeout, WriteError, WriteTimeout
2424

2525
from couchbase_analytics.common.errors import AnalyticsError, InternalSDKError, TimeoutError
2626
from couchbase_analytics.common.logging import LogLevel
@@ -38,7 +38,9 @@ class RetryHandler:
3838
"""
3939

4040
@staticmethod
41-
def handle_httpx_retry(ex: Union[ConnectError, ConnectTimeout], ctx: RequestContext) -> Optional[Exception]:
41+
def handle_httpx_retry(
42+
ex: Union[ConnectError, ConnectTimeout, WriteError, WriteTimeout], ctx: RequestContext
43+
) -> Optional[Exception]:
4244
err_str = str(ex)
4345
if 'SSL:' in err_str:
4446
message = 'TLS connection error occurred.'
@@ -103,17 +105,30 @@ def wrapped_fn(self: HttpStreamingResponse) -> None: # noqa: C901
103105
continue
104106
self._request_context.shutdown(ex)
105107
raise err from None
106-
except (ConnectError, ConnectTimeout) as ex:
108+
except (ConnectError, ConnectTimeout, WriteError, WriteTimeout) as ex:
107109
err = RetryHandler.handle_httpx_retry(ex, self._request_context)
108110
if err is None:
109111
continue
110112
self._request_context.shutdown(ex)
111113
raise err from None
114+
except ReadTimeout as ex:
115+
# we set the read timeout to the query timeout, so if we get a ReadTimeout,
116+
# it means the request timed out from the httpx client
117+
self._request_context.shutdown(ex)
118+
raise TimeoutError(
119+
message='Request timed out.', context=str(self._request_context.error_context)
120+
) from None
112121
except AnalyticsError:
113122
# if an AnalyticsError is raised, we have already shut down the request context
114123
raise
115124
except RuntimeError as ex:
116125
self._request_context.shutdown(ex)
126+
if self._request_context.timed_out:
127+
raise TimeoutError(
128+
message='Request timeout.', context=str(self._request_context.error_context)
129+
) from None
130+
if self._request_context.cancelled:
131+
raise CancelledError('Request was cancelled.') from None
117132
raise ex
118133
except BaseException as ex:
119134
self._request_context.shutdown(ex)

couchbase_analytics/tests/json_parsing_t.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ def test_invalid_empty(self) -> None:
256256
assert isinstance(res, ParsedResult)
257257
assert res.result_type == ParsedResultType.ERROR
258258
assert res.value is not None
259-
assert 'parse error' in str(res.value.decode('utf-8'))
259+
decoded_value = res.value.decode('utf-8')
260+
assert ('parse error' in decoded_value or 'Incomplete JSON content' in decoded_value) is True
260261

261262
def test_invalid_garbage_between_objects(self) -> None:
262263
data = '[{"id":1,"name":"Alice"},garbage,{"id":2,"name":"Bob"}]'
@@ -268,7 +269,8 @@ def test_invalid_garbage_between_objects(self) -> None:
268269
assert isinstance(res, ParsedResult)
269270
assert res.result_type == ParsedResultType.ERROR
270271
assert res.value is not None
271-
assert 'lexical error' in str(res.value.decode('utf-8'))
272+
decoded_value = res.value.decode('utf-8')
273+
assert ('lexical error' in decoded_value or 'Unexpected symbol' in decoded_value) is True
272274

273275
def test_invalid_leading_garbage(self) -> None:
274276
data = 'garbage{"key":"value"}'
@@ -280,7 +282,8 @@ def test_invalid_leading_garbage(self) -> None:
280282
assert isinstance(res, ParsedResult)
281283
assert res.result_type == ParsedResultType.ERROR
282284
assert res.value is not None
283-
assert 'lexical error' in str(res.value.decode('utf-8'))
285+
decoded_value = res.value.decode('utf-8')
286+
assert ('lexical error' in decoded_value or 'Unexpected symbol' in decoded_value) is True
284287

285288
def test_invalid_trailing_garbage(self) -> None:
286289
data = '{"key":"value"}garbage'
@@ -292,7 +295,8 @@ def test_invalid_trailing_garbage(self) -> None:
292295
assert isinstance(res, ParsedResult)
293296
assert res.result_type == ParsedResultType.ERROR
294297
assert res.value is not None
295-
assert 'parse error' in str(res.value.decode('utf-8'))
298+
decoded_value = res.value.decode('utf-8')
299+
assert ('parse error' in decoded_value or 'Additional data found' in decoded_value) is True
296300

297301
def test_invalid_whitespace_only(self) -> None:
298302
data = ' \n\t '
@@ -304,7 +308,8 @@ def test_invalid_whitespace_only(self) -> None:
304308
assert isinstance(res, ParsedResult)
305309
assert res.result_type == ParsedResultType.ERROR
306310
assert res.value is not None
307-
assert 'parse error' in str(res.value.decode('utf-8'))
311+
decoded_value = res.value.decode('utf-8')
312+
assert ('parse error' in decoded_value or 'Incomplete JSON content' in decoded_value) is True
308313

309314
def test_object(self) -> None:
310315
data = '{"name":"John","age":30,"city":"New York"}'

couchbase_analytics/tests/test_server_t.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def test_error_timeout(self, test_env: BlockingTestEnvironment, server_side: boo
150150
if server_side:
151151
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 1, 'server_side': True}
152152
else:
153-
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 2}
153+
req_json = {'error_type': ErrorType.Timeout.value, 'timeout': 3}
154154

155155
test_env.update_request_json(req_json)
156156
statement = 'SELECT "Hello, data!" AS greeting'

0 commit comments

Comments
 (0)