Skip to content

Commit 82f29e2

Browse files
committed
IGNITE-15102 Implement event handling and monitoring for python thin client - Fixes #46.
1 parent 8fc14f8 commit 82f29e2

20 files changed

Lines changed: 1047 additions & 173 deletions

docs/modules.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ of `pyignite`, intended for end users.
3131
datatypes/parsers
3232
datatypes/cache_props
3333
Exceptions <source/pyignite.exceptions>
34+
Monitoring and handling events <source/pyignite.monitoring>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one or more
2+
contributor license agreements. See the NOTICE file distributed with
3+
this work for additional information regarding copyright ownership.
4+
The ASF licenses this file to You under the Apache License, Version 2.0
5+
(the "License"); you may not use this file except in compliance with
6+
the License. You may obtain a copy of the License at
7+
8+
.. http://www.apache.org/licenses/LICENSE-2.0
9+
10+
.. Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
pyignite.connection.protocol_context package
17+
===========================
18+
19+
.. automodule:: pyignite.connection.protocol_context
20+
:members:

docs/source/pyignite.connection.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@ pyignite.connection package
2020
:members:
2121
:undoc-members:
2222
:show-inheritance:
23+
24+
Submodules
25+
----------
26+
27+
.. toctree::
28+
pyignite.connection.protocol_context
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one or more
2+
contributor license agreements. See the NOTICE file distributed with
3+
this work for additional information regarding copyright ownership.
4+
The ASF licenses this file to You under the Apache License, Version 2.0
5+
(the "License"); you may not use this file except in compliance with
6+
the License. You may obtain a copy of the License at
7+
8+
.. http://www.apache.org/licenses/LICENSE-2.0
9+
10+
.. Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
16+
pyignite.monitoring module
17+
===========================
18+
19+
.. automodule:: pyignite.monitoring
20+
:members:
21+
:member-order: bysource

docs/source/pyignite.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ Submodules
4444
pyignite.transaction
4545
pyignite.cursors
4646
pyignite.exceptions
47+
pyignite.monitoring
4748

pyignite/aio_client.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import random
1717
import sys
1818
from itertools import chain
19-
from typing import Iterable, Type, Union, Any, Dict, Optional
19+
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
2020

2121
from .aio_cluster import AioCluster
2222
from .api import cache_get_node_partitions_async
@@ -60,7 +60,8 @@ class AioClient(BaseClient):
6060
Asynchronous Client implementation.
6161
"""
6262

63-
def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
63+
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
64+
event_listeners: Optional[Sequence] = None, **kwargs):
6465
"""
6566
Initialize client.
6667
@@ -71,9 +72,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
7172
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
7273
:param partition_aware: (optional) try to calculate the exact data
7374
placement from the key before to issue the key operation to the
74-
server node, `True` by default.
75+
server node, `True` by default,
76+
:param event_listeners: (optional) event listeners.
7577
"""
76-
super().__init__(compact_footer, partition_aware, **kwargs)
78+
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
7779
self._registry_mux = asyncio.Lock()
7880
self._affinity_query_mux = asyncio.Lock()
7981

@@ -99,9 +101,8 @@ async def _connect(self, nodes):
99101

100102
# do not try to open more nodes
101103
self._current_node = i
102-
103104
except connection_errors:
104-
conn.failed = True
105+
pass
105106

106107
self._nodes.append(conn)
107108

@@ -301,7 +302,7 @@ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> D
301302
"""
302303
for _ in range(AFFINITY_RETRIES or 1):
303304
result = await cache_get_node_partitions_async(conn, caches)
304-
if result.status == 0 and result.value['partition_mapping']:
305+
if result.status == 0:
305306
break
306307
await asyncio.sleep(AFFINITY_DELAY)
307308

@@ -341,7 +342,7 @@ async def get_best_node(
341342

342343
asyncio.ensure_future(
343344
asyncio.gather(
344-
*[conn.reconnect() for conn in self._nodes if not conn.alive],
345+
*[node.reconnect() for node in self._nodes if not node.alive],
345346
return_exceptions=True
346347
)
347348
)

pyignite/client.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import random
4545
import re
4646
from itertools import chain
47-
from typing import Iterable, Type, Union, Any, Dict, Optional
47+
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
4848

4949
from .api import cache_get_node_partitions
5050
from .api.binary import get_binary_type, put_binary_type
@@ -66,6 +66,7 @@
6666
get_field_by_id, unsigned
6767
)
6868
from .binary import GenericObjectMeta
69+
from .monitoring import _EventListeners
6970

7071

7172
__all__ = ['Client']
@@ -76,7 +77,8 @@ class BaseClient:
7677
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
7778
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
7879

79-
def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
80+
def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
81+
event_listeners: Optional[Sequence] = None, **kwargs):
8082
self._compact_footer = compact_footer
8183
self._partition_aware = partition_aware
8284
self._connection_args = kwargs
@@ -87,6 +89,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
8789
self.affinity_version = (0, 0)
8890
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
8991
self._protocol_context = None
92+
self._event_listeners = _EventListeners(event_listeners)
9093

9194
@property
9295
def protocol_context(self):
@@ -338,7 +341,8 @@ class Client(BaseClient):
338341
Synchronous Client implementation.
339342
"""
340343

341-
def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
344+
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
345+
event_listeners: Optional[Sequence] = None, **kwargs):
342346
"""
343347
Initialize client.
344348
@@ -349,9 +353,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
349353
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
350354
:param partition_aware: (optional) try to calculate the exact data
351355
placement from the key before to issue the key operation to the
352-
server node, `True` by default.
356+
server node, `True` by default,
357+
:param event_listeners: (optional) event listeners.
353358
"""
354-
super().__init__(compact_footer, partition_aware, **kwargs)
359+
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
355360

356361
def connect(self, *args):
357362
"""
@@ -382,7 +387,6 @@ def _connect(self, nodes):
382387
self._current_node = i
383388

384389
except connection_errors:
385-
conn.failed = True
386390
if self.partition_aware:
387391
# schedule the reconnection
388392
conn.reconnect()
@@ -565,7 +569,7 @@ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
565569
"""
566570
for _ in range(AFFINITY_RETRIES or 1):
567571
result = cache_get_node_partitions(conn, caches)
568-
if result.status == 0 and result.value['partition_mapping']:
572+
if result.status == 0:
569573
break
570574
time.sleep(AFFINITY_DELAY)
571575

@@ -608,9 +612,9 @@ def get_best_node(
608612

609613
self._update_affinity(full_affinity)
610614

611-
for conn in self._nodes:
612-
if not conn.alive:
613-
conn.reconnect()
615+
for node in self._nodes:
616+
if not node.alive:
617+
node.reconnect()
614618

615619
c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
616620
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')

pyignite/connection/aio_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,10 @@ async def _connect(self):
190190
self._on_handshake_fail(e)
191191
raise e
192192
except Exception as e:
193+
self._on_handshake_fail(e)
193194
# restore undefined protocol version
194195
if detecting_protocol:
195196
self.client.protocol_context = None
196-
self._on_handshake_fail(e)
197197
raise e
198198

199199
self._on_handshake_success(result)

pyignite/connection/connection.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ def _process_handshake_error(self, response):
9999
def _on_handshake_start(self):
100100
if logger.isEnabledFor(logging.DEBUG):
101101
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
102-
self.host, self.port, self.client.protocol_context)
102+
self.host, self.port, self.protocol_context)
103+
if self._enabled_connection_listener:
104+
self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)
103105

104106
def _on_handshake_success(self, result):
105107
features = BitmaskFeature.from_array(result.get('features', None))
@@ -109,24 +111,45 @@ def _on_handshake_success(self, result):
109111

110112
if logger.isEnabledFor(logging.DEBUG):
111113
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
112-
self.host, self.port, self.uuid, self.client.protocol_context)
114+
self.host, self.port, self.uuid, self.protocol_context)
115+
if self._enabled_connection_listener:
116+
self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)
113117

114118
def _on_handshake_fail(self, err):
119+
self.failed = True
120+
115121
if isinstance(err, AuthenticationError):
116122
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
117123
self.host, self.port, err)
124+
if self._enabled_connection_listener:
125+
self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
118126
else:
119127
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
120128
"with protocol context %s failed: %s",
121-
self.host, self.port, self.client.protocol_context, err, exc_info=True)
129+
self.host, self.port, self.protocol_context, err, exc_info=True)
130+
if self._enabled_connection_listener:
131+
self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)
122132

123133
def _on_connection_lost(self, err=None, expected=False):
124-
if expected and logger.isEnabledFor(logging.DEBUG):
125-
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
126-
self.host, self.port, self.uuid)
134+
if expected:
135+
if logger.isEnabledFor(logging.DEBUG):
136+
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
137+
self.host, self.port, self.uuid)
138+
if self._enabled_connection_listener:
139+
self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
127140
else:
128141
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
129142
self.host, self.port, self.uuid, err)
143+
if self._enabled_connection_listener:
144+
self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)
145+
146+
@property
147+
def _enabled_connection_listener(self):
148+
return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener
149+
150+
@property
151+
def _connection_listener(self):
152+
return self.client._event_listeners
130153

131154

132155
class Connection(BaseConnection):
@@ -216,10 +239,10 @@ def connect(self):
216239
self._on_handshake_fail(e)
217240
raise e
218241
except Exception as e:
242+
self._on_handshake_fail(e)
219243
# restore undefined protocol version
220244
if detecting_protocol:
221245
self.client.protocol_context = None
222-
self._on_handshake_fail(e)
223246
raise e
224247

225248
self._on_handshake_success(result)
@@ -260,7 +283,7 @@ def reconnect(self):
260283
if self.alive:
261284
return
262285

263-
self.close()
286+
self.close(on_reconnect=True)
264287

265288
# connect and silence the connection errors
266289
try:
@@ -352,7 +375,7 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
352375

353376
return data
354377

355-
def close(self):
378+
def close(self, on_reconnect=False):
356379
"""
357380
Try to mark socket closed, then unlink it. This is recommended but
358381
not required, since sockets are automatically closed when
@@ -364,5 +387,6 @@ def close(self):
364387
self._socket.close()
365388
except connection_errors:
366389
pass
367-
self._on_connection_lost(expected=True)
390+
if not on_reconnect and not self.failed:
391+
self._on_connection_lost(expected=True)
368392
self._socket = None

pyignite/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def _ensure_consistency(self):
4444
if not self.is_feature_flags_supported():
4545
self._features = None
4646

47+
def copy(self):
48+
return ProtocolContext(self.version, self.features)
49+
4750
@property
4851
def version(self):
4952
return getattr(self, '_version', None)

0 commit comments

Comments
 (0)