Skip to content

Commit 8fdc81c

Browse files
isapegoivandasch
andauthored
GG-33936 IGNITE-15102 Event handling and monitoring (#54)
(cherry picked from commit 82f29e2) Co-authored-by: Ivan Daschinsky <ivandasch@apache.org>
1 parent dafa815 commit 8fdc81c

20 files changed

+1048
-176
lines changed

docs/modules.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ of `pygridgain`, intended for end users.
3030
datatypes/parsers
3131
datatypes/cache_props
3232
Exceptions <source/pygridgain.exceptions>
33+
Monitoring and handling events <source/pygridgain.monitoring>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
.. Copyright 2021 GridGain Systems, Inc. and Contributors.
2+
3+
.. Licensed under the GridGain Community Edition License (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
.. https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
8+
9+
.. Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
15+
pygridgain.connection.protocol_context package
16+
===========================
17+
18+
.. automodule:: pygridgain.connection.protocol_context
19+
:members:

docs/source/pygridgain.connection.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,9 @@ pygridgain.connection package
1919
:members:
2020
:undoc-members:
2121
:show-inheritance:
22+
23+
Submodules
24+
----------
25+
26+
.. toctree::
27+
pygridgain.connection.protocol_context
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
.. Copyright 2021 GridGain Systems, Inc. and Contributors.
2+
3+
.. Licensed under the GridGain Community Edition License (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
.. https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
8+
9+
.. Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
15+
pygridgain.monitoring module
16+
===========================
17+
18+
.. automodule:: pygridgain.monitoring
19+
:members:
20+
:member-order: bysource

docs/source/pygridgain.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ Submodules
4343
pygridgain.transaction
4444
pygridgain.cursors
4545
pygridgain.exceptions
46+
pygridgain.monitoring
4647

pygridgain/aio_client.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import random
1818
import sys
1919
from itertools import chain
20-
from typing import Iterable, Type, Union, Any, Dict, Optional
20+
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
2121

2222
from .aio_cluster import AioCluster
2323
from .api import cache_get_node_partitions_async
@@ -61,7 +61,8 @@ class AioClient(BaseClient):
6161
Asynchronous Client implementation.
6262
"""
6363

64-
def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
64+
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
65+
event_listeners: Optional[Sequence] = None, **kwargs):
6566
"""
6667
Initialize client.
6768
@@ -72,9 +73,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
7273
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
7374
:param partition_aware: (optional) try to calculate the exact data
7475
placement from the key before to issue the key operation to the
75-
server node, `True` by default.
76+
server node, `True` by default,
77+
:param event_listeners: (optional) event listeners.
7678
"""
77-
super().__init__(compact_footer, partition_aware, **kwargs)
79+
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
7880
self._registry_mux = asyncio.Lock()
7981
self._affinity_query_mux = asyncio.Lock()
8082

@@ -100,9 +102,8 @@ async def _connect(self, nodes):
100102

101103
# do not try to open more nodes
102104
self._current_node = i
103-
104105
except connection_errors:
105-
conn.failed = True
106+
pass
106107

107108
self._nodes.append(conn)
108109

@@ -302,7 +303,7 @@ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> D
302303
"""
303304
for _ in range(AFFINITY_RETRIES or 1):
304305
result = await cache_get_node_partitions_async(conn, caches)
305-
if result.status == 0 and result.value['partition_mapping']:
306+
if result.status == 0:
306307
break
307308
await asyncio.sleep(AFFINITY_DELAY)
308309

@@ -342,7 +343,7 @@ async def get_best_node(
342343

343344
asyncio.ensure_future(
344345
asyncio.gather(
345-
*[conn.reconnect() for conn in self._nodes if not conn.alive],
346+
*[node.reconnect() for node in self._nodes if not node.alive],
346347
return_exceptions=True
347348
)
348349
)

pygridgain/client.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import random
4545
import re
4646
from itertools import chain
47-
from typing import Iterable, Type, Union, Any, Dict, Optional
47+
from typing import Iterable, Type, Union, Any, Dict, Optional, Sequence
4848

4949
from .api import cache_get_node_partitions
5050
from .api.binary import get_binary_type, put_binary_type
@@ -66,6 +66,7 @@
6666
get_field_by_id, unsigned
6767
)
6868
from .binary import GenericObjectMeta
69+
from .monitoring import _EventListeners
6970

7071

7172
__all__ = ['Client']
@@ -76,7 +77,8 @@ class BaseClient:
7677
_identifier = re.compile(r'[^0-9a-zA-Z_.+$]', re.UNICODE)
7778
_ident_start = re.compile(r'^[^a-zA-Z_]+', re.UNICODE)
7879

79-
def __init__(self, compact_footer: bool = None, partition_aware: bool = False, **kwargs):
80+
def __init__(self, compact_footer: bool = None, partition_aware: bool = False,
81+
event_listeners: Optional[Sequence] = None, **kwargs):
8082
self._compact_footer = compact_footer
8183
self._partition_aware = partition_aware
8284
self._connection_args = kwargs
@@ -87,6 +89,7 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
8789
self.affinity_version = (0, 0)
8890
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
8991
self._protocol_context = None
92+
self._event_listeners = _EventListeners(event_listeners)
9093

9194
@property
9295
def protocol_context(self):
@@ -338,7 +341,8 @@ class Client(BaseClient):
338341
Synchronous Client implementation.
339342
"""
340343

341-
def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **kwargs):
344+
def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
345+
event_listeners: Optional[Sequence] = None, **kwargs):
342346
"""
343347
Initialize client.
344348
@@ -349,9 +353,10 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True, **
349353
https://ignite.apache.org/docs/latest/binary-client-protocol/data-format#schema
350354
:param partition_aware: (optional) try to calculate the exact data
351355
placement from the key before to issue the key operation to the
352-
server node, `True` by default.
356+
server node, `True` by default,
357+
:param event_listeners: (optional) event listeners.
353358
"""
354-
super().__init__(compact_footer, partition_aware, **kwargs)
359+
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
355360

356361
def connect(self, *args):
357362
"""
@@ -382,7 +387,6 @@ def _connect(self, nodes):
382387
self._current_node = i
383388

384389
except connection_errors:
385-
conn.failed = True
386390
if self.partition_aware:
387391
# schedule the reconnection
388392
conn.reconnect()
@@ -566,7 +570,7 @@ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
566570
"""
567571
for _ in range(AFFINITY_RETRIES or 1):
568572
result = cache_get_node_partitions(conn, caches)
569-
if result.status == 0 and result.value['partition_mapping']:
573+
if result.status == 0:
570574
break
571575
time.sleep(AFFINITY_DELAY)
572576

@@ -609,9 +613,9 @@ def get_best_node(
609613

610614
self._update_affinity(full_affinity)
611615

612-
for conn in self._nodes:
613-
if not conn.alive:
614-
conn.reconnect()
616+
for node in self._nodes:
617+
if not node.alive:
618+
node.reconnect()
615619

616620
c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
617621
parts = self._cache_partition_mapping(c_id).get('number_of_partitions')

pygridgain/connection/aio_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ async def _connect(self):
178178
self._on_handshake_fail(e)
179179
raise e
180180
except Exception as e:
181+
self._on_handshake_fail(e)
181182
# restore undefined protocol version
182183
if detecting_protocol:
183184
self.client.protocol_context = None
184-
self._on_handshake_fail(e)
185185
raise e
186186

187187
self._on_handshake_success(result)

pygridgain/connection/connection.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ def _process_handshake_error(self, response):
103103
def _on_handshake_start(self):
104104
if logger.isEnabledFor(logging.DEBUG):
105105
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
106-
self.host, self.port, self.client.protocol_context)
106+
self.host, self.port, self.protocol_context)
107+
if self._enabled_connection_listener:
108+
self._connection_listener.publish_handshake_start(self.host, self.port, self.protocol_context)
107109

108110
def _on_handshake_success(self, result):
109111
features = BitmaskFeature.from_array(result.get('features', None))
@@ -113,24 +115,45 @@ def _on_handshake_success(self, result):
113115

114116
if logger.isEnabledFor(logging.DEBUG):
115117
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
116-
self.host, self.port, self.uuid, self.client.protocol_context)
118+
self.host, self.port, self.uuid, self.protocol_context)
119+
if self._enabled_connection_listener:
120+
self._connection_listener.publish_handshake_success(self.host, self.port, self.protocol_context, self.uuid)
117121

118122
def _on_handshake_fail(self, err):
123+
self.failed = True
124+
119125
if isinstance(err, AuthenticationError):
120126
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
121127
self.host, self.port, err)
128+
if self._enabled_connection_listener:
129+
self._connection_listener.publish_authentication_fail(self.host, self.port, self.protocol_context, err)
122130
else:
123131
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
124132
"with protocol context %s failed: %s",
125-
self.host, self.port, self.client.protocol_context, err, exc_info=True)
133+
self.host, self.port, self.protocol_context, err, exc_info=True)
134+
if self._enabled_connection_listener:
135+
self._connection_listener.publish_handshake_fail(self.host, self.port, self.protocol_context, err)
126136

127137
def _on_connection_lost(self, err=None, expected=False):
128-
if expected and logger.isEnabledFor(logging.DEBUG):
129-
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
130-
self.host, self.port, self.uuid)
138+
if expected:
139+
if logger.isEnabledFor(logging.DEBUG):
140+
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
141+
self.host, self.port, self.uuid)
142+
if self._enabled_connection_listener:
143+
self._connection_listener.publish_connection_closed(self.host, self.port, self.uuid)
131144
else:
132145
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
133146
self.host, self.port, self.uuid, err)
147+
if self._enabled_connection_listener:
148+
self._connection_listener.publish_connection_lost(self.host, self.port, self.uuid, err)
149+
150+
@property
151+
def _enabled_connection_listener(self):
152+
return self.client._event_listeners and self.client._event_listeners.enabled_connection_listener
153+
154+
@property
155+
def _connection_listener(self):
156+
return self.client._event_listeners
134157

135158

136159
class Connection(BaseConnection):
@@ -221,10 +244,10 @@ def connect(self):
221244
self._on_handshake_fail(e)
222245
raise e
223246
except Exception as e:
247+
self._on_handshake_fail(e)
224248
# restore undefined protocol version
225249
if detecting_protocol:
226250
self.client.protocol_context = None
227-
self._on_handshake_fail(e)
228251
raise e
229252

230253
self._on_handshake_success(result)
@@ -266,7 +289,7 @@ def reconnect(self):
266289
if self.alive:
267290
return
268291

269-
self.close()
292+
self.close(on_reconnect=True)
270293

271294
# connect and silence the connection errors
272295
try:
@@ -358,7 +381,7 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
358381

359382
return data
360383

361-
def close(self):
384+
def close(self, on_reconnect=False):
362385
"""
363386
Try to mark socket closed, then unlink it. This is recommended but
364387
not required, since sockets are automatically closed when
@@ -370,5 +393,6 @@ def close(self):
370393
self._socket.close()
371394
except connection_errors:
372395
pass
373-
self._on_connection_lost(expected=True)
396+
if not on_reconnect and not self.failed:
397+
self._on_connection_lost(expected=True)
374398
self._socket = None

pygridgain/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ def _ensure_consistency(self):
4545
if not self.is_feature_flags_supported():
4646
self._features = None
4747

48+
def copy(self):
49+
return ProtocolContext(self.version, self.features)
50+
4851
@property
4952
def version(self):
5053
return getattr(self, '_version', None)

0 commit comments

Comments
 (0)