Skip to content

Commit bc0d57f

Browse files
committed
PYCO-62: Async API connect timeout results in CancelledError
Changes ======= * Allow httpx to handle timeouts instead of using anyio `CancelScope` deadline * Add tests to confirm functionality * Add logging helpful logging messages
1 parent 9b99326 commit bc0d57f

10 files changed

Lines changed: 200 additions & 25 deletions

File tree

.gitignore

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,7 @@ cython_debug/
161161

162162
# Distribution / packaging
163163
build/
164-
couchbase_columnar/_version.py
165-
couchbase_columnar/*.so
166-
couchbase_columnar/*.dylib*.*
167-
couchbase_columnar/*.dll
168-
couchbase_columnar/*.pyd
169-
deps/couchbase-cxx-cache/
164+
couchbase_analytics/_version.py
170165

171166
# Sphinx
172167
docs/_build/

acouchbase_analytics/protocol/_core/request_context.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,7 @@ def _check_timed_out(self) -> None:
112112
return
113113

114114
current_time = get_time()
115-
if self._cancel_scope_deadline_updated is False:
116-
timed_out = current_time >= self._connect_deadline
117-
else:
118-
timed_out = current_time >= self._request_deadline
119-
115+
timed_out = current_time >= self._request_deadline
120116
if timed_out:
121117
message_data = {'current_time': f'{current_time}', 'request_deadline': f'{self._request_deadline}'}
122118
self.log_message('Request has timed out', LogLevel.DEBUG, message_data=message_data)
@@ -262,11 +258,11 @@ async def get_result_from_stream(self) -> ParsedResult:
262258

263259
async def initialize(self) -> None:
264260
if self._request_state == RequestState.ResetAndNotStarted:
265-
self._update_cancel_scope_deadline(self._connect_deadline, is_absolute=True)
261+
current_time = get_time()
266262
self.log_message(
267263
'Request is a retry, skipping initialization',
268264
LogLevel.DEBUG,
269-
message_data={'request_deadline': f'{self._request_deadline}'},
265+
message_data={'current_time': f'{current_time}', 'request_deadline': f'{self._request_deadline}'},
270266
)
271267
return
272268
await self.__aenter__()
@@ -276,7 +272,6 @@ async def initialize(self) -> None:
276272
timeouts = self._request.get_request_timeouts() or {}
277273
current_time = get_time()
278274
self._request_deadline = current_time + (timeouts.get('read', None) or DEFAULT_TIMEOUTS['query_timeout'])
279-
self._update_cancel_scope_deadline(self._connect_deadline, is_absolute=True)
280275
message_data = {'current_time': f'{current_time}', 'request_deadline': f'{self._request_deadline}'}
281276
self.log_message('Request context initialized', LogLevel.DEBUG, message_data=message_data)
282277

@@ -437,6 +432,8 @@ async def wait_for_results_or_errors(self) -> None:
437432

438433
async def __aenter__(self) -> AsyncRequestContext:
439434
self._taskgroup = anyio.create_task_group()
435+
message_data = {'cancel_scope': f'{id(self._taskgroup.cancel_scope):x}'}
436+
self.log_message('Task group created', LogLevel.DEBUG, message_data=message_data)
440437
await self._taskgroup.__aenter__()
441438
return self
442439

acouchbase_analytics/protocol/streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def send_request(self) -> None:
159159

160160
# start cancel scope
161161
await self._request_context.initialize()
162-
self._core_response = await self._request_context.send_request(enable_trace_handling=True)
162+
self._core_response = await self._request_context.send_request()
163163
self._request_context.start_stream(self._core_response)
164164
# block until we either know we have rows or we have an error
165165
await self._request_context.wait_for_results_or_errors()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
from datetime import timedelta
19+
from typing import TYPE_CHECKING
20+
21+
import pytest
22+
23+
from acouchbase_analytics.cluster import AsyncCluster
24+
from acouchbase_analytics.credential import Credential
25+
from acouchbase_analytics.errors import AnalyticsError, TimeoutError
26+
from acouchbase_analytics.options import QueryOptions
27+
from tests import AsyncYieldFixture
28+
29+
if TYPE_CHECKING:
30+
from tests.environments.base_environment import AsyncTestEnvironment
31+
32+
33+
class ConnectTestSuite:
34+
TEST_MANIFEST = [
35+
'test_connect_timeout_max_retry_limit',
36+
'test_connect_timeout_query_timeout',
37+
]
38+
39+
async def test_connect_timeout_max_retry_limit(self, test_env: AsyncTestEnvironment) -> None:
40+
statement = 'SELECT sleep("some value", 10000) AS some_field;'
41+
42+
username, pw = test_env.config.get_username_and_pw()
43+
cred = Credential.from_username_and_password(username, pw)
44+
# ignoring the port enables the failure
45+
connstr = test_env.config.get_connection_string(ignore_port=True)
46+
cluster = AsyncCluster.create_instance(connstr, cred)
47+
48+
allowed_retries = 5
49+
q_opts = QueryOptions(max_retries=allowed_retries, timeout=timedelta(seconds=10))
50+
with pytest.raises(AnalyticsError) as ex:
51+
await cluster.execute_query(statement, q_opts)
52+
53+
assert ex.value._message is not None
54+
assert 'Retry limit exceeded' in ex.value._message
55+
test_env.assert_error_context_num_attempts(allowed_retries + 1, ex.value._context)
56+
57+
async def test_connect_timeout_query_timeout(self, test_env: AsyncTestEnvironment) -> None:
58+
statement = 'SELECT sleep("some value", 10000) AS some_field;'
59+
60+
username, pw = test_env.config.get_username_and_pw()
61+
cred = Credential.from_username_and_password(username, pw)
62+
# ignoring the port enables the failure
63+
connstr = test_env.config.get_connection_string(ignore_port=True)
64+
cluster = AsyncCluster.create_instance(connstr, cred)
65+
66+
q_opts = QueryOptions(timeout=timedelta(seconds=3))
67+
with pytest.raises(TimeoutError) as ex:
68+
await cluster.execute_query(statement, q_opts)
69+
70+
assert ex.value._message is not None
71+
assert 'Request timed out during retry delay' in ex.value._message
72+
test_env.assert_error_context_num_attempts(2, ex.value._context, exact=False)
73+
74+
75+
class ConnectTests(ConnectTestSuite):
76+
@pytest.fixture(scope='class', autouse=True)
77+
def validate_test_manifest(self) -> None:
78+
def valid_test_method(meth: str) -> bool:
79+
attr = getattr(ConnectTests, meth)
80+
return callable(attr) and not meth.startswith('__') and meth.startswith('test')
81+
82+
method_list = [meth for meth in dir(ConnectTests) if valid_test_method(meth)]
83+
test_list = set(ConnectTestSuite.TEST_MANIFEST).symmetric_difference(method_list)
84+
if test_list:
85+
pytest.fail(f'Test manifest invalid. Missing/extra tests: {test_list}.')
86+
87+
@pytest.fixture(scope='class', name='test_env')
88+
async def couchbase_test_environment(
89+
self, async_test_env: AsyncTestEnvironment
90+
) -> AsyncYieldFixture[AsyncTestEnvironment]:
91+
await async_test_env.setup()
92+
yield async_test_env
93+
await async_test_env.teardown()

acouchbase_analytics/tests/test_server_t.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ async def test_error_retriable_response_retries_exceeded(self, test_env: AsyncTe
121121
with pytest.raises(QueryError) as ex:
122122
await test_env.cluster_or_scope.execute_query(statement, q_opts)
123123

124-
print(ex.value)
125124
test_env.assert_error_context_num_attempts(allowed_retries + 1, ex.value._context)
126125
test_env.assert_error_context_contains_last_dispatch(ex.value._context)
127126

conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
]
4343

4444
_INTEGRATRION_TESTS = [
45+
'acouchbase_analytics/tests/connect_integration_t.py::ConnectTests',
4546
'acouchbase_analytics/tests/query_integration_t.py::ClusterQueryTests',
4647
'acouchbase_analytics/tests/query_integration_t.py::ScopeQueryTests',
48+
'couchbase_analytics/tests/connect_integration_t.py::ConnectTests',
4749
'couchbase_analytics/tests/query_integration_t.py::ClusterQueryTests',
4850
'couchbase_analytics/tests/query_integration_t.py::ScopeQueryTests',
4951
]

couchbase_analytics/_version.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# This file automatically generated by
2-
# /Users/jaredcasey/GIT/couchbase/clients/python/analytics-python-client/./couchbase_analytics_version.py
2+
# /Users/jaredcasey/GIT/couchbase/clients/python/analytics-python-client/couchbase_analytics_version.py
33
# at
4-
# 2025-07-21 18:36:10.973732
4+
# 2025-07-24 17:08:38.315069
55
__version__ = '0.0.1'

couchbase_analytics/protocol/_core/request.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,6 @@ def build_base_query_request( # noqa: C901
167167
extensions = deepcopy(self._extensions)
168168
if timeout is not None and timeout != self._default_query_timeout:
169169
extensions['timeout']['read'] = timeout
170-
# in the async world we have our own cancel scope that handles the connect timeout
171-
if is_async:
172-
del extensions['timeout']['pool']
173-
del extensions['timeout']['connect']
174170
# we add 5 seconds to the server timeout to ensure we always trigger a client side timeout
175171
timeout_ms = (timeout + 5) * 1e3 # convert to milliseconds
176172
body['timeout'] = f'{timeout_ms}ms'
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2016-2024. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
from datetime import timedelta
19+
from typing import TYPE_CHECKING
20+
21+
import pytest
22+
23+
from couchbase_analytics.cluster import Cluster
24+
from couchbase_analytics.credential import Credential
25+
from couchbase_analytics.errors import AnalyticsError, TimeoutError
26+
from couchbase_analytics.options import QueryOptions
27+
from tests import YieldFixture
28+
29+
if TYPE_CHECKING:
30+
from tests.environments.base_environment import BlockingTestEnvironment
31+
32+
33+
class ConnectTestSuite:
34+
TEST_MANIFEST = [
35+
'test_connect_timeout_max_retry_limit',
36+
'test_connect_timeout_query_timeout',
37+
]
38+
39+
def test_connect_timeout_max_retry_limit(self, test_env: BlockingTestEnvironment) -> None:
40+
statement = 'SELECT sleep("some value", 10000) AS some_field;'
41+
42+
username, pw = test_env.config.get_username_and_pw()
43+
cred = Credential.from_username_and_password(username, pw)
44+
# ignoring the port enables the failure
45+
connstr = test_env.config.get_connection_string(ignore_port=True)
46+
cluster = Cluster.create_instance(connstr, cred)
47+
48+
allowed_retries = 5
49+
q_opts = QueryOptions(max_retries=allowed_retries, timeout=timedelta(seconds=10))
50+
with pytest.raises(AnalyticsError) as ex:
51+
cluster.execute_query(statement, q_opts)
52+
53+
assert ex.value._message is not None
54+
assert 'Retry limit exceeded' in ex.value._message
55+
test_env.assert_error_context_num_attempts(allowed_retries + 1, ex.value._context)
56+
57+
def test_connect_timeout_query_timeout(self, test_env: BlockingTestEnvironment) -> None:
58+
statement = 'SELECT sleep("some value", 10000) AS some_field;'
59+
60+
username, pw = test_env.config.get_username_and_pw()
61+
cred = Credential.from_username_and_password(username, pw)
62+
# ignoring the port enables the failure
63+
connstr = test_env.config.get_connection_string(ignore_port=True)
64+
cluster = Cluster.create_instance(connstr, cred)
65+
66+
q_opts = QueryOptions(timeout=timedelta(seconds=3))
67+
with pytest.raises(TimeoutError) as ex:
68+
cluster.execute_query(statement, q_opts)
69+
70+
assert ex.value._message is not None
71+
assert 'Request timed out during retry delay' in ex.value._message
72+
test_env.assert_error_context_num_attempts(2, ex.value._context, exact=False)
73+
74+
75+
class ConnectTests(ConnectTestSuite):
76+
@pytest.fixture(scope='class', autouse=True)
77+
def validate_test_manifest(self) -> None:
78+
def valid_test_method(meth: str) -> bool:
79+
attr = getattr(ConnectTests, meth)
80+
return callable(attr) and not meth.startswith('__') and meth.startswith('test')
81+
82+
method_list = [meth for meth in dir(ConnectTests) if valid_test_method(meth)]
83+
test_list = set(ConnectTestSuite.TEST_MANIFEST).symmetric_difference(method_list)
84+
if test_list:
85+
pytest.fail(f'Test manifest invalid. Missing/extra tests: {test_list}.')
86+
87+
@pytest.fixture(scope='class', name='test_env')
88+
def couchbase_test_environment(
89+
self, sync_test_env: BlockingTestEnvironment
90+
) -> YieldFixture[BlockingTestEnvironment]:
91+
sync_test_env.setup()
92+
yield sync_test_env
93+
sync_test_env.teardown()

tests/analytics_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import os
1919
import pathlib
2020
from configparser import ConfigParser
21-
from typing import Tuple
21+
from typing import Optional, Tuple
2222
from uuid import uuid4
2323

2424
import pytest
@@ -72,8 +72,8 @@ def disable_server_certificate_verification(self) -> bool:
7272
def scope_name(self) -> str:
7373
return self._scope_name
7474

75-
def get_connection_string(self) -> str:
76-
if self._port is not None:
75+
def get_connection_string(self, ignore_port: Optional[bool] = False) -> str:
76+
if ignore_port is None or ignore_port is False and self._port is not None:
7777
return f'{self._scheme}://{self._host}:{self._port}'
7878
return f'{self._scheme}://{self._host}'
7979

0 commit comments

Comments
 (0)