33import json
44from asyncio import CancelledError , Task
55from types import TracebackType
6- from typing import TYPE_CHECKING , Any , Awaitable , Callable , Coroutine , Dict , List , Optional , Type , Union
6+ from typing import (TYPE_CHECKING ,
7+ Any ,
8+ Awaitable ,
9+ Callable ,
10+ Coroutine ,
11+ Dict ,
12+ List ,
13+ Optional ,
14+ Type ,
15+ Union )
716from uuid import uuid4
817
918import anyio
1019from httpx import Response as HttpCoreResponse
1120from httpx import TimeoutException
1221
13- from acouchbase_analytics .protocol ._core .anyio_utils import AsyncBackend , current_async_library , get_time
22+ from acouchbase_analytics .protocol ._core .anyio_utils import (AsyncBackend ,
23+ current_async_library ,
24+ get_time )
1425from acouchbase_analytics .protocol ._core .async_json_stream import AsyncJsonStream
1526from acouchbase_analytics .protocol ._core .net_utils import get_request_ip_async
16- from couchbase_analytics .common ._core import JsonStreamConfig , ParsedResult , ParsedResultType
27+ from couchbase_analytics .common ._core import (JsonStreamConfig ,
28+ ParsedResult ,
29+ ParsedResultType )
1730from couchbase_analytics .common ._core .error_context import ErrorContext
18- from couchbase_analytics .common .errors import AnalyticsError , InvalidCredentialError
31+ from couchbase_analytics .common .backoff_calculator import DefaultBackoffCalculator
32+ from couchbase_analytics .common .errors import AnalyticsError
1933from couchbase_analytics .common .streaming import StreamingState
2034from couchbase_analytics .protocol .connection import DEFAULT_TIMEOUTS
2135from couchbase_analytics .protocol .errors import ErrorMapper
@@ -37,6 +51,7 @@ def __init__(self,
3751 self ._client_adapter = client_adapter
3852 self ._request = request
3953 self ._backend = backend or current_async_library ()
54+ self ._backoff_calc = DefaultBackoffCalculator ()
4055 self ._error_ctx = ErrorContext (num_attempts = 0 ,
4156 method = request .method ,
4257 statement = request .get_request_statement ())
@@ -85,6 +100,10 @@ def request_error(self) -> Optional[Union[BaseException, Exception]]:
85100 def request_state (self ) -> StreamingState :
86101 return self ._request_state
87102
103+ @property
104+ def retry_limit_exceeded (self ) -> bool :
105+ return self .error_context .num_attempts > self ._request .max_retries
106+
88107 @property
89108 def results_or_errors_type (self ) -> ParsedResultType :
90109 return self ._json_stream .results_or_errors_type
@@ -137,10 +156,12 @@ def _maybe_set_request_error(self,
137156 self ._request_error = exc_val
138157
139158 async def _process_error (self ,
140- json_data : List [Dict [str , Any ]],
159+ json_data : Union [ str , List [Dict [str , Any ] ]],
141160 handle_context_shutdown : Optional [bool ]= False ) -> None :
142161 self ._request_state = StreamingState .Error
143- if not isinstance (json_data , list ):
162+ if isinstance (json_data , str ):
163+ self ._request_error = ErrorMapper .build_error_from_http_status_code (json_data , self ._error_ctx )
164+ elif not isinstance (json_data , list ):
144165 self ._request_error = AnalyticsError ('Cannot parse error response; expected JSON array' ,
145166 context = str (self ._error_ctx ))
146167 else :
@@ -154,7 +175,6 @@ def _reset_stream(self) -> None:
154175 if hasattr (self , '_json_stream' ):
155176 del self ._json_stream
156177 self ._request_state = StreamingState .ResetAndNotStarted
157- self ._request .previous_ips = set ()
158178 self ._stage_completed = None
159179 self ._cancel_scope_deadline_updated = False
160180
@@ -197,6 +217,9 @@ async def _wait_for_stage_to_complete(self) -> None:
197217 return
198218 await self ._stage_completed .wait ()
199219
220+ def calculate_backoff (self ) -> float :
221+ return self ._backoff_calc .calculate_backoff (self ._error_ctx .num_attempts ) / 1000
222+
200223 def cancel_request (self ,
201224 fn : Optional [Callable [..., Awaitable [Any ]]]= None ,
202225 * args : object ) -> None :
@@ -273,6 +296,9 @@ def okay_to_delay_and_retry(self, delay: float) -> bool:
273296 if will_time_out :
274297 self ._request_state = StreamingState .Timeout
275298 return False
299+ elif self .retry_limit_exceeded :
300+ self ._request_state = StreamingState .Error
301+ return False
276302 else :
277303 self ._reset_stream ()
278304 return True
@@ -296,10 +322,15 @@ async def process_response(self,
296322 # we have all the data, close the core response/stream
297323 await close_handler ()
298324
299- json_response = json .loads (raw_response .value )
300- if 'errors' in json_response :
301- await self ._process_error (json_response ['errors' ], handle_context_shutdown = handle_context_shutdown )
302- return json_response
325+ try :
326+ json_response = json .loads (raw_response .value )
327+ except json .JSONDecodeError :
328+ await self ._process_error (str (raw_response .value ),
329+ handle_context_shutdown = handle_context_shutdown )
330+ else :
331+ if 'errors' in json_response :
332+ await self ._process_error (json_response ['errors' ], handle_context_shutdown = handle_context_shutdown )
333+ return json_response
303334
304335 async def reraise_after_shutdown (self , err : Exception ) -> None :
305336 try :
@@ -309,24 +340,17 @@ async def reraise_after_shutdown(self, err: Exception) -> None:
309340 raise ex from None
310341
311342 async def send_request (self , enable_trace_handling : Optional [bool ]= False ) -> HttpCoreResponse :
312- ip = await get_request_ip_async (self ._request .url .host , self ._request .url .port , self ._request .previous_ips )
313- if ip is None :
314- attempted_ips = ', ' .join (self ._request .previous_ips or [])
315- raise AnalyticsError (message = f'Connect failure. Unable to connect to any resolved IPs: { attempted_ips } .' ,
316- context = str (self ._error_ctx ))
317-
343+ self ._error_ctx .update_num_attempts ()
344+ ip = await get_request_ip_async (self ._request .url .host , self ._request .url .port )
318345 if enable_trace_handling is True :
319346 (self ._request .update_url (ip , self ._client_adapter .analytics_path )
320- .add_trace_to_extensions (self ._trace_handler )
321- .update_previous_ips (ip ))
347+ .add_trace_to_extensions (self ._trace_handler ))
322348 else :
323- self ._request .update_url (ip , self ._client_adapter .analytics_path ). update_previous_ips ( ip )
349+ self ._request .update_url (ip , self ._client_adapter .analytics_path )
324350 # TODO: add logging; provide request details (to/from, deadlines, etc.)
325351 self ._error_ctx .update_request_context (self ._request )
326352 response = await self ._client_adapter .send_request (self ._request )
327353 self ._error_ctx .update_response_context (response )
328- if response .status_code == 401 :
329- raise InvalidCredentialError (context = str (self ._error_ctx ))
330354 return response
331355
332356 async def shutdown (self ,
0 commit comments