Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,59 @@ def client_version(self):
def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency

class TonCollector():
"""A collector to fetch information about Ton endpoints."""

def __init__(self, url, labels, chain_id, **client_parameters):

self.labels = labels
self.chain_id = chain_id
self.interface = HttpsInterface(url.rstrip("/") + "/jsonRPC",
client_parameters.get('open_timeout'),
client_parameters.get('ping_timeout'))
self._logger_metadata = {
'component': 'TonCollector',
'url': strip_url(url)
}
self.block_height_payload = {
'jsonrpc': '2.0',
'method': "getMasterchainInfo",
'id': 1
}
self.consensus_block_height_payload = {
'jsonrpc': '2.0',
'method': "getConsensusBlock",
'id': 1
}

def alive(self):
"""Returns true if endpoint is alive, false if not."""
# Run cached query because we can also fetch block height from this
# later on. This will save us an RPC call per run.
return self.interface.cached_json_rpc_post(
self.block_height_payload) is not None

def block_height(self):
"""Returns latest block height."""
result = self.interface.cached_json_rpc_post(self.block_height_payload)
if result is None:
raise ValueError("No response received from TON endpoint")
block_height = result.get('last', {}).get('seqno', None)
if block_height is not None:
return block_height
raise ValueError(f"Invalid block height result: {result}")

def finalized_block_height(self):
"""Runs a query to return consensus block height"""
result = self.interface.cached_json_rpc_post(self.consensus_block_height_payload)
if result is None:
raise ValueError("No response received from TON endpoint")
consensus_block = result.get('consensus_block', None)
if consensus_block is not None:
return consensus_block
raise ValueError(f"Invalid consensus block height result: {result}")

def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency
2 changes: 1 addition & 1 deletion src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def endpoints(self):
def _load_configuration(self):
supported_collectors = ('evm', 'evmhttp', 'cardano', 'conflux', 'solana',
'bitcoin', 'doge', 'filecoin', 'starknet', 'aptos',
'tron', 'xrpl')
'tron', 'xrpl', 'ton')

configuration_schema = Schema({
'blockchain':
Expand Down
2 changes: 2 additions & 0 deletions src/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def get_collector_registry(self) -> list:
collector = collectors.AptosCollector
case "xrpl", "xrpl":
collector = collectors.XRPLCollector
case "ton", "ton":
collector = collectors.TonCollector
case "evmhttp", other: # pylint: disable=unused-variable
collector = collectors.EvmHttpCollector
case "evm", other: # pylint: disable=unused-variable
Expand Down
210 changes: 210 additions & 0 deletions src/test_collectors_evm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# pylint: disable=protected-access, too-many-instance-attributes, duplicate-code
"""Module for testing collectors"""
from unittest import TestCase, mock

import collectors


class TestEvmCollector(TestCase):
"""Tests the evm collector class"""

def setUp(self):
self.url = "wss://test.com"
self.labels = ["dummy", "labels"]
self.chain_id = 123
self.client_params = {"param1": "dummy", "param2": "data"}
self.sub_payload = {
"method": 'eth_subscribe',
"jsonrpc": "2.0",
"id": self.chain_id,
"params": ["newHeads"]
}
with mock.patch('collectors.WebsocketInterface') as mocked_websocket:
self.evm_collector = collectors.EvmCollector(
self.url, self.labels, self.chain_id, **self.client_params)
self.mocked_websocket = mocked_websocket

def test_websocket_interface_created(self):
"""Tests that the evm collector calls the websocket interface with the correct args"""
self.mocked_websocket.assert_called_once_with(
self.url, self.sub_payload, **self.client_params)

def test_interface_attribute_exists(self):
"""Tests that the interface attribute exists.
May be used by external calls to access objects such as the interface cache"""
self.assertTrue(hasattr(self.evm_collector, 'interface'))

def test_websocket_attr_daemon_is_bool(self):
"""Tests that the daemon attribute is of type bool"""
self.assertEqual(bool, type(self.mocked_websocket.return_value.daemon))

def test_websocket_daemon_true(self):
"""Tests that the websocket object has daemon set to true"""
self.assertTrue(self.mocked_websocket.return_value.daemon)

def test_websocket_start_called(self):
"""Tests that the websocket object start function is called"""
self.mocked_websocket.return_value.start.assert_called_once_with()

def test_alive_is_true(self):
"""Tests the alive function returns true when websocket.healthy is true"""
self.mocked_websocket.return_value.healthy = True
self.assertTrue(self.evm_collector.alive())

def test_alive_is_false(self):
"""Tests the alive function returns false when websocket.healthy is false"""
self.mocked_websocket.return_value.healthy = False
self.assertFalse(self.evm_collector.alive())

def test_block_height(self):
"""Tests the block_height function uses the correct call and args to get block height"""
self.evm_collector.block_height()
self.mocked_websocket.return_value.get_message_property_to_hex.assert_called_once_with(
'number')

def test_finalized_block_height(self):
"""Tests that finalized_block_height uses correct call and args to get finalized block"""
# Mock with hex string, not integer
mock_block_response = {"number": "0x1a2b3c"}
self.mocked_websocket.return_value.query.return_value = mock_block_response

payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["finalized", False],
"id": self.chain_id
}
self.evm_collector.finalized_block_height()
self.mocked_websocket.return_value.query.assert_called_once_with(payload)

def test_finalized_block_height_return_none_when_query_none(self):
"""Tests that finalized_block_height returns None if the query returns None"""
self.mocked_websocket.return_value.query.return_value = None
result = self.evm_collector.finalized_block_height()
self.assertEqual(None, result)

def test_finalized_block_height_return_none_when_no_number_field(self):
"""Tests that finalized_block_height returns None if the response has no 'number' field"""
self.mocked_websocket.return_value.query.return_value = {"hash": "0x123"}
result = self.evm_collector.finalized_block_height()
self.assertEqual(None, result)

def test_finalized_block_height_return(self):
"""Tests that finalized_block_height converts hex block number to integer correctly"""
mock_block_response = {
"number": "0x1a2b3c", # Hex string as your code expects
"hash": "0x456def"
}
self.mocked_websocket.return_value.query.return_value = mock_block_response
result = self.evm_collector.finalized_block_height()
# 0x1a2b3c = 1715004 in decimal
self.assertEqual(1715004, result)

def test_client_version(self):
"""Tests the client_version function uses the correct call and args to get client version"""
payload = {
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"params": [],
"id": self.chain_id
}
self.evm_collector.client_version()
self.mocked_websocket.return_value.cached_query.assert_called_once_with(
payload)

def test_client_version_return_none(self):
"""Tests that the client_version returns None if the query returns no version"""
self.mocked_websocket.return_value.cached_query.return_value = None
result = self.evm_collector.client_version()
self.assertEqual(None, result)

def test_client_version_return(self):
"""Tests that the client_version is returned in the correct format"""
self.mocked_websocket.return_value.cached_query.return_value = "test/v1.23"
result = self.evm_collector.client_version()
self.assertEqual({"client_version": "test/v1.23"}, result)

def test_latency(self):
"""Tests that the latency is obtained from the interface based on subscription ping"""
self.mocked_websocket.return_value.subscription_ping_latency = 0.123
self.assertEqual(0.123, self.evm_collector.latency())

class TestEvmHttpCollector(TestCase):
"""Tests the EvmHttp collector class"""

def setUp(self):
self.url = "https://test.com"
self.labels = ["dummy", "labels"]
self.chain_id = 123
self.open_timeout = 8
self.ping_timeout = 9
self.client_params = {
"open_timeout": self.open_timeout, "ping_timeout": self.ping_timeout}
with mock.patch('collectors.HttpsInterface') as mocked_connection:
self.evmhttp_collector = collectors.EvmHttpCollector(
self.url, self.labels, self.chain_id, **self.client_params)
self.mocked_connection = mocked_connection

def test_logger_metadata(self):
"""Validate logger metadata. Makes sure url is stripped by helpers.strip_url function."""
expected_metadata = {
'component': 'EvmHttpCollector', 'url': 'test.com'}
self.assertEqual(expected_metadata,
self.evmhttp_collector._logger_metadata)

def test_https_interface_created(self):
"""Tests that the EvmHttp collector calls the https interface with the correct args"""
self.mocked_connection.assert_called_once_with(
self.url, self.open_timeout, self.ping_timeout)

def test_interface_attribute_exists(self):
"""Tests that the interface attribute exists."""
self.assertTrue(hasattr(self.evmhttp_collector, 'interface'))

def test_alive_call(self):
"""Tests the alive function uses the correct call"""
self.evmhttp_collector.alive()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
self.evmhttp_collector.client_version_payload)

def test_alive_false(self):
"""Tests the alive function returns false when post returns None"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = None
result = self.evmhttp_collector.alive()
self.assertFalse(result)

def test_block_height(self):
"""Tests the block_height function uses the correct call to get block height"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = "0x1a2b3c"
result = self.evmhttp_collector.block_height()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
self.evmhttp_collector.block_height_payload)
self.assertEqual(result, 1715004)

def test_block_height_raises_value_error(self):
"""Tests that the block height raises ValueError if result is invalid"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = "invalid"
with self.assertRaises(ValueError):
self.evmhttp_collector.block_height()

def test_client_version(self):
"""Tests the client_version function uses the correct call and args to get client version"""
payload = {
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"id": 1
}
self.evmhttp_collector.client_version()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
payload)

def test_client_version_returns_none(self):
"""Tests that the client_version returns None if cached_json_rpc_post returns None"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = None
result = self.evmhttp_collector.client_version()
self.assertIsNone(result)

def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.evmhttp_collector.latency())
Loading