diff --git a/Cargo.lock b/Cargo.lock index adcf97d7..ea0a215c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1579,6 +1579,7 @@ dependencies = [ "reth-cli-util", "reth-optimism-cli", "reth-optimism-node", + "reth-optimism-payload-builder", ] [[package]] @@ -1603,13 +1604,16 @@ dependencies = [ "eyre", "futures-util", "httpmock", + "indexmap 2.12.1", "jsonrpsee", "jsonrpsee-types", "metrics", "metrics-derive", "op-alloy-consensus", + "op-alloy-flz", "op-alloy-network", "op-alloy-rpc-types", + "parking_lot", "rand 0.9.2", "reth", "reth-db", @@ -1619,6 +1623,7 @@ dependencies = [ "reth-optimism-chainspec", "reth-optimism-evm", "reth-optimism-node", + "reth-optimism-payload-builder", "reth-optimism-primitives", "reth-primitives-traits", "reth-provider", @@ -1648,6 +1653,8 @@ dependencies = [ name = "base-reth-runner" version = "0.2.1" dependencies = [ + "alloy-primitives", + "base-flashtypes", "base-reth-flashblocks", "base-reth-rpc", "base-tracex", @@ -1655,11 +1662,14 @@ dependencies = [ "eyre", "futures-util", "once_cell", + "parking_lot", "reth", "reth-db", "reth-exex", "reth-optimism-chainspec", "reth-optimism-node", + "reth-optimism-payload-builder", + "tokio", "tracing", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 7c74d179..64eeef96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,7 @@ reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1 reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-primitives-traits = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3", features = [ "op", @@ -122,6 +123,7 @@ op-alloy-rpc-types = "0.22.0" op-alloy-consensus = "0.22.0" op-alloy-rpc-jsonrpsee = "0.22.0" op-alloy-rpc-types-engine = "0.22.0" +op-alloy-flz = "0.13.1" # tokio tokio = "1.48.0" @@ -162,3 +164,6 @@ derive_more = "2.1.0" serde_json = "1.0.145" metrics-derive = "0.1.0" tracing-subscriber = "0.3.22" +parking_lot = "0.12.3" +indexmap = "2.7.0" +rdkafka = { version = "0.37.0", default-features = false, features = ["tokio", "ssl-vendored", "libz-static"] } diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 2782995f..bc760def 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -21,6 +21,7 @@ base-reth-runner.workspace = true reth-optimism-node.workspace = true reth-optimism-cli.workspace = true reth-cli-util.workspace = true +reth-optimism-payload-builder.workspace = true # misc clap.workspace = true diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index c0e09e70..2db1bf4a 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -2,12 +2,16 @@ use std::sync::Arc; -use base_reth_runner::{BaseNodeConfig, FlashblocksCell, FlashblocksConfig, TracingConfig}; +use base_reth_runner::{ + BaseNodeConfig, FlashblocksCell, FlashblocksConfig, MeteringConfig, ResourceLimitsConfig, + TracingConfig, +}; use once_cell::sync::OnceCell; use reth_optimism_node::args::RollupArgs; +use reth_optimism_payload_builder::config::OpDAConfig; /// CLI Arguments -#[derive(Debug, Clone, PartialEq, Eq, clap::Args)] +#[derive(Debug, Clone, PartialEq, clap::Args)] #[command(next_help_heading = "Rollup")] pub struct Args { /// Rollup arguments @@ -40,6 +44,36 @@ pub struct Args { /// Enable metering RPC for transaction bundle simulation #[arg(long = "enable-metering", value_name = "ENABLE_METERING")] pub enable_metering: bool, + + // --- Priority fee estimation args --- + /// Gas limit per flashblock for priority fee estimation + #[arg(long = "metering-gas-limit", default_value = "30000000")] + pub metering_gas_limit: u64, + + /// Execution time budget in microseconds per flashblock + #[arg(long = "metering-execution-time-us", default_value = "50000")] + pub metering_execution_time_us: u64, + + /// State root time budget in microseconds (optional, disabled by default) + #[arg(long = "metering-state-root-time-us")] + pub metering_state_root_time_us: Option, + + /// Data availability bytes limit per flashblock (default). + /// This value is used when `miner_setMaxDASize` has not been called. + #[arg(long = "metering-da-bytes", default_value = "120000")] + pub metering_da_bytes: u64, + + /// Percentile for recommended priority fee (0.0-1.0) + #[arg(long = "metering-priority-fee-percentile", default_value = "0.5")] + pub metering_priority_fee_percentile: f64, + + /// Default priority fee when resource is not congested (in wei) + #[arg(long = "metering-uncongested-priority-fee", default_value = "1")] + pub metering_uncongested_priority_fee: u128, + + /// Number of recent blocks to retain in metering cache + #[arg(long = "metering-cache-size", default_value = "12")] + pub metering_cache_size: usize, } impl Args { @@ -58,6 +92,23 @@ impl From for BaseNodeConfig { max_pending_blocks_depth: args.max_pending_blocks_depth, }); + let metering = MeteringConfig { + enabled: args.enable_metering, + resource_limits: ResourceLimitsConfig { + gas_limit: args.metering_gas_limit, + execution_time_us: args.metering_execution_time_us, + state_root_time_us: args.metering_state_root_time_us, + da_bytes: args.metering_da_bytes, + }, + priority_fee_percentile: args.metering_priority_fee_percentile, + uncongested_priority_fee: args.metering_uncongested_priority_fee, + cache_size: args.metering_cache_size, + }; + + // Create shared DA config. This is shared between the payload builder and the + // priority fee estimator, allowing miner_setMaxDASize to affect both. + let da_config = OpDAConfig::default(); + Self { rollup_args: args.rollup_args, flashblocks, @@ -66,7 +117,9 @@ impl From for BaseNodeConfig { logs_enabled: args.enable_transaction_tracing_logs, }, metering_enabled: args.enable_metering, + metering, flashblocks_cell, + da_config, } } } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index a44ac84c..184c4477 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -54,6 +54,14 @@ eyre.workspace = true serde.workspace = true metrics.workspace = true metrics-derive.workspace = true +parking_lot.workspace = true +indexmap.workspace = true + +# priority fee estimation +reth-optimism-payload-builder.workspace = true + +# DA calculation +op-alloy-flz.workspace = true [dev-dependencies] base-flashtypes.workspace = true diff --git a/crates/rpc/src/base/annotator.rs b/crates/rpc/src/base/annotator.rs new file mode 100644 index 00000000..bd123a83 --- /dev/null +++ b/crates/rpc/src/base/annotator.rs @@ -0,0 +1,392 @@ +//! Resource annotator that correlates metering data with flashblock inclusions. + +use std::{fmt, sync::Arc}; + +use alloy_primitives::TxHash; +use parking_lot::RwLock; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{debug, info, warn}; + +use crate::{MeteredTransaction, MeteringCache}; + +/// Commands that can be sent to the annotator. +#[derive(Debug, Clone)] +pub enum AnnotatorCommand { + /// Clear all pending transactions. + ClearPending, +} + +/// Message received from the flashblocks websocket feed indicating which +/// transactions were included in a specific flashblock. +#[derive(Debug)] +pub struct FlashblockInclusion { + /// Block number. + pub block_number: u64, + /// Flashblock index within the block. + pub flashblock_index: u64, + /// Tx hashes included in this flashblock. + pub ordered_tx_hashes: Vec, +} + +/// Maximum number of pending transactions before oldest entries are evicted. +const MAX_PENDING_TRANSACTIONS: usize = 10_000; + +/// Annotates flashblock transactions with their resource usage. +/// +/// The flow is: +/// 1. RPC receives `MeteredTransaction` via `base_setMeteringInfo` and sends it here +/// 2. These are stored in a pending lookup table +/// 3. Websocket sends `FlashblockInclusion` with actual (block, flashblock) location +/// 4. We look up pending transactions and insert them into the cache at the real location +pub struct ResourceAnnotator { + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + command_rx: UnboundedReceiver, + /// Pending metering data awaiting flashblock inclusion confirmation. + /// Uses IndexMap to maintain insertion order for FIFO eviction. + pending_transactions: indexmap::IndexMap, +} + +impl fmt::Debug for ResourceAnnotator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ResourceAnnotator") + .field("pending_transactions", &self.pending_transactions.len()) + .finish_non_exhaustive() + } +} + +impl ResourceAnnotator { + /// Creates a new resource annotator. + pub fn new( + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + command_rx: UnboundedReceiver, + ) -> Self { + Self { + cache, + tx_updates_rx, + flashblock_rx, + command_rx, + pending_transactions: indexmap::IndexMap::new(), + } + } + + /// Runs the annotator until all channels are closed. + pub async fn run(mut self) { + info!(target: "metering::annotator", "Starting ResourceAnnotator"); + loop { + tokio::select! { + Some(tx_event) = self.tx_updates_rx.recv() => { + self.handle_tx_event(tx_event); + } + Some(flashblock_event) = self.flashblock_rx.recv() => { + self.handle_flashblock_event(flashblock_event); + } + Some(command) = self.command_rx.recv() => { + self.handle_command(command); + } + else => { + info!(target: "metering::annotator", "ResourceAnnotator terminating"); + break; + } + } + } + } + + fn handle_command(&mut self, command: AnnotatorCommand) { + match command { + AnnotatorCommand::ClearPending => { + let count = self.pending_transactions.len(); + self.pending_transactions.clear(); + info!( + target: "metering::annotator", + cleared = count, + "Cleared pending transactions" + ); + metrics::gauge!("metering.pending.size").set(0.0); + } + } + } + + fn handle_tx_event(&mut self, tx: MeteredTransaction) { + debug!( + tx_hash = %tx.tx_hash, + gas_used = tx.gas_used, + "Storing metered transaction in pending map" + ); + self.pending_transactions.insert(tx.tx_hash, tx); + + // Evict oldest entries if we exceed the limit. + while self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { + if let Some((evicted_hash, _)) = self.pending_transactions.shift_remove_index(0) { + info!( + tx_hash = %evicted_hash, + "Evicting old transaction from pending map (limit exceeded)" + ); + metrics::counter!("metering.pending.evicted").increment(1); + } + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + } + + fn handle_flashblock_event(&mut self, event: FlashblockInclusion) { + // Reorg detection: flashblock_index=0 for existing block indicates reorg + if event.flashblock_index == 0 && self.cache.read().contains_block(event.block_number) { + let cleared = self.cache.write().clear_blocks_from(event.block_number); + + warn!( + target: "metering::annotator", + block_number = event.block_number, + blocks_cleared = cleared, + "Reorg detected: cleared cache from block" + ); + metrics::counter!("metering.cache.reorgs_detected").increment(1); + } + + let mut matched = 0usize; + let mut missed = 0usize; + + { + let mut cache = self.cache.write(); + for tx_hash in &event.ordered_tx_hashes { + if let Some(tx) = self.pending_transactions.shift_remove(tx_hash) { + cache.push_transaction(event.block_number, event.flashblock_index, tx); + matched += 1; + } else { + missed += 1; + } + } + } + + if matched > 0 { + debug!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + "Inserted transactions into cache from flashblock" + ); + } + + // All transactions should come through as bundles. Any misses indicate + // the Kafka event hasn't arrived yet or was lost. + if missed > 0 { + warn!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + missed, + "Flashblock contained transactions not found in pending map" + ); + metrics::counter!("metering.streams.tx_misses_total").increment(missed as u64); + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + } +} + +#[cfg(test)] +mod tests { + use alloy_primitives::{B256, U256}; + use tokio::sync::mpsc; + + use super::*; + + fn test_tx(hash: u64, priority: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&hash.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: 10, + execution_time_us: 5, + state_root_time_us: 7, + data_availability_bytes: 20, + } + } + + fn test_flashblock( + block_number: u64, + flashblock_index: u64, + hashes: Vec, + ) -> FlashblockInclusion { + FlashblockInclusion { + block_number, + flashblock_index, + ordered_tx_hashes: hashes + .into_iter() + .map(|h| { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&h.to_be_bytes()); + B256::new(hash_bytes) + }) + .collect(), + } + } + + #[tokio::test] + async fn reorg_clears_affected_blocks() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx, cmd_rx); + + // Pre-populate cache with blocks 100, 101, 102 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + c.push_transaction(101, 0, test_tx(2, 20)); + c.push_transaction(102, 0, test_tx(3, 30)); + } + + assert!(cache.read().contains_block(100)); + assert!(cache.read().contains_block(101)); + assert!(cache.read().contains_block(102)); + + // Send flashblock_index=0 for existing block 101 (simulates reorg) + let event = test_flashblock(101, 0, vec![]); + annotator.handle_flashblock_event(event); + + // Blocks 101 and 102 should be cleared, block 100 should remain + assert!(cache.read().contains_block(100)); + assert!(!cache.read().contains_block(101)); + assert!(!cache.read().contains_block(102)); + + drop(tx_sender); + drop(fb_sender); + drop(cmd_sender); + } + + #[tokio::test] + async fn non_zero_flashblock_does_not_trigger_reorg() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx, cmd_rx); + + // Pre-populate cache with block 100 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + } + + assert!(cache.read().contains_block(100)); + + // Send flashblock_index=1 for existing block 100 (not a reorg signal) + let event = test_flashblock(100, 1, vec![]); + annotator.handle_flashblock_event(event); + + // Block 100 should still exist + assert!(cache.read().contains_block(100)); + + drop(tx_sender); + drop(fb_sender); + drop(cmd_sender); + } + + #[tokio::test] + async fn flashblock_zero_for_new_block_does_not_trigger_reorg() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx, cmd_rx); + + // Pre-populate cache with block 100 + { + let mut c = cache.write(); + c.push_transaction(100, 0, test_tx(1, 10)); + } + + assert!(cache.read().contains_block(100)); + assert!(!cache.read().contains_block(101)); + + // Send flashblock_index=0 for NEW block 101 (not a reorg, just a new block) + let event = test_flashblock(101, 0, vec![]); + annotator.handle_flashblock_event(event); + + // Block 100 should still exist (no reorg happened) + assert!(cache.read().contains_block(100)); + + drop(tx_sender); + drop(fb_sender); + drop(cmd_sender); + } + + #[tokio::test] + async fn clear_pending_command_clears_all_pending_transactions() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (_tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (_fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (_cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache, tx_rx, fb_rx, cmd_rx); + + // Add some pending transactions via handle_tx_event + annotator.handle_tx_event(test_tx(1, 10)); + annotator.handle_tx_event(test_tx(2, 20)); + annotator.handle_tx_event(test_tx(3, 30)); + + // Verify transactions are pending + assert_eq!(annotator.pending_transactions.len(), 3); + + // Send clear command + annotator.handle_command(AnnotatorCommand::ClearPending); + + // Verify pending transactions are cleared + assert_eq!(annotator.pending_transactions.len(), 0); + } + + #[tokio::test] + async fn tx_event_stores_transaction_in_pending_map() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (_tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (_fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (_cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache, tx_rx, fb_rx, cmd_rx); + + // Initially empty + assert_eq!(annotator.pending_transactions.len(), 0); + + // Add a transaction + let tx = test_tx(42, 100); + let tx_hash = tx.tx_hash; + annotator.handle_tx_event(tx); + + // Verify it's stored + assert_eq!(annotator.pending_transactions.len(), 1); + assert!(annotator.pending_transactions.contains_key(&tx_hash)); + } + + #[tokio::test] + async fn flashblock_event_moves_pending_to_cache() { + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let (_tx_sender, tx_rx) = mpsc::unbounded_channel(); + let (_fb_sender, fb_rx) = mpsc::unbounded_channel(); + let (_cmd_sender, cmd_rx) = mpsc::unbounded_channel(); + + let mut annotator = ResourceAnnotator::new(cache.clone(), tx_rx, fb_rx, cmd_rx); + + // Add pending transactions + annotator.handle_tx_event(test_tx(1, 10)); + annotator.handle_tx_event(test_tx(2, 20)); + + assert_eq!(annotator.pending_transactions.len(), 2); + + // Simulate flashblock inclusion for tx 1 + let event = test_flashblock(100, 0, vec![1]); + annotator.handle_flashblock_event(event); + + // tx 1 should be moved to cache, tx 2 still pending + assert_eq!(annotator.pending_transactions.len(), 1); + assert!(cache.read().contains_block(100)); + } +} diff --git a/crates/rpc/src/base/cache.rs b/crates/rpc/src/base/cache.rs new file mode 100644 index 00000000..59aae123 --- /dev/null +++ b/crates/rpc/src/base/cache.rs @@ -0,0 +1,403 @@ +//! In-memory cache for metering data used by the priority fee estimator. +//! +//! Transactions are stored in sequencer order (highest priority fee first) as received +//! from flashblock events. + +use std::collections::{BTreeMap, HashMap, VecDeque}; + +use alloy_primitives::{B256, U256}; + +/// A metered transaction with resource consumption data. +#[derive(Debug, Clone)] +pub struct MeteredTransaction { + /// Transaction hash. + pub tx_hash: B256, + /// Priority fee per gas for ordering. + pub priority_fee_per_gas: U256, + /// Gas consumed. + pub gas_used: u64, + /// Execution time in microseconds. + pub execution_time_us: u128, + /// State root computation time in microseconds. + pub state_root_time_us: u128, + /// Data availability bytes. + pub data_availability_bytes: u64, +} + +impl MeteredTransaction { + /// Creates a zeroed transaction (placeholder with no resource usage). + pub const fn zeroed(tx_hash: B256) -> Self { + Self { + tx_hash, + priority_fee_per_gas: U256::ZERO, + gas_used: 0, + execution_time_us: 0, + state_root_time_us: 0, + data_availability_bytes: 0, + } + } +} + +/// Aggregated resource totals. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceTotals { + /// Total gas used. + pub gas_used: u64, + /// Total execution time in microseconds. + pub execution_time_us: u128, + /// Total state root time in microseconds. + pub state_root_time_us: u128, + /// Total data availability bytes. + pub data_availability_bytes: u64, +} + +impl ResourceTotals { + const fn accumulate(&mut self, tx: &MeteredTransaction) { + self.gas_used = self.gas_used.saturating_add(tx.gas_used); + self.execution_time_us = self.execution_time_us.saturating_add(tx.execution_time_us); + self.state_root_time_us = self.state_root_time_us.saturating_add(tx.state_root_time_us); + self.data_availability_bytes = + self.data_availability_bytes.saturating_add(tx.data_availability_bytes); + } +} + +/// Metrics for a single flashblock within a block. +/// +/// Transactions are stored in sequencer order (highest priority fee first). +#[derive(Debug)] +pub struct FlashblockMetrics { + /// Block number. + pub block_number: u64, + /// Flashblock index within the block. + pub flashblock_index: u64, + /// Transactions in sequencer order. + transactions: Vec, + totals: ResourceTotals, +} + +impl FlashblockMetrics { + /// Creates a new flashblock metrics container. + pub fn new(block_number: u64, flashblock_index: u64) -> Self { + Self { + block_number, + flashblock_index, + transactions: Vec::new(), + totals: ResourceTotals::default(), + } + } + + /// Appends a transaction, preserving sequencer order. + pub fn push_transaction(&mut self, tx: MeteredTransaction) { + self.totals.accumulate(&tx); + self.transactions.push(tx); + } + + /// Returns the resource totals for this flashblock. + pub const fn totals(&self) -> ResourceTotals { + self.totals + } + + /// Returns transactions in sequencer order. + pub fn transactions(&self) -> &[MeteredTransaction] { + &self.transactions + } + + /// Returns the number of transactions. + pub const fn len(&self) -> usize { + self.transactions.len() + } + + /// Returns true if empty. + pub const fn is_empty(&self) -> bool { + self.transactions.is_empty() + } +} + +/// Aggregated metrics for a block, including per-flashblock breakdown. +#[derive(Debug)] +pub struct BlockMetrics { + /// Block number. + pub block_number: u64, + flashblocks: BTreeMap, + totals: ResourceTotals, +} + +impl BlockMetrics { + /// Creates a new block metrics container. + pub fn new(block_number: u64) -> Self { + Self { block_number, flashblocks: BTreeMap::new(), totals: ResourceTotals::default() } + } + + /// Returns the number of flashblocks. + pub fn flashblock_count(&self) -> usize { + self.flashblocks.len() + } + + /// Iterates over all flashblocks. + pub fn flashblocks(&self) -> impl Iterator { + self.flashblocks.values() + } + + /// Returns a mutable reference to the flashblock, creating it if necessary. + /// Returns `(flashblock, is_new)`. + pub fn flashblock_mut(&mut self, flashblock_index: u64) -> (&mut FlashblockMetrics, bool) { + let is_new = !self.flashblocks.contains_key(&flashblock_index); + let entry = self + .flashblocks + .entry(flashblock_index) + .or_insert_with(|| FlashblockMetrics::new(self.block_number, flashblock_index)); + (entry, is_new) + } + + /// Returns the resource totals for this block. + pub const fn totals(&self) -> ResourceTotals { + self.totals + } + + fn recompute_totals(&mut self) { + self.totals = ResourceTotals::default(); + for flashblock in self.flashblocks.values() { + let totals = flashblock.totals(); + self.totals.gas_used = self.totals.gas_used.saturating_add(totals.gas_used); + self.totals.execution_time_us = + self.totals.execution_time_us.saturating_add(totals.execution_time_us); + self.totals.state_root_time_us = + self.totals.state_root_time_us.saturating_add(totals.state_root_time_us); + self.totals.data_availability_bytes = + self.totals.data_availability_bytes.saturating_add(totals.data_availability_bytes); + } + } +} + +/// In-memory cache maintaining metering data for the most recent blocks. +#[derive(Debug)] +pub struct MeteringCache { + max_blocks: usize, + blocks: VecDeque, + block_index: HashMap, +} + +impl MeteringCache { + /// Creates a new cache retaining at most `max_blocks` recent blocks. + pub fn new(max_blocks: usize) -> Self { + Self { max_blocks, blocks: VecDeque::new(), block_index: HashMap::new() } + } + + /// Returns the maximum number of blocks retained. + pub const fn max_blocks(&self) -> usize { + self.max_blocks + } + + /// Returns the block metrics for the given block number. + pub fn block(&self, block_number: u64) -> Option<&BlockMetrics> { + self.block_index.get(&block_number).and_then(|&idx| self.blocks.get(idx)) + } + + /// Returns a mutable reference to the block, creating it if necessary. + pub fn block_mut(&mut self, block_number: u64) -> &mut BlockMetrics { + if let Some(&idx) = self.block_index.get(&block_number) { + return self.blocks.get_mut(idx).expect("block index out of bounds"); + } + + let block = BlockMetrics::new(block_number); + self.blocks.push_back(block); + let idx = self.blocks.len() - 1; + self.block_index.insert(block_number, idx); + + self.evict_if_needed(); + self.blocks.get_mut(*self.block_index.get(&block_number).unwrap()).unwrap() + } + + /// Appends a transaction to the cache, preserving sequencer order. + pub fn push_transaction( + &mut self, + block_number: u64, + flashblock_index: u64, + tx: MeteredTransaction, + ) { + let block = self.block_mut(block_number); + let (flashblock, _) = block.flashblock_mut(flashblock_index); + flashblock.push_transaction(tx); + block.recompute_totals(); + } + + /// Returns the number of cached blocks. + pub fn len(&self) -> usize { + self.blocks.len() + } + + /// Returns true if the cache is empty. + pub fn is_empty(&self) -> bool { + self.blocks.is_empty() + } + + /// Iterates over blocks in descending order (most recent first). + pub fn blocks_desc(&self) -> impl Iterator { + self.blocks.iter().rev() + } + + /// Returns true if the specified block_number exists in the cache. + pub fn contains_block(&self, block_number: u64) -> bool { + self.block_index.contains_key(&block_number) + } + + /// Clears all blocks with block_number >= the specified value. + /// Returns the number of blocks cleared. + pub fn clear_blocks_from(&mut self, block_number: u64) -> usize { + let mut cleared = 0; + + // Remove from back to front (blocks stored oldest first) + while let Some(block) = self.blocks.back() { + if block.block_number >= block_number { + let removed = self.blocks.pop_back().unwrap(); + self.block_index.remove(&removed.block_number); + cleared += 1; + } else { + break; + } + } + + cleared + } + + fn evict_if_needed(&mut self) { + let mut evicted = false; + while self.blocks.len() > self.max_blocks { + if let Some(oldest) = self.blocks.pop_front() { + self.block_index.remove(&oldest.block_number); + evicted = true; + } + } + // Rebuild index once after all evictions to maintain correctness. + if evicted { + self.rebuild_index(); + } + } + + fn rebuild_index(&mut self) { + self.block_index.clear(); + for (idx, block) in self.blocks.iter().enumerate() { + self.block_index.insert(block.block_number, idx); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_tx(hash: u64, priority: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&hash.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: 10, + execution_time_us: 5, + state_root_time_us: 7, + data_availability_bytes: 20, + } + } + + #[test] + fn insert_and_retrieve_transactions() { + let mut cache = MeteringCache::new(12); + let tx1 = test_tx(1, 2); + cache.push_transaction(100, 0, tx1.clone()); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + assert_eq!(flashblock.len(), 1); + assert_eq!(flashblock.transactions()[0].tx_hash, tx1.tx_hash); + } + + #[test] + fn transactions_preserve_sequencer_order() { + let mut cache = MeteringCache::new(12); + // Insert in sequencer order (highest priority first) + cache.push_transaction(100, 0, test_tx(1, 30)); + cache.push_transaction(100, 0, test_tx(2, 20)); + cache.push_transaction(100, 0, test_tx(3, 10)); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + let fees: Vec<_> = + flashblock.transactions().iter().map(|tx| tx.priority_fee_per_gas).collect(); + // Order should be preserved as inserted + assert_eq!(fees, vec![U256::from(30u64), U256::from(20u64), U256::from(10u64)]); + } + + #[test] + fn evicts_old_blocks() { + let mut cache = MeteringCache::new(2); + for block_number in 0..3u64 { + cache.push_transaction(block_number, 0, test_tx(block_number, block_number)); + } + assert!(cache.block(0).is_none()); + assert!(cache.block(1).is_some()); + assert!(cache.block(2).is_some()); + } + + #[test] + fn contains_block_returns_correct_values() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + + assert!(cache.contains_block(100)); + assert!(cache.contains_block(101)); + assert!(!cache.contains_block(99)); + assert!(!cache.contains_block(102)); + } + + #[test] + fn clear_blocks_from_clears_subsequent_blocks() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + cache.push_transaction(102, 0, test_tx(3, 30)); + + let cleared = cache.clear_blocks_from(101); + + assert_eq!(cleared, 2); + assert!(cache.contains_block(100)); + assert!(!cache.contains_block(101)); + assert!(!cache.contains_block(102)); + assert_eq!(cache.len(), 1); + } + + #[test] + fn clear_blocks_from_returns_zero_when_no_match() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + + let cleared = cache.clear_blocks_from(200); + + assert_eq!(cleared, 0); + assert_eq!(cache.len(), 2); + } + + #[test] + fn clear_blocks_from_clears_all_blocks() { + let mut cache = MeteringCache::new(10); + cache.push_transaction(100, 0, test_tx(1, 10)); + cache.push_transaction(101, 0, test_tx(2, 20)); + cache.push_transaction(102, 0, test_tx(3, 30)); + + let cleared = cache.clear_blocks_from(100); + + assert_eq!(cleared, 3); + assert!(cache.is_empty()); + } + + #[test] + fn clear_blocks_from_handles_empty_cache() { + let mut cache = MeteringCache::new(10); + + let cleared = cache.clear_blocks_from(100); + + assert_eq!(cleared, 0); + assert!(cache.is_empty()); + } +} diff --git a/crates/rpc/src/base/estimator.rs b/crates/rpc/src/base/estimator.rs new file mode 100644 index 00000000..e036c7c0 --- /dev/null +++ b/crates/rpc/src/base/estimator.rs @@ -0,0 +1,900 @@ +//! Priority fee estimation based on resource consumption in flashblocks. + +use std::sync::Arc; + +use alloy_primitives::U256; +use parking_lot::RwLock; +use reth_optimism_payload_builder::config::OpDAConfig; + +use crate::base::cache::{MeteredTransaction, MeteringCache}; + +/// Errors that can occur during priority fee estimation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EstimateError { + /// The bundle's resource demand exceeds the configured capacity limit. + DemandExceedsCapacity { + /// The resource that exceeded capacity. + resource: ResourceKind, + /// The requested demand. + demand: u128, + /// The configured limit. + limit: u128, + }, +} + +impl std::fmt::Display for EstimateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::DemandExceedsCapacity { resource, demand, limit } => { + write!( + f, + "bundle {} demand ({}) exceeds capacity limit ({})", + resource.as_name(), + demand, + limit + ) + } + } + } +} + +impl std::error::Error for EstimateError {} + +/// Configured capacity limits for each resource type. +/// +/// These values define the maximum capacity available per flashblock (or per block +/// for "use-it-or-lose-it" resources). The estimator uses these limits to determine +/// when resources are congested. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceLimits { + /// Gas limit per flashblock. + pub gas_used: Option, + /// Execution time budget in microseconds. + pub execution_time_us: Option, + /// State root computation time budget in microseconds. + pub state_root_time_us: Option, + /// Data availability bytes limit per flashblock. + pub data_availability_bytes: Option, +} + +impl ResourceLimits { + /// Returns the limit for the given resource kind. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Resources that influence flashblock inclusion ordering. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ResourceKind { + /// Gas consumption. + GasUsed, + /// Execution time. + ExecutionTime, + /// State root computation time. + StateRootTime, + /// Data availability bytes. + DataAvailability, +} + +impl ResourceKind { + /// Returns all resource kinds in a fixed order. + pub const fn all() -> [Self; 4] { + [Self::GasUsed, Self::ExecutionTime, Self::StateRootTime, Self::DataAvailability] + } + + /// Returns `true` if this resource is "use-it-or-lose-it", meaning capacity + /// that isn't consumed in one flashblock cannot be reclaimed in later ones. + /// + /// Execution time is the canonical example: the block builder has a fixed + /// time budget per block, and unused time in flashblock 0 doesn't roll over + /// to flashblock 1. For these resources, the estimator aggregates usage + /// across all flashblocks rather than evaluating each flashblock in isolation. + /// + /// Other resources like gas and DA bytes are bounded per-block but are + /// evaluated per-flashblock since their limits apply independently. + const fn use_it_or_lose_it(self) -> bool { + matches!(self, Self::ExecutionTime) + } + + /// Returns a human-readable name for the resource kind. + pub const fn as_name(&self) -> &'static str { + match self { + Self::GasUsed => "gas", + Self::ExecutionTime => "execution time", + Self::StateRootTime => "state root time", + Self::DataAvailability => "data availability", + } + } + + /// Returns a camelCase name for JSON serialization. + pub const fn as_camel_case(&self) -> &'static str { + match self { + Self::GasUsed => "gasUsed", + Self::ExecutionTime => "executionTime", + Self::StateRootTime => "stateRootTime", + Self::DataAvailability => "dataAvailability", + } + } +} + +/// Amount of resources required by the bundle being priced. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceDemand { + /// Gas demand. + pub gas_used: Option, + /// Execution time demand in microseconds. + pub execution_time_us: Option, + /// State root time demand in microseconds. + pub state_root_time_us: Option, + /// Data availability bytes demand. + pub data_availability_bytes: Option, +} + +impl ResourceDemand { + fn demand_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Fee estimate for a single resource type. +/// +/// The estimation algorithm answers: "What priority fee would my bundle need to pay +/// to displace enough lower-paying transactions to free up the resources I need?" +#[derive(Debug, Clone)] +pub struct ResourceEstimate { + /// Minimum fee to displace enough capacity for the bundle's resource demand. + pub threshold_priority_fee: U256, + /// Recommended fee based on a percentile of transactions above the threshold. + /// Provides a safety margin over the bare minimum. + pub recommended_priority_fee: U256, + /// Total resource usage of transactions at or above the threshold fee. + pub cumulative_usage: u128, + /// Number of transactions at or above `threshold_priority_fee`. These higher-paying + /// transactions remain included alongside the bundle; lower-paying ones are displaced. + pub threshold_tx_count: usize, + /// Total transactions considered in the estimate. + pub total_transactions: usize, +} + +/// Per-resource fee estimates. +/// +/// Each field corresponds to a resource type. `None` indicates the resource +/// was not requested or could not be estimated (e.g., demand exceeds capacity). +#[derive(Debug, Clone, Default)] +pub struct ResourceEstimates { + /// Gas usage estimate. + pub gas_used: Option, + /// Execution time estimate. + pub execution_time: Option, + /// State root time estimate. + pub state_root_time: Option, + /// Data availability estimate. + pub data_availability: Option, +} + +impl ResourceEstimates { + /// Returns the estimate for the given resource kind. + pub const fn get(&self, kind: ResourceKind) -> Option<&ResourceEstimate> { + match kind { + ResourceKind::GasUsed => self.gas_used.as_ref(), + ResourceKind::ExecutionTime => self.execution_time.as_ref(), + ResourceKind::StateRootTime => self.state_root_time.as_ref(), + ResourceKind::DataAvailability => self.data_availability.as_ref(), + } + } + + /// Sets the estimate for the given resource kind. + pub const fn set(&mut self, kind: ResourceKind, estimate: ResourceEstimate) { + match kind { + ResourceKind::GasUsed => self.gas_used = Some(estimate), + ResourceKind::ExecutionTime => self.execution_time = Some(estimate), + ResourceKind::StateRootTime => self.state_root_time = Some(estimate), + ResourceKind::DataAvailability => self.data_availability = Some(estimate), + } + } + + /// Iterates over all present estimates with their resource kind. + pub fn iter(&self) -> impl Iterator { + [ + (ResourceKind::GasUsed, &self.gas_used), + (ResourceKind::ExecutionTime, &self.execution_time), + (ResourceKind::StateRootTime, &self.state_root_time), + (ResourceKind::DataAvailability, &self.data_availability), + ] + .into_iter() + .filter_map(|(kind, opt)| opt.as_ref().map(|est| (kind, est))) + } + + /// Returns true if no estimates are present. + pub fn is_empty(&self) -> bool { + self.iter().next().is_none() + } +} + +/// Estimates for a specific flashblock index. +#[derive(Debug, Clone)] +pub struct FlashblockResourceEstimates { + /// Flashblock index. + pub flashblock_index: u64, + /// Per-resource estimates. + pub estimates: ResourceEstimates, +} + +/// Aggregated estimates for a block. +#[derive(Debug, Clone)] +pub struct BlockPriorityEstimates { + /// Block number. + pub block_number: u64, + /// Per-flashblock estimates. + pub flashblocks: Vec, + /// Minimum recommended fee across all flashblocks (easiest inclusion). + pub min_across_flashblocks: ResourceEstimates, + /// Maximum recommended fee across all flashblocks (most competitive). + pub max_across_flashblocks: ResourceEstimates, +} + +/// Priority fee estimate aggregated across multiple recent blocks. +#[derive(Debug, Clone)] +pub struct RollingPriorityEstimate { + /// Number of blocks that contributed to this estimate. + pub blocks_sampled: usize, + /// Per-resource estimates (median across sampled blocks). + pub estimates: ResourceEstimates, + /// Recommended priority fee: maximum across all resources. + pub priority_fee: U256, +} + +/// Computes resource fee estimates based on cached flashblock metering data. +#[derive(Debug)] +pub struct PriorityFeeEstimator { + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + /// Optional shared DA config from the miner RPC. When set, the estimator uses + /// `max_da_block_size` from this config instead of `limits.data_availability_bytes`. + /// This allows dynamic updates via `miner_setMaxDASize`. + da_config: Option, +} + +impl PriorityFeeEstimator { + /// Creates a new estimator referencing the shared metering cache. + /// + /// # Parameters + /// - `cache`: Shared cache containing recent flashblock metering data. + /// - `percentile`: Point in the fee distribution (among transactions above threshold) + /// to use for the recommended fee. + /// - `limits`: Configured resource capacity limits. + /// - `default_priority_fee`: Fee to return when a resource is not congested. + /// - `da_config`: Optional shared DA config for dynamic DA limit updates. + pub const fn new( + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + da_config: Option, + ) -> Self { + Self { cache, percentile, limits, default_priority_fee, da_config } + } + + /// Returns the current DA block size limit, preferring the dynamic `OpDAConfig` value + /// if available, otherwise falling back to the static limit. + pub fn max_da_block_size(&self) -> Option { + self.da_config + .as_ref() + .and_then(|c| c.max_da_block_size()) + .or(self.limits.data_availability_bytes) + } + + /// Returns the limit for the given resource kind, using dynamic config where available. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::DataAvailability => self.max_da_block_size().map(|v| v as u128), + _ => self.limits.limit_for(resource), + } + } + + /// Returns fee estimates for the provided block. If `block_number` is `None` + /// the most recent block in the cache is used. + /// + /// Returns `Ok(None)` if the cache is empty, the requested block is not cached, + /// or no transactions exist in the cached flashblocks. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_for_block( + &self, + block_number: Option, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_metrics = block_number + .map_or_else(|| cache_guard.blocks_desc().next(), |target| cache_guard.block(target)); + let Some(block_metrics) = block_metrics else { + return Ok(None); + }; + + let block_number = block_metrics.block_number; + + // Clone transactions per flashblock so we can drop the lock. + // Transactions are pre-sorted descending by priority fee in the cache. + let mut flashblock_transactions = Vec::new(); + let mut total_tx_count = 0usize; + for flashblock in block_metrics.flashblocks() { + let txs: Vec = flashblock.transactions().to_vec(); + if txs.is_empty() { + continue; + } + total_tx_count += txs.len(); + flashblock_transactions.push((flashblock.flashblock_index, txs)); + } + drop(cache_guard); + + if flashblock_transactions.is_empty() { + return Ok(None); + } + + // Build the aggregate list for use-it-or-lose-it resources. + // Need to sort since we're combining multiple pre-sorted flashblocks. + let mut aggregate_refs: Vec<&MeteredTransaction> = Vec::with_capacity(total_tx_count); + for (_, txs) in &flashblock_transactions { + aggregate_refs.extend(txs.iter()); + } + aggregate_refs.sort_by(|a, b| b.priority_fee_per_gas.cmp(&a.priority_fee_per_gas)); + + let mut flashblock_estimates = Vec::new(); + + for (flashblock_index, txs) in &flashblock_transactions { + // Build a reference slice for this flashblock's transactions. + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + + let mut estimates = ResourceEstimates::default(); + for resource in ResourceKind::all() { + let Some(demand_value) = demand.demand_for(resource) else { + continue; + }; + let Some(limit_value) = self.limit_for(resource) else { + continue; + }; + + let transactions: &[&MeteredTransaction] = + if resource.use_it_or_lose_it() { &aggregate_refs } else { &txs_refs }; + let estimate = compute_estimate( + resource, + transactions, + demand_value, + limit_value, + usage_extractor(resource), + self.percentile, + self.default_priority_fee, + )?; + + estimates.set(resource, estimate); + } + + flashblock_estimates.push(FlashblockResourceEstimates { + flashblock_index: *flashblock_index, + estimates, + }); + } + + let (min_across_flashblocks, max_across_flashblocks) = + compute_min_max_estimates(&flashblock_estimates); + + Ok(Some(BlockPriorityEstimates { + block_number, + flashblocks: flashblock_estimates, + min_across_flashblocks, + max_across_flashblocks, + })) + } + + /// Returns rolling fee estimates aggregated across the most recent blocks in the cache. + /// + /// For each resource, computes estimates per-block and takes the median recommended fee. + /// The final `recommended_priority_fee` is the maximum across all resources. + /// + /// Returns `Ok(None)` if the cache is empty or no blocks contain transaction data. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_rolling( + &self, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_numbers: Vec = cache_guard.blocks_desc().map(|b| b.block_number).collect(); + drop(cache_guard); + + if block_numbers.is_empty() { + return Ok(None); + } + + // Collect per-block max estimates. Propagate any errors. + let mut block_estimates = Vec::new(); + for &n in &block_numbers { + if let Some(est) = self.estimate_for_block(Some(n), demand)? { + block_estimates.push(est.max_across_flashblocks); + } + } + + if block_estimates.is_empty() { + return Ok(None); + } + + // Compute median fee for each resource across blocks. + let mut estimates = ResourceEstimates::default(); + let mut max_fee = U256::ZERO; + + for resource in ResourceKind::all() { + let mut fees: Vec = block_estimates + .iter() + .filter_map(|e| e.get(resource)) + .map(|e| e.recommended_priority_fee) + .collect(); + + if fees.is_empty() { + continue; + } + + fees.sort(); + let median_fee = fees[fees.len() / 2]; + max_fee = max_fee.max(median_fee); + + estimates.set( + resource, + ResourceEstimate { + threshold_priority_fee: median_fee, + recommended_priority_fee: median_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }, + ); + } + + if estimates.is_empty() { + return Ok(None); + } + + Ok(Some(RollingPriorityEstimate { + blocks_sampled: block_numbers.len(), + estimates, + priority_fee: max_fee, + })) + } +} + +/// Core estimation algorithm (top-down approach). +/// +/// Given a sorted list of transactions and a resource limit, determines the minimum priority +/// fee needed to be included alongside enough high-paying transactions while still +/// leaving room for the bundle's demand. +/// +/// # Arguments +/// +/// * `transactions` - Must be sorted by priority fee descending (highest first) +/// +/// # Algorithm +/// +/// 1. Walk from highest-paying transactions, subtracting each transaction's usage from +/// remaining capacity. +/// 2. Stop when including another transaction would leave less capacity than the bundle needs. +/// 3. The threshold fee is the fee of the last included transaction (the minimum fee +/// among transactions that would be included alongside the bundle). +/// 4. If we include all transactions and still have capacity >= demand, the resource is +/// not congested, so return the configured default fee. +/// +/// Returns `Err` if the bundle's demand exceeds the resource limit. +fn compute_estimate( + resource: ResourceKind, + transactions: &[&MeteredTransaction], + demand: u128, + limit: u128, + usage_fn: fn(&MeteredTransaction) -> u128, + percentile: f64, + default_fee: U256, +) -> Result { + // Bundle demand exceeds the resource limit entirely. + if demand > limit { + return Err(EstimateError::DemandExceedsCapacity { resource, demand, limit }); + } + + // No transactions or zero demand means no competition for this resource. + if transactions.is_empty() || demand == 0 { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }); + } + + // Walk from highest-paying transactions, subtracting usage from remaining capacity. + // Stop when we can no longer fit another transaction while leaving room for demand. + let mut remaining = limit; + let mut included_usage = 0u128; + let mut last_included_idx: Option = None; + + for (idx, tx) in transactions.iter().enumerate() { + let usage = usage_fn(tx); + + // Check if we can include this transaction and still have room for the bundle. + if remaining >= usage && remaining.saturating_sub(usage) >= demand { + remaining = remaining.saturating_sub(usage); + included_usage = included_usage.saturating_add(usage); + last_included_idx = Some(idx); + } else { + // Can't include this transaction without crowding out the bundle. + break; + } + } + + // If we included all transactions and still have room, resource is not congested. + let is_uncongested = last_included_idx == Some(transactions.len() - 1) && remaining >= demand; + + if is_uncongested { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: included_usage, + threshold_tx_count: transactions.len(), + total_transactions: transactions.len(), + }); + } + + let (supporting_count, threshold_fee, recommended_fee) = last_included_idx.map_or_else( + || { + // No transactions fit - even the first transaction would crowd out + // the bundle. The bundle must beat the highest fee to be included. + // Report 0 supporting transactions since none were actually included. + let threshold_fee = transactions[0].priority_fee_per_gas; + (0, threshold_fee, threshold_fee) + }, + |idx| { + // At least one transaction fits alongside the bundle. + // The threshold is the fee of the last included transaction. + let threshold_fee = transactions[idx].priority_fee_per_gas; + + // For recommended fee, look at included transactions (those above threshold) + // and pick one at the specified percentile for a safety margin. + let included = &transactions[..=idx]; + let percentile = percentile.clamp(0.0, 1.0); + let recommended_fee = if included.len() <= 1 { + threshold_fee + } else { + // Pick from the higher end of included transactions for safety. + let pos = ((included.len() - 1) as f64 * (1.0 - percentile)).round() as usize; + included[pos.min(included.len() - 1)].priority_fee_per_gas + }; + + (idx + 1, threshold_fee, recommended_fee) + }, + ); + + Ok(ResourceEstimate { + threshold_priority_fee: threshold_fee, + recommended_priority_fee: recommended_fee, + cumulative_usage: included_usage, + threshold_tx_count: supporting_count, + total_transactions: transactions.len(), + }) +} + +/// Returns a function that extracts the relevant resource usage from a transaction. +fn usage_extractor(resource: ResourceKind) -> fn(&MeteredTransaction) -> u128 { + match resource { + ResourceKind::GasUsed => |tx: &MeteredTransaction| tx.gas_used as u128, + ResourceKind::ExecutionTime => |tx: &MeteredTransaction| tx.execution_time_us, + ResourceKind::StateRootTime => |tx: &MeteredTransaction| tx.state_root_time_us, + ResourceKind::DataAvailability => { + |tx: &MeteredTransaction| tx.data_availability_bytes as u128 + } + } +} + +/// Computes the minimum and maximum recommended fees across all flashblocks. +/// +/// Returns two `ResourceEstimates`: +/// - First: For each resource, the estimate with the lowest recommended fee (easiest inclusion). +/// - Second: For each resource, the estimate with the highest recommended fee (most competitive). +fn compute_min_max_estimates( + flashblocks: &[FlashblockResourceEstimates], +) -> (ResourceEstimates, ResourceEstimates) { + let mut min_estimates = ResourceEstimates::default(); + let mut max_estimates = ResourceEstimates::default(); + + for flashblock in flashblocks { + for (resource, estimate) in flashblock.estimates.iter() { + // Update min. + let current_min = min_estimates.get(resource); + if current_min.is_none() + || estimate.recommended_priority_fee < current_min.unwrap().recommended_priority_fee + { + min_estimates.set(resource, estimate.clone()); + } + + // Update max. + let current_max = max_estimates.get(resource); + if current_max.is_none() + || estimate.recommended_priority_fee > current_max.unwrap().recommended_priority_fee + { + max_estimates.set(resource, estimate.clone()); + } + } + } + + (min_estimates, max_estimates) +} + +#[cfg(test)] +mod tests { + use alloy_primitives::B256; + + use super::*; + + const DEFAULT_FEE: U256 = U256::from_limbs([1, 0, 0, 0]); // 1 wei + + fn tx(priority: u64, usage: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&priority.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: usage, + execution_time_us: usage as u128, + state_root_time_us: usage as u128, + data_availability_bytes: usage, + } + } + + #[test] + fn compute_estimate_congested_resource() { + // Limit: 30, Demand: 15 + // Transactions: priority=10 (10 gas), priority=5 (10 gas), priority=2 (10 gas) + // Walking from top (highest fee): + // - Include tx priority=10: remaining = 30-10 = 20 >= 15 ok + // - Include tx priority=5: remaining = 20-10 = 10 < 15 stop + // Threshold = 10 (the last included tx's fee) + let txs = [tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); // Only the first tx was included + assert_eq!(quote.threshold_tx_count, 1); + assert_eq!(quote.total_transactions, 3); + } + + #[test] + fn compute_estimate_uncongested_resource() { + // Limit: 100, Demand: 15 + // All transactions fit with room to spare -> return default fee + let txs = [tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 100, // limit is much larger than total usage + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + assert_eq!(quote.cumulative_usage, 30); // All txs included + assert_eq!(quote.threshold_tx_count, 3); + } + + #[test] + fn compute_estimate_demand_exceeds_limit() { + // Demand > Limit -> Error + let txs = [tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let result = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 50, // demand + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ); + assert!(matches!( + result, + Err(EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 50, + limit: 30, + }) + )); + } + + #[test] + fn compute_estimate_exact_fit() { + // Limit: 30, Demand: 20 + // Transactions: priority=10 (10 gas), priority=5 (10 gas) + // After including tx priority=10: remaining = 20 >= 20 ok + // After including tx priority=5: remaining = 10 < 20 stop + let txs = [tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); + assert_eq!(quote.threshold_tx_count, 1); + } + + #[test] + fn compute_estimate_single_transaction() { + // Single tx that fits + let txs = [tx(10, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // After including the tx: remaining = 20 >= 15 ok + // But we only have 1 tx, so it's uncongested + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + #[test] + fn compute_estimate_no_room_for_any_tx() { + // Limit: 25, Demand: 20 + // First tx uses 10, remaining = 15 < 20 -> can't even include first tx + let txs = [tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 25, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // No transactions can be included, threshold is the highest fee + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.threshold_tx_count, 0); + assert_eq!(quote.cumulative_usage, 0); + } + + #[test] + fn compute_estimate_empty_transactions() { + // No transactions = uncongested, return default fee + let txs_refs: Vec<&MeteredTransaction> = vec![]; + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + const DEFAULT_LIMITS: ResourceLimits = ResourceLimits { + gas_used: Some(25), + execution_time_us: Some(100), + state_root_time_us: None, + data_availability_bytes: Some(100), + }; + + fn setup_estimator( + limits: ResourceLimits, + ) -> (Arc>, PriorityFeeEstimator) { + let cache = Arc::new(RwLock::new(MeteringCache::new(4))); + let estimator = PriorityFeeEstimator::new(cache.clone(), 0.5, limits, DEFAULT_FEE, None); + (cache, estimator) + } + + #[test] + fn estimate_for_block_respects_limits() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + } + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let estimates = + estimator.estimate_for_block(Some(1), demand).expect("no error").expect("cached block"); + + assert_eq!(estimates.block_number, 1); + let gas_estimate = estimates.max_across_flashblocks.gas_used.expect("gas estimate present"); + assert_eq!(gas_estimate.threshold_priority_fee, U256::from(10)); + } + + #[test] + fn estimate_for_block_propagates_limit_errors() { + let mut limits = DEFAULT_LIMITS; + limits.gas_used = Some(10); + let (cache, estimator) = setup_estimator(limits); + { + let mut guard = cache.write(); + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + } + let demand = ResourceDemand { gas_used: Some(15), ..Default::default() }; + + let err = estimator + .estimate_for_block(Some(1), demand) + .expect_err("demand should exceed capacity"); + assert!(matches!( + err, + EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 15, + limit: 10 + } + )); + } + + #[test] + fn estimate_rolling_aggregates_across_blocks() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + // Block 1 → threshold 10 + guard.push_transaction(1, 0, tx(10, 10)); + guard.push_transaction(1, 0, tx(5, 10)); + // Block 2 → threshold 30 + guard.push_transaction(2, 0, tx(30, 10)); + guard.push_transaction(2, 0, tx(25, 10)); + } + + let demand = ResourceDemand { gas_used: Some(15), ..Default::default() }; + + let rolling = + estimator.estimate_rolling(demand).expect("no error").expect("estimates available"); + + assert_eq!(rolling.blocks_sampled, 2); + let gas_estimate = rolling.estimates.gas_used.expect("gas estimate present"); + // Median across [10, 30] = 30 (upper median for even count) + assert_eq!(gas_estimate.recommended_priority_fee, U256::from(30)); + assert_eq!(rolling.priority_fee, U256::from(30)); + } +} diff --git a/crates/rpc/src/base/meter_rpc.rs b/crates/rpc/src/base/meter_rpc.rs index 631d287f..555eb50d 100644 --- a/crates/rpc/src/base/meter_rpc.rs +++ b/crates/rpc/src/base/meter_rpc.rs @@ -1,22 +1,55 @@ +//! Implementation of the metering RPC API. + +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + use alloy_consensus::Header; -use alloy_eips::BlockNumberOrTag; -use alloy_primitives::{B256, U256}; -use jsonrpsee::core::{RpcResult, async_trait}; +use alloy_eips::{BlockNumberOrTag, Encodable2718}; +use alloy_primitives::{B256, TxHash, U256}; +use jsonrpsee::{ + core::{RpcResult, async_trait}, + types::{ErrorCode, ErrorObjectOwned}, +}; +use op_alloy_flz::tx_estimated_size_fjord_bytes; use reth::providers::BlockReaderIdExt; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_primitives::OpBlock; use reth_provider::{BlockReader, ChainSpecProvider, HeaderProvider, StateProviderFactory}; use tips_core::types::{Bundle, MeterBundleResponse, ParsedBundle}; -use tracing::{error, info}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; use super::{ - block::meter_block, meter::meter_bundle, traits::MeteringApiServer, types::MeterBlockResponse, + block::meter_block, + meter::meter_bundle, + traits::MeteringApiServer, + types::{MeterBlockResponse, MeteredPriorityFeeResponse, ResourceFeeEstimateResponse}, +}; +use crate::{ + AnnotatorCommand, MeteredTransaction, PriorityFeeEstimator, ResourceDemand, ResourceEstimates, + RollingPriorityEstimate, }; /// Implementation of the metering RPC API -#[derive(Debug)] pub struct MeteringApiImpl { provider: Provider, + priority_fee_estimator: Option>, + /// Channel to send metered transactions to the annotator. + tx_sender: Option>, + /// Channel to send commands to the annotator. + command_sender: Option>, + /// Whether metering data collection is enabled. + metering_enabled: Arc, +} + +impl std::fmt::Debug for MeteringApiImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MeteringApiImpl") + .field("metering_enabled", &self.metering_enabled.load(Ordering::Relaxed)) + .finish_non_exhaustive() + } } impl MeteringApiImpl @@ -28,86 +61,100 @@ where + HeaderProvider
+ Clone, { - /// Creates a new instance of MeteringApi - pub const fn new(provider: Provider) -> Self { - Self { provider } + /// Creates a new instance of MeteringApi without priority fee estimation. + pub fn new(provider: Provider) -> Self { + Self { + provider, + priority_fee_estimator: None, + tx_sender: None, + command_sender: None, + metering_enabled: Arc::new(AtomicBool::new(true)), + } } -} -#[async_trait] -impl MeteringApiServer for MeteringApiImpl -where - Provider: StateProviderFactory - + ChainSpecProvider - + BlockReaderIdExt
- + BlockReader - + HeaderProvider
- + Clone - + Send - + Sync - + 'static, -{ - async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + /// Creates a new instance of MeteringApi with priority fee estimation enabled. + pub fn with_estimator( + provider: Provider, + priority_fee_estimator: Arc, + tx_sender: mpsc::UnboundedSender, + command_sender: mpsc::UnboundedSender, + ) -> Self { + Self { + provider, + priority_fee_estimator: Some(priority_fee_estimator), + tx_sender: Some(tx_sender), + command_sender: Some(command_sender), + metering_enabled: Arc::new(AtomicBool::new(true)), + } + } + + fn run_metering( + &self, + bundle: Bundle, + ) -> Result<(MeterBundleResponse, ResourceDemand), ErrorObjectOwned> { info!( num_transactions = &bundle.txs.len(), block_number = &bundle.block_number, "Starting bundle metering" ); - // Get the latest header let header = self .provider .sealed_header_by_number_or_tag(BlockNumberOrTag::Latest) .map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get latest header: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get latest header: {e}"), None::<()>, ) })? .ok_or_else(|| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), "Latest block not found".to_string(), None::<()>, ) })?; let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InvalidParams.code(), - format!("Failed to parse bundle: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InvalidParams.code(), + format!("Failed to parse bundle: {e}"), None::<()>, ) })?; - // Get state provider for the block + let da_usage: u64 = parsed_bundle + .txs + .iter() + .map(|tx| tx_estimated_size_fjord_bytes(&tx.encoded_2718())) + .sum(); + let state_provider = self.provider.state_by_block_hash(header.hash()).map_err(|e| { error!(error = %e, "Failed to get state provider"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get state provider: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get state provider: {e}"), None::<()>, ) })?; - // Meter bundle using utility function + let chain_spec = self.provider.chain_spec(); + let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle(state_provider, self.provider.chain_spec(), parsed_bundle, &header) - .map_err(|e| { - error!(error = %e, "Bundle metering failed"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Bundle metering failed: {}", e), - None::<()>, - ) - })?; - - // Calculate average gas price + meter_bundle(state_provider, chain_spec, parsed_bundle, &header).map_err(|e| { + error!(error = %e, "Bundle metering failed"); + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Bundle metering failed: {e}"), + None::<()>, + ) + })?; + let bundle_gas_price = if total_gas_used > 0 { total_gas_fees / U256::from(total_gas_used) } else { - U256::from(0) + U256::ZERO }; info!( @@ -118,18 +165,79 @@ where "Bundle metering completed successfully" ); - Ok(MeterBundleResponse { + let response = MeterBundleResponse { bundle_gas_price, bundle_hash, coinbase_diff: total_gas_fees, - eth_sent_to_coinbase: U256::from(0), + eth_sent_to_coinbase: U256::ZERO, gas_fees: total_gas_fees, results, state_block_number: header.number, state_flashblock_index: None, total_gas_used, total_execution_time_us: total_execution_time, - }) + }; + + let resource_demand = ResourceDemand { + gas_used: Some(total_gas_used), + execution_time_us: Some(total_execution_time), + state_root_time_us: None, // Populated when state-root metrics become available. + data_availability_bytes: Some(da_usage), + }; + + Ok((response, resource_demand)) + } +} + +#[async_trait] +impl MeteringApiServer for MeteringApiImpl +where + Provider: StateProviderFactory + + ChainSpecProvider + + BlockReaderIdExt
+ + BlockReader + + HeaderProvider
+ + Clone + + Send + + Sync + + 'static, +{ + async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + let (response, _) = self.run_metering(bundle)?; + Ok(response) + } + + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult { + let (meter_bundle, resource_demand) = self.run_metering(bundle)?; + + let estimator = self.priority_fee_estimator.as_ref().ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Priority fee estimation not enabled".to_string(), + None::<()>, + ) + })?; + + debug!(?resource_demand, "Computing priority fee estimates"); + + let estimates = estimator + .estimate_rolling(resource_demand) + .map_err(|e| { + ErrorObjectOwned::owned(ErrorCode::InvalidParams.code(), e.to_string(), None::<()>) + })? + .ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Priority fee data unavailable".to_string(), + None::<()>, + ) + })?; + + let response = build_priority_fee_response(meter_bundle, estimates); + Ok(response) } async fn meter_block_by_hash(&self, hash: B256) -> RpcResult { @@ -207,6 +315,79 @@ where Ok(response) } + + async fn set_metering_info( + &self, + tx_hash: TxHash, + meter: MeterBundleResponse, + ) -> RpcResult<()> { + if !self.metering_enabled.load(Ordering::Relaxed) { + debug!(tx_hash = %tx_hash, "Metering disabled, ignoring set_metering_info"); + return Ok(()); + } + + let tx_sender = self.tx_sender.as_ref().ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Metering pipeline not configured".to_string(), + None::<()>, + ) + })?; + + // Extract data from the first transaction result (single tx metering) + let result = meter.results.first().ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InvalidParams.code(), + "MeterBundleResponse must contain at least one result".to_string(), + None::<()>, + ) + })?; + + let metered_tx = MeteredTransaction { + tx_hash, + priority_fee_per_gas: result.gas_price, + gas_used: result.gas_used, + execution_time_us: result.execution_time_us, + state_root_time_us: 0, // Not available in MeterBundleResponse + data_availability_bytes: 0, // Not available in MeterBundleResponse, will be set by annotator if needed + }; + + debug!( + tx_hash = %tx_hash, + gas_used = result.gas_used, + execution_time_us = result.execution_time_us, + "Received metering info via RPC" + ); + + if tx_sender.send(metered_tx).is_err() { + warn!(tx_hash = %tx_hash, "Failed to send metered transaction to annotator"); + } + + Ok(()) + } + + async fn set_metering_enabled(&self, enabled: bool) -> RpcResult<()> { + self.metering_enabled.store(enabled, Ordering::Relaxed); + info!(enabled, "Metering data collection enabled state changed"); + Ok(()) + } + + async fn clear_metering_info(&self) -> RpcResult<()> { + let command_sender = self.command_sender.as_ref().ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Metering pipeline not configured".to_string(), + None::<()>, + ) + })?; + + if command_sender.send(AnnotatorCommand::ClearPending).is_err() { + warn!("Failed to send clear command to annotator"); + } + + info!("Cleared pending metering information"); + Ok(()) + } } impl MeteringApiImpl @@ -233,3 +414,34 @@ where }) } } + +/// Converts a rolling estimate to the response format. +fn build_priority_fee_response( + meter_bundle: MeterBundleResponse, + estimate: RollingPriorityEstimate, +) -> MeteredPriorityFeeResponse { + let resource_estimates = build_resource_estimate_responses(&estimate.estimates); + + MeteredPriorityFeeResponse { + meter_bundle, + priority_fee: estimate.priority_fee, + blocks_sampled: estimate.blocks_sampled as u64, + resource_estimates, + } +} + +fn build_resource_estimate_responses( + estimates: &ResourceEstimates, +) -> Vec { + estimates + .iter() + .map(|(kind, est)| ResourceFeeEstimateResponse { + resource: kind.as_camel_case().to_string(), + threshold_priority_fee: est.threshold_priority_fee, + recommended_priority_fee: est.recommended_priority_fee, + cumulative_usage: U256::from(est.cumulative_usage), + threshold_tx_count: est.threshold_tx_count.try_into().unwrap_or(u64::MAX), + total_transactions: est.total_transactions.try_into().unwrap_or(u64::MAX), + }) + .collect() +} diff --git a/crates/rpc/src/base/mod.rs b/crates/rpc/src/base/mod.rs index 05aea14f..d68c0979 100644 --- a/crates/rpc/src/base/mod.rs +++ b/crates/rpc/src/base/mod.rs @@ -1,4 +1,7 @@ +pub(crate) mod annotator; pub(crate) mod block; +pub(crate) mod cache; +pub(crate) mod estimator; pub(crate) mod meter; pub(crate) mod meter_rpc; pub(crate) mod pubsub; diff --git a/crates/rpc/src/base/traits.rs b/crates/rpc/src/base/traits.rs index 00ca4c1a..95871426 100644 --- a/crates/rpc/src/base/traits.rs +++ b/crates/rpc/src/base/traits.rs @@ -4,7 +4,10 @@ use alloy_eips::BlockNumberOrTag; use alloy_primitives::{B256, TxHash}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use crate::{Bundle, MeterBlockResponse, MeterBundleResponse, TransactionStatusResponse}; +use crate::{ + Bundle, MeterBlockResponse, MeterBundleResponse, MeteredPriorityFeeResponse, + TransactionStatusResponse, +}; /// RPC API for transaction metering #[rpc(server, namespace = "base")] @@ -41,6 +44,28 @@ pub trait MeteringApi { &self, number: BlockNumberOrTag, ) -> RpcResult; + + /// Estimates the priority fee necessary for a bundle to be included in recently observed + /// flashblocks, considering multiple resource constraints. + #[method(name = "meteredPriorityFeePerGas")] + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult; + + /// Sets metering information for a transaction. Called by tips-ingress to push + /// transaction resource usage data for priority fee estimation. + #[method(name = "setMeteringInfo")] + async fn set_metering_info(&self, tx_hash: TxHash, meter: MeterBundleResponse) + -> RpcResult<()>; + + /// Enables or disables metering data collection. + #[method(name = "setMeteringEnabled")] + async fn set_metering_enabled(&self, enabled: bool) -> RpcResult<()>; + + /// Clears all pending metering information. + #[method(name = "clearMeteringInfo")] + async fn clear_metering_info(&self) -> RpcResult<()>; } /// RPC API for transaction status diff --git a/crates/rpc/src/base/transaction_rpc.rs b/crates/rpc/src/base/transaction_rpc.rs index 6924fadf..602e738f 100644 --- a/crates/rpc/src/base/transaction_rpc.rs +++ b/crates/rpc/src/base/transaction_rpc.rs @@ -95,7 +95,7 @@ mod tests { assert_eq!(Status::Unknown, result); let tx = MockTransaction::eip1559(); - let hash = tx.hash().clone(); + let hash = *tx.hash(); let before = rpc .transaction_status(hash) diff --git a/crates/rpc/src/base/types.rs b/crates/rpc/src/base/types.rs index 13b443a6..5c60fb2e 100644 --- a/crates/rpc/src/base/types.rs +++ b/crates/rpc/src/base/types.rs @@ -1,8 +1,9 @@ -//! Types for the transaction status rpc +//! Types for the Base RPC extensions. -use alloy_primitives::B256; +use alloy_primitives::{B256, U256}; use alloy_rpc_types_eth::pubsub::SubscriptionKind; use serde::{Deserialize, Serialize}; +use tips_core::types::MeterBundleResponse; /// The status of a transaction. #[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] @@ -134,3 +135,38 @@ pub struct MeterBlockTransactions { /// Execution time in microseconds pub execution_time_us: u128, } + +// --- Metered priority fee types --- + +/// Human-friendly representation of a resource fee quote. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceFeeEstimateResponse { + /// Resource name (gasUsed, executionTime, etc). + pub resource: String, + /// Minimum fee to displace enough capacity. + pub threshold_priority_fee: U256, + /// Recommended fee with safety margin. + pub recommended_priority_fee: U256, + /// Cumulative resource usage above threshold. + pub cumulative_usage: U256, + /// Number of transactions above threshold. + pub threshold_tx_count: u64, + /// Total transactions considered. + pub total_transactions: u64, +} + +/// Response payload for `base_meteredPriorityFeePerGas`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeteredPriorityFeeResponse { + /// Bundled metering results. + #[serde(flatten)] + pub meter_bundle: MeterBundleResponse, + /// Recommended priority fee (max across all resources and median across recent blocks). + pub priority_fee: U256, + /// Number of recent blocks used to compute the rolling estimate. + pub blocks_sampled: u64, + /// Per-resource estimates (median across sampled blocks). + pub resource_estimates: Vec, +} diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 6f839e5d..4d063ce7 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -8,7 +8,14 @@ pub use tips_core::types::{Bundle, MeterBundleResponse, TransactionResult}; mod base; pub use base::{ + annotator::{AnnotatorCommand, FlashblockInclusion, ResourceAnnotator}, block::meter_block, + cache::{BlockMetrics, FlashblockMetrics, MeteredTransaction, MeteringCache, ResourceTotals}, + estimator::{ + BlockPriorityEstimates, EstimateError, FlashblockResourceEstimates, PriorityFeeEstimator, + ResourceDemand, ResourceEstimate, ResourceEstimates, ResourceKind, ResourceLimits, + RollingPriorityEstimate, + }, meter::meter_bundle, meter_rpc::MeteringApiImpl, pubsub::{EthPubSub, EthPubSubApiServer}, @@ -16,7 +23,7 @@ pub use base::{ transaction_rpc::TransactionStatusApiImpl, types::{ BaseSubscriptionKind, ExtendedSubscriptionKind, MeterBlockResponse, MeterBlockTransactions, - Status, TransactionStatusResponse, + MeteredPriorityFeeResponse, ResourceFeeEstimateResponse, Status, TransactionStatusResponse, }, }; diff --git a/crates/rpc/tests/flashblocks_rpc.rs b/crates/rpc/tests/flashblocks_rpc.rs index 22ec2218..6514e42e 100644 --- a/crates/rpc/tests/flashblocks_rpc.rs +++ b/crates/rpc/tests/flashblocks_rpc.rs @@ -181,7 +181,7 @@ impl TestSetup { let (counter_deployment_tx, counter_address, _) = deployer .create_deployment_tx(DoubleCounter::BYTECODE.clone(), 0) .expect("should be able to sign DoubleCounter deployment txn"); - let counter = DoubleCounterInstance::new(counter_address.clone(), provider); + let counter = DoubleCounterInstance::new(counter_address, provider); let (increment1_tx, _) = deployer .sign_txn_request(counter.increment().into_transaction_request().nonce(1)) .expect("should be able to sign increment() txn"); @@ -198,8 +198,7 @@ impl TestSetup { .gas_limit(100_000) .nonce(0) .to(bob.address) - .value(U256::from_str("999999999000000000000000").unwrap()) - .into(), + .value(U256::from_str("999999999000000000000000").unwrap()), ) .expect("should be able to sign eth transfer txn"); @@ -227,8 +226,7 @@ impl TestSetup { .transaction_type(TransactionType::Eip1559.into()) .gas_limit(100_000) .nonce(5) - .to(log_emitter_a_address) - .into(), + .to(log_emitter_a_address), ) .expect("should be able to sign log trigger txn"); @@ -241,8 +239,7 @@ impl TestSetup { .gas_limit(21_000) .nonce(1) .to(TEST_ADDRESS) - .value(U256::from(PENDING_BALANCE)) - .into(), + .value(U256::from(PENDING_BALANCE)), ) .expect("should be able to sign balance transfer txn"); @@ -486,7 +483,7 @@ async fn test_get_transaction_receipt_pending() -> Result<()> { let provider = setup.harness.provider(); let receipt = provider.get_transaction_receipt(DEPOSIT_TX_HASH).await?; - assert_eq!(receipt.is_none(), true); + assert!(receipt.is_none()); setup.send_test_payloads().await?; diff --git a/crates/rpc/tests/meter.rs b/crates/rpc/tests/meter.rs index 23235233..72025024 100644 --- a/crates/rpc/tests/meter.rs +++ b/crates/rpc/tests/meter.rs @@ -94,7 +94,7 @@ fn setup_harness() -> eyre::Result { reth_db_common::init::init_genesis(&factory).context("initializing genesis state")?; - let provider = BlockchainProvider::new(factory.clone()).context("creating provider")?; + let provider = BlockchainProvider::new(factory).context("creating provider")?; let header = provider .sealed_header(0) .context("fetching genesis header")? @@ -104,7 +104,7 @@ fn setup_harness() -> eyre::Result { } fn envelope_from_signed(tx: &OpTransactionSigned) -> eyre::Result { - Ok(tx.clone().into()) + Ok(tx.clone()) } fn create_parsed_bundle(envelopes: Vec) -> eyre::Result { @@ -176,7 +176,7 @@ fn meter_bundle_single_transaction() -> eyre::Result<()> { .state_by_block_hash(harness.header.hash()) .context("getting state provider")?; - let parsed_bundle = create_parsed_bundle(vec![envelope.clone()])?; + let parsed_bundle = create_parsed_bundle(vec![envelope])?; let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?; @@ -253,7 +253,7 @@ fn meter_bundle_multiple_transactions() -> eyre::Result<()> { .state_by_block_hash(harness.header.hash()) .context("getting state provider")?; - let parsed_bundle = create_parsed_bundle(vec![envelope_1.clone(), envelope_2.clone()])?; + let parsed_bundle = create_parsed_bundle(vec![envelope_1, envelope_2])?; let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?; diff --git a/crates/rpc/tests/meter_block.rs b/crates/rpc/tests/meter_block.rs index fc3b54b2..d9d37430 100644 --- a/crates/rpc/tests/meter_block.rs +++ b/crates/rpc/tests/meter_block.rs @@ -79,7 +79,7 @@ fn setup_harness() -> eyre::Result { reth_db_common::init::init_genesis(&factory).context("initializing genesis state")?; - let provider = BlockchainProvider::new(factory.clone()).context("creating provider")?; + let provider = BlockchainProvider::new(factory).context("creating provider")?; let header = provider .sealed_header(0) .context("fetching genesis header")? @@ -122,7 +122,7 @@ fn meter_block_empty_transactions() -> eyre::Result<()> { let block = create_block_with_transactions(&harness, vec![]); - let response = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block)?; + let response = meter_block(harness.provider.clone(), harness.chain_spec, &block)?; assert_eq!(response.block_hash, block.header().hash_slow()); assert_eq!(response.block_number, block.header().number()); @@ -160,7 +160,7 @@ fn meter_block_single_transaction() -> eyre::Result<()> { let block = create_block_with_transactions(&harness, vec![tx]); - let response = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block)?; + let response = meter_block(harness.provider.clone(), harness.chain_spec, &block)?; assert_eq!(response.block_hash, block.header().hash_slow()); assert_eq!(response.block_number, block.header().number()); @@ -225,7 +225,7 @@ fn meter_block_multiple_transactions() -> eyre::Result<()> { let block = create_block_with_transactions(&harness, vec![tx_1, tx_2]); - let response = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block)?; + let response = meter_block(harness.provider.clone(), harness.chain_spec, &block)?; assert_eq!(response.block_hash, block.header().hash_slow()); assert_eq!(response.block_number, block.header().number()); @@ -283,7 +283,7 @@ fn meter_block_timing_consistency() -> eyre::Result<()> { let block = create_block_with_transactions(&harness, vec![tx]); - let response = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block)?; + let response = meter_block(harness.provider.clone(), harness.chain_spec, &block)?; // Verify timing invariants assert!(response.signer_recovery_time_us > 0, "signer recovery time must be positive"); @@ -322,7 +322,7 @@ fn meter_block_parent_header_not_found() -> eyre::Result<()> { let body = OpBlockBody { transactions: vec![], ommers: vec![], withdrawals: None }; let block = OpBlock::new(header, body); - let result = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block); + let result = meter_block(harness.provider.clone(), harness.chain_spec, &block); assert!(result.is_err(), "should fail when parent header is not found"); let err = result.unwrap_err(); @@ -365,7 +365,7 @@ fn meter_block_invalid_transaction_signature() -> eyre::Result<()> { let block = create_block_with_transactions(&harness, vec![op_tx]); - let result = meter_block(harness.provider.clone(), harness.chain_spec.clone(), &block); + let result = meter_block(harness.provider.clone(), harness.chain_spec, &block); assert!(result.is_err(), "should fail when transaction has invalid signature"); let err = result.unwrap_err(); diff --git a/crates/rpc/tests/meter_rpc.rs b/crates/rpc/tests/meter_rpc.rs index c7efd4c2..715d08a3 100644 --- a/crates/rpc/tests/meter_rpc.rs +++ b/crates/rpc/tests/meter_rpc.rs @@ -3,11 +3,15 @@ use std::{any::Any, net::SocketAddr, sync::Arc}; use alloy_eips::Encodable2718; -use alloy_primitives::{Bytes, U256, address, b256, bytes}; +use alloy_primitives::{Address, B256, Bytes, U256, address, b256, bytes}; use alloy_rpc_client::RpcClient; -use base_reth_rpc::{MeterBundleResponse, MeteringApiImpl, MeteringApiServer}; +use base_reth_rpc::{ + AnnotatorCommand, MeterBundleResponse, MeteredTransaction, MeteringApiImpl, MeteringApiServer, + MeteringCache, PriorityFeeEstimator, ResourceLimits, +}; use base_reth_test_utils::{init_silenced_tracing, load_genesis}; use op_alloy_consensus::OpTxEnvelope; +use parking_lot::RwLock; use reth::{ args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, builder::{Node, NodeBuilder, NodeConfig, NodeHandle}, @@ -20,7 +24,8 @@ use reth_optimism_node::{OpNode, args::RollupArgs}; use reth_optimism_primitives::OpTransactionSigned; use reth_provider::providers::BlockchainProvider; use reth_transaction_pool::test_utils::TransactionBuilder; -use tips_core::types::Bundle; +use tips_core::types::{Bundle, TransactionResult}; +use tokio::sync::mpsc; struct NodeContext { http_api_addr: SocketAddr, @@ -29,7 +34,7 @@ struct NodeContext { } // Helper function to create a Bundle with default fields -fn create_bundle(txs: Vec, block_number: u64, min_timestamp: Option) -> Bundle { +const fn create_bundle(txs: Vec, block_number: u64, min_timestamp: Option) -> Bundle { Bundle { txs, block_number, @@ -142,7 +147,7 @@ async fn test_meter_bundle_single_transaction() -> eyre::Result<()> { let signed_tx = OpTransactionSigned::Eip1559(tx.as_eip1559().expect("eip1559 transaction").clone()); - let envelope: OpTxEnvelope = signed_tx.into(); + let envelope: OpTxEnvelope = signed_tx; // Encode transaction let tx_bytes = Bytes::from(envelope.encoded_2718()); @@ -189,7 +194,7 @@ async fn test_meter_bundle_multiple_transactions() -> eyre::Result<()> { let tx1_signed = OpTransactionSigned::Eip1559(tx1_inner.as_eip1559().expect("eip1559 transaction").clone()); - let tx1_envelope: OpTxEnvelope = tx1_signed.into(); + let tx1_envelope: OpTxEnvelope = tx1_signed; let tx1_bytes = Bytes::from(tx1_envelope.encoded_2718()); // Second transaction from second account @@ -209,7 +214,7 @@ async fn test_meter_bundle_multiple_transactions() -> eyre::Result<()> { let tx2_signed = OpTransactionSigned::Eip1559(tx2_inner.as_eip1559().expect("eip1559 transaction").clone()); - let tx2_envelope: OpTxEnvelope = tx2_signed.into(); + let tx2_envelope: OpTxEnvelope = tx2_signed; let tx2_bytes = Bytes::from(tx2_envelope.encoded_2718()); let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); @@ -359,7 +364,7 @@ async fn test_meter_bundle_gas_calculations() -> eyre::Result<()> { let signed_tx1 = OpTransactionSigned::Eip1559(tx1_inner.as_eip1559().expect("eip1559 transaction").clone()); - let envelope1: OpTxEnvelope = signed_tx1.into(); + let envelope1: OpTxEnvelope = signed_tx1; let tx1_bytes = Bytes::from(envelope1.encoded_2718()); // Second transaction with 7 gwei gas price @@ -376,7 +381,7 @@ async fn test_meter_bundle_gas_calculations() -> eyre::Result<()> { let signed_tx2 = OpTransactionSigned::Eip1559(tx2_inner.as_eip1559().expect("eip1559 transaction").clone()); - let envelope2: OpTxEnvelope = signed_tx2.into(); + let envelope2: OpTxEnvelope = signed_tx2; let tx2_bytes = Bytes::from(envelope2.encoded_2718()); let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); @@ -410,3 +415,204 @@ async fn test_meter_bundle_gas_calculations() -> eyre::Result<()> { Ok(()) } + +/// Context for a node with metering pipeline enabled +struct MeteringNodeContext { + http_api_addr: SocketAddr, + tx_receiver: mpsc::UnboundedReceiver, + cmd_receiver: mpsc::UnboundedReceiver, + _node_exit_future: NodeExitFuture, + _node: Box, +} + +impl MeteringNodeContext { + async fn rpc_client(&self) -> eyre::Result { + let url = format!("http://{}", self.http_api_addr); + let client = RpcClient::new_http(url.parse()?); + Ok(client) + } +} + +async fn setup_node_with_metering() -> eyre::Result { + init_silenced_tracing(); + let tasks = TaskManager::current(); + let exec = tasks.executor(); + const BASE_SEPOLIA_CHAIN_ID: u64 = 84532; + + let genesis = load_genesis(); + let chain_spec = Arc::new( + OpChainSpecBuilder::base_mainnet() + .genesis(genesis) + .ecotone_activated() + .chain(Chain::from(BASE_SEPOLIA_CHAIN_ID)) + .build(), + ); + + let network_config = NetworkArgs { + discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() }, + ..NetworkArgs::default() + }; + + let node_config = NodeConfig::new(chain_spec.clone()) + .with_network(network_config.clone()) + .with_rpc(RpcServerArgs::default().with_unused_ports().with_http()) + .with_unused_ports(); + + let node = OpNode::new(RollupArgs::default()); + + // Create the metering channels that we'll capture for testing + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (cmd_sender, cmd_receiver) = mpsc::unbounded_channel::(); + + let NodeHandle { node, node_exit_future } = NodeBuilder::new(node_config.clone()) + .testing_node(exec.clone()) + .with_types_and_provider::>() + .with_components(node.components_builder()) + .with_add_ons(node.add_ons()) + .extend_rpc_modules(move |ctx| { + // Create a minimal estimator for testing + let cache = Arc::new(RwLock::new(MeteringCache::new(10))); + let limits = ResourceLimits { + gas_used: Some(30_000_000), + execution_time_us: Some(50_000), + state_root_time_us: None, + data_availability_bytes: Some(120_000), + }; + let estimator = + Arc::new(PriorityFeeEstimator::new(cache, 0.5, limits, U256::from(1), None)); + + let metering_api = MeteringApiImpl::with_estimator( + ctx.provider().clone(), + estimator, + tx_sender.clone(), + cmd_sender, + ); + ctx.modules.merge_configured(metering_api.into_rpc())?; + Ok(()) + }) + .launch() + .await?; + + let http_api_addr = node + .rpc_server_handle() + .http_local_addr() + .ok_or_else(|| eyre::eyre!("Failed to get http api address"))?; + + Ok(MeteringNodeContext { + http_api_addr, + tx_receiver, + cmd_receiver, + _node_exit_future: node_exit_future, + _node: Box::new(node), + }) +} + +fn test_meter_bundle_response() -> MeterBundleResponse { + MeterBundleResponse { + bundle_gas_price: U256::from(1000), + bundle_hash: B256::ZERO, + coinbase_diff: U256::from(21000), + eth_sent_to_coinbase: U256::ZERO, + gas_fees: U256::from(21000), + results: vec![TransactionResult { + coinbase_diff: U256::from(21000), + eth_sent_to_coinbase: U256::ZERO, + from_address: Address::ZERO, + gas_fees: U256::from(21000), + gas_price: U256::from(1000000000), // 1 gwei + gas_used: 21000, + to_address: Some(Address::ZERO), + tx_hash: B256::ZERO, + value: U256::ZERO, + execution_time_us: 500, + }], + state_block_number: 100, + state_flashblock_index: None, + total_gas_used: 21000, + total_execution_time_us: 500, + } +} + +#[tokio::test] +async fn test_set_metering_info_sends_to_annotator() -> eyre::Result<()> { + let mut ctx = setup_node_with_metering().await?; + let client = ctx.rpc_client().await?; + + let tx_hash = B256::random(); + let meter = test_meter_bundle_response(); + + // Call the RPC method + let _: () = client.request("base_setMeteringInfo", (tx_hash, meter)).await?; + + // Verify the transaction was sent to the channel + let received = ctx.tx_receiver.try_recv()?; + assert_eq!(received.tx_hash, tx_hash); + assert_eq!(received.gas_used, 21000); + assert_eq!(received.execution_time_us, 500); + assert_eq!(received.priority_fee_per_gas, U256::from(1000000000)); + + Ok(()) +} + +#[tokio::test] +async fn test_set_metering_info_with_empty_results_returns_error() -> eyre::Result<()> { + let ctx = setup_node_with_metering().await?; + let client = ctx.rpc_client().await?; + + let tx_hash = B256::random(); + let meter = MeterBundleResponse { + results: vec![], // Empty results should fail + ..test_meter_bundle_response() + }; + + // Call should fail due to empty results + let result: Result<(), _> = client.request("base_setMeteringInfo", (tx_hash, meter)).await; + assert!(result.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn test_set_metering_enabled_toggles_collection() -> eyre::Result<()> { + let mut ctx = setup_node_with_metering().await?; + let client = ctx.rpc_client().await?; + + // Disable metering + let _: () = client.request("base_setMeteringEnabled", (false,)).await?; + + // Try to send metering info - should be silently ignored + let tx_hash = B256::random(); + let meter = test_meter_bundle_response(); + let _: () = client.request("base_setMeteringInfo", (tx_hash, meter.clone())).await?; + + // Verify nothing was sent (channel should be empty) + assert!(ctx.tx_receiver.try_recv().is_err()); + + // Re-enable metering + let _: () = client.request("base_setMeteringEnabled", (true,)).await?; + + // Now send should work + let tx_hash2 = B256::random(); + let _: () = client.request("base_setMeteringInfo", (tx_hash2, meter)).await?; + + // Verify it was sent + let received = ctx.tx_receiver.try_recv()?; + assert_eq!(received.tx_hash, tx_hash2); + + Ok(()) +} + +#[tokio::test] +async fn test_clear_metering_info_sends_command() -> eyre::Result<()> { + let mut ctx = setup_node_with_metering().await?; + let client = ctx.rpc_client().await?; + + // Call clear + let _: () = client.request("base_clearMeteringInfo", ()).await?; + + // Verify the command was sent + let received = ctx.cmd_receiver.try_recv()?; + assert!(matches!(received, AnnotatorCommand::ClearPending)); + + Ok(()) +} diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index 8f48a83e..2937f032 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -23,6 +23,13 @@ reth-db.workspace = true reth-exex.workspace = true reth-optimism-node.workspace = true reth-optimism-chainspec.workspace = true +reth-optimism-payload-builder.workspace = true + +# alloy +alloy-primitives.workspace = true + +# flashblocks +base-flashtypes.workspace = true # misc eyre.workspace = true @@ -30,4 +37,6 @@ futures-util.workspace = true once_cell.workspace = true tracing.workspace = true url.workspace = true +parking_lot.workspace = true derive_more = { workspace = true, features = ["debug"] } +tokio.workspace = true diff --git a/crates/runner/src/config.rs b/crates/runner/src/config.rs index 58c60a56..ac301381 100644 --- a/crates/runner/src/config.rs +++ b/crates/runner/src/config.rs @@ -1,6 +1,7 @@ //! Contains the Base node configuration structures. use reth_optimism_node::args::RollupArgs; +use reth_optimism_payload_builder::config::OpDAConfig; use crate::extensions::FlashblocksCell; @@ -15,8 +16,12 @@ pub struct BaseNodeConfig { pub tracing: TracingConfig, /// Indicates whether the metering RPC surface should be installed. pub metering_enabled: bool, + /// Configuration for priority fee estimation. + pub metering: MeteringConfig, /// Shared Flashblocks state cache. pub flashblocks_cell: FlashblocksCell, + /// Shared DA config for dynamic updates via `miner_setMaxDASize`. + pub da_config: OpDAConfig, } impl BaseNodeConfig { @@ -43,3 +48,31 @@ pub struct TracingConfig { /// Emits `info`-level logs for the tracing ExEx when enabled. pub logs_enabled: bool, } + +/// Configuration for priority fee estimation. +#[derive(Debug, Clone)] +pub struct MeteringConfig { + /// Whether metering is enabled. + pub enabled: bool, + /// Resource limits for fee estimation. + pub resource_limits: ResourceLimitsConfig, + /// Percentile for recommended priority fee (0.0-1.0). + pub priority_fee_percentile: f64, + /// Default priority fee when resource is not congested (in wei). + pub uncongested_priority_fee: u128, + /// Number of recent blocks to retain in metering cache. + pub cache_size: usize, +} + +/// Resource limits for priority fee estimation. +#[derive(Debug, Clone, Copy)] +pub struct ResourceLimitsConfig { + /// Gas limit per flashblock. + pub gas_limit: u64, + /// Execution time budget in microseconds. + pub execution_time_us: u64, + /// State root time budget in microseconds (optional). + pub state_root_time_us: Option, + /// Data availability bytes limit. + pub da_bytes: u64, +} diff --git a/crates/runner/src/extensions/rpc.rs b/crates/runner/src/extensions/rpc.rs index 164a3af5..2e6b56d7 100644 --- a/crates/runner/src/extensions/rpc.rs +++ b/crates/runner/src/extensions/rpc.rs @@ -2,19 +2,96 @@ use std::sync::Arc; -use base_reth_flashblocks::{FlashblocksState, FlashblocksSubscriber}; +use alloy_primitives::{B256, U256, keccak256}; +use base_flashtypes::Flashblock; +use base_reth_flashblocks::{FlashblocksReceiver, FlashblocksState, FlashblocksSubscriber}; use base_reth_rpc::{ - EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, MeteringApiImpl, - MeteringApiServer, TransactionStatusApiImpl, TransactionStatusApiServer, + AnnotatorCommand, EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, + FlashblockInclusion, MeteredTransaction, MeteringApiImpl, MeteringApiServer, MeteringCache, + PriorityFeeEstimator, ResourceAnnotator, ResourceLimits, TransactionStatusApiImpl, + TransactionStatusApiServer, }; -use tracing::info; +use parking_lot::RwLock; +use reth_optimism_payload_builder::config::OpDAConfig; +use tokio::sync::mpsc; +use tracing::{info, warn}; use url::Url; use crate::{ - BaseNodeConfig, FlashblocksConfig, + BaseNodeConfig, FlashblocksConfig, MeteringConfig, extensions::{BaseNodeExtension, ConfigurableBaseNodeExtension, FlashblocksCell, OpBuilder}, }; +/// Runtime state for the metering pipeline. +#[derive(Clone)] +struct MeteringRuntime { + /// Priority fee estimator. + estimator: Arc, + /// Sender for metered transactions from RPC. + tx_sender: mpsc::UnboundedSender, + /// Sender for flashblock inclusions. + flashblock_sender: mpsc::UnboundedSender, + /// Sender for annotator commands. + command_sender: mpsc::UnboundedSender, +} + +/// Composite receiver that forwards flashblocks to both FlashblocksState and the metering pipeline. +struct CompositeFlashblocksReceiver { + state: Arc>, + /// Optional channel for the metering pipeline; flashblocks RPC still needs the stream even + /// when metering is disabled, so we only forward inclusions if a sender is provided. + metering_sender: Option>, +} + +impl CompositeFlashblocksReceiver { + const fn new( + state: Arc>, + metering_sender: Option>, + ) -> Self { + Self { state, metering_sender } + } +} + +impl FlashblocksReceiver for CompositeFlashblocksReceiver +where + FlashblocksState: FlashblocksReceiver, +{ + fn on_flashblock_received(&self, flashblock: Flashblock) { + // Forward to the state first + self.state.on_flashblock_received(flashblock.clone()); + + // Then forward to metering if enabled + let Some(sender) = &self.metering_sender else { + return; + }; + let Some(inclusion) = flashblock_inclusion_from_flashblock(&flashblock) else { + return; + }; + + if sender.send(inclusion).is_err() { + warn!( + target: "metering::flashblocks", + "Failed to forward flashblock inclusion to metering" + ); + } + } +} + +/// Converts a flashblock to a FlashblockInclusion for the metering pipeline. +fn flashblock_inclusion_from_flashblock(flashblock: &Flashblock) -> Option { + if flashblock.diff.transactions.is_empty() { + return None; + } + + let ordered_tx_hashes: Vec = flashblock.diff.transactions.iter().map(keccak256).collect(); + + Some(FlashblockInclusion { + block_number: flashblock.metadata.block_number, + flashblock_index: flashblock.index, + ordered_tx_hashes, + }) +} + /// Helper struct that wires the custom RPC modules into the node builder. #[derive(Debug, Clone)] pub struct BaseRpcExtension { @@ -22,10 +99,12 @@ pub struct BaseRpcExtension { pub flashblocks_cell: FlashblocksCell, /// Optional Flashblocks configuration. pub flashblocks: Option, - /// Indicates whether the metering RPC surface should be installed. - pub metering_enabled: bool, + /// Full metering configuration. + pub metering: MeteringConfig, /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, + /// Shared DA config for dynamic updates via `miner_setMaxDASize`. + pub da_config: OpDAConfig, } impl BaseRpcExtension { @@ -34,8 +113,9 @@ impl BaseRpcExtension { Self { flashblocks_cell: config.flashblocks_cell.clone(), flashblocks: config.flashblocks.clone(), - metering_enabled: config.metering_enabled, + metering: config.metering.clone(), sequencer_rpc: config.rollup_args.sequencer.clone(), + da_config: config.da_config.clone(), } } } @@ -45,13 +125,71 @@ impl BaseNodeExtension for BaseRpcExtension { fn apply(&self, builder: OpBuilder) -> OpBuilder { let flashblocks_cell = self.flashblocks_cell.clone(); let flashblocks = self.flashblocks.clone(); - let metering_enabled = self.metering_enabled; + let metering = self.metering.clone(); let sequencer_rpc = self.sequencer_rpc.clone(); + let da_config = self.da_config.clone(); builder.extend_rpc_modules(move |ctx| { - if metering_enabled { - info!(message = "Starting Metering RPC"); - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); + // Set up metering runtime if enabled + let metering_runtime = if metering.enabled { + info!(message = "Starting Metering RPC with priority fee estimation"); + + let cache = Arc::new(RwLock::new(MeteringCache::new(metering.cache_size))); + let limits = ResourceLimits { + gas_used: Some(metering.resource_limits.gas_limit), + execution_time_us: Some(metering.resource_limits.execution_time_us as u128), + state_root_time_us: metering + .resource_limits + .state_root_time_us + .map(|v| v as u128), + data_availability_bytes: Some(metering.resource_limits.da_bytes), + }; + let default_fee = U256::from(metering.uncongested_priority_fee); + let estimator = Arc::new(PriorityFeeEstimator::new( + cache.clone(), + metering.priority_fee_percentile, + limits, + default_fee, + Some(da_config.clone()), + )); + + // Create channels for the annotator + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (flashblock_sender, flashblock_receiver) = + mpsc::unbounded_channel::(); + let (command_sender, command_receiver) = + mpsc::unbounded_channel::(); + + // Spawn the resource annotator + tokio::spawn(async move { + ResourceAnnotator::new( + cache, + tx_receiver, + flashblock_receiver, + command_receiver, + ) + .run() + .await; + }); + + Some(MeteringRuntime { estimator, tx_sender, flashblock_sender, command_sender }) + } else { + None + }; + + // Register metering RPC + if metering.enabled { + let metering_api = metering_runtime.as_ref().map_or_else( + || MeteringApiImpl::new(ctx.provider().clone()), + |rt| { + MeteringApiImpl::with_estimator( + ctx.provider().clone(), + rt.estimator.clone(), + rt.tx_sender.clone(), + rt.command_sender.clone(), + ) + }, + ); ctx.modules.merge_configured(metering_api.into_rpc())?; } @@ -74,7 +212,13 @@ impl BaseNodeExtension for BaseRpcExtension { .clone(); fb.start(); - let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); + // Create composite receiver that forwards to both flashblocks state and metering + let metering_sender = + metering_runtime.as_ref().map(|rt| rt.flashblock_sender.clone()); + let receiver = + Arc::new(CompositeFlashblocksReceiver::new(fb.clone(), metering_sender)); + + let mut flashblocks_client = FlashblocksSubscriber::new(receiver, ws_url); flashblocks_client.start(); let api_ext = EthApiExt::new( @@ -85,8 +229,9 @@ impl BaseNodeExtension for BaseRpcExtension { ctx.modules.replace_configured(api_ext.into_rpc())?; // Register the eth_subscribe subscription endpoint for flashblocks - // Uses replace_configured since eth_subscribe already exists from reth's standard module - // Pass eth_api to enable proxying standard subscription types to reth's implementation + // Uses replace_configured since eth_subscribe already exists from reth's standard + // module Pass eth_api to enable proxying standard subscription types to + // reth's implementation let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb); ctx.modules.replace_configured(eth_pubsub.into_rpc())?; } else { diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index c96fadf1..300b7fa7 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -13,7 +13,9 @@ mod runner; pub use runner::BaseNodeRunner; mod config; -pub use config::{BaseNodeConfig, FlashblocksConfig, TracingConfig}; +pub use config::{ + BaseNodeConfig, FlashblocksConfig, MeteringConfig, ResourceLimitsConfig, TracingConfig, +}; mod extensions; pub use extensions::{ diff --git a/crates/runner/src/runner.rs b/crates/runner/src/runner.rs index e33f2ec4..9f0dcaca 100644 --- a/crates/runner/src/runner.rs +++ b/crates/runner/src/runner.rs @@ -56,7 +56,8 @@ impl BaseNodeRunner { ) -> Result> { info!(target: "base-runner", "starting custom Base node"); - let op_node = OpNode::new(config.rollup_args.clone()); + let op_node = + OpNode::new(config.rollup_args.clone()).with_da_config(config.da_config.clone()); let builder = builder .with_types_and_provider::>()