|
16 | 16 | import asyncio |
17 | 17 | import random |
18 | 18 | from itertools import chain |
19 | | -from typing import Iterable, Type, Union, Any |
| 19 | +from typing import Iterable, Type, Union, Any, Dict |
20 | 20 |
|
| 21 | +from .api import cache_get_node_partitions_async |
21 | 22 | from .api.binary import get_binary_type_async, put_binary_type_async |
22 | 23 | from .api.cache_config import cache_get_names_async |
| 24 | +from .cache import BaseCache |
23 | 25 | from .client import BaseClient |
24 | 26 | from .cursors import AioSqlFieldsCursor |
25 | 27 | from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache |
26 | 28 | from .connection import AioConnection |
| 29 | +from .constants import AFFINITY_RETRIES, AFFINITY_DELAY |
27 | 30 | from .datatypes import BinaryObject |
28 | 31 | from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors |
29 | 32 | from .stream import AioBinaryStream, READ_BACKWARD |
30 | | -from .utils import cache_id, entity_id, status_to_exception, is_iterable, is_wrapped |
| 33 | +from .utils import cache_id, entity_id, status_to_exception, is_wrapped |
31 | 34 |
|
32 | 35 |
|
33 | 36 | __all__ = ['AioClient'] |
@@ -72,6 +75,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, * |
72 | 75 | """ |
73 | 76 | super().__init__(compact_footer, partition_aware, **kwargs) |
74 | 77 | self._registry_mux = asyncio.Lock() |
| 78 | + self._affinity_query_mux = asyncio.Lock() |
75 | 79 |
|
76 | 80 | def connect(self, *args): |
77 | 81 | """ |
@@ -271,6 +275,89 @@ async def unwrap_binary(self, value: Any) -> Any: |
271 | 275 | return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self) |
272 | 276 | return value |
273 | 277 |
|
| 278 | + @status_to_exception(CacheError) |
| 279 | + async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> Dict: |
| 280 | + """ |
| 281 | + Queries server for affinity mappings. Retries in case |
| 282 | + of an intermittent error (most probably “Getting affinity for topology |
| 283 | + version earlier than affinity is calculated”). |
| 284 | +
|
| 285 | + :param conn: connection to Igneite server, |
| 286 | + :param caches: Ids of caches, |
| 287 | + :return: OP_CACHE_PARTITIONS operation result value. |
| 288 | + """ |
| 289 | + for _ in range(AFFINITY_RETRIES or 1): |
| 290 | + result = await cache_get_node_partitions_async(conn, caches) |
| 291 | + if result.status == 0 and result.value['partition_mapping']: |
| 292 | + break |
| 293 | + await asyncio.sleep(AFFINITY_DELAY) |
| 294 | + |
| 295 | + return result |
| 296 | + |
| 297 | + async def get_best_node( |
| 298 | + self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'GridGainDataType' = None |
| 299 | + ) -> 'AioConnection': |
| 300 | + """ |
| 301 | + Returns the node from the list of the nodes, opened by client, that |
| 302 | + most probably contains the needed key-value pair. See IEP-23. |
| 303 | +
|
| 304 | + This method is not a part of the public API. Unless you wish to |
| 305 | + extend the `pyignite` capabilities (with additional testing, logging, |
| 306 | + examining connections, et c.) you probably should not use it. |
| 307 | +
|
| 308 | + :param cache: Ignite cache, cache name or cache id, |
| 309 | + :param key: (optional) pythonic key, |
| 310 | + :param key_hint: (optional) Ignite data type, for which the given key |
| 311 | + should be converted, |
| 312 | + :return: Ignite connection object. |
| 313 | + """ |
| 314 | + conn = await self.random_node() |
| 315 | + |
| 316 | + if self.partition_aware and key is not None: |
| 317 | + caches = self._caches_to_update_affinity() |
| 318 | + if caches: |
| 319 | + async with self._affinity_query_mux: |
| 320 | + while True: |
| 321 | + caches = self._caches_to_update_affinity() |
| 322 | + if not caches: |
| 323 | + break |
| 324 | + |
| 325 | + try: |
| 326 | + full_affinity = await self._get_affinity(conn, caches) |
| 327 | + self._update_affinity(full_affinity) |
| 328 | + |
| 329 | + asyncio.ensure_future( |
| 330 | + asyncio.gather( |
| 331 | + *[conn.reconnect() for conn in self._nodes if not conn.alive], |
| 332 | + return_exceptions=True |
| 333 | + ) |
| 334 | + ) |
| 335 | + |
| 336 | + break |
| 337 | + except connection_errors: |
| 338 | + # retry if connection failed |
| 339 | + conn = await self.random_node() |
| 340 | + pass |
| 341 | + except CacheError: |
| 342 | + # server did not create mapping in time |
| 343 | + return conn |
| 344 | + |
| 345 | + c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache) |
| 346 | + parts = self._cache_partition_mapping(c_id).get('number_of_partitions') |
| 347 | + |
| 348 | + if not parts: |
| 349 | + return conn |
| 350 | + |
| 351 | + key, key_hint = self._get_affinity_key(c_id, key, key_hint) |
| 352 | + |
| 353 | + hashcode = await key_hint.hashcode_async(key, self) |
| 354 | + |
| 355 | + best_node = self._get_node_by_hashcode(c_id, hashcode, parts) |
| 356 | + if best_node: |
| 357 | + return best_node |
| 358 | + |
| 359 | + return conn |
| 360 | + |
274 | 361 | async def create_cache(self, settings: Union[str, dict]) -> 'AioCache': |
275 | 362 | """ |
276 | 363 | Creates Ignite cache by name. Raises `CacheError` if such a cache is |
|
0 commit comments