Skip to content

Commit 0d2c08f

Browse files
committed
PYCO-53: Analytics SDK - Logging
Changes ======= * Add loggers to `acouchbase_analytics` and `couchbase_analytics` * Remove print/todos related to logging and replace with log messaging * Allow logging to be configured via `PYCBAC_LOG_LEVEL` environment variable * General clean-up
1 parent 294ef5f commit 0d2c08f

30 files changed

Lines changed: 532 additions & 286 deletions

acouchbase_analytics/__init__.py

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,77 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import asyncio
17-
import selectors
18-
from asyncio import AbstractEventLoop
19-
from typing import Optional
2016

17+
from acouchbase_analytics.protocol import get_event_loop as get_event_loop # noqa: F401
18+
from couchbase_analytics.common import LOG_DATE_FORMAT as LOG_DATE_FORMAT # noqa: F401
19+
from couchbase_analytics.common import LOG_FORMAT as LOG_FORMAT # noqa: F401
2120
from couchbase_analytics.common import JSONType as JSONType # noqa: F401
22-
23-
24-
class _LoopValidator:
25-
"""
26-
**INTERNAL**
27-
"""
28-
29-
REQUIRED_METHODS = {'add_reader', 'remove_reader', 'add_writer', 'remove_writer'}
30-
31-
@staticmethod
32-
def _get_working_loop() -> AbstractEventLoop:
33-
"""
34-
**INTERNAL**
35-
"""
36-
evloop = asyncio.get_event_loop()
37-
gen_new_loop = not _LoopValidator._is_valid_loop(evloop)
38-
if gen_new_loop:
39-
evloop.close()
40-
selector = selectors.SelectSelector()
41-
new_loop = asyncio.SelectorEventLoop(selector)
42-
asyncio.set_event_loop(new_loop)
43-
return new_loop
44-
45-
return evloop
46-
47-
@staticmethod
48-
def _is_valid_loop(evloop: Optional[AbstractEventLoop] = None) -> bool:
49-
"""
50-
**INTERNAL**
51-
"""
52-
if not evloop:
53-
return False
54-
for meth in _LoopValidator.REQUIRED_METHODS:
55-
abs_meth, actual_meth = (getattr(asyncio.AbstractEventLoop, meth), getattr(evloop.__class__, meth))
56-
if abs_meth == actual_meth:
57-
return False
58-
return True
59-
60-
@staticmethod
61-
def get_event_loop(evloop: Optional[AbstractEventLoop] = None) -> AbstractEventLoop:
62-
"""
63-
**INTERNAL**
64-
"""
65-
if evloop and _LoopValidator._is_valid_loop(evloop):
66-
return evloop
67-
return _LoopValidator._get_working_loop()
68-
69-
@staticmethod
70-
def close_loop() -> None:
71-
"""
72-
**INTERNAL**
73-
"""
74-
evloop = asyncio.get_event_loop()
75-
evloop.close()
76-
77-
78-
def get_event_loop(evloop: Optional[AbstractEventLoop] = None) -> AbstractEventLoop:
79-
"""
80-
Get an event loop compatible with acouchbase_analytics.
81-
Some Event loops, such as ProactorEventLoop (the default asyncio event
82-
loop for Python 3.8 on Windows) are not compatible with acouchbase_analytics as
83-
they don't implement all members in the abstract base class.
84-
85-
:param evloop: preferred event loop
86-
:return: The preferred event loop, if compatible, otherwise, a compatible
87-
alternative event loop.
88-
"""
89-
return _LoopValidator.get_event_loop(evloop)

acouchbase_analytics/protocol/__init__.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,75 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15+
16+
import asyncio
17+
import selectors
18+
from typing import Optional
19+
20+
21+
class _LoopValidator:
22+
"""
23+
**INTERNAL**
24+
"""
25+
26+
REQUIRED_METHODS = {'add_reader', 'remove_reader', 'add_writer', 'remove_writer'}
27+
28+
@staticmethod
29+
def _get_working_loop() -> asyncio.AbstractEventLoop:
30+
"""
31+
**INTERNAL**
32+
"""
33+
evloop = asyncio.get_event_loop()
34+
gen_new_loop = not _LoopValidator._is_valid_loop(evloop)
35+
if gen_new_loop:
36+
evloop.close()
37+
selector = selectors.SelectSelector()
38+
new_loop = asyncio.SelectorEventLoop(selector)
39+
asyncio.set_event_loop(new_loop)
40+
return new_loop
41+
42+
return evloop
43+
44+
@staticmethod
45+
def _is_valid_loop(evloop: Optional[asyncio.AbstractEventLoop] = None) -> bool:
46+
"""
47+
**INTERNAL**
48+
"""
49+
if not evloop:
50+
return False
51+
for meth in _LoopValidator.REQUIRED_METHODS:
52+
abs_meth, actual_meth = (getattr(asyncio.AbstractEventLoop, meth), getattr(evloop.__class__, meth))
53+
if abs_meth == actual_meth:
54+
return False
55+
return True
56+
57+
@staticmethod
58+
def get_event_loop(evloop: Optional[asyncio.AbstractEventLoop] = None) -> asyncio.AbstractEventLoop:
59+
"""
60+
**INTERNAL**
61+
"""
62+
if evloop and _LoopValidator._is_valid_loop(evloop):
63+
return evloop
64+
return _LoopValidator._get_working_loop()
65+
66+
@staticmethod
67+
def close_loop() -> None:
68+
"""
69+
**INTERNAL**
70+
"""
71+
evloop = asyncio.get_event_loop()
72+
evloop.close()
73+
74+
75+
def get_event_loop(evloop: Optional[asyncio.AbstractEventLoop] = None) -> asyncio.AbstractEventLoop:
76+
"""
77+
Get an event loop compatible with acouchbase_analytics.
78+
Some Event loops, such as ProactorEventLoop (the default asyncio event
79+
loop for Python 3.8 on Windows) are not compatible with acouchbase_analytics as
80+
they don't implement all members in the abstract base class.
81+
82+
:param evloop: preferred event loop
83+
:return: The preferred event loop, if compatible, otherwise, a compatible
84+
alternative event loop.
85+
"""
86+
return _LoopValidator.get_event_loop(evloop)

acouchbase_analytics/protocol/_core/async_json_stream.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515

1616
from __future__ import annotations
1717

18-
from typing import AsyncIterator, Optional
18+
from typing import AsyncIterator, Callable, Optional
1919

2020
import ijson
2121
from anyio import EndOfStream, Event, create_memory_object_stream
2222

2323
from acouchbase_analytics.protocol._core.async_json_token_parser import AsyncJsonTokenParser
2424
from couchbase_analytics.common._core.json_parsing import JsonStreamConfig, ParsedResult, ParsedResultType
25+
from couchbase_analytics.common._core.json_token_parser_base import JsonTokenParsingError
2526
from couchbase_analytics.common.errors import AnalyticsError
27+
from couchbase_analytics.common.logging import LogLevel
2628

2729

2830
class AsyncJsonStream:
@@ -31,6 +33,7 @@ def __init__(
3133
http_stream_iter: AsyncIterator[bytes],
3234
*,
3335
stream_config: Optional[JsonStreamConfig] = None,
36+
logger_handler: Optional[Callable[[str, LogLevel], None]] = None,
3437
) -> None:
3538
# HTTP stream handling
3639
if stream_config is None:
@@ -40,10 +43,13 @@ def __init__(
4043
self._http_response_buffer = bytearray()
4144
self._http_stream_exhausted = False
4245

46+
# logging
47+
self._log_handler = logger_handler
48+
4349
# results handling
4450
self._send_stream, self._receive_stream = create_memory_object_stream[ParsedResult](
4551
max_buffer_size=stream_config.buffered_row_max
46-
) # noqa: E501
52+
)
4753
self._json_stream_parser = None
4854
self._buffer_entire_result = stream_config.buffer_entire_result
4955
handler = None if self._buffer_entire_result is True else self._handle_json_result
@@ -87,6 +93,10 @@ def _continue_processing(self) -> bool:
8793
return False
8894
return True
8995

96+
def _log_message(self, message: str, level: LogLevel) -> None:
97+
if self._log_handler is not None:
98+
self._log_handler(message, level)
99+
90100
async def _send_to_stream(self, result: ParsedResult, close: Optional[bool] = False) -> None:
91101
"""
92102
**INTERNAL**
@@ -130,10 +140,18 @@ async def _process_token_stream(self) -> None:
130140
await self._json_token_parser.parse_token(event, value)
131141
except StopAsyncIteration:
132142
self._token_stream_exhausted = True
143+
except JsonTokenParsingError as ex:
144+
ex_str = str(ex)
145+
self._log_message(f'JSON token parsing error encountered: {ex_str}', LogLevel.ERROR)
146+
self._token_stream_exhausted = True
147+
await self._send_to_stream(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR), close=True)
148+
self._handle_notification(ParsedResultType.ERROR)
149+
return
133150
except ijson.common.IncompleteJSONError as ex:
134-
# TODO: log this error
151+
ex_str = str(ex)
152+
self._log_message(f'Incomplete JSON error encountered: {ex_str}', LogLevel.ERROR)
135153
self._token_stream_exhausted = True
136-
await self._send_to_stream(ParsedResult(str(ex).encode('utf-8'), ParsedResultType.ERROR), close=True)
154+
await self._send_to_stream(ParsedResult(ex_str.encode('utf-8'), ParsedResultType.ERROR), close=True)
137155
self._handle_notification(ParsedResultType.ERROR)
138156
return
139157

@@ -176,10 +194,9 @@ async def get_result(self) -> ParsedResult:
176194

177195
async def start_parsing(self) -> None:
178196
if self._json_stream_parser is not None:
179-
# TODO: logging; I don't think this is an error...
197+
self._log_message('JSON stream parser already exists', LogLevel.WARNING)
180198
return
181199
await self._process_token_stream()
182200

183201
async def continue_parsing(self) -> None:
184-
# TODO: error is _json_stream_parser is None?
185202
await self._process_token_stream()

acouchbase_analytics/protocol/_core/async_json_token_parser.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
START_EVENTS,
2323
VALUE_TOKENS,
2424
JsonTokenParserBase,
25+
JsonTokenParsingError,
2526
ParsingState,
2627
TokenType,
2728
)
@@ -49,7 +50,9 @@ async def _handle_pop_event(self, token_type: TokenType) -> None:
4950
next_token = self._pop()
5051
if next_token.type == matching_token.type:
5152
should_emit = self._handle_pop_transition(next_token.state)
52-
# I think obj_pairs.reverse() is O(n); while reversed is O(1)
53+
# NOTE: obj_pairs.reverse() vs. reversed(obj_pairs) are essentially the same _because_ we convert
54+
# the obj_pairs to a string (e.g. ",".join(...)); using reversed() in this case is slightly
55+
# more convenient as it returns an iterator
5356
if matching_token.type == TokenType.START_ARRAY:
5457
obj = f'[{",".join(reversed(obj_pairs))}]'
5558
else:
@@ -88,5 +91,4 @@ async def parse_token(self, token: str, value: str) -> None:
8891
elif token_type in POP_EVENTS:
8992
await self._handle_pop_event(token_type)
9093
else:
91-
# TODO: custom exception
92-
raise ValueError(f'Invalid token type: {token_type}; {value=}')
94+
raise JsonTokenParsingError(f'Invalid token type: {token_type}; {value=}')

acouchbase_analytics/protocol/_core/client_adapter.py

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515

1616
from __future__ import annotations
1717

18-
import socket
19-
from typing import TYPE_CHECKING, Optional
18+
import logging
19+
from typing import TYPE_CHECKING, Optional, cast
2020
from uuid import uuid4
2121

2222
from httpx import URL, AsyncClient, BasicAuth, Response
2323

2424
from couchbase_analytics.common.credential import Credential
2525
from couchbase_analytics.common.deserializer import Deserializer
26+
from couchbase_analytics.common.logging import LogLevel, log_message
2627
from couchbase_analytics.protocol.connection import _ConnectionDetails
2728
from couchbase_analytics.protocol.options import OptionsBuilder
2829

@@ -35,13 +36,17 @@ class _AsyncClientAdapter:
3536
**INTERNAL**
3637
"""
3738

38-
_ANALYTICS_PATH = '/api/v1/request'
39+
ANALYTICS_PATH = '/api/v1/request'
40+
LOGGER_NAME = 'acouchbase_analytics'
3941

4042
def __init__(
4143
self, http_endpoint: str, credential: Credential, options: Optional[object] = None, **kwargs: object
4244
) -> None:
4345
self._client_id = str(uuid4())
46+
self._prefix = ''
47+
self._cluster_id = cast(str, kwargs.pop('cluster_id', ''))
4448
self._opts_builder = OptionsBuilder()
49+
kwargs['logger_name'] = self.logger_name
4550
self._conn_details = _ConnectionDetails.create(self._opts_builder, http_endpoint, credential, options, **kwargs)
4651
# TODO: do we want to support custom HTTP transports for the async client?
4752
self._http_transport_cls = None
@@ -51,7 +56,7 @@ def analytics_path(self) -> str:
5156
"""
5257
**INTERNAL**
5358
"""
54-
return self._ANALYTICS_PATH
59+
return self.ANALYTICS_PATH
5560

5661
@property
5762
def client(self) -> AsyncClient:
@@ -88,6 +93,30 @@ def has_client(self) -> bool:
8893
"""
8994
return hasattr(self, '_client')
9095

96+
@property
97+
def log_prefix(self) -> str:
98+
"""
99+
**INTERNAL**
100+
"""
101+
if self._prefix:
102+
return self._prefix
103+
self._prefix = f'[{self._cluster_id}'
104+
if self.has_client:
105+
self._prefix += f'/{self._client_id}'
106+
if self.connection_details.is_secure():
107+
self._prefix += '/https]'
108+
else:
109+
self._prefix += '/http]'
110+
111+
return self._prefix
112+
113+
@property
114+
def logger_name(self) -> str:
115+
"""
116+
**INTERNAL**
117+
"""
118+
return self.LOGGER_NAME
119+
91120
@property
92121
def options_builder(self) -> OptionsBuilder:
93122
"""
@@ -101,6 +130,7 @@ async def close_client(self) -> None:
101130
"""
102131
if hasattr(self, '_client'):
103132
await self._client.aclose()
133+
self.log_message('Cluster HTTP client closed', LogLevel.INFO)
104134

105135
async def create_client(self) -> None:
106136
"""
@@ -123,7 +153,15 @@ async def create_client(self) -> None:
123153
if self._http_transport_cls is not None:
124154
transport = self._http_transport_cls()
125155
self._client = AsyncClient(auth=BasicAuth(*self._conn_details.credential), transport=transport)
126-
# TODO: log message
156+
self.log_message(
157+
(f'Cluster HTTP client created: connection_details={self._conn_details.get_init_details()}'),
158+
LogLevel.INFO,
159+
)
160+
else:
161+
self.log_message('Cluster HTTP client already exists, skipping creation.', LogLevel.INFO)
162+
163+
def log_message(self, message: str, log_level: LogLevel) -> None:
164+
log_message(logger, f'{self.log_prefix} {message}', log_level)
127165

128166
async def send_request(self, request: QueryRequest) -> Response:
129167
"""
@@ -139,15 +177,14 @@ async def send_request(self, request: QueryRequest) -> Response:
139177
path=request.url.path,
140178
)
141179
req = self._client.build_request(request.method, url, json=request.body, extensions=request.extensions)
142-
try:
143-
return await self._client.send(req, stream=True)
144-
except socket.gaierror as err:
145-
req_url = self._conn_details.url.get_formatted_url()
146-
raise RuntimeError(f'Unable to connect to {req_url}') from err
180+
return await self._client.send(req, stream=True)
147181

148182
def reset_client(self) -> None:
149183
"""
150184
**INTERNAL**
151185
"""
152186
if hasattr(self, '_client'):
153187
del self._client
188+
189+
190+
logger = logging.getLogger(_AsyncClientAdapter.LOGGER_NAME)

0 commit comments

Comments
 (0)