Skip to content
88 changes: 33 additions & 55 deletions src/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ def block_height(self):
"""Returns latest block height."""
return self.interface.get_message_property_to_hex('number')

def finalized_block_height(self):
"""Runs a query to return finalized block height"""
payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["finalized", False],
"id": self.chain_id
}

finalized_block = self.interface.query(payload)
if finalized_block is None:
return None
block_number_hex = finalized_block.get('number')
if block_number_hex is None:
return None
return int(block_number_hex, 16)

def heads_received(self):
"""Returns amount of received messages from the subscription."""
return self.interface.heads_received
Expand All @@ -54,7 +71,6 @@ def client_version(self):
client_version = {"client_version": version}
return client_version


class ConfluxCollector():
"""A collector to fetch information about conflux RPC endpoints."""

Expand Down Expand Up @@ -394,60 +410,6 @@ def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency

class TronCollector():
"""A collector to fetch information from Tron endpoints."""

def __init__(self, url, labels, chain_id, **client_parameters):

self.labels = labels
self.chain_id = chain_id
self.interface = HttpsInterface(url, client_parameters.get('open_timeout'),
client_parameters.get('ping_timeout'))

self._logger_metadata = {
'component': 'TronCollector',
'url': strip_url(url)
}
self.client_version_payload = {
'jsonrpc': '2.0',
'method': "web3_clientVersion",
'id': 1
}
self.block_height_payload = {
'jsonrpc': '2.0',
'method': "eth_blockNumber",
'id': 1
}

def alive(self):
"""Returns true if endpoint is alive, false if not."""
# Run cached query because we can also fetch client version from this
# later on. This will save us an RPC call per run.
return self.interface.cached_json_rpc_post(
self.client_version_payload) is not None

def block_height(self):
"""Cached query and returns blockheight after converting hex string value to an int"""
result = self.interface.cached_json_rpc_post(self.block_height_payload)

if result and isinstance(result, str) and result.startswith('0x'):
return int(result, 16)
raise ValueError(f"Invalid block height result: {result}")

def client_version(self):
"""Runs a cached query to return client version."""
version = self.interface.cached_json_rpc_post(
self.client_version_payload)
if version is None:
return None
client_version = {"client_version": version}
return client_version

def latency(self):
"""Returns connection latency."""
return self.interface.latest_query_latency


class EvmHttpCollector():
"""A collector to fetch information from EVM HTTPS endpoints."""

Expand All @@ -472,6 +434,12 @@ def __init__(self, url, labels, chain_id, **client_parameters):
'method': "eth_blockNumber",
'id': 1
}
self.finalized_block_height_payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["finalized", False],
"id": 1
}

def alive(self):
"""Returns true if endpoint is alive, false if not."""
Expand All @@ -488,6 +456,16 @@ def block_height(self):
return int(result, 16)
raise ValueError(f"Invalid block height result: {result}")

def finalized_block_height(self):
"""Returns finalized blockheight after converting hex string value to an int"""
finalized_block = self.interface.json_rpc_post(self.finalized_block_height_payload)
if finalized_block is None:
return None
block_number_hex = finalized_block.get('number')
if block_number_hex is None:
return None
return int(block_number_hex, 16)

def client_version(self):
"""Runs a cached query to return client version."""
version = self.interface.cached_json_rpc_post(
Expand Down
12 changes: 12 additions & 0 deletions src/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def block_height_metric(self):
'Latest observed block_height.',
labels=self._labels)

@property
def finalized_block_height_metric(self):
"""Returns instantiated finalized block height metric"""
return GaugeMetricFamily(
'brpc_finalized_block_height',
'Latest finalized block height',
labels=self._labels)

@property
def client_version_metric(self):
"""Returns instantiated client version metric."""
Expand Down Expand Up @@ -126,6 +134,7 @@ def collect(self):
heads_received_metric = self._metrics_loader.heads_received_metric
disconnects_metric = self._metrics_loader.disconnects_metric
block_height_metric = self._metrics_loader.block_height_metric
finalized_block_height_metric = self._metrics_loader.finalized_block_height_metric
client_version_metric = self._metrics_loader.client_version_metric
total_difficulty_metric = self._metrics_loader.total_difficulty_metric
latency_metric = self._metrics_loader.latency_metric
Expand All @@ -142,6 +151,8 @@ def collect(self):
client_version_metric, 'client_version')
executor.submit(self._write_metric, collector,
block_height_metric, 'block_height')
executor.submit(self._write_metric, collector,
finalized_block_height_metric, 'finalized_block_height')
executor.submit(self._write_metric, collector,
heads_received_metric, 'heads_received')
executor.submit(self._write_metric, collector,
Expand All @@ -159,6 +170,7 @@ def collect(self):
yield heads_received_metric
yield disconnects_metric
yield block_height_metric
yield finalized_block_height_metric
yield client_version_metric
yield total_difficulty_metric
yield latency_metric
Expand Down
2 changes: 0 additions & 2 deletions src/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ def get_collector_registry(self) -> list:
collector = collectors.StarknetCollector
case "aptos", "aptos":
collector = collectors.AptosCollector
case "tron", "tron":
collector = collectors.TronCollector
case "xrpl", "xrpl":
collector = collectors.XRPLCollector
case "evmhttp", other: # pylint: disable=unused-variable
Expand Down
81 changes: 61 additions & 20 deletions src/test_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,44 @@ def test_block_height(self):
self.mocked_websocket.return_value.get_message_property_to_hex.assert_called_once_with(
'number')

def test_finalized_block_height(self):
"""Tests that finalized_block_height uses correct call and args to get finalized block"""
# Mock with hex string, not integer
mock_block_response = {"number": "0x1a2b3c"}
self.mocked_websocket.return_value.query.return_value = mock_block_response

payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["finalized", False],
"id": self.chain_id
}
self.evm_collector.finalized_block_height()
self.mocked_websocket.return_value.query.assert_called_once_with(payload)

def test_finalized_block_height_return_none_when_query_none(self):
"""Tests that finalized_block_height returns None if the query returns None"""
self.mocked_websocket.return_value.query.return_value = None
result = self.evm_collector.finalized_block_height()
self.assertEqual(None, result)

def test_finalized_block_height_return_none_when_no_number_field(self):
"""Tests that finalized_block_height returns None if the response has no 'number' field"""
self.mocked_websocket.return_value.query.return_value = {"hash": "0x123"}
result = self.evm_collector.finalized_block_height()
self.assertEqual(None, result)

def test_finalized_block_height_return(self):
"""Tests that finalized_block_height converts hex block number to integer correctly"""
mock_block_response = {
"number": "0x1a2b3c", # Hex string as your code expects
"hash": "0x456def"
}
self.mocked_websocket.return_value.query.return_value = mock_block_response
result = self.evm_collector.finalized_block_height()
# 0x1a2b3c = 1715004 in decimal
self.assertEqual(1715004, result)

def test_client_version(self):
"""Tests the client_version function uses the correct call and args to get client version"""
payload = {
Expand Down Expand Up @@ -735,8 +773,8 @@ def test_latency(self):
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.aptos_collector.latency())

class TestTronCollector(TestCase):
"""Tests the Tron collector class"""
class TestEvmHttpCollector(TestCase):
"""Tests the EvmHttp collector class"""

def setUp(self):
self.url = "https://test.com"
Expand All @@ -747,70 +785,73 @@ def setUp(self):
self.client_params = {
"open_timeout": self.open_timeout, "ping_timeout": self.ping_timeout}
with mock.patch('collectors.HttpsInterface') as mocked_connection:
self.tron_collector = collectors.TronCollector(
self.evmhttp_collector = collectors.EvmHttpCollector(
self.url, self.labels, self.chain_id, **self.client_params)
self.mocked_connection = mocked_connection

def test_logger_metadata(self):
"""Validate logger metadata. Makes sure url is stripped by helpers.strip_url function."""
expected_metadata = {
'component': 'TronCollector', 'url': 'test.com'}
'component': 'EvmHttpCollector', 'url': 'test.com'}
self.assertEqual(expected_metadata,
self.tron_collector._logger_metadata)
self.evmhttp_collector._logger_metadata)

def test_https_interface_created(self):
"""Tests that the Tron collector calls the https interface with the correct args"""
"""Tests that the EvmHttp collector calls the https interface with the correct args"""
self.mocked_connection.assert_called_once_with(
self.url, self.open_timeout, self.ping_timeout)

def test_interface_attribute_exists(self):
"""Tests that the interface attribute exists."""
self.assertTrue(hasattr(self.tron_collector, 'interface'))
self.assertTrue(hasattr(self.evmhttp_collector, 'interface'))

def test_alive_call(self):
"""Tests the alive function uses the correct call"""
self.tron_collector.alive()
self.evmhttp_collector.alive()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
self.tron_collector.client_version_payload)
self.evmhttp_collector.client_version_payload)

def test_alive_false(self):
"""Tests the alive function returns false when post returns None"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = None
result = self.tron_collector.alive()
result = self.evmhttp_collector.alive()
self.assertFalse(result)

def test_block_height(self):
"""Tests the block_height function uses the correct call to get block height"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = "0x1a2b3c"
result = self.tron_collector.block_height()
result = self.evmhttp_collector.block_height()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
self.tron_collector.block_height_payload)
self.evmhttp_collector.block_height_payload)
self.assertEqual(result, 1715004)

def test_block_height_raises_value_error(self):
"""Tests that the block height raises ValueError if result is invalid"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = "invalid"
with self.assertRaises(ValueError):
self.tron_collector.block_height()
self.evmhttp_collector.block_height()

def test_client_version(self):
"""Tests the client_version function uses the correct call to get client version"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = "Tron/v1.0.0"
result = self.tron_collector.client_version()
"""Tests the client_version function uses the correct call and args to get client version"""
payload = {
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"id": 1
}
self.evmhttp_collector.client_version()
self.mocked_connection.return_value.cached_json_rpc_post.assert_called_once_with(
self.tron_collector.client_version_payload)
self.assertEqual(result, {"client_version": "Tron/v1.0.0"})
payload)

def test_client_version_returns_none(self):
"""Tests that the client_version returns None if cached_json_rpc_post returns None"""
self.mocked_connection.return_value.cached_json_rpc_post.return_value = None
result = self.tron_collector.client_version()
result = self.evmhttp_collector.client_version()
self.assertIsNone(result)

def test_latency(self):
"""Tests that the latency is obtained from the interface based on latest_query_latency"""
self.mocked_connection.return_value.latest_query_latency = 0.123
self.assertEqual(0.123, self.tron_collector.latency())
self.assertEqual(0.123, self.evmhttp_collector.latency())

class TestXRPLCollector(TestCase):
"""Tests the XRPL collector class"""
Expand Down
9 changes: 5 additions & 4 deletions src/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestMetricsLoader(TestCase):
def setUp(self):
self.metrics_loader = MetricsLoader()
self.labels = [
'url', 'provider', 'blockchain', 'network_name', 'network_type',
'url', 'provider', 'blockchain', 'network_name', 'network_type',
'integration_maturity', 'canonical_name', 'chain_selector',
'evmChainID'
]
Expand Down Expand Up @@ -171,6 +171,7 @@ def test_collect_yields_correct_metrics(self):
self.mocked_loader.return_value.heads_received_metric,
self.mocked_loader.return_value.disconnects_metric,
self.mocked_loader.return_value.block_height_metric,
self.mocked_loader.return_value.finalized_block_height_metric,
self.mocked_loader.return_value.client_version_metric,
self.mocked_loader.return_value.total_difficulty_metric,
self.mocked_loader.return_value.latency_metric,
Expand All @@ -184,14 +185,14 @@ def test_collect_yields_correct_metrics(self):
def test_collect_number_of_yields(self):
"""Tests that the collect method yields the expected number of values"""
results = self.prom_collector.collect()
self.assertEqual(9, len(list(results)))
self.assertEqual(10, len(list(results)))

def test_get_thread_count(self):
"""Tests get thread count returns the expected number of threads
based on number of metrics and collectors"""
thread_count = self.prom_collector.get_thread_count()
# Total of 9 metrics times 2 items in our mocked pool should give 18
self.assertEqual(18, thread_count)
# Total of 10 metrics times 2 items in our mocked pool should give 20
self.assertEqual(20, thread_count)

def test_collect_thread_max_workers(self):
"""Tests the max workers is correct for the collect threads"""
Expand Down
18 changes: 4 additions & 14 deletions src/test_registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ def test_get_collector_registry_for_aptos(self):
helper_test_collector_registry(self, collector)

@mock.patch.dict(os.environ, {
"CONFIG_FILE_PATH": "tests/fixtures/configuration_tron.yaml",
"CONFIG_FILE_PATH": "tests/fixtures/configuration_evmhttp.yaml",
"VALIDATION_FILE_PATH": "tests/fixtures/validation.yaml"
})
def test_get_collector_registry_for_tron(self):
"""Tests that the Tron collector is called with the correct args"""
def test_get_collector_registry_for_evmhttp(self):
"""Tests that the EvmHttp collector is called with the correct args"""
self.collector_registry = CollectorRegistry()
with mock.patch('collectors.TronCollector', new=mock.Mock()) as collector:
with mock.patch('collectors.EvmHttpCollector', new=mock.Mock()) as collector:
helper_test_collector_registry(self, collector)

@mock.patch.dict(os.environ, {
Expand All @@ -167,16 +167,6 @@ def test_get_collector_registry_for_xrpl(self):
with mock.patch('collectors.XRPLCollector', new=mock.Mock()) as collector:
helper_test_collector_registry(self, collector)

@mock.patch.dict(os.environ, {
"CONFIG_FILE_PATH": "tests/fixtures/configuration_evmhttp.yaml",
"VALIDATION_FILE_PATH": "tests/fixtures/validation.yaml"
})
def test_get_collector_registry_for_evmhttp(self):
"""Tests that the EVM HTTP collector is called with the correct args"""
self.collector_registry = CollectorRegistry()
with mock.patch('collectors.EvmHttpCollector', new=mock.Mock()) as collector:
helper_test_collector_registry(self, collector)

@mock.patch.dict(os.environ, {
"CONFIG_FILE_PATH": "tests/fixtures/configuration_evm.yaml",
"VALIDATION_FILE_PATH": "tests/fixtures/validation.yaml"
Expand Down