Skip to content

Commit 7cbfe32

Browse files
ivandaschisapego
authored andcommitted
IGNITE-14444 Move affinity mapping storage and best node calculation to clients
This closes #26
1 parent 2fd7fda commit 7cbfe32

9 files changed

Lines changed: 476 additions & 353 deletions

File tree

pyignite/aio_cache.py

Lines changed: 35 additions & 148 deletions
Large diffs are not rendered by default.

pyignite/aio_client.py

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,21 @@
1515
import asyncio
1616
import random
1717
from itertools import chain
18-
from typing import Iterable, Type, Union, Any
18+
from typing import Iterable, Type, Union, Any, Dict
1919

20+
from .api import cache_get_node_partitions_async
2021
from .api.binary import get_binary_type_async, put_binary_type_async
2122
from .api.cache_config import cache_get_names_async
23+
from .cache import BaseCache
2224
from .client import BaseClient
2325
from .cursors import AioSqlFieldsCursor
2426
from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
2527
from .connection import AioConnection
26-
from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT
28+
from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
2729
from .datatypes import BinaryObject
2830
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
2931
from .stream import AioBinaryStream, READ_BACKWARD
30-
from .utils import cache_id, entity_id, status_to_exception, is_iterable, is_wrapped
32+
from .utils import cache_id, entity_id, status_to_exception, is_wrapped
3133

3234

3335
__all__ = ['AioClient']
@@ -72,6 +74,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
7274
"""
7375
super().__init__(compact_footer, partition_aware, **kwargs)
7476
self._registry_mux = asyncio.Lock()
77+
self._affinity_query_mux = asyncio.Lock()
7578

7679
def connect(self, *args):
7780
"""
@@ -271,6 +274,89 @@ async def unwrap_binary(self, value: Any) -> Any:
271274
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
272275
return value
273276

277+
@status_to_exception(CacheError)
278+
async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> Dict:
279+
"""
280+
Queries server for affinity mappings. Retries in case
281+
of an intermittent error (most probably “Getting affinity for topology
282+
version earlier than affinity is calculated”).
283+
284+
:param conn: connection to Igneite server,
285+
:param caches: Ids of caches,
286+
:return: OP_CACHE_PARTITIONS operation result value.
287+
"""
288+
for _ in range(AFFINITY_RETRIES or 1):
289+
result = await cache_get_node_partitions_async(conn, caches)
290+
if result.status == 0 and result.value['partition_mapping']:
291+
break
292+
await asyncio.sleep(AFFINITY_DELAY)
293+
294+
return result
295+
296+
async def get_best_node(
297+
self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None
298+
) -> 'AioConnection':
299+
"""
300+
Returns the node from the list of the nodes, opened by client, that
301+
most probably contains the needed key-value pair. See IEP-23.
302+
303+
This method is not a part of the public API. Unless you wish to
304+
extend the `pyignite` capabilities (with additional testing, logging,
305+
examining connections, et c.) you probably should not use it.
306+
307+
:param cache: Ignite cache, cache name or cache id,
308+
:param key: (optional) pythonic key,
309+
:param key_hint: (optional) Ignite data type, for which the given key
310+
should be converted,
311+
:return: Ignite connection object.
312+
"""
313+
conn = await self.random_node()
314+
315+
if self.partition_aware and key is not None:
316+
caches = self._caches_to_update_affinity()
317+
if caches:
318+
async with self._affinity_query_mux:
319+
while True:
320+
caches = self._caches_to_update_affinity()
321+
if not caches:
322+
break
323+
324+
try:
325+
full_affinity = await self._get_affinity(conn, caches)
326+
self._update_affinity(full_affinity)
327+
328+
asyncio.ensure_future(
329+
asyncio.gather(
330+
*[conn.reconnect() for conn in self._nodes if not conn.alive],
331+
return_exceptions=True
332+
)
333+
)
334+
335+
break
336+
except connection_errors:
337+
# retry if connection failed
338+
conn = await self.random_node()
339+
pass
340+
except CacheError:
341+
# server did not create mapping in time
342+
return conn
343+
344+
c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
345+
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
346+
347+
if not parts:
348+
return conn
349+
350+
key, key_hint = self._get_affinity_key(c_id, key, key_hint)
351+
352+
hashcode = await key_hint.hashcode_async(key, self)
353+
354+
best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
355+
if best_node:
356+
return best_node
357+
358+
return conn
359+
274360
async def create_cache(self, settings: Union[str, dict]) -> 'AioCache':
275361
"""
276362
Creates Ignite cache by name. Raises `CacheError` if such a cache is

0 commit comments

Comments
 (0)