Skip to content

Commit df4ee1a

Browse files
committed
Updates to move baseline SDK to internal preview ready.
Changes ======= * Introduce request_context to handle bulk of logic for a request (state, retries, etc.) * Increase test coverage * Apply formatting/lint via ruff * Apply static type checking via mypy * Cleanup and reorganize code structure
1 parent 976dc10 commit df4ee1a

97 files changed

Lines changed: 3018 additions & 1480 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ include couchbase-sdk-analytics-python-black-duck-manifest.yaml
33
include couchbase_analytics/common/core/_nonprod_certificates/*.pem
44
include couchbase_analytics/common/core/_capella_certificates/*.pem
55
recursive-include couchbase_analytics *.py
6+
exclude couchbase_analytics/tests/*.py
67
recursive-include acouchbase_analytics *.py
8+
exclude acouchbase_analytics/tests/*.py
79
global-exclude *.py[cod] *.DS_Store
810
exclude .git .gitignore .gitmodules gocaves* *.jar .clang* .cmake* .pre* .flake* MANIFEST.in

acouchbase_analytics/cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from __future__ import annotations
1717

1818
import sys
19-
from typing import Awaitable, TYPE_CHECKING, Optional
19+
from typing import TYPE_CHECKING, Awaitable, Optional
2020

2121
if sys.version_info < (3, 10):
2222
from typing_extensions import TypeAlias

acouchbase_analytics/cluster.pyi

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ else:
2323

2424
from acouchbase_analytics.database import AsyncDatabase
2525
from couchbase_analytics.credential import Credential
26-
from couchbase_analytics.options import (ClusterOptions,
27-
ClusterOptionsKwargs,
28-
QueryOptions,
29-
QueryOptionsKwargs)
26+
from couchbase_analytics.options import ClusterOptions, ClusterOptionsKwargs, QueryOptions, QueryOptionsKwargs
3027
from couchbase_analytics.result import AsyncQueryResult
3128

3229
class AsyncCluster:
File renamed without changes.

acouchbase_analytics/protocol/core/_anyio_utils.py renamed to acouchbase_analytics/protocol/_core/anyio_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55

66
import anyio
77

8+
89
def get_time() -> float:
910
"""
1011
Get the current time in seconds since the epoch.
1112
"""
1213
return anyio.current_time()
1314

15+
async def sleep(delay: float) -> None:
16+
await anyio.sleep(delay)
17+
1418
class AsyncBackend:
1519
def __init__(self, backend_lib: str) -> None:
1620
"""

couchbase_analytics/common/core/async_json_stream.py renamed to acouchbase_analytics/protocol/_core/async_json_stream.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,21 @@
1515

1616
from __future__ import annotations
1717

18-
from typing import (AsyncIterator,
19-
Optional)
18+
from typing import AsyncIterator, Optional
2019

2120
import ijson
22-
from anyio import (create_memory_object_stream,
23-
Event,
24-
EndOfStream)
25-
26-
27-
from couchbase_analytics.common.core.async_json_token_parser import AsyncJsonTokenParser
28-
from couchbase_analytics.common.core.json_parsing import (JsonParsingError,
29-
JsonStreamConfig,
30-
ParsedResult,
31-
ParsedResultType)
21+
from anyio import EndOfStream, Event, create_memory_object_stream
22+
23+
from acouchbase_analytics.protocol._core.async_json_token_parser import AsyncJsonTokenParser
24+
from couchbase_analytics.common._core.json_parsing import (
25+
JsonParsingError,
26+
JsonStreamConfig,
27+
ParsedResult,
28+
ParsedResultType,
29+
)
3230
from couchbase_analytics.common.errors import AnalyticsError
3331

32+
3433
class AsyncJsonStream:
3534
def __init__(self,
3635
http_stream_iter: AsyncIterator[bytes],
@@ -46,14 +45,14 @@ def __init__(self,
4645
self._http_stream_exhausted = False
4746

4847
# results handling
49-
self._send_stream, self._receive_stream = create_memory_object_stream[ParsedResult](max_buffer_size=stream_config.buffered_row_max)
48+
self._send_stream, self._receive_stream = create_memory_object_stream[ParsedResult](max_buffer_size=stream_config.buffered_row_max) # noqa: E501
5049
self._json_stream_parser = None
5150
self._buffer_entire_result = stream_config.buffer_entire_result
5251
handler = None if self._buffer_entire_result is True else self._handle_json_result
5352
self._json_token_parser = AsyncJsonTokenParser(handler)
5453
self._token_stream_exhausted = False
5554
self._has_results_or_errors_evt = Event()
56-
self._has_results_or_errors_type = ParsedResultType.UNKNOWN
55+
self._results_or_errors_type = ParsedResultType.UNKNOWN
5756

5857
@property
5958
def has_results_or_errors(self) -> Event:
@@ -63,11 +62,11 @@ def has_results_or_errors(self) -> Event:
6362
return self._has_results_or_errors_evt
6463

6564
@property
66-
def has_results_or_errors_type(self) -> ParsedResultType:
65+
def results_or_errors_type(self) -> ParsedResultType:
6766
"""
6867
**INTERNAL**
6968
"""
70-
return self._has_results_or_errors_type
69+
return self._results_or_errors_type
7170

7271
@property
7372
def token_stream_exhausted(self) -> bool:
@@ -111,11 +110,11 @@ def _handle_notification(self, result_type: Optional[ParsedResultType]=None) ->
111110
return
112111

113112
if result_type is None:
114-
self._has_results_or_errors_type = ParsedResultType.END
113+
self._results_or_errors_type = ParsedResultType.END
115114
self._has_results_or_errors_evt.set()
116115
return
117116

118-
self._has_results_or_errors_type = result_type
117+
self._results_or_errors_type = result_type
119118
self._has_results_or_errors_evt.set()
120119

121120
async def _process_token_stream(self) -> None:
@@ -131,7 +130,7 @@ async def _process_token_stream(self) -> None:
131130
# this is a hack b/c the ijson.parse_async iterator does not yield to the event loop
132131
# TODO: create PYCO to either build custom JSON parsing, or dig into ijson root cause
133132
await self._json_token_parser.parse_token(event, value)
134-
except StopAsyncIteration as ex:
133+
except StopAsyncIteration:
135134
self._token_stream_exhausted = True
136135
except ijson.common.IncompleteJSONError as ex:
137136
raise JsonParsingError(cause=ex) from None

couchbase_analytics/common/core/async_json_token_parser.py renamed to acouchbase_analytics/protocol/_core/async_json_token_parser.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,17 @@
1515

1616
from __future__ import annotations
1717

18-
from typing import (Any,
19-
Callable,
20-
Coroutine,
21-
List,
22-
Optional)
18+
from typing import Any, Callable, Coroutine, List, Optional
19+
20+
from couchbase_analytics.common._core.json_token_parser_base import (
21+
POP_EVENTS,
22+
START_EVENTS,
23+
VALUE_TOKENS,
24+
JsonTokenParserBase,
25+
ParsingState,
26+
TokenType,
27+
)
2328

24-
from couchbase_analytics.common.core.json_token_parser_base import (JsonTokenParserBase,
25-
ParsingState,
26-
TokenType,
27-
POP_EVENTS,
28-
START_EVENTS,
29-
VALUE_TOKENS,)
3029

3130
class AsyncJsonTokenParser(JsonTokenParserBase):
3231
def __init__(self,
@@ -37,7 +36,7 @@ def __init__(self,
3736
async def _handle_obj_emit(self, obj: str) -> bool:
3837
if (self._emit_results_enabled
3938
and self._results_handler is not None
40-
and self._state == ParsingState.PROCESSING_RESULTS):
39+
and ParsingState.okay_to_emit(self._state, self._previous_state)):
4140
await self._results_handler(bytes(obj, 'utf-8'))
4241
return True
4342
return False
@@ -54,13 +53,16 @@ async def _handle_pop_event(self, token_type: TokenType) -> None:
5453
obj = f'[{",".join(reversed(obj_pairs))}]'
5554
else:
5655
obj = f'{{{",".join(reversed(obj_pairs))}}}'
57-
object_emitted = await self._handle_obj_emit(obj)
58-
if should_emit and object_emitted:
59-
break # this means we emiited the result/error, so stop processing the stack
56+
57+
if should_emit:
58+
object_emitted = await self._handle_obj_emit(obj)
59+
if object_emitted:
60+
break # this means we emiited the result/error, so stop processing the stack
6061

6162
if len(self._stack) > 0 and self._stack[-1].type == TokenType.MAP_KEY:
6263
map_key = self._pop()
63-
# If we are emitting rows and/or errors, we don't keep them in the stack and therefore don't need to return the results
64+
# If we are emitting rows and/or errors,
65+
# we don't keep them in the stack and therefore don't need to return the results
6466
if self._should_push_pair(next_token):
6567
self._push(TokenType.PAIR, f'{map_key.value}:{obj}')
6668
else:
@@ -75,7 +77,9 @@ def get_result(self) -> Optional[bytes]:
7577
async def parse_token(self, token: str, value: str) -> None:
7678
token_type = TokenType.from_str(token)
7779
if token_type in VALUE_TOKENS:
78-
self._handle_value_token(token_type, value)
80+
val = self._handle_value_token(token_type, value)
81+
if val is not None:
82+
await self._handle_obj_emit(val)
7983
elif token_type == TokenType.MAP_KEY:
8084
self._handle_map_key_token(value)
8185
elif token_type in START_EVENTS:

acouchbase_analytics/protocol/core/client_adapter.py renamed to acouchbase_analytics/protocol/_core/client_adapter.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,18 @@
1616
from __future__ import annotations
1717

1818
import socket
19-
20-
from typing import Optional, TYPE_CHECKING
19+
from typing import TYPE_CHECKING, Optional
2120
from uuid import uuid4
2221

23-
from httpx import BasicAuth, AsyncClient, Response
22+
from httpx import URL, AsyncClient, BasicAuth, Response
2423

2524
from couchbase_analytics.common.credential import Credential
2625
from couchbase_analytics.common.deserializer import Deserializer
2726
from couchbase_analytics.protocol.connection import _ConnectionDetails
2827
from couchbase_analytics.protocol.options import OptionsBuilder
2928

3029
if TYPE_CHECKING:
31-
from couchbase_analytics.protocol.core.request import QueryRequest
30+
from couchbase_analytics.protocol._core.request import QueryRequest
3231

3332

3433
class _AsyncClientAdapter:
@@ -139,17 +138,22 @@ async def send_request(self, request: QueryRequest) -> Response:
139138
if not hasattr(self, '_client'):
140139
raise RuntimeError('Client not created yet')
141140

142-
if request.url is None:
143-
raise ValueError('Request URL cannot be None')
141+
# if request.url is None:
142+
# raise ValueError('Request URL cannot be None')
144143

144+
url = URL(scheme=request.url.scheme,
145+
host=request.url.host,
146+
port=request.url.port,
147+
path=request.url.path,)
145148
req = self._client.build_request(request.method,
146-
request.url,
149+
url,
147150
json=request.body,
148151
extensions=request.extensions)
149152
try:
150153
return await self._client.send(req, stream=True)
151154
except socket.gaierror as err:
152-
raise RuntimeError(f'Unable to connect to {self._conn_details.get_scheme_host_and_port()}') from err
155+
req_url = self._conn_details.url.get_formatted_url()
156+
raise RuntimeError(f'Unable to connect to {req_url}') from err
153157

154158
def reset_client(self) -> None:
155159
"""
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
import socket
19+
from ipaddress import IPv4Address, IPv6Address, ip_address
20+
from random import choice
21+
from typing import Optional, Set, Union
22+
23+
import anyio
24+
25+
from acouchbase_analytics.protocol.errors import ErrorMapper
26+
27+
28+
@ErrorMapper.handle_socket_error_async
29+
async def get_request_ip_async(host: str,
30+
port: int,
31+
previous_ips: Optional[Set[str]]=None) -> Optional[str]:
32+
# Lets not call getaddrinfo, if the host is already an IP address
33+
try:
34+
ip: Optional[Union[IPv4Address, IPv6Address, str]] = ip_address(host)
35+
except ValueError:
36+
ip = None
37+
38+
# if we have localhost, httpx does not seem to be able to resolve IPv6 localhost (::1) properly
39+
# TODO: IPv6 support for localhost??
40+
if host == 'localhost':
41+
ip = '127.0.0.1'
42+
43+
if previous_ips is None:
44+
previous_ips = set()
45+
46+
if not ip:
47+
result = await anyio.getaddrinfo(host, port, type=socket.SOCK_STREAM, family=socket.AF_UNSPEC)
48+
try:
49+
res_ip = choice([addr[4][0] for addr in result if addr[4][0] not in previous_ips])
50+
ip = str(res_ip)
51+
except IndexError:
52+
ip = None
53+
else:
54+
ip_str = str(ip) if not isinstance(ip, str) else ip
55+
ip = None if ip_str in previous_ips else ip_str
56+
57+
return ip

0 commit comments

Comments
 (0)