-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcluster.py
More file actions
126 lines (105 loc) · 4.7 KB
/
cluster.py
File metadata and controls
126 lines (105 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright 2016-2025. Couchbase, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import sys
from typing import TYPE_CHECKING, Awaitable, Optional
from uuid import uuid4
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
from acouchbase_analytics.protocol._core.anyio_utils import current_async_library
from acouchbase_analytics.protocol._core.client_adapter import _AsyncClientAdapter
from acouchbase_analytics.protocol._core.request_context import AsyncRequestContext
from acouchbase_analytics.protocol.streaming import AsyncHttpStreamingResponse
from couchbase_analytics.common.logging import LogLevel
from couchbase_analytics.common.result import AsyncQueryResult
from couchbase_analytics.protocol._core.request import _RequestBuilder
if TYPE_CHECKING:
from couchbase_analytics.common.credential import Credential
from couchbase_analytics.options import ClusterOptions
class AsyncCluster:
def __init__(
self, endpoint: str, credential: Credential, options: Optional[ClusterOptions] = None, **kwargs: object
) -> None:
self._cluster_id = str(uuid4())
kwargs['cluster_id'] = self._cluster_id
self._client_adapter = _AsyncClientAdapter(endpoint, credential, options, **kwargs)
self._request_builder = _RequestBuilder(self._client_adapter)
self._backend = current_async_library()
@property
def client_adapter(self) -> _AsyncClientAdapter:
"""
**INTERNAL**
"""
return self._client_adapter
@property
def cluster_id(self) -> str:
"""
**INTERNAL**
"""
return self._cluster_id
@property
def has_client(self) -> bool:
"""
bool: Indicator on if the cluster HTTP client has been created or not.
"""
return self._client_adapter.has_client
async def _shutdown(self) -> None:
"""
**INTERNAL**
"""
await self._client_adapter.close_client()
self._client_adapter.reset_client()
async def _create_client(self) -> None:
"""
**INTERNAL**
"""
await self._client_adapter.create_client()
async def shutdown(self) -> None:
"""Shuts down this cluster instance. Cleaning up all resources associated with it.
.. warning::
Use of this method is almost *always* unnecessary. Cluster resources should be cleaned
up once the cluster instance falls out of scope. However, in some applications tuning resources
is necessary and in those types of applications, this method might be beneficial.
"""
if self.has_client:
await self._shutdown()
else:
self.client_adapter.log_message('Cluster does not have a connection. Ignoring shutdown.', LogLevel.WARNING)
async def _execute_query(self, http_resp: AsyncHttpStreamingResponse) -> AsyncQueryResult:
if not self.has_client:
self.client_adapter.log_message(
'Cluster does not have a connection. Creating the client.', LogLevel.WARNING
)
await self._create_client()
await http_resp.send_request()
return AsyncQueryResult(http_resp)
def execute_query(self, statement: str, *args: object, **kwargs: object) -> Awaitable[AsyncQueryResult]:
base_req = self._request_builder.build_base_query_request(statement, *args, is_async=True, **kwargs)
stream_config = base_req.options.pop('stream_config', None)
request_context = AsyncRequestContext(
client_adapter=self.client_adapter, request=base_req, stream_config=stream_config, backend=self._backend
)
resp = AsyncHttpStreamingResponse(request_context)
if self._backend.backend_lib == 'asyncio':
return request_context.create_response_task(self._execute_query, resp)
return self._execute_query(resp)
@classmethod
def create_instance(
cls, endpoint: str, credential: Credential, options: Optional[ClusterOptions] = None, **kwargs: object
) -> AsyncCluster:
return cls(endpoint, credential, options, **kwargs)
Cluster: TypeAlias = AsyncCluster