-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcluster.py
More file actions
244 lines (174 loc) · 10.3 KB
/
cluster.py
File metadata and controls
244 lines (174 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Copyright 2016-2025. Couchbase, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Awaitable, Optional
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
from acouchbase_analytics.database import AsyncDatabase
from acouchbase_analytics.query_handle import AsyncQueryHandle
from acouchbase_analytics.result import AsyncQueryResult
if TYPE_CHECKING:
from couchbase_analytics.credential import Credential
from couchbase_analytics.options import ClusterOptions
class AsyncCluster:
"""Create an AsyncCluster instance.
The cluster instance exposes the operations which are available to be performed against a Analytics cluster.
.. important::
Use the static :meth:`.AsyncCluster.create_instance` method to create an AsyncCluster.
Args:
endpoint:
The endpoint to use for sending HTTP requests to the Analytics server.
The format of the endpoint string is the **scheme** (``http`` or ``https`` is *required*, use ``https`` for TLS enabled connections), followed a hostname and optional port.
credential: User credentials.
options: Global options to set for the cluster.
Some operations allow the global options to be overriden by passing in options to the operation.
**kwargs: keyword arguments that can be used in place or to overrride provided :class:`~acouchbase_analytics.options.ClusterOptions`
Raises:
ValueError: If incorrect endpoint is provided.
ValueError: If incorrect options are provided.
""" # noqa: E501
def __init__(
self, endpoint: str, credential: Credential, options: Optional[ClusterOptions] = None, **kwargs: object
) -> None:
from acouchbase_analytics.protocol.cluster import AsyncCluster as _AsyncCluster
self._impl = _AsyncCluster(endpoint, credential, options, **kwargs)
def database(self, name: str) -> AsyncDatabase:
"""Creates a database instance.
.. seealso::
:class:`~acouchbase_analytics.database.AsyncDatabase`
Args:
name: Name of the database
Returns:
An AsyncDatabase instance.
"""
return AsyncDatabase(self._impl, name)
def execute_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryResult]:
"""Executes a query against an Analytics cluster.
.. note::
A departure from the operational SDK, the query is *NOT* executed lazily.
.. seealso::
:meth:`acouchbase_analytics.AsyncScope.execute_query`: For how to execute scope-level queries.
Args:
statement: The SQL++ statement to execute.
options (:class:`~acouchbase_analytics.options.QueryOptions`): Optional parameters for the query operation.
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to override provided :class:`~couchbase_analytics.options.QueryOptions`
Returns:
Future[:class:`~couchbase_analytics.result.AsyncQueryResult`]: A :class:`~asyncio.Future` is returned.
Once the :class:`~asyncio.Future` completes, an instance of a :class:`~acouchbase_analytics.result.AsyncQueryResult`
is available to provide access to iterate over the query results and access metadata and metrics about the query.
Examples:
Simple query::
q_str = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country LIKE 'United%' LIMIT 2;'
q_res = cluster.execute_query(q_str)
async for row in q_res.rows():
print(f'Found row: {row}')
Simple query with positional parameters::
from acouchbase_analytics.options import QueryOptions
# ... other code ...
q_str = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country LIKE $1 LIMIT $2;'
q_res = cluster.execute_query(q_str, QueryOptions(positional_parameters=['United%', 5]))
async for row in q_res.rows():
print(f'Found row: {row}')
Simple query with named parameters::
from acouchbase_analytics.options import QueryOptions
# ... other code ...
q_str = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country LIKE $country LIMIT $lim;'
q_res = cluster.execute_query(q_str, QueryOptions(named_parameters={'country': 'United%', 'lim':2}))
async for row in q_res.rows():
print(f'Found row: {row}')
Retrieve metadata and/or metrics from query::
from acouchbase_analytics.options import QueryOptions
# ... other code ...
q_str = 'SELECT * FROM `travel-sample` WHERE country LIKE $country LIMIT $lim;'
q_res = cluster.execute_query(q_str, QueryOptions(named_parameters={'country': 'United%', 'lim':2}))
async for row in q_res.rows():
print(f'Found row: {row}')
print(f'Query metadata: {q_res.metadata()}')
print(f'Query metrics: {q_res.metadata().metrics()}')
""" # noqa: E501
return self._impl.execute_query(statement, *args, **kwargs)
def start_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryHandle]:
"""Executes a query against an Analytics cluster in async mode.
.. seealso::
:meth:`acouchbase_analytics.Scope.start_query`: For how to execute scope-level queries.
Args:
statement: The SQL++ statement to execute.
options (:class:`~acouchbase_analytics.options.StartQueryOptions`): Optional parameters for the query operation.
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to override provided :class:`~acouchbase_analytics.options.StartQueryOptions`
Returns:
:class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`: An instance of a :class:`~acouchbase_analytics.query_handle.AsyncQueryHandle`
""" # noqa: E501
return self._impl.start_query(statement, *args, **kwargs)
async def shutdown(self) -> None:
"""Shuts down this cluster instance. Cleaning up all resources associated with it.
.. warning::
Use of this method is almost *always* unnecessary. Cluster resources should be cleaned
up once the cluster instance falls out of scope. However, in some applications tuning resources
is necessary and in those types of applications, this method might be beneficial.
"""
return await self._impl.shutdown()
@classmethod
def create_instance(
cls, endpoint: str, credential: Credential, options: Optional[ClusterOptions] = None, **kwargs: object
) -> AsyncCluster:
"""Create an AsyncCluster instance
.. important::
The appropriate port needs to be specified. The SDK's default ports are 80 (http) and 443 (https).
If attempting to connect to Capella, the correct ports are most likely to be 8095 (http) and 18095 (https).
Capella example: https://cb.2xg3vwszqgqcrsix.cloud.couchbase.com:18095
Args:
endpoint:
The endpoint to use for sending HTTP requests to the Analytics server.
The format of the endpoint string is the **scheme** (``http`` or ``https`` is *required*, use ``https`` for TLS enabled connections), followed a hostname and optional port.
credential: User credentials.
options: Global options to set for the cluster.
Some operations allow the global options to be overriden by passing in options to the operation.
**kwargs: Keyword arguments that can be used in place or to overrride provided :class:`~acouchbase_analytics.options.ClusterOptions`
Returns:
An Analytics Cluster instance.
Raises:
ValueError: If incorrect endpoint is provided.
ValueError: If incorrect options are provided.
Examples:
Initialize cluster using default options::
import asyncio
from acouchbase_analytics.cluster import AsyncCluster
from acouchbase_analytics.credential import Credential
async def main() -> None:
cred = Credential.from_username_and_password('username', 'password')
cluster = AsyncCluster.create_instance('https://hostname', cred)
# ... other async code ...
if __name__ == '__main__':
asyncio.run(main())
Initialize cluster using with global timeout options::
import asyncio
from datetime import timedelta
from acouchbase_analytics import get_event_loop
from acouchbase_analytics.cluster import AsyncCluster
from acouchbase_analytics.credential import Credential
from acouchbase_analytics.options import ClusterOptions, ClusterTimeoutOptions
async def main() -> None:
cred = Credential.from_username_and_password('username', 'password')
opts = ClusterOptions(timeout_options=ClusterTimeoutOptions(query_timeout=timedelta(seconds=120)))
cluster = AsyncCluster.create_instance('https://hostname', cred, opts)
# ... other async code ...
if __name__ == '__main__':
asyncio.run(main())
""" # noqa: E501
return cls(endpoint, credential, options, **kwargs)
Cluster: TypeAlias = AsyncCluster