Skip to content

Commit a4e34a7

Browse files
dmelnichukisapego
authored andcommitted
NBL-5: Affinity Awareness for Python Thin client
This closes #180
1 parent 01da2d0 commit a4e34a7

42 files changed

Lines changed: 2826 additions & 1054 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/examples.rst

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -445,27 +445,24 @@ When connection to the server is broken or timed out,
445445
(`OSError` or `SocketError`), but keeps its constructor's parameters intact
446446
and tries to reconnect transparently.
447447

448-
When there's no way for :class:`~pyignite.client.Client` to reconnect, it
449-
raises a special :class:`~pyignite.exceptions.ReconnectError` exception.
448+
When :class:`~pyignite.client.Client` detects that all nodes in the list are
449+
failed without the possibility of restoring connection, it raises a special
450+
:class:`~pyignite.exceptions.ReconnectError` exception.
450451

451-
The following example features a simple node list traversal failover mechanism.
452452
Gather 3 Ignite nodes on `localhost` into one cluster and run:
453453

454454
.. literalinclude:: ../examples/failover.py
455455
:language: python
456-
:lines: 16-49
456+
:lines: 16-51
457457

458458
Then try shutting down and restarting nodes, and see what happens.
459459

460460
.. literalinclude:: ../examples/failover.py
461461
:language: python
462-
:lines: 51-61
462+
:lines: 53-65
463463

464464
Client reconnection do not require an explicit user action, like calling
465-
a special method or resetting a parameter. Note, however, that reconnection
466-
is lazy: it happens only if (and when) it is needed. In this example,
467-
the automatic reconnection happens, when the script checks upon the last
468-
saved value:
465+
a special method or resetting a parameter.
469466

470467
.. literalinclude:: ../examples/failover.py
471468
:language: python
@@ -475,29 +472,6 @@ It means that instead of checking the connection status it is better for
475472
`pyignite` user to just try the supposed data operations and catch
476473
the resulting exception.
477474

478-
:py:meth:`~pyignite.connection.Connection.connect` method accepts any
479-
iterable, not just list. It means that you can implement any reconnection
480-
policy (round-robin, nodes prioritization, pause on reconnect or graceful
481-
backoff) with a generator.
482-
483-
`pyignite` comes with a sample
484-
:class:`~pyignite.connection.generators.RoundRobin` generator. In the above
485-
example try to replace
486-
487-
.. literalinclude:: ../examples/failover.py
488-
:language: python
489-
:lines: 29
490-
491-
with
492-
493-
.. code-block:: python3
494-
495-
client.connect(RoundRobin(nodes, max_reconnects=20))
496-
497-
The client will try to reconnect to node 1 after node 3 is crashed, then to
498-
node 2, et c. At least one node should be active for the
499-
:class:`~pyignite.connection.generators.RoundRobin` to work properly.
500-
501475
SSL/TLS
502476
-------
503477

examples/failover.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,39 @@
2727

2828
client = Client(timeout=4.0)
2929
client.connect(nodes)
30-
print('Connected to {}'.format(client))
30+
print('Connected')
3131

3232
my_cache = client.get_or_create_cache({
3333
PROP_NAME: 'my_cache',
34-
PROP_CACHE_MODE: CacheMode.REPLICATED,
34+
PROP_CACHE_MODE: CacheMode.PARTITIONED,
35+
PROP_BACKUPS_NUMBER: 2,
3536
})
3637
my_cache.put('test_key', 0)
38+
test_value = 0
3739

3840
# abstract main loop
3941
while True:
4042
try:
4143
# do the work
42-
test_value = my_cache.get('test_key')
44+
test_value = my_cache.get('test_key') or 0
4345
my_cache.put('test_key', test_value + 1)
4446
except (OSError, SocketError) as e:
4547
# recover from error (repeat last command, check data
4648
# consistency or just continue − depends on the task)
4749
print('Error: {}'.format(e))
48-
print('Last value: {}'.format(my_cache.get('test_key')))
49-
print('Reconnected to {}'.format(client))
50+
print('Last value: {}'.format(test_value))
51+
print('Reconnecting')
5052

51-
# Connected to 127.0.0.1:10800
52-
# Error: [Errno 104] Connection reset by peer
53-
# Last value: 6999
54-
# Reconnected to 127.0.0.1:10801
55-
# Error: Socket connection broken.
56-
# Last value: 12302
57-
# Reconnected to 127.0.0.1:10802
58-
# Error: [Errno 111] Client refused
53+
# Connected
54+
# Error: Connection broken.
55+
# Last value: 2650
56+
# Reconnecting
57+
# Error: Connection broken.
58+
# Last value: 10204
59+
# Reconnecting
60+
# Error: Connection broken.
61+
# Last value: 18932
62+
# Reconnecting
5963
# Traceback (most recent call last):
60-
# ...
61-
# pyignite.exceptions.ReconnectError: Can not reconnect: out of nodes
64+
# ...
65+
# pyignite.exceptions.ReconnectError: Can not reconnect: out of nodes.

pyignite/api/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
stable end user API see :mod:`pyignite.client` module.
2424
"""
2525

26+
from .affinity import (
27+
cache_get_node_partitions,
28+
)
2629
from .cache_config import (
2730
cache_create,
2831
cache_get_names,
@@ -54,6 +57,7 @@
5457
cache_remove_keys,
5558
cache_remove_all,
5659
cache_get_size,
60+
cache_local_peek,
5761
)
5862
from .sql import (
5963
scan,

pyignite/api/affinity.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#
2+
# Copyright 2019 GridGain Systems, Inc. and Contributors.
3+
#
4+
# Licensed under the GridGain Community Edition License (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
from typing import Iterable, Union
17+
18+
from pyignite.datatypes import Bool, Int, Long, UUIDObject
19+
from pyignite.datatypes.internal import StructArray
20+
from pyignite.queries import Query
21+
from pyignite.queries.op_codes import OP_CACHE_PARTITIONS
22+
from pyignite.utils import is_iterable
23+
from .result import APIResult
24+
25+
26+
cache_ids = StructArray([
27+
('cache_id', Int),
28+
])
29+
30+
cache_config = StructArray([
31+
('key_type_id', Int),
32+
('affinity_key_field_id', Int),
33+
])
34+
35+
node_partitions = StructArray([
36+
('partition_id', Int),
37+
])
38+
39+
node_mapping = StructArray([
40+
('node_uuid', UUIDObject),
41+
('node_partitions', node_partitions)
42+
])
43+
44+
cache_mapping = StructArray([
45+
('cache_id', Int),
46+
('cache_config', cache_config),
47+
])
48+
49+
partition_mapping = StructArray([
50+
('is_applicable', Bool),
51+
('cache_mapping', cache_mapping),
52+
('node_mapping', node_mapping),
53+
])
54+
55+
56+
def cache_get_node_partitions(
57+
conn: 'Connection', caches: Union[int, Iterable[int]],
58+
query_id: int = None,
59+
) -> APIResult:
60+
"""
61+
Gets partition mapping for an Ignite cache or a number of caches. See
62+
“IEP-23: Best Effort Affinity for thin clients”.
63+
64+
:param conn: connection to Ignite server,
65+
:param caches: cache ID(s) the mapping is provided for,
66+
:param query_id: (optional) a value generated by client and returned as-is
67+
in response.query_id. When the parameter is omitted, a random value
68+
is generated,
69+
:return: API result data object.
70+
"""
71+
query_struct = Query(
72+
OP_CACHE_PARTITIONS,
73+
[
74+
('cache_ids', cache_ids),
75+
],
76+
query_id=query_id
77+
)
78+
if not is_iterable(caches):
79+
caches = [caches]
80+
81+
result = query_struct.perform(
82+
conn,
83+
query_params={
84+
'cache_ids': [{'cache_id': cache} for cache in caches],
85+
},
86+
response_config=[
87+
('version_major', Long),
88+
('version_minor', Int),
89+
('partition_mapping', partition_mapping),
90+
],
91+
)
92+
if result.status == 0:
93+
# tidying up the result
94+
value = {
95+
'version': (
96+
result.value['version_major'],
97+
result.value['version_minor']
98+
),
99+
'partition_mapping': [],
100+
}
101+
for i, partition_map in enumerate(result.value['partition_mapping']):
102+
cache_id = partition_map['cache_mapping'][0]['cache_id']
103+
value['partition_mapping'].insert(
104+
i,
105+
{
106+
'cache_id': cache_id,
107+
'is_applicable': partition_map['is_applicable'],
108+
}
109+
)
110+
if partition_map['is_applicable']:
111+
value['partition_mapping'][i]['cache_config'] = {
112+
a['key_type_id']: a['affinity_key_field_id']
113+
for a in partition_map['cache_mapping'][0]['cache_config']
114+
}
115+
value['partition_mapping'][i]['node_mapping'] = {
116+
p['node_uuid']: [
117+
x['partition_id'] for x in p['node_partitions']
118+
]
119+
for p in partition_map['node_mapping']
120+
}
121+
result.value = value
122+
123+
return result

pyignite/api/binary.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
body_struct, enum_struct, schema_struct, binary_fields_struct,
2121
)
2222
from pyignite.datatypes import String, Int, Bool
23-
from pyignite.queries import Query, Response
23+
from pyignite.queries import Query, get_response_class
2424
from pyignite.queries.op_codes import *
2525
from pyignite.utils import int_overflow, entity_id
2626
from .result import APIResult
@@ -53,7 +53,7 @@ def get_binary_type(
5353
})
5454
connection.send(send_buffer)
5555

56-
response_head_struct = Response([
56+
response_head_struct = get_response_class(connection)([
5757
('type_exists', Bool),
5858
])
5959
response_head_type, recv_buffer = response_head_struct.parse(connection)

0 commit comments

Comments
 (0)