Skip to content

Commit 33f7ead

Browse files
isapegoivandaschptupitsynalamar
authored
GG-32660: Port review changes from several donations (#13)
Co-authored-by: Ivan Dashchinskiy <ivandasch@gmail.com> Co-authored-by: Pavel Tupitsyn <ptupitsyn@apache.org> Co-authored-by: Ilya Kasnacheev <ilya.kasnacheev@gmail.com>
1 parent 8d643c8 commit 33f7ead

28 files changed

Lines changed: 345 additions & 677 deletions

pygridgain/api/key_value.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -977,10 +977,7 @@ def cache_get_size(
977977
otherwise.
978978
"""
979979
if not isinstance(peek_modes, (list, tuple)):
980-
if peek_modes == 0:
981-
peek_modes = []
982-
else:
983-
peek_modes = [peek_modes]
980+
peek_modes = [peek_modes] if peek_modes else []
984981

985982
query_struct = Query(
986983
OP_CACHE_GET_SIZE,
@@ -1035,10 +1032,7 @@ def cache_local_peek(
10351032
(null if not found).
10361033
"""
10371034
if not isinstance(peek_modes, (list, tuple)):
1038-
if peek_modes == 0:
1039-
peek_modes = []
1040-
else:
1041-
peek_modes = [peek_modes]
1035+
peek_modes = [peek_modes] if peek_modes else []
10421036

10431037
query_struct = Query(
10441038
OP_CACHE_LOCAL_PEEK,

pygridgain/api/sql.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,10 +416,7 @@ def sql_fields_cursor_get_page(
416416
'cursor': cursor,
417417
},
418418
response_config=[
419-
('data', StructArray([
420-
('field_{}'.format(i), AnyDataObject)
421-
for i in range(field_count)
422-
])),
419+
('data', StructArray([(f'field_{i}', AnyDataObject) for i in range(field_count)])),
423420
('more', Bool),
424421
]
425422
)

pygridgain/binary.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,10 @@ def schema_id(self) -> int:
6969
def __new__(cls, *args, **kwargs) -> Any:
7070
# allow all items in Binary Object schema to be populated as optional
7171
# arguments to `__init__()` with sensible defaults.
72-
attributes = {
73-
k: attr.ib(
74-
type=getattr(v, 'pythonic', type(None)),
75-
default=getattr(v, 'default', None),
76-
) for k, v in cls.schema.items()
77-
}
72+
attributes = {}
73+
for k, v in cls.schema.items():
74+
attributes[k] = attr.ib(type=getattr(v, 'pythonic', type(None)), default=getattr(v, 'default', None))
75+
7876
attributes.update({'version': attr.ib(type=int, default=1)})
7977
cls = attr.s(cls, these=attributes)
8078
# skip parameters
@@ -151,10 +149,7 @@ def _build(self, client: 'Client' = None) -> int:
151149
header.flags |= BinaryObject.OFFSET_ONE_BYTE
152150
elif max(offsets) < 65535:
153151
header.flags |= BinaryObject.OFFSET_TWO_BYTES
154-
schema_class = (
155-
BinaryObject.schema_type(header.flags)
156-
* len(offsets)
157-
)
152+
schema_class = BinaryObject.schema_type(header.flags) * len(offsets)
158153
schema = schema_class()
159154
if compact_footer:
160155
for i, offset in enumerate(offsets):

pygridgain/cache.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
connection_errors,
2626
)
2727
from .utils import (
28-
cache_id, get_field_by_id, is_wrapped, select_version,
28+
cache_id, get_field_by_id, is_wrapped,
2929
status_to_exception, unsigned, unwrap_binary,
3030
)
3131
from .api.cache_config import (
@@ -234,7 +234,6 @@ def _get_affinity(self, conn: 'Connection') -> Dict:
234234

235235
return result
236236

237-
@select_version
238237
def get_best_node(
239238
self, key: Any = None, key_hint: 'GridGainDataType' = None,
240239
) -> 'Connection':
@@ -257,8 +256,6 @@ def get_best_node(
257256
if key_hint is None:
258257
key_hint = AnyDataObject.map_python_type(key)
259258

260-
parts = -1
261-
262259
if self.affinity['version'] < self._client.affinity_version:
263260
# update partition mapping
264261
while True:
@@ -280,9 +277,10 @@ def get_best_node(
280277
del self.affinity['partition_mapping']
281278

282279
# calculate the number of partitions
283-
parts = sum(
284-
[len(p) for _, p in self.affinity['node_mapping'].items()]
285-
) if 'node_mapping' in self.affinity else 0
280+
parts = 0
281+
if 'node_mapping' in self.affinity:
282+
for p in self.affinity['node_mapping'].values():
283+
parts += len(p)
286284

287285
self.affinity['number_of_partitions'] = parts
288286
else:
@@ -314,26 +312,24 @@ def get_best_node(
314312

315313
# search for connection
316314
try:
317-
node_uuid = next(
318-
u for u, p
319-
in self.affinity['node_mapping'].items()
320-
if part in p
321-
)
322-
best_conn = next(
323-
n for n in conn.client._nodes if n.uuid == node_uuid
324-
)
325-
if best_conn.alive:
326-
conn = best_conn
327-
except (StopIteration, KeyError):
315+
node_uuid, best_conn = None, None
316+
for u, p in self.affinity['node_mapping'].items():
317+
if part in p:
318+
node_uuid = u
319+
break
320+
321+
if node_uuid:
322+
for n in conn.client._nodes:
323+
if n.uuid == node_uuid:
324+
best_conn = n
325+
break
326+
if best_conn and best_conn.alive:
327+
conn = best_conn
328+
except KeyError:
328329
pass
329330

330331
return conn
331332

332-
def get_best_node_130(self, *args, **kwargs):
333-
return self.client.random_node
334-
335-
get_best_node_120 = get_best_node_130
336-
337333
@status_to_exception(CacheError)
338334
def get(self, key, key_hint: object = None) -> Any:
339335
"""
@@ -404,9 +400,7 @@ def put_all(self, pairs: dict):
404400
to save. Each key or value can be an item of representable
405401
Python type or a tuple of (item, hint),
406402
"""
407-
return cache_put_all(
408-
self.get_best_node(), self._cache_id, pairs
409-
)
403+
return cache_put_all(self.get_best_node(), self._cache_id, pairs)
410404

411405
@status_to_exception(CacheError)
412406
def replace(

pygridgain/client.py

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from collections import defaultdict, OrderedDict
4444
import random
4545
import re
46+
from itertools import chain
4647
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union
4748

4849
from .api.binary import get_binary_type, put_binary_type
@@ -57,7 +58,7 @@
5758
BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
5859
)
5960
from .utils import (
60-
capitalize, entity_id, schema_id, process_delimiter, select_version,
61+
capitalize, entity_id, schema_id, process_delimiter,
6162
status_to_exception, is_iterable,
6263
)
6364
from .binary import GenericObjectMeta
@@ -131,7 +132,14 @@ def get_protocol_version(self) -> Optional[Tuple]:
131132

132133
@property
133134
def partition_aware(self):
134-
return self._partition_aware
135+
return self._partition_aware and self.partition_awareness_supported_by_protocol
136+
137+
@property
138+
def partition_awareness_supported_by_protocol(self):
139+
# TODO: Need to re-factor this. I believe, we need separate class or
140+
# set of functions to work with protocol versions without manually
141+
# comparing versions with just some random tuples
142+
return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
135143

136144
def connect(self, *args):
137145
"""
@@ -158,23 +166,20 @@ def connect(self, *args):
158166
# the following code is quite twisted, because the protocol version
159167
# is initially unknown
160168

161-
# TODO: open first node in foregroung, others − in background
169+
# TODO: open first node in foreground, others − in background
162170
for i, node in enumerate(nodes):
163171
host, port = node
164172
conn = Connection(self, **self._connection_args)
165173
conn.host = host
166174
conn.port = port
167175

168176
try:
169-
if (
170-
self.protocol_version is None
171-
or self.protocol_version >= (1, 4, 0)
172-
):
177+
if self.protocol_version is None or self.partition_aware:
173178
# open connection before adding to the pool
174179
conn.connect(host, port)
175180

176181
# now we have the protocol version
177-
if self.protocol_version < (1, 4, 0):
182+
if not self.partition_aware:
178183
# do not try to open more nodes
179184
self._current_node = i
180185
else:
@@ -186,10 +191,7 @@ def connect(self, *args):
186191

187192
except connection_errors:
188193
conn._fail()
189-
if (
190-
self.protocol_version
191-
and self.protocol_version >= (1, 4, 0)
192-
):
194+
if self.partition_aware:
193195
# schedule the reconnection
194196
conn.reconnect()
195197

@@ -204,7 +206,6 @@ def close(self):
204206
self._nodes.clear()
205207

206208
@property
207-
@select_version
208209
def random_node(self) -> Connection:
209210
"""
210211
Returns random usable node.
@@ -213,46 +214,44 @@ def random_node(self) -> Connection:
213214
extend the `pygridgain` capabilities (with additional testing,
214215
logging, examining connections, et c.) you probably should not use it.
215216
"""
216-
try:
217-
return random.choice(
218-
list(n for n in self._nodes if n.alive)
219-
)
220-
except IndexError:
221-
# cannot choose from an empty sequence
222-
raise ReconnectError('Can not reconnect: out of nodes.') from None
223-
224-
def random_node_130(self):
225-
# it actually returns the next usable node, but the name stands for
226-
# the code unification reason
227-
node = self._nodes[self._current_node]
228-
if node.alive:
229-
return node
230-
231-
# close current (supposedly failed) node
232-
self._nodes[self._current_node].close()
233-
234-
# advance the node index
235-
self._current_node += 1
236-
if self._current_node >= len(self._nodes):
237-
self._current_node = 0
238-
239-
# prepare the list of node indexes to try to connect to
240-
seq = list(range(len(self._nodes)))
241-
seq = seq[self._current_node:] + seq[:self._current_node]
242-
243-
for i in seq:
244-
node = self._nodes[i]
217+
if self.partition_aware:
218+
# if partition awareness is used just pick a random connected node
245219
try:
246-
node.connect(node.host, node.port)
247-
except connection_errors:
248-
pass
249-
else:
220+
return random.choice(
221+
list(n for n in self._nodes if n.alive)
222+
)
223+
except IndexError:
224+
# cannot choose from an empty sequence
225+
raise ReconnectError('Can not reconnect: out of nodes.') from None
226+
else:
227+
# if partition awareness is not used then just return the current
228+
# node if it's alive or the next usable node if connection with the
229+
# current is broken
230+
node = self._nodes[self._current_node]
231+
if node.alive:
250232
return node
251233

252-
# no nodes left
253-
raise ReconnectError('Can not reconnect: out of nodes.')
254-
255-
random_node_120 = random_node_130
234+
# close current (supposedly failed) node
235+
self._nodes[self._current_node].close()
236+
237+
# advance the node index
238+
self._current_node += 1
239+
if self._current_node >= len(self._nodes):
240+
self._current_node = 0
241+
242+
# prepare the list of node indexes to try to connect to
243+
num_nodes = len(self._nodes)
244+
for i in chain(range(self._current_node, num_nodes), range(self._current_node)):
245+
node = self._nodes[i]
246+
try:
247+
node.connect(node.host, node.port)
248+
except connection_errors:
249+
pass
250+
else:
251+
return node
252+
253+
# no nodes left
254+
raise ReconnectError('Can not reconnect: out of nodes.')
256255

257256
@status_to_exception(BinaryTypeError)
258257
def get_binary_type(self, binary_type: Union[str, int]) -> dict:

0 commit comments

Comments
 (0)