Skip to content

Commit 1cd3532

Browse files
committed
PYCO-94: Support Server Side Async Requests
Changes ------- * Add start_query() to cluster & scope for both sync and async APIs.
1 parent 665ca23 commit 1cd3532

57 files changed

Lines changed: 2287 additions & 654 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ env:
4242
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
4343
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
4444
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
45-
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
46-
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
45+
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
46+
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
4747
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
4848
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
4949
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ env:
6464
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
6565
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
6666
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
67-
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
68-
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
67+
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
68+
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
6969
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
7070
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
7171
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"

.github/workflows/verify_release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ env:
5757
CBCI_SUPPORTED_ARM64_PLATFORMS: "linux macos"
5858
CBCI_DEFAULT_LINUX_X86_64_PLATFORM: "ubuntu-22.04"
5959
CBCI_DEFAULT_LINUX_ARM64_PLATFORM: "ubuntu-22.04-arm"
60-
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-13"
61-
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-14"
60+
CBCI_DEFAULT_MACOS_X86_64_PLATFORM: "macos-15-intel"
61+
CBCI_DEFAULT_MACOS_ARM64_PLATFORM: "macos-15"
6262
CBCI_DEFAULT_WINDOWS_PLATFORM: "windows-2022"
6363
CBCI_DEFAULT_LINUX_CONTAINER: "slim-bookworm"
6464
CBCI_DEFAULT_ALPINE_CONTAINER: "alpine"

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,8 @@ gocaves*
176176
.pytest_cache/
177177
test_scripts/
178178

179-
# rff
179+
# ruff
180180
.ruff_cache/
181+
182+
# other
183+
.DS_Store

.pre-commit-config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ repos:
4545
- pytest~=8.3.5
4646
- httpx~=0.28.1
4747
- aiohttp~=3.11.10
48+
- sniffio~=1.3.1
49+
- anyio~=4.9.0
4850
types:
4951
- python
5052
require_serial: true

acouchbase_analytics/cluster.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from typing import TypeAlias
2626

2727
from acouchbase_analytics.database import AsyncDatabase
28-
from couchbase_analytics.result import AsyncQueryResult
28+
from acouchbase_analytics.query_handle import AsyncQueryHandle
29+
from acouchbase_analytics.result import AsyncQueryResult
2930

3031
if TYPE_CHECKING:
3132
from couchbase_analytics.credential import Credential
@@ -143,6 +144,22 @@ def execute_query(self, statement: str, *args: object, **kwargs: object) -> Awai
143144
""" # noqa: E501
144145
return self._impl.execute_query(statement, *args, **kwargs)
145146

147+
def start_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryHandle]:
148+
"""Executes a query against an Analytics cluster in async mode.
149+
150+
.. seealso::
151+
:meth:`acouchbase_analytics.Scope.start_query`: For how to execute scope-level queries.
152+
153+
Args:
154+
statement: The SQL++ statement to execute.
155+
options (:class:`~acouchbase_analytics.options.StartQueryOptions`): Optional parameters for the query operation.
156+
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to override provided :class:`~acouchbase_analytics.options.StartQueryOptions`
157+
158+
Returns:
159+
:class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`: An instance of a :class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`
160+
""" # noqa: E501
161+
return self._impl.start_query(statement, *args, **kwargs)
162+
146163
async def shutdown(self) -> None:
147164
"""Shuts down this cluster instance. Cleaning up all resources associated with it.
148165

acouchbase_analytics/cluster.pyi

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@ if sys.version_info < (3, 11):
2121
else:
2222
from typing import Unpack
2323

24+
from acouchbase_analytics import JSONType
25+
from acouchbase_analytics.credential import Credential
2426
from acouchbase_analytics.database import AsyncDatabase
25-
from couchbase_analytics.credential import Credential
26-
from couchbase_analytics.options import ClusterOptions, ClusterOptionsKwargs, QueryOptions, QueryOptionsKwargs
27-
from couchbase_analytics.result import AsyncQueryResult
27+
from acouchbase_analytics.options import (
28+
ClusterOptions,
29+
ClusterOptionsKwargs,
30+
QueryOptions,
31+
QueryOptionsKwargs,
32+
StartQueryOptions,
33+
StartQueryOptionsKwargs,
34+
)
35+
from acouchbase_analytics.query_handle import AsyncQueryHandle
36+
from acouchbase_analytics.result import AsyncQueryResult
2837

2938
class AsyncCluster:
3039
@overload
@@ -54,14 +63,34 @@ class AsyncCluster:
5463
) -> Awaitable[AsyncQueryResult]: ...
5564
@overload
5665
def execute_query(
57-
self, statement: str, options: QueryOptions, *args: str, **kwargs: Unpack[QueryOptionsKwargs]
66+
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: Unpack[QueryOptionsKwargs]
5867
) -> Awaitable[AsyncQueryResult]: ...
5968
@overload
6069
def execute_query(
61-
self, statement: str, options: QueryOptions, *args: str, **kwargs: str
70+
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: str
6271
) -> Awaitable[AsyncQueryResult]: ...
6372
@overload
64-
def execute_query(self, statement: str, *args: str, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
73+
def execute_query(self, statement: str, *args: JSONType, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
74+
@overload
75+
def start_query(self, statement: str) -> AsyncQueryHandle: ...
76+
@overload
77+
def start_query(self, statement: str, options: StartQueryOptions) -> AsyncQueryHandle: ...
78+
@overload
79+
def start_query(self, statement: str, **kwargs: Unpack[StartQueryOptionsKwargs]) -> AsyncQueryHandle: ...
80+
@overload
81+
def start_query(
82+
self, statement: str, options: StartQueryOptions, **kwargs: Unpack[StartQueryOptionsKwargs]
83+
) -> AsyncQueryHandle: ...
84+
@overload
85+
def start_query(
86+
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: Unpack[StartQueryOptionsKwargs]
87+
) -> AsyncQueryHandle: ...
88+
@overload
89+
def start_query(
90+
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: str
91+
) -> AsyncQueryHandle: ...
92+
@overload
93+
def start_query(self, statement: str, *args: JSONType, **kwargs: str) -> AsyncQueryHandle: ...
6594
def shutdown(self) -> Awaitable[None]: ...
6695
@overload
6796
@classmethod

acouchbase_analytics/options.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
from couchbase_analytics.common.options import ClusterOptions as ClusterOptions # noqa: F401
1818
from couchbase_analytics.common.options import ClusterOptionsKwargs as ClusterOptionsKwargs # noqa: F401
19+
from couchbase_analytics.common.options import FetchResultsOptions as FetchResultsOptions # noqa: F401
20+
from couchbase_analytics.common.options import FetchResultsOptionsKwargs as FetchResultsOptionsKwargs # noqa: F401
1921
from couchbase_analytics.common.options import QueryOptions as QueryOptions # noqa: F401
2022
from couchbase_analytics.common.options import QueryOptionsKwargs as QueryOptionsKwargs # noqa: F401
2123
from couchbase_analytics.common.options import SecurityOptions as SecurityOptions # noqa: F401
2224
from couchbase_analytics.common.options import SecurityOptionsKwargs as SecurityOptionsKwargs # noqa: F401
25+
from couchbase_analytics.common.options import StartQueryOptions as StartQueryOptions # noqa: F401
26+
from couchbase_analytics.common.options import StartQueryOptionsKwargs as StartQueryOptionsKwargs # noqa: F401
2327
from couchbase_analytics.common.options import TimeoutOptions as TimeoutOptions # noqa: F401
2428
from couchbase_analytics.common.options import TimeoutOptionsKwargs as TimeoutOptionsKwargs # noqa: F401

acouchbase_analytics/protocol/_core/anyio_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def current_async_library() -> Optional[AsyncBackend]:
6666
try:
6767
import sniffio
6868
except ImportError:
69-
async_lib = 'asyncio'
69+
return AsyncBackend('asyncio')
7070

7171
try:
7272
async_lib = sniffio.current_async_library()

acouchbase_analytics/protocol/_core/client_adapter.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717
from __future__ import annotations
1818

1919
import logging
20-
from typing import TYPE_CHECKING, Optional, cast
20+
from typing import Optional, Union, cast
2121
from uuid import uuid4
2222

2323
from httpx import URL, AsyncClient, BasicAuth, Response
2424

2525
from couchbase_analytics.common.credential import Credential
2626
from couchbase_analytics.common.deserializer import Deserializer
2727
from couchbase_analytics.common.logging import LogLevel, log_message
28+
from couchbase_analytics.protocol._core.request import CancelRequest, HttpRequest, QueryRequest, StartQueryRequest
2829
from couchbase_analytics.protocol.connection import _ConnectionDetails
2930
from couchbase_analytics.protocol.options import OptionsBuilder
3031

31-
if TYPE_CHECKING:
32-
from couchbase_analytics.protocol._core.request import QueryRequest
33-
3432

3533
class _AsyncClientAdapter:
3634
"""
@@ -164,7 +162,9 @@ async def create_client(self) -> None:
164162
def log_message(self, message: str, log_level: LogLevel) -> None:
165163
log_message(logger, f'{self.log_prefix} {message}', log_level)
166164

167-
async def send_request(self, request: QueryRequest) -> Response:
165+
async def send_request(
166+
self, request: Union[CancelRequest, HttpRequest, QueryRequest, StartQueryRequest], stream: Optional[bool] = True
167+
) -> Response:
168168
"""
169169
**INTERNAL**
170170
"""
@@ -177,8 +177,19 @@ async def send_request(self, request: QueryRequest) -> Response:
177177
port=request.url.port,
178178
path=request.url.path,
179179
)
180-
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
181-
return await self._client.send(req, stream=True)
180+
if isinstance(request, (QueryRequest, StartQueryRequest)):
181+
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
182+
else:
183+
headers = request.headers if request.headers is not None else None
184+
data = request.data if isinstance(request, CancelRequest) else None
185+
req = self._client.build_request(
186+
request.method, url, data=data, headers=headers, extensions=request.extensions
187+
)
188+
189+
if stream is None:
190+
stream = True
191+
192+
return await self._client.send(req, stream=stream)
182193

183194
def reset_client(self) -> None:
184195
"""

0 commit comments

Comments
 (0)