Skip to content

Commit e4a9927

Browse files
committed
NBL-31: Python thin now does not fails if one node is not avail
This closes #255
1 parent df2129c commit e4a9927

5 files changed

Lines changed: 107 additions & 76 deletions

File tree

pygridgain/cache.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,9 @@ def get_best_node(
313313
in self.affinity['node_mapping'].items()
314314
if part in p
315315
)
316-
best_conn = conn.client._nodes[node_uuid]
316+
best_conn = next(
317+
n for n in conn.client._nodes if n.uuid == node_uuid
318+
)
317319
if best_conn.alive:
318320
conn = best_conn
319321
except (StopIteration, KeyError):

pygridgain/client.py

Lines changed: 41 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242

4343
from collections import defaultdict, OrderedDict
4444
import random
45-
from typing import Dict, Iterable, Optional, Tuple, Type, Union
45+
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
4646

4747
from .api.binary import get_binary_type, put_binary_type
4848
from .api.cache_config import cache_get_names
@@ -80,10 +80,10 @@ class Client:
8080
_compact_footer: bool = None
8181
_connection_args: Dict = None
8282
_current_node: int = None
83-
_nodes: Union[Iterable[Connection], Dict['UUID', Connection]] = None
83+
_nodes: List[Connection] = None
8484

85-
affinity_version: Tuple = None
86-
protocol_version = None
85+
affinity_version: Optional[Tuple] = None
86+
protocol_version: Optional[Tuple] = None
8787

8888
def __init__(
8989
self, compact_footer: bool = None, affinity_aware: bool = False,
@@ -106,6 +106,7 @@ def __init__(
106106
"""
107107
self._compact_footer = compact_footer
108108
self._connection_args = kwargs
109+
self._nodes = []
109110
self._current_node = 0
110111
self._affinity_aware = affinity_aware
111112
self.affinity_version = (0, 0)
@@ -126,43 +127,6 @@ def get_protocol_version(self) -> Optional[Tuple]:
126127
def affinity_aware(self):
127128
return self._affinity_aware
128129

129-
@select_version
130-
def _add_node(
131-
self, host: str = None, port: int = None,
132-
conn: Connection = None, node_uuid: 'UUID' = None,
133-
) -> 'UUID':
134-
"""
135-
Opens a connection to GridGain server and adds it to the nodes'
136-
collection (connection pool).
137-
"""
138-
if self._nodes is None:
139-
self._nodes = {}
140-
141-
if conn is None:
142-
conn = Connection(self, **self._connection_args)
143-
hs_response = conn.connect(host, port)
144-
node_uuid = hs_response['node_uuid']
145-
146-
if node_uuid:
147-
self._nodes[node_uuid] = conn
148-
return node_uuid
149-
150-
def _add_node_130(
151-
self, host: str = None, port: int = None, conn: Connection = None,
152-
*args, **kwargs,
153-
):
154-
if self._nodes is None:
155-
self._nodes = []
156-
157-
if conn is None:
158-
conn = Connection(self, **self._connection_args)
159-
conn.host = host
160-
conn.port = port
161-
162-
self._nodes.append(conn)
163-
164-
_add_node_120 = _add_node_130
165-
166130
def connect(self, *args):
167131
"""
168132
Connect to GridGain cluster node(s).
@@ -185,36 +149,51 @@ def connect(self, *args):
185149
else:
186150
raise ConnectionError('Connection parameters are not valid.')
187151

188-
nodes = iter(nodes)
152+
# the following code is quite twisted, because the protocol version
153+
# is initially unknown
154+
189155
# TODO: open first node in foregroung, others − in background
156+
for i, node in enumerate(nodes):
157+
host, port = node
158+
conn = Connection(self, **self._connection_args)
159+
conn.host = host
160+
conn.port = port
190161

191-
first_node = Connection(self, **self._connection_args)
162+
try:
163+
if (
164+
self.protocol_version is None
165+
or self.protocol_version >= (1, 4, 0)
166+
):
167+
# open connection before adding to the pool
168+
conn.connect(host, port)
169+
170+
# now we have the protocol version
171+
if self.protocol_version < (1, 4, 0):
172+
# do not try to open more nodes
173+
self._current_node = i
174+
else:
175+
# take a chance to schedule the reconnection
176+
# for all the failed connections, that was probed
177+
# before this
178+
for failed_node in self._nodes[:i]:
179+
failed_node.reconnect()
192180

193-
# now we know protocol version
194-
self._add_node(
195-
conn=first_node,
196-
node_uuid=first_node.connect(*next(nodes)).get('node_uuid', None),
197-
)
181+
except connection_errors:
182+
conn._fail()
183+
if (
184+
self.protocol_version
185+
and self.protocol_version >= (1, 4, 0)
186+
):
187+
# schedule the reconnection
188+
conn.reconnect()
198189

199-
for host, port in nodes:
200-
self._add_node(host, port)
190+
self._nodes.append(conn)
201191

202-
@select_version
203192
def close(self):
204-
"""
205-
Close all connections to the server and clean the connection pool.
206-
"""
207-
for conn in self._nodes.values():
208-
conn.close()
209-
self._nodes.clear()
210-
211-
def close_130(self):
212193
for conn in self._nodes:
213194
conn.close()
214195
self._nodes.clear()
215196

216-
close_120 = close_130
217-
218197
@property
219198
@select_version
220199
def random_node(self) -> Connection:
@@ -227,7 +206,7 @@ def random_node(self) -> Connection:
227206
"""
228207
try:
229208
return random.choice(
230-
list(n for n in self._nodes.values() if n.alive)
209+
list(n for n in self._nodes if n.alive)
231210
)
232211
except IndexError:
233212
# cannot choose from an empty sequence

pygridgain/connection/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from collections import OrderedDict
2323
import socket
24-
from threading import Lock, Timer
24+
from threading import Lock
2525
from typing import Union
2626

2727
from pygridgain.constants import *
@@ -30,7 +30,7 @@
3030
)
3131
from pygridgain.datatypes import Byte, Int, Short, String, UUIDObject
3232
from pygridgain.datatypes.internal import Struct
33-
from pygridgain.utils import select_version
33+
from pygridgain.utils import DaemonicTimer, select_version
3434

3535
from .handshake import HandshakeRequest
3636
from .ssl import wrap
@@ -62,6 +62,7 @@ class Connection:
6262
username = None
6363
password = None
6464
ssl_params = {}
65+
uuid = None
6566

6667
@staticmethod
6768
def _check_ssl_params(params):
@@ -260,6 +261,8 @@ def connect(
260261
raise e
261262

262263
# connection is ready for end user
264+
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
265+
263266
self._failed = False
264267
return result
265268

@@ -334,7 +337,7 @@ def reconnect(self, seq_no=0):
334337
self._reconnect()
335338

336339
if self.failed:
337-
Timer(
340+
DaemonicTimer(
338341
RECONNECT_BACKOFF_SEQUENCE[seq_no],
339342
self.reconnect,
340343
kwargs={'seq_no': seq_no + 1},
@@ -477,7 +480,10 @@ def close(self, release=True):
477480
garbage-collected.
478481
"""
479482
if release:
480-
self._in_use.release()
483+
try:
484+
self._in_use.release()
485+
except RuntimeError:
486+
pass
481487

482488
if self._socket:
483489
try:

pygridgain/utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import ctypes
1717
import decimal
1818
from functools import wraps
19+
from threading import Event, Thread
1920
from typing import Any, Callable, Optional, Type, Tuple, Union
2021

2122
from pygridgain.datatypes.base import GridGainDataType
@@ -279,3 +280,27 @@ def get_field_by_id(
279280
def unsigned(value: int, c_type: ctypes._SimpleCData = ctypes.c_uint) -> int:
280281
""" Convert signed integer value to unsigned. """
281282
return c_type(value).value
283+
284+
285+
class DaemonicTimer(Thread):
286+
"""
287+
Same as normal `threading.Timer`, but do not delay the program exit.
288+
"""
289+
290+
def __init__(self, interval, function, args=None, kwargs=None):
291+
Thread.__init__(self, daemon=True)
292+
self.interval = interval
293+
self.function = function
294+
self.args = args if args is not None else []
295+
self.kwargs = kwargs if kwargs is not None else {}
296+
self.finished = Event()
297+
298+
def cancel(self):
299+
"""Stop the timer if it hasn't finished yet."""
300+
self.finished.set()
301+
302+
def run(self):
303+
self.finished.wait(self.interval)
304+
if not self.finished.is_set():
305+
self.function(*self.args, **self.kwargs)
306+
self.finished.set()

tests/test_affinity.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#
1616
from datetime import datetime, timedelta
1717
import decimal
18-
import time
1918
from uuid import UUID, uuid4
2019

2120
import pytest
@@ -30,6 +29,13 @@
3029

3130
def test_get_node_partitions(client):
3231

32+
if client.protocol_version < (1, 4, 0):
33+
pytest.skip(
34+
'Best effort affinity is not supported by the protocol {}.'.format(
35+
client.protocol_version
36+
)
37+
)
38+
3339
conn = client.random_node
3440

3541
cache_1 = client.get_or_create_cache('test_cache_1')
@@ -46,8 +52,6 @@ def test_get_node_partitions(client):
4652
cache_4 = client.get_or_create_cache('test_cache_4')
4753
cache_5 = client.get_or_create_cache('test_cache_5')
4854

49-
time.sleep(0.1)
50-
5155
result = cache_get_node_partitions(
5256
conn,
5357
[cache_1.cache_id, cache_2.cache_id]
@@ -120,7 +124,12 @@ def test_get_node_partitions(client):
120124
)
121125
def test_affinity(client, key, key_hint):
122126

123-
time.sleep(0.1)
127+
if client.protocol_version < (1, 4, 0):
128+
pytest.skip(
129+
'Best effort affinity is not supported by the protocol {}.'.format(
130+
client.protocol_version
131+
)
132+
)
124133

125134
cache_1 = client.get_or_create_cache({
126135
PROP_NAME: 'test_cache_1',
@@ -131,7 +140,7 @@ def test_affinity(client, key, key_hint):
131140

132141
best_node = cache_1.get_best_node(key, key_hint=key_hint)
133142

134-
for node in client._nodes.values():
143+
for node in filter(lambda n: n.alive, client._nodes):
135144
result = cache_local_peek(
136145
node, cache_1.cache_id, key, key_hint=key_hint,
137146
)
@@ -149,7 +158,12 @@ def test_affinity(client, key, key_hint):
149158

150159
def test_affinity_for_generic_object(client):
151160

152-
time.sleep(0.1)
161+
if client.protocol_version < (1, 4, 0):
162+
pytest.skip(
163+
'Best effort affinity is not supported by the protocol {}.'.format(
164+
client.protocol_version
165+
)
166+
)
153167

154168
cache_1 = client.get_or_create_cache({
155169
PROP_NAME: 'test_cache_1',
@@ -173,7 +187,7 @@ class KeyClass(
173187

174188
best_node = cache_1.get_best_node(key, key_hint=BinaryObject)
175189

176-
for node in client._nodes.values():
190+
for node in filter(lambda n: n.alive, client._nodes):
177191
result = cache_local_peek(
178192
node, cache_1.cache_id, key, key_hint=BinaryObject,
179193
)
@@ -191,7 +205,12 @@ class KeyClass(
191205

192206
def test_affinity_for_generic_object_without_type_hints(client):
193207

194-
time.sleep(0.1)
208+
if client.protocol_version < (1, 4, 0):
209+
pytest.skip(
210+
'Best effort affinity is not supported by the protocol {}.'.format(
211+
client.protocol_version
212+
)
213+
)
195214

196215
cache_1 = client.get_or_create_cache({
197216
PROP_NAME: 'test_cache_1',
@@ -215,7 +234,7 @@ class KeyClass(
215234

216235
best_node = cache_1.get_best_node(key)
217236

218-
for node in client._nodes.values():
237+
for node in filter(lambda n: n.alive, client._nodes):
219238
result = cache_local_peek(
220239
node, cache_1.cache_id, key
221240
)

0 commit comments

Comments
 (0)