Skip to content

Commit e03ae58

Browse files
committed
PYCO-94: Support Server Side Async Requests
Changes ------- * Add start_query() to cluster & scope for both sync and async APIs.
1 parent 665ca23 commit e03ae58

51 files changed

Lines changed: 2274 additions & 646 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

acouchbase_analytics/cluster.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from typing import TypeAlias
2626

2727
from acouchbase_analytics.database import AsyncDatabase
28-
from couchbase_analytics.result import AsyncQueryResult
28+
from acouchbase_analytics.query_handle import AsyncQueryHandle
29+
from acouchbase_analytics.result import AsyncQueryResult
2930

3031
if TYPE_CHECKING:
3132
from couchbase_analytics.credential import Credential
@@ -143,6 +144,22 @@ def execute_query(self, statement: str, *args: object, **kwargs: object) -> Awai
143144
""" # noqa: E501
144145
return self._impl.execute_query(statement, *args, **kwargs)
145146

147+
def start_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryHandle]:
148+
"""Executes a query against an Analytics cluster in async mode.
149+
150+
.. seealso::
151+
:meth:`acouchbase_analytics.Scope.start_query`: For how to execute scope-level queries.
152+
153+
Args:
154+
statement: The SQL++ statement to execute.
155+
options (:class:`~acouchbase_analytics.options.StartQueryOptions`): Optional parameters for the query operation.
156+
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to override provided :class:`~acouchbase_analytics.options.StartQueryOptions`
157+
158+
Returns:
159+
:class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`: An instance of a :class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`
160+
""" # noqa: E501
161+
return self._impl.start_query(statement, *args, **kwargs)
162+
146163
async def shutdown(self) -> None:
147164
"""Shuts down this cluster instance. Cleaning up all resources associated with it.
148165

acouchbase_analytics/cluster.pyi

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@ if sys.version_info < (3, 11):
2121
else:
2222
from typing import Unpack
2323

24+
from acouchbase_analytics import JSONType
25+
from acouchbase_analytics.credential import Credential
2426
from acouchbase_analytics.database import AsyncDatabase
25-
from couchbase_analytics.credential import Credential
26-
from couchbase_analytics.options import ClusterOptions, ClusterOptionsKwargs, QueryOptions, QueryOptionsKwargs
27-
from couchbase_analytics.result import AsyncQueryResult
27+
from acouchbase_analytics.options import (
28+
ClusterOptions,
29+
ClusterOptionsKwargs,
30+
QueryOptions,
31+
QueryOptionsKwargs,
32+
StartQueryOptions,
33+
StartQueryOptionsKwargs,
34+
)
35+
from acouchbase_analytics.query_handle import AsyncQueryHandle
36+
from acouchbase_analytics.result import AsyncQueryResult
2837

2938
class AsyncCluster:
3039
@overload
@@ -54,14 +63,34 @@ class AsyncCluster:
5463
) -> Awaitable[AsyncQueryResult]: ...
5564
@overload
5665
def execute_query(
57-
self, statement: str, options: QueryOptions, *args: str, **kwargs: Unpack[QueryOptionsKwargs]
66+
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: Unpack[QueryOptionsKwargs]
5867
) -> Awaitable[AsyncQueryResult]: ...
5968
@overload
6069
def execute_query(
61-
self, statement: str, options: QueryOptions, *args: str, **kwargs: str
70+
self, statement: str, options: QueryOptions, *args: JSONType, **kwargs: str
6271
) -> Awaitable[AsyncQueryResult]: ...
6372
@overload
64-
def execute_query(self, statement: str, *args: str, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
73+
def execute_query(self, statement: str, *args: JSONType, **kwargs: str) -> Awaitable[AsyncQueryResult]: ...
74+
@overload
75+
def start_query(self, statement: str) -> AsyncQueryHandle: ...
76+
@overload
77+
def start_query(self, statement: str, options: StartQueryOptions) -> AsyncQueryHandle: ...
78+
@overload
79+
def start_query(self, statement: str, **kwargs: Unpack[StartQueryOptionsKwargs]) -> AsyncQueryHandle: ...
80+
@overload
81+
def start_query(
82+
self, statement: str, options: StartQueryOptions, **kwargs: Unpack[StartQueryOptionsKwargs]
83+
) -> AsyncQueryHandle: ...
84+
@overload
85+
def start_query(
86+
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: Unpack[StartQueryOptionsKwargs]
87+
) -> AsyncQueryHandle: ...
88+
@overload
89+
def start_query(
90+
self, statement: str, options: StartQueryOptions, *args: JSONType, **kwargs: str
91+
) -> AsyncQueryHandle: ...
92+
@overload
93+
def start_query(self, statement: str, *args: JSONType, **kwargs: str) -> AsyncQueryHandle: ...
6594
def shutdown(self) -> Awaitable[None]: ...
6695
@overload
6796
@classmethod

acouchbase_analytics/options.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
from couchbase_analytics.common.options import ClusterOptions as ClusterOptions # noqa: F401
1818
from couchbase_analytics.common.options import ClusterOptionsKwargs as ClusterOptionsKwargs # noqa: F401
19+
from couchbase_analytics.common.options import FetchResultsOptions as FetchResultsOptions # noqa: F401
20+
from couchbase_analytics.common.options import FetchResultsOptionsKwargs as FetchResultsOptionsKwargs # noqa: F401
1921
from couchbase_analytics.common.options import QueryOptions as QueryOptions # noqa: F401
2022
from couchbase_analytics.common.options import QueryOptionsKwargs as QueryOptionsKwargs # noqa: F401
2123
from couchbase_analytics.common.options import SecurityOptions as SecurityOptions # noqa: F401
2224
from couchbase_analytics.common.options import SecurityOptionsKwargs as SecurityOptionsKwargs # noqa: F401
25+
from couchbase_analytics.common.options import StartQueryOptions as StartQueryOptions # noqa: F401
26+
from couchbase_analytics.common.options import StartQueryOptionsKwargs as StartQueryOptionsKwargs # noqa: F401
2327
from couchbase_analytics.common.options import TimeoutOptions as TimeoutOptions # noqa: F401
2428
from couchbase_analytics.common.options import TimeoutOptionsKwargs as TimeoutOptionsKwargs # noqa: F401

acouchbase_analytics/protocol/_core/client_adapter.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717
from __future__ import annotations
1818

1919
import logging
20-
from typing import TYPE_CHECKING, Optional, cast
20+
from typing import Optional, Union, cast
2121
from uuid import uuid4
2222

2323
from httpx import URL, AsyncClient, BasicAuth, Response
2424

2525
from couchbase_analytics.common.credential import Credential
2626
from couchbase_analytics.common.deserializer import Deserializer
2727
from couchbase_analytics.common.logging import LogLevel, log_message
28+
from couchbase_analytics.protocol._core.request import CancelRequest, HttpRequest, QueryRequest, StartQueryRequest
2829
from couchbase_analytics.protocol.connection import _ConnectionDetails
2930
from couchbase_analytics.protocol.options import OptionsBuilder
3031

31-
if TYPE_CHECKING:
32-
from couchbase_analytics.protocol._core.request import QueryRequest
33-
3432

3533
class _AsyncClientAdapter:
3634
"""
@@ -164,7 +162,9 @@ async def create_client(self) -> None:
164162
def log_message(self, message: str, log_level: LogLevel) -> None:
165163
log_message(logger, f'{self.log_prefix} {message}', log_level)
166164

167-
async def send_request(self, request: QueryRequest) -> Response:
165+
async def send_request(
166+
self, request: Union[CancelRequest, HttpRequest, QueryRequest, StartQueryRequest], stream: Optional[bool] = True
167+
) -> Response:
168168
"""
169169
**INTERNAL**
170170
"""
@@ -177,8 +177,19 @@ async def send_request(self, request: QueryRequest) -> Response:
177177
port=request.url.port,
178178
path=request.url.path,
179179
)
180-
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
181-
return await self._client.send(req, stream=True)
180+
if isinstance(request, (QueryRequest, StartQueryRequest)):
181+
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
182+
else:
183+
headers = request.headers if request.headers is not None else None
184+
data = request.data if isinstance(request, CancelRequest) else None
185+
req = self._client.build_request(
186+
request.method, url, data=data, headers=headers, extensions=request.extensions
187+
)
188+
189+
if stream is None:
190+
stream = True
191+
192+
return await self._client.send(req, stream=stream)
182193

183194
def reset_client(self) -> None:
184195
"""

0 commit comments

Comments
 (0)