Skip to content

Commit 0dde192

Browse files
committed
rename StreamingState to RequestState
1 parent 3f34bb0 commit 0dde192

8 files changed

Lines changed: 130 additions & 129 deletions

File tree

acouchbase_analytics/protocol/_core/request_context.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from couchbase_analytics.common._core.error_context import ErrorContext
1818
from couchbase_analytics.common.backoff_calculator import DefaultBackoffCalculator
1919
from couchbase_analytics.common.errors import AnalyticsError
20-
from couchbase_analytics.common.streaming import StreamingState
20+
from couchbase_analytics.common.request import RequestState
2121
from couchbase_analytics.protocol.connection import DEFAULT_TIMEOUTS
2222
from couchbase_analytics.protocol.errors import ErrorMapper
2323

@@ -43,7 +43,7 @@ def __init__(
4343
self._backend = backend or current_async_library()
4444
self._backoff_calc = DefaultBackoffCalculator()
4545
self._error_ctx = ErrorContext(num_attempts=0, method=request.method, statement=request.get_request_statement())
46-
self._request_state = StreamingState.NotStarted
46+
self._request_state = RequestState.NotStarted
4747
self._stream_config = stream_config or JsonStreamConfig()
4848
self._json_stream: AsyncJsonStream
4949
self._stage_completed: Optional[anyio.Event] = None
@@ -56,7 +56,7 @@ def __init__(
5656
@property
5757
def cancelled(self) -> bool:
5858
self._check_cancelled_or_timed_out()
59-
return self._request_state in [StreamingState.Cancelled, StreamingState.AsyncCancelledPriorToTimeout]
59+
return self._request_state in [RequestState.Cancelled, RequestState.AsyncCancelledPriorToTimeout]
6060

6161
@property
6262
def error_context(self) -> ErrorContext:
@@ -73,19 +73,19 @@ def is_shutdown(self) -> bool:
7373
@property
7474
def okay_to_iterate(self) -> bool:
7575
self._check_cancelled_or_timed_out()
76-
return StreamingState.okay_to_iterate(self._request_state)
76+
return RequestState.okay_to_iterate(self._request_state)
7777

7878
@property
7979
def okay_to_stream(self) -> bool:
8080
self._check_cancelled_or_timed_out()
81-
return StreamingState.okay_to_stream(self._request_state)
81+
return RequestState.okay_to_stream(self._request_state)
8282

8383
@property
8484
def request_error(self) -> Optional[Union[BaseException, Exception]]:
8585
return self._request_error
8686

8787
@property
88-
def request_state(self) -> StreamingState:
88+
def request_state(self) -> RequestState:
8989
return self._request_state
9090

9191
@property
@@ -99,10 +99,10 @@ def results_or_errors_type(self) -> ParsedResultType:
9999
@property
100100
def timed_out(self) -> bool:
101101
self._check_cancelled_or_timed_out()
102-
return self._request_state == StreamingState.Timeout
102+
return self._request_state == RequestState.Timeout
103103

104104
def _check_cancelled_or_timed_out(self) -> None:
105-
if self._request_state in [StreamingState.Timeout, StreamingState.Cancelled, StreamingState.Error]:
105+
if self._request_state in [RequestState.Timeout, RequestState.Cancelled, RequestState.Error]:
106106
return
107107

108108
if hasattr(self, '_request_deadline') is False:
@@ -115,10 +115,10 @@ def _check_cancelled_or_timed_out(self) -> None:
115115
timed_out = current_time >= self._request_deadline
116116

117117
if timed_out:
118-
if self._request_state == StreamingState.Cancelled:
119-
self._request_state = StreamingState.AsyncCancelledPriorToTimeout
118+
if self._request_state == RequestState.Cancelled:
119+
self._request_state = RequestState.AsyncCancelledPriorToTimeout
120120
else:
121-
self._request_state = StreamingState.Timeout
121+
self._request_state = RequestState.Timeout
122122

123123
async def _execute(self, fn: Callable[..., Awaitable[Any]], *args: object) -> None:
124124
await fn(*args)
@@ -131,22 +131,22 @@ def _maybe_set_request_error(
131131
self._check_cancelled_or_timed_out()
132132
if exc_val is None:
133133
return
134-
if not StreamingState.is_timeout_or_cancelled(self._request_state):
134+
if not RequestState.is_timeout_or_cancelled(self._request_state):
135135
# This handles httpx timeouts
136136
if exc_type is not None and issubclass(exc_type, TimeoutException):
137-
self._request_state = StreamingState.Timeout
137+
self._request_state = RequestState.Timeout
138138
elif issubclass(type(exc_val), TimeoutException):
139-
self._request_state = StreamingState.Timeout
139+
self._request_state = RequestState.Timeout
140140
elif isinstance(exc_val, CancelledError):
141-
self._request_state = StreamingState.Cancelled
141+
self._request_state = RequestState.Cancelled
142142
else:
143-
self._request_state = StreamingState.Error
143+
self._request_state = RequestState.Error
144144
self._request_error = exc_val
145145

146146
async def _process_error(
147147
self, json_data: Union[str, List[Dict[str, Any]]], handle_context_shutdown: Optional[bool] = False
148148
) -> None:
149-
self._request_state = StreamingState.Error
149+
self._request_state = RequestState.Error
150150
if isinstance(json_data, str):
151151
self._request_error = ErrorMapper.build_error_from_http_status_code(json_data, self._error_ctx)
152152
elif not isinstance(json_data, list):
@@ -163,7 +163,7 @@ async def _process_error(
163163
def _reset_stream(self) -> None:
164164
if hasattr(self, '_json_stream'):
165165
del self._json_stream
166-
self._request_state = StreamingState.ResetAndNotStarted
166+
self._request_state = RequestState.ResetAndNotStarted
167167
self._stage_completed = None
168168
self._cancel_scope_deadline_updated = False
169169

@@ -211,10 +211,10 @@ def calculate_backoff(self) -> float:
211211
def cancel_request(self, fn: Optional[Callable[..., Awaitable[Any]]] = None, *args: object) -> None:
212212
if fn is not None:
213213
self._taskgroup.start_soon(fn, *args)
214-
if self._request_state == StreamingState.Timeout:
214+
if self._request_state == RequestState.Timeout:
215215
return
216216
self._taskgroup.cancel_scope.cancel()
217-
self._request_state = StreamingState.Cancelled
217+
self._request_state = RequestState.Cancelled
218218

219219
def create_response_task(self, fn: Callable[..., Coroutine[Any, Any, Any]], *args: object) -> Task[Any]:
220220
if self._backend is None or self._backend.backend_lib != 'asyncio':
@@ -246,12 +246,12 @@ async def get_result_from_stream(self) -> ParsedResult:
246246

247247
async def initialize(self) -> None:
248248
# TODO: Add useful logging messages
249-
if self._request_state == StreamingState.ResetAndNotStarted:
249+
if self._request_state == RequestState.ResetAndNotStarted:
250250
self._update_cancel_scope_deadline(self._connect_deadline, is_absolute=True)
251251
# print('Skipping initialization as request is a retry')
252252
return
253253
await self.__aenter__()
254-
self._request_state = StreamingState.Started
254+
self._request_state = RequestState.Started
255255
# we set the request timeout once the context is initialized in order to create the deadline
256256
# closer to when the upstream logic will begin to use the request context
257257
timeouts = self._request.get_request_timeouts() or {}
@@ -272,18 +272,18 @@ def maybe_continue_to_process_stream(self) -> None:
272272
def okay_to_delay_and_retry(self, delay: float) -> bool:
273273
# TODO: Add useful logging messages
274274
self._check_cancelled_or_timed_out()
275-
if self._request_state in [StreamingState.Timeout, StreamingState.Cancelled]:
275+
if self._request_state in [RequestState.Timeout, RequestState.Cancelled]:
276276
return False
277277

278278
current_time = get_time()
279279
delay_time = current_time + delay
280280
will_time_out = self._request_deadline < delay_time
281281
# print(f'{current_time=}; {delay_time=}; req_deadline={self._request_deadline}; {will_time_out=}')
282282
if will_time_out:
283-
self._request_state = StreamingState.Timeout
283+
self._request_state = RequestState.Timeout
284284
return False
285285
elif self.retry_limit_exceeded:
286-
self._request_state = StreamingState.Error
286+
self._request_state = RequestState.Error
287287
return False
288288
else:
289289
self._reset_stream()
@@ -358,8 +358,8 @@ async def shutdown(
358358
else:
359359
self._maybe_set_request_error(exc_type, exc_val)
360360

361-
if StreamingState.is_okay(self._request_state):
362-
self._request_state = StreamingState.Completed
361+
if RequestState.is_okay(self._request_state):
362+
self._request_state = RequestState.Completed
363363
self._shutdown = True
364364

365365
def start_stream(self, core_response: HttpCoreResponse) -> None:
@@ -374,7 +374,7 @@ async def wait_for_results_or_errors(self) -> None:
374374
await self._json_stream.has_results_or_errors.wait()
375375
if self._json_stream.results_or_errors_type == ParsedResultType.ROW:
376376
# we move to iterating rows
377-
self._request_state = StreamingState.StreamingResults
377+
self._request_state = RequestState.StreamingResults
378378

379379
async def __aenter__(self) -> AsyncRequestContext:
380380
self._taskgroup = anyio.create_task_group()

acouchbase_analytics/protocol/_core/retries.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from acouchbase_analytics.protocol._core.anyio_utils import sleep
2525
from couchbase_analytics.common.errors import AnalyticsError, InternalSDKError, TimeoutError
26-
from couchbase_analytics.common.streaming import StreamingState
26+
from couchbase_analytics.common.request import RequestState
2727
from couchbase_analytics.protocol.errors import WrappedError
2828

2929
if TYPE_CHECKING:
@@ -122,7 +122,7 @@ async def wrapped_fn(self: AsyncHttpStreamingResponse) -> None: # noqa: C901
122122
cause=ex, message=str(ex), context=str(self._request_context.error_context)
123123
) from None
124124
finally:
125-
if not StreamingState.is_okay(self._request_context.request_state):
125+
if not RequestState.is_okay(self._request_context.request_state):
126126
await self.close()
127127

128128
return wrapped_fn

acouchbase_analytics/tests/query_integration_t.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from acouchbase_analytics.errors import QueryError, TimeoutError
2727
from acouchbase_analytics.options import QueryOptions
2828
from acouchbase_analytics.result import AsyncQueryResult
29-
from couchbase_analytics.common.streaming import StreamingState
29+
from couchbase_analytics.common.request import RequestState
3030
from tests import AsyncYieldFixture
3131

3232
if TYPE_CHECKING:
@@ -99,7 +99,7 @@ async def test_query_cancel_async_while_iterating(
9999
assert isinstance(qtask, Task)
100100
res = await qtask
101101
assert isinstance(res, AsyncQueryResult)
102-
expected_state = StreamingState.StreamingResults
102+
expected_state = RequestState.StreamingResults
103103
assert res._http_response._request_context.request_state == expected_state
104104
rows = []
105105
count = 0
@@ -111,7 +111,7 @@ async def test_query_cancel_async_while_iterating(
111111
count += 1
112112

113113
assert len(rows) == count
114-
expected_state = StreamingState.Cancelled
114+
expected_state = RequestState.Cancelled
115115
assert res._http_response._request_context.request_state == expected_state
116116
with pytest.raises(RuntimeError):
117117
res.metadata()
@@ -124,7 +124,7 @@ async def test_query_cancel_while_iterating(
124124
assert isinstance(qtask, Task)
125125
res = await qtask
126126
assert isinstance(res, AsyncQueryResult)
127-
expected_state = StreamingState.StreamingResults
127+
expected_state = RequestState.StreamingResults
128128
assert res._http_response._request_context.request_state == expected_state
129129
rows = []
130130
count = 0
@@ -136,7 +136,7 @@ async def test_query_cancel_while_iterating(
136136
count += 1
137137

138138
assert len(rows) == count
139-
expected_state = StreamingState.Cancelled
139+
expected_state = RequestState.Cancelled
140140
assert res._http_response._request_context.request_state == expected_state
141141
with pytest.raises(RuntimeError):
142142
res.metadata()

couchbase_analytics/common/request.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,60 @@
1616
from __future__ import annotations
1717

1818
from dataclasses import dataclass
19+
from enum import IntEnum
1920
from typing import Dict, Optional
2021

2122

23+
class RequestState(IntEnum):
24+
"""
25+
**INTERNAL
26+
"""
27+
28+
NotStarted = 0
29+
ResetAndNotStarted = 1
30+
Started = 2
31+
Cancelled = 3
32+
Completed = 4
33+
StreamingResults = 5
34+
Error = 6
35+
Timeout = 7
36+
AsyncCancelledPriorToTimeout = 8
37+
SyncCancelledPriorToTimeout = 9
38+
39+
@staticmethod
40+
def okay_to_stream(state: RequestState) -> bool:
41+
"""
42+
**INTERNAL
43+
"""
44+
return state in [RequestState.NotStarted, RequestState.ResetAndNotStarted]
45+
46+
@staticmethod
47+
def okay_to_iterate(state: RequestState) -> bool:
48+
"""
49+
**INTERNAL
50+
"""
51+
return state == RequestState.StreamingResults
52+
53+
@staticmethod
54+
def is_okay(state: RequestState) -> bool:
55+
"""
56+
**INTERNAL
57+
"""
58+
return state not in [RequestState.Cancelled, RequestState.Error, RequestState.Timeout]
59+
60+
@staticmethod
61+
def is_timeout_or_cancelled(state: RequestState) -> bool:
62+
"""
63+
**INTERNAL
64+
"""
65+
return state in [
66+
RequestState.Cancelled,
67+
RequestState.Timeout,
68+
RequestState.AsyncCancelledPriorToTimeout,
69+
RequestState.SyncCancelledPriorToTimeout,
70+
]
71+
72+
2273
@dataclass
2374
class RequestURL:
2475
scheme: str

couchbase_analytics/common/streaming.py

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,56 +27,6 @@
2727
from couchbase_analytics.protocol.streaming import HttpStreamingResponse
2828

2929

30-
class StreamingState(IntEnum):
31-
"""
32-
**INTERNAL
33-
"""
34-
35-
NotStarted = 0
36-
ResetAndNotStarted = 1
37-
Started = 2
38-
Cancelled = 3
39-
Completed = 4
40-
StreamingResults = 5
41-
Error = 6
42-
Timeout = 7
43-
AsyncCancelledPriorToTimeout = 8
44-
SyncCancelledPriorToTimeout = 9
45-
46-
@staticmethod
47-
def okay_to_stream(state: StreamingState) -> bool:
48-
"""
49-
**INTERNAL
50-
"""
51-
return state in [StreamingState.NotStarted, StreamingState.ResetAndNotStarted]
52-
53-
@staticmethod
54-
def okay_to_iterate(state: StreamingState) -> bool:
55-
"""
56-
**INTERNAL
57-
"""
58-
return state == StreamingState.StreamingResults
59-
60-
@staticmethod
61-
def is_okay(state: StreamingState) -> bool:
62-
"""
63-
**INTERNAL
64-
"""
65-
return state not in [StreamingState.Cancelled, StreamingState.Error, StreamingState.Timeout]
66-
67-
@staticmethod
68-
def is_timeout_or_cancelled(state: StreamingState) -> bool:
69-
"""
70-
**INTERNAL
71-
"""
72-
return state in [
73-
StreamingState.Cancelled,
74-
StreamingState.Timeout,
75-
StreamingState.AsyncCancelledPriorToTimeout,
76-
StreamingState.SyncCancelledPriorToTimeout,
77-
]
78-
79-
8030
class BlockingIterator(Iterator[Any]):
8131
"""
8232
**INTERNAL

0 commit comments

Comments
 (0)