|
32 | 32 | from httpx import Response as HttpCoreResponse |
33 | 33 |
|
34 | 34 | # TODO: errors? |
35 | | -from couchbase_analytics.common.errors import AnalyticsError, InternalSDKError |
| 35 | +from couchbase_analytics.common.errors import AnalyticsError, InternalSDKError, TimeoutError |
36 | 36 | from couchbase_analytics.common.core import (JsonStreamConfig, |
37 | 37 | ParsedResult, |
38 | 38 | ParsedResultType) |
@@ -69,7 +69,7 @@ def wrapped_fn(self: HttpStreamingResponse) -> None: |
69 | 69 | if self._request_context.request_error is not None: |
70 | 70 | raise self._request_context.request_error from None |
71 | 71 | if self._request_context.timed_out: |
72 | | - raise TimeoutError() from None |
| 72 | + raise TimeoutError(message='Request timeout.') from None |
73 | 73 | if self._request_context.cancelled: |
74 | 74 | raise CancelledError('Request was cancelled.') from None |
75 | 75 | raise InternalSDKError(ex) from None |
@@ -112,6 +112,17 @@ def _finish_processing_stream(self) -> None: |
112 | 112 | while not self._json_stream.token_stream_exhausted: |
113 | 113 | self._json_stream.continue_parsing() |
114 | 114 |
|
| 115 | + def _handle_iteration_abort(self) -> None: |
| 116 | + self.close() |
| 117 | + if self._request_context.cancelled: |
| 118 | + print('Request was cancelled, closing stream.') |
| 119 | + raise StopIteration |
| 120 | + elif self._request_context.timed_out: |
| 121 | + print('Request timed out, closing stream.') |
| 122 | + raise TimeoutError(message='Request timeout.') |
| 123 | + else: |
| 124 | + raise StopIteration |
| 125 | + |
115 | 126 | def _maybe_continue_to_process_stream(self) -> None: |
116 | 127 | if not self._request_context.has_stage_completed: |
117 | 128 | return |
@@ -190,17 +201,17 @@ def get_next_row(self) -> Any: |
190 | 201 | if not (hasattr(self, '_core_response') |
191 | 202 | and self._core_response is not None |
192 | 203 | and self._request_context.okay_to_iterate): |
193 | | - self.close() |
194 | | - raise StopIteration |
| 204 | + self._handle_iteration_abort() |
195 | 205 |
|
196 | 206 | self._maybe_continue_to_process_stream() |
| 207 | + check_state = False |
197 | 208 | while True: |
198 | | - if self._request_context.cancelled: |
199 | | - self.close() |
200 | | - raise StopIteration |
201 | | - # TODO: handle timeout |
| 209 | + if check_state and not self._request_context.okay_to_iterate: |
| 210 | + self._handle_iteration_abort() |
| 211 | + |
202 | 212 | raw_response = self._json_stream.get_result(self._stream_config.queue_timeout) |
203 | 213 | if raw_response is None: |
| 214 | + check_state = True |
204 | 215 | continue |
205 | 216 | if raw_response.result_type == ParsedResultType.ROW: |
206 | 217 | if raw_response.value is None: |
|
0 commit comments