Skip to content

Commit dafa815

Browse files
isapegoivandasch
andauthored
GG-33923 IGNITE-15103 Implement logging of connections and queries (#53)
(cherry picked from commit 8fc14f8) Co-authored-by: Ivan Daschinsky <ivandasch@apache.org>
1 parent 894a64c commit dafa815

File tree

7 files changed

+208
-64
lines changed

7 files changed

+208
-64
lines changed

pygridgain/connection/aio_connection.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from tzlocal import get_localzone
2222

2323
from pygridgain.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
24-
from pygridgain.exceptions import HandshakeError, SocketError, connection_errors
24+
from pygridgain.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
2525
from .bitmask_feature import BitmaskFeature
2626
from .connection import BaseConnection
2727

@@ -56,7 +56,7 @@ def data_received(self, data: bytes) -> None:
5656
hs_response = self.__parse_handshake(packet, self._conn.client)
5757
self._handshake_fut.set_result(hs_response)
5858
else:
59-
self._conn.on_message(packet)
59+
self._conn.process_message(packet)
6060
self._buffer = self._buffer[packet_sz:len(self._buffer)]
6161

6262
def __has_full_response(self):
@@ -72,7 +72,7 @@ def __process_connection_error(self, exc):
7272
connected = self._handshake_fut.done()
7373
if not connected:
7474
self._handshake_fut.set_exception(exc)
75-
self._conn.on_connection_lost(exc, connected)
75+
self._conn.process_connection_lost(exc, connected)
7676

7777
@staticmethod
7878
def __send_handshake(transport, conn):
@@ -165,38 +165,41 @@ async def _connect(self):
165165
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
166166

167167
try:
168+
self._on_handshake_start()
168169
result = await self._connect_version()
169170
except HandshakeError as e:
170171
if e.expected_version in PROTOCOLS:
171172
self.client.protocol_context.version = e.expected_version
172173
result = await self._connect_version()
173174
else:
175+
self._on_handshake_fail(e)
174176
raise e
175-
except connection_errors:
177+
except AuthenticationError as e:
178+
self._on_handshake_fail(e)
179+
raise e
180+
except Exception as e:
176181
# restore undefined protocol version
177182
if detecting_protocol:
178183
self.client.protocol_context = None
179-
raise
184+
self._on_handshake_fail(e)
185+
raise e
180186

181-
# connection is ready for end user
182-
features = BitmaskFeature.from_array(result.get('features', None))
183-
self.client.protocol_context.features = features
184-
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
185-
self.failed = False
187+
self._on_handshake_success(result)
186188

187-
def on_connection_lost(self, error, reconnect=False):
189+
def process_connection_lost(self, err, reconnect=False):
188190
self.failed = True
189191
for _, fut in self._pending_reqs.items():
190-
fut.set_exception(error)
192+
fut.set_exception(err)
191193
self._pending_reqs.clear()
192194

193195
if self._transport_closed_fut and not self._transport_closed_fut.done():
194196
self._transport_closed_fut.set_result(None)
195197

196198
if reconnect and not self._closed:
199+
self._on_connection_lost(err)
197200
self._loop.create_task(self._reconnect())
198201

199-
def on_message(self, data):
202+
def process_message(self, data):
200203
req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, signed=True)
201204
if req_id in self._pending_reqs:
202205
self._pending_reqs[req_id].set_result(data)
@@ -217,7 +220,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
217220
hs_response = await handshake_fut
218221

219222
if hs_response.op_code == 0:
220-
await self._close_transport()
223+
await self.close()
221224
self._process_handshake_error(hs_response)
222225

223226
return hs_response
@@ -271,4 +274,5 @@ async def _close_transport(self):
271274
except asyncio.TimeoutError:
272275
pass
273276
finally:
277+
self._on_connection_lost(expected=True)
274278
self._transport_closed_fut = None

pygridgain/connection/connection.py

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17+
import logging
1718
from collections import OrderedDict
1819
import socket
1920
from typing import Union
@@ -30,6 +31,8 @@
3031

3132
CLIENT_STATUS_AUTH_FAILURE = 2000
3233

34+
logger = logging.getLogger('.'.join(__name__.split('.')[:-1]))
35+
3336

3437
class BaseConnection:
3538
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
@@ -82,21 +85,53 @@ def protocol_context(self):
8285
return self.client.protocol_context
8386

8487
def _process_handshake_error(self, response):
85-
error_text = f'Handshake error: {response.message}'
8688
# if handshake fails for any reason other than protocol mismatch
8789
# (i.e. authentication error), server version is 0.0.0
90+
if response.client_status == CLIENT_STATUS_AUTH_FAILURE:
91+
raise AuthenticationError(response.message)
92+
8893
protocol_version = self.client.protocol_context.version
8994
server_version = (response.version_major, response.version_minor, response.version_patch)
90-
95+
error_text = f'Handshake error: {response.message}'
9196
if any(server_version):
9297
error_text += f' Server expects binary protocol version ' \
9398
f'{server_version[0]}.{server_version[1]}.{server_version[2]}. ' \
9499
f'Client provides ' \
95100
f'{protocol_version[0]}.{protocol_version[1]}.{protocol_version[2]}.'
96-
elif response.client_status == CLIENT_STATUS_AUTH_FAILURE:
97-
raise AuthenticationError(error_text)
98101
raise HandshakeError(server_version, error_text)
99102

103+
def _on_handshake_start(self):
104+
if logger.isEnabledFor(logging.DEBUG):
105+
logger.debug("Connecting to node(address=%s, port=%d) with protocol context %s",
106+
self.host, self.port, self.client.protocol_context)
107+
108+
def _on_handshake_success(self, result):
109+
features = BitmaskFeature.from_array(result.get('features', None))
110+
self.client.protocol_context.features = features
111+
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
112+
self.failed = False
113+
114+
if logger.isEnabledFor(logging.DEBUG):
115+
logger.debug("Connected to node(address=%s, port=%d, node_uuid=%s) with protocol context %s",
116+
self.host, self.port, self.uuid, self.client.protocol_context)
117+
118+
def _on_handshake_fail(self, err):
119+
if isinstance(err, AuthenticationError):
120+
logger.error("Authentication failed while connecting to node(address=%s, port=%d): %s",
121+
self.host, self.port, err)
122+
else:
123+
logger.error("Failed to perform handshake, connection to node(address=%s, port=%d) "
124+
"with protocol context %s failed: %s",
125+
self.host, self.port, self.client.protocol_context, err, exc_info=True)
126+
127+
def _on_connection_lost(self, err=None, expected=False):
128+
if expected and logger.isEnabledFor(logging.DEBUG):
129+
logger.debug("Connection closed to node(address=%s, port=%d, node_uuid=%s)",
130+
self.host, self.port, self.uuid)
131+
else:
132+
logger.info("Connection lost to node(address=%s, port=%d, node_uuid=%s): %s",
133+
self.host, self.port, self.uuid, err)
134+
100135

101136
class Connection(BaseConnection):
102137
"""
@@ -173,24 +208,26 @@ def connect(self):
173208
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
174209

175210
try:
211+
self._on_handshake_start()
176212
result = self._connect_version()
177213
except HandshakeError as e:
178214
if e.expected_version in PROTOCOLS:
179215
self.client.protocol_context.version = e.expected_version
180216
result = self._connect_version()
181217
else:
218+
self._on_handshake_fail(e)
182219
raise e
183-
except connection_errors:
220+
except AuthenticationError as e:
221+
self._on_handshake_fail(e)
222+
raise e
223+
except Exception as e:
184224
# restore undefined protocol version
185225
if detecting_protocol:
186226
self.client.protocol_context = None
187-
raise
227+
self._on_handshake_fail(e)
228+
raise e
188229

189-
# connection is ready for end user
190-
features = BitmaskFeature.from_array(result.get('features', None))
191-
self.client.protocol_context.features = features
192-
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
193-
self.failed = False
230+
self._on_handshake_success(result)
194231

195232
def _connect_version(self) -> Union[dict, OrderedDict]:
196233
"""
@@ -264,11 +301,12 @@ def send(self, data: Union[bytes, bytearray], flags=None, reconnect=True):
264301

265302
try:
266303
self._socket.sendall(data, **kwargs)
267-
except connection_errors:
304+
except connection_errors as e:
268305
self.failed = True
269306
if reconnect:
307+
self._on_connection_lost(e)
270308
self.reconnect()
271-
raise
309+
raise e
272310

273311
def recv(self, flags=None, reconnect=True) -> bytearray:
274312
"""
@@ -293,11 +331,12 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
293331
if bytes_received == 0:
294332
raise SocketError('Connection broken.')
295333
bytes_total_received += bytes_received
296-
except connection_errors:
334+
except connection_errors as e:
297335
self.failed = True
298336
if reconnect:
337+
self._on_connection_lost(e)
299338
self.reconnect()
300-
raise
339+
raise e
301340

302341
if bytes_total_received < 4:
303342
continue
@@ -331,5 +370,5 @@ def close(self):
331370
self._socket.close()
332371
except connection_errors:
333372
pass
334-
373+
self._on_connection_lost(expected=True)
335374
self._socket = None

pygridgain/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ def __eq__(self, other):
3838
self.version == other.version and \
3939
self.features == other.features
4040

41+
def __str__(self):
42+
return f'ProtocolContext(version={self._version}, features={self._features})'
43+
4144
def _ensure_consistency(self):
4245
if not self.is_feature_flags_supported():
4346
self._features = None

0 commit comments

Comments
 (0)