Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
workflow_dispatch: {}

env:
REGISTRY_IMAGE: ghcr.io/base/node-reth-dev
REGISTRY_IMAGE: ghcr.io/${{ github.repository_owner }}/node-reth-dev

permissions:
contents: read
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ lru = "0.16.2"
rand = "0.9.2"
uuid = "1.19.0"
time = "0.3.44"
rayon = "1.11"
clap = "4.5.53"
eyre = "0.6.12"
bytes = "1.11.0"
Expand Down
7 changes: 7 additions & 0 deletions crates/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tracing.workspace = true
metrics.workspace = true
arc-swap.workspace = true
metrics-derive.workspace = true
rayon.workspace = true

[dev-dependencies]
rand.workspace = true
Expand All @@ -69,8 +70,14 @@ reth-e2e-test-utils.workspace = true
base-reth-test-utils.workspace = true
reth-primitives-traits.workspace = true
reth-optimism-primitives.workspace = true
reth-transaction-pool.workspace = true
serde_json.workspace = true
criterion = { version = "0.5", features = ["async_tokio"] }

[[bench]]
name = "pending_state"
harness = false

[[bench]]
name = "sender_recovery"
harness = false
108,952 changes: 108,952 additions & 0 deletions crates/flashblocks/benches/fixtures/base_mainnet_blocks.json

Large diffs are not rendered by default.

137 changes: 137 additions & 0 deletions crates/flashblocks/benches/sender_recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#![allow(missing_docs)]

//! Benchmark for sender recovery performance.
//!
//! Compares sequential vs parallel ECDSA sender recovery
//! as requested in https://github.com/base/node-reth/issues/282

use alloy_consensus::transaction::SignerRecoverable;
use alloy_primitives::{Address, B256};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use rayon::prelude::*;
use reth_optimism_primitives::OpTransactionSigned;
use reth_transaction_pool::test_utils::TransactionBuilder;
use serde_json::Value;

/// Generate signed transactions from multiple unique signers.
fn generate_transactions(count: usize) -> Vec<OpTransactionSigned> {
(0..count)
.map(|i| {
// Create unique signer for each transaction to simulate varying senders
let mut signer_bytes = [0u8; 32];
signer_bytes[0] = ((i + 1) % 256) as u8;
signer_bytes[1] = ((i + 1) / 256) as u8;
signer_bytes[31] = 1; // Ensure non-zero
let signer = B256::from(signer_bytes);

let txn = TransactionBuilder::default()
.signer(signer)
.chain_id(8453) // Base mainnet
.to(Address::random())
.nonce(0)
.value(1_000_000_000u128)
.gas_limit(21_000)
.max_fee_per_gas(1_000_000_000)
.max_priority_fee_per_gas(1_000_000_000)
.into_eip1559()
.as_eip1559()
.expect("should convert to eip1559")
.clone();

OpTransactionSigned::Eip1559(txn)
})
.collect()
}

/// Sequential sender recovery (baseline)
fn recover_senders_sequential(txs: &[OpTransactionSigned]) -> Vec<Address> {
txs.iter().map(|tx| tx.recover_signer().expect("valid signature")).collect()
}

/// Parallel sender recovery using rayon
fn recover_senders_parallel(txs: &[OpTransactionSigned]) -> Vec<Address> {
txs.par_iter().map(|tx| tx.recover_signer().expect("valid signature")).collect()
}

/// Load real transactions from Base mainnet blocks fixture
fn load_real_transactions() -> Vec<OpTransactionSigned> {
let fixture_data = include_str!("./fixtures/base_mainnet_blocks.json");
let json: Value = serde_json::from_str(fixture_data).expect("valid JSON");

let mut transactions = Vec::new();

// Extract all transactions from all blocks
if let Some(blocks) = json["blocks"].as_array() {
for block in blocks {
if let Some(txs) = block["transactions"].as_array() {
for tx_value in txs {
// Try direct deserialization first
if let Ok(signed_tx) =
serde_json::from_value::<OpTransactionSigned>(tx_value.clone())
{
transactions.push(signed_tx);
}
}
}
}
}

transactions
}

fn sender_recovery_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("sender_recovery_synthetic");

// Test with 30, 40, and 100 transactions as requested in issue #282
for tx_count in [30, 40, 100] {
let transactions = generate_transactions(tx_count);

group.bench_with_input(
BenchmarkId::new("sequential", tx_count),
&transactions,
|b, txs| {
b.iter(|| recover_senders_sequential(txs));
},
);

group.bench_with_input(BenchmarkId::new("parallel", tx_count), &transactions, |b, txs| {
b.iter(|| recover_senders_parallel(txs));
});
}

group.finish();
}

fn sender_recovery_real_txs_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("sender_recovery_real_txs");

// Load real Base mainnet transactions
let all_transactions = load_real_transactions();
println!("Loaded {} real transactions from Base mainnet", all_transactions.len());

// Benchmark with different batch sizes
for tx_count in [30, 40, 100, 500, 1000] {
if all_transactions.len() < tx_count {
continue;
}

let transactions: Vec<_> = all_transactions.iter().take(tx_count).cloned().collect();

group.bench_with_input(
BenchmarkId::new("sequential", tx_count),
&transactions,
|b, txs| {
b.iter(|| recover_senders_sequential(txs));
},
);

group.bench_with_input(BenchmarkId::new("parallel", tx_count), &transactions, |b, txs| {
b.iter(|| recover_senders_parallel(txs));
});
}

group.finish();
}

criterion_group!(benches, sender_recovery_benches, sender_recovery_real_txs_benches);
criterion_main!(benches);
4 changes: 4 additions & 0 deletions crates/flashblocks/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub struct Metrics {
#[metric(describe = "Time taken to process a message")]
pub block_processing_duration: Histogram,

/// Time spent on parallel sender recovery (ECDSA operations).
#[metric(describe = "Time spent on parallel sender recovery")]
pub sender_recovery_duration: Histogram,

/// Number of Flashblocks that arrive in an unexpected order.
#[metric(describe = "Number of Flashblocks that arrive in an unexpected order")]
pub unexpected_block_order: Counter,
Expand Down
33 changes: 23 additions & 10 deletions crates/flashblocks/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloy_consensus::{
transaction::{Recovered, SignerRecoverable, TransactionMeta},
};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap};
use alloy_primitives::{Address, B256, BlockNumber, Bytes, Sealable, map::foldhash::HashMap};
use alloy_rpc_types::{TransactionTrait, Withdrawal};
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::state::StateOverride;
Expand All @@ -21,6 +21,7 @@ use eyre::eyre;
use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::TransactionResponse;
use op_alloy_rpc_types::Transaction;
use rayon::prelude::*;
use reth::{
chainspec::{ChainSpecProvider, EthChainSpec},
providers::{BlockReaderIdExt, StateProviderFactory},
Expand Down Expand Up @@ -393,17 +394,29 @@ where
let mut gas_used = 0;
let mut next_log_index = 0;

for (idx, transaction) in block.body.transactions.iter().enumerate() {
// Parallel sender recovery - batch all ECDSA operations upfront
let recovery_start = Instant::now();
let txs_with_senders: Vec<(&OpTxEnvelope, Address)> = block
.body
.transactions
.par_iter()
.map(|tx| -> eyre::Result<(&OpTxEnvelope, Address)> {
let tx_hash = tx.tx_hash();
let sender = match prev_pending_blocks
.as_ref()
.and_then(|p| p.get_transaction_sender(&tx_hash))
{
Some(cached) => cached,
None => tx.recover_signer()?,
};
Ok((tx, sender))
})
.collect::<eyre::Result<_>>()?;
self.metrics.sender_recovery_duration.record(recovery_start.elapsed());

for (idx, (transaction, sender)) in txs_with_senders.into_iter().enumerate() {
let tx_hash = transaction.tx_hash();

let sender = match prev_pending_blocks
.as_ref()
.and_then(|p| p.get_transaction_sender(&tx_hash))
{
Some(sender) => sender,
None => transaction.recover_signer()?,
};

pending_blocks_builder.with_transaction_sender(tx_hash, sender);
pending_blocks_builder.increment_nonce(sender);

Expand Down