Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/ingress-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,16 @@ pub struct Metrics {

#[metric(describe = "Total raw transactions forwarded to additional endpoint")]
pub raw_tx_forwards_total: Counter,

#[metric(describe = "Total bulk bundle requests received")]
pub bulk_bundle_requests_received: Counter,

#[metric(describe = "Total bulk bundles processed successfully")]
pub bulk_bundles_processed_successfully: Counter,

#[metric(describe = "Total bulk bundles failed to process")]
pub bulk_bundles_failed: Counter,

#[metric(describe = "Duration of bulk bundle processing")]
pub send_bulk_bundles_duration: Histogram,
}
98 changes: 98 additions & 0 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ pub trait IngressApi {
user_operation: VersionedUserOperation,
entry_point: Address,
) -> RpcResult<FixedBytes<32>>;

/// Bulk bundle operations
#[method(name = "sendBulkBundles")]
async fn send_bulk_bundles(&self, bundles: Vec<Bundle>) -> RpcResult<Vec<BundleHash>>;

#[method(name = "cancelBulkBundles")]
async fn cancel_bulk_bundles(&self, requests: Vec<CancelBundle>) -> RpcResult<Vec<bool>>;
}

pub struct IngressService<Q: MessageQueue, M: Mempool> {
Expand Down Expand Up @@ -424,6 +431,95 @@ impl<Q: MessageQueue + 'static, M: Mempool + 'static> IngressApiServer for Ingre

Ok(user_op_hash)
}

async fn send_bulk_bundles(&self, bundles: Vec<Bundle>) -> RpcResult<Vec<BundleHash>> {
if bundles.is_empty() {
return Err(EthApiError::InvalidParams("No bundles provided".into()).into_rpc_err());
}

if bundles.len() > 10 {
return Err(EthApiError::InvalidParams("Maximum 10 bundles per bulk request".into()).into_rpc_err());
}

let start = Instant::now();
let mut results = Vec::with_capacity(bundles.len());
let mut successful_count = 0;
let mut failed_count = 0;

self.metrics.bulk_bundle_requests_received.increment(1);

// Process bundles sequentially to maintain order and proper validation
for bundle in bundles {
match self.validate_parse_and_meter_bundle(&bundle, true).await {
Ok((accepted_bundle, bundle_hash)) => {
// Get meter_bundle_response for builder broadcast
let meter_bundle_response = accepted_bundle.meter_bundle_response.clone();

// Send to builder
if let Err(e) = self.builder_tx.send(meter_bundle_response) {
warn!(message = "Failed to send bulk bundle to builder", bundle_hash = %bundle_hash, error = %e);
failed_count += 1;
continue;
}

// Publish to queue
if let Err(e) = self.bundle_queue_publisher.publish(&accepted_bundle, &bundle_hash).await {
warn!(message = "Failed to publish bulk bundle to queue", bundle_hash = %bundle_hash, error = %e);
failed_count += 1;
continue;
}

info!(message = "queued bulk bundle", bundle_hash = %bundle_hash);
self.send_audit_event(&accepted_bundle, bundle_hash);
results.push(BundleHash { bundle_hash });
successful_count += 1;
}
Err(e) => {
warn!(message = "Failed to process bulk bundle", error = %e);
failed_count += 1;
// Continue processing other bundles instead of failing the whole batch
}
}
}

self.metrics.bulk_bundles_processed_successfully.increment(successful_count);
self.metrics.bulk_bundles_failed.increment(failed_count);

self.metrics
.send_bulk_bundles_duration
.record(start.elapsed().as_secs_f64());

if results.is_empty() {
return Err(EthApiError::InvalidParams("All bundles failed to process".into()).into_rpc_err());
}

Ok(results)
}

async fn cancel_bulk_bundles(&self, requests: Vec<CancelBundle>) -> RpcResult<Vec<bool>> {
if requests.is_empty() {
return Err(EthApiError::InvalidParams("No cancellation requests provided".into()).into_rpc_err());
}

if requests.len() > 15 {
return Err(EthApiError::InvalidParams("Maximum 15 cancellation requests per bulk operation".into()).into_rpc_err());
}

let mut results = Vec::with_capacity(requests.len());

// Process cancellations sequentially
for request in requests {
// TODO: Implement actual bundle cancellation logic
// For now, just return success for all requests
warn!(
message = "Bulk bundle cancellation requested",
replacement_uuid = %request.replacement_uuid
);
results.push(true);
}

Ok(results)
}
}

impl<Q: MessageQueue, M: Mempool> IngressService<Q, M> {
Expand Down Expand Up @@ -835,6 +931,8 @@ mod tests {
user_operation: VersionedUserOperation,
entry_point: Address,
) -> RpcResult<FixedBytes<32>>;
async fn send_bulk_bundles(&self, bundles: Vec<Bundle>) -> RpcResult<Vec<BundleHash>>;
async fn cancel_bulk_bundles(&self, requests: Vec<CancelBundle>) -> RpcResult<Vec<bool>>;
}
}
#[tokio::test]
Expand Down