@@ -19,15 +19,27 @@ use graph::futures03::future::try_join_all;
1919use graph:: futures03:: {
2020 self , compat:: Future01CompatExt , FutureExt , StreamExt , TryFutureExt , TryStreamExt ,
2121} ;
22- use graph:: prelude:: alloy;
23- use graph:: prelude:: alloy:: primitives:: B256 ;
24- use graph:: prelude:: alloy:: rpc:: types:: { TransactionInput , TransactionRequest } ;
25- use graph:: prelude:: alloy:: transports:: RpcError ;
26-
27- use graph:: prelude:: alloy_transaction_receipt_to_web3_transaction_receipt;
28- use graph:: prelude:: h256_to_b256;
29- use graph:: prelude:: tokio:: try_join;
30- use graph:: prelude:: { alloy_log_to_web3_log, b256_to_h256} ;
22+ use graph:: prelude:: {
23+ alloy:: {
24+ self ,
25+ primitives:: B256 ,
26+ providers:: {
27+ ext:: TraceApi ,
28+ fillers:: {
29+ BlobGasFiller , ChainIdFiller , FillProvider , GasFiller , JoinFill , NonceFiller ,
30+ } ,
31+ Identity , Provider , RootProvider ,
32+ } ,
33+ rpc:: types:: {
34+ trace:: { filter:: TraceFilter as AlloyTraceFilter , parity:: LocalizedTransactionTrace } ,
35+ TransactionInput , TransactionRequest ,
36+ } ,
37+ transports:: { RpcError , TransportErrorKind } ,
38+ } ,
39+ alloy_log_to_web3_log, alloy_transaction_receipt_to_web3_transaction_receipt, b256_to_h256,
40+ h160_to_alloy_address, h256_to_b256,
41+ tokio:: try_join,
42+ } ;
3143use graph:: slog:: o;
3244use graph:: tokio:: sync:: RwLock ;
3345use graph:: tokio:: time:: timeout;
@@ -41,11 +53,7 @@ use graph::{
4153 TimeoutError ,
4254 } ,
4355} ;
44- use graph:: {
45- components:: ethereum:: * ,
46- prelude:: web3:: api:: Web3 ,
47- prelude:: web3:: types:: { Trace , TraceFilter , TraceFilterBuilder , H160 } ,
48- } ;
56+ use graph:: { components:: ethereum:: * , prelude:: web3:: api:: Web3 , prelude:: web3:: types:: H160 } ;
4957use itertools:: Itertools ;
5058use std:: collections:: { BTreeMap , BTreeSet , HashMap , HashSet } ;
5159use std:: convert:: TryFrom ;
@@ -73,12 +81,20 @@ use crate::{
7381 ENV_VARS ,
7482} ;
7583
84+ type AlloyProvider = FillProvider <
85+ JoinFill <
86+ Identity ,
87+ JoinFill < GasFiller , JoinFill < BlobGasFiller , JoinFill < NonceFiller , ChainIdFiller > > > ,
88+ > ,
89+ RootProvider ,
90+ > ;
91+
7692#[ derive( Clone ) ]
7793pub struct EthereumAdapter {
7894 logger : Logger ,
7995 provider : String ,
8096 web3 : Arc < Web3 < Transport > > ,
81- alloy : Arc < dyn alloy :: providers :: Provider > ,
97+ alloy : Arc < AlloyProvider > ,
8298 metrics : Arc < ProviderEthRpcMetrics > ,
8399 supports_eip_1898 : bool ,
84100 call_only : bool ,
@@ -164,79 +180,26 @@ impl EthereumAdapter {
164180 subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
165181 from : BlockNumber ,
166182 to : BlockNumber ,
167- addresses : Vec < H160 > ,
168- ) -> Result < Vec < Trace > , Error > {
183+ addresses : Vec < alloy :: primitives :: Address > ,
184+ ) -> Result < Vec < LocalizedTransactionTrace > , Error > {
169185 assert ! ( !self . call_only) ;
170186
171- let eth = self . clone ( ) ;
172187 let retry_log_message =
173188 format ! ( "trace_filter RPC call for block range: [{}..{}]" , from, to) ;
189+ let eth = self . clone ( ) ;
190+
174191 retry ( retry_log_message, & logger)
175192 . redact_log_urls ( true )
176193 . limit ( ENV_VARS . request_retries )
177194 . timeout_secs ( ENV_VARS . json_rpc_timeout . as_secs ( ) )
178195 . run ( move || {
179- let trace_filter: TraceFilter = match addresses. len ( ) {
180- 0 => TraceFilterBuilder :: default ( )
181- . from_block ( from. into ( ) )
182- . to_block ( to. into ( ) )
183- . build ( ) ,
184- _ => TraceFilterBuilder :: default ( )
185- . from_block ( from. into ( ) )
186- . to_block ( to. into ( ) )
187- . to_address ( addresses. clone ( ) )
188- . build ( ) ,
189- } ;
190-
191- let eth = eth. cheap_clone ( ) ;
192- let logger_for_triggers = logger. clone ( ) ;
193- let logger_for_error = logger. clone ( ) ;
194- let start = Instant :: now ( ) ;
196+ let eth = eth. clone ( ) ;
197+ let logger = logger. clone ( ) ;
195198 let subgraph_metrics = subgraph_metrics. clone ( ) ;
196- let provider_metrics = eth. metrics . clone ( ) ;
197- let provider = self . provider . clone ( ) ;
198-
199+ let addresses = addresses. clone ( ) ;
199200 async move {
200- let result = eth
201- . web3
202- . trace ( )
203- . filter ( trace_filter)
201+ eth. execute_trace_filter_request ( logger, subgraph_metrics, from, to, addresses)
204202 . await
205- . map ( move |traces| {
206- if !traces. is_empty ( ) {
207- if to == from {
208- debug ! (
209- logger_for_triggers,
210- "Received {} traces for block {}" ,
211- traces. len( ) ,
212- to
213- ) ;
214- } else {
215- debug ! (
216- logger_for_triggers,
217- "Received {} traces for blocks [{}, {}]" ,
218- traces. len( ) ,
219- from,
220- to
221- ) ;
222- }
223- }
224- traces
225- } )
226- . map_err ( Error :: from) ;
227-
228- let elapsed = start. elapsed ( ) . as_secs_f64 ( ) ;
229- provider_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
230- subgraph_metrics. observe_request ( elapsed, "trace_filter" , & provider) ;
231- if let Err ( e) = & result {
232- provider_metrics. add_error ( "trace_filter" , & provider) ;
233- subgraph_metrics. add_error ( "trace_filter" , & provider) ;
234- debug ! (
235- logger_for_error,
236- "Error querying traces error = {:#} from = {} to = {}" , e, from, to
237- ) ;
238- }
239- result
240203 }
241204 } )
242205 . map_err ( move |e| {
@@ -252,6 +215,93 @@ impl EthereumAdapter {
252215 . await
253216 }
254217
218+ async fn execute_trace_filter_request (
219+ & self ,
220+ logger : Logger ,
221+ subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
222+ from : BlockNumber ,
223+ to : BlockNumber ,
224+ addresses : Vec < alloy:: primitives:: Address > ,
225+ ) -> Result < Vec < LocalizedTransactionTrace > , Error > {
226+ let alloy_trace_filter = Self :: build_trace_filter ( from, to, & addresses) ;
227+ let start = Instant :: now ( ) ;
228+
229+ let result = self . alloy . trace_filter ( & alloy_trace_filter) . await ;
230+
231+ if let Ok ( traces) = & result {
232+ self . log_trace_results ( & logger, from, to, traces. len ( ) ) ;
233+ }
234+
235+ self . record_trace_metrics (
236+ & subgraph_metrics,
237+ start. elapsed ( ) . as_secs_f64 ( ) ,
238+ & result,
239+ from,
240+ to,
241+ & logger,
242+ ) ;
243+
244+ result. map_err ( Error :: from)
245+ }
246+
247+ fn build_trace_filter (
248+ from : BlockNumber ,
249+ to : BlockNumber ,
250+ addresses : & [ alloy:: primitives:: Address ] ,
251+ ) -> AlloyTraceFilter {
252+ let filter = AlloyTraceFilter :: default ( )
253+ . from_block ( from as u64 )
254+ . to_block ( to as u64 ) ;
255+
256+ if !addresses. is_empty ( ) {
257+ filter. to_address ( addresses. to_vec ( ) )
258+ } else {
259+ filter
260+ }
261+ }
262+
263+ fn log_trace_results (
264+ & self ,
265+ logger : & Logger ,
266+ from : BlockNumber ,
267+ to : BlockNumber ,
268+ trace_len : usize ,
269+ ) {
270+ if trace_len > 0 {
271+ if to == from {
272+ debug ! ( logger, "Received {} traces for block {}" , trace_len, to) ;
273+ } else {
274+ debug ! (
275+ logger,
276+ "Received {} traces for blocks [{}, {}]" , trace_len, from, to
277+ ) ;
278+ }
279+ }
280+ }
281+
282+ fn record_trace_metrics (
283+ & self ,
284+ subgraph_metrics : & Arc < SubgraphEthRpcMetrics > ,
285+ elapsed : f64 ,
286+ result : & Result < Vec < LocalizedTransactionTrace > , RpcError < TransportErrorKind > > ,
287+ from : BlockNumber ,
288+ to : BlockNumber ,
289+ logger : & Logger ,
290+ ) {
291+ self . metrics
292+ . observe_request ( elapsed, "trace_filter" , & self . provider ) ;
293+ subgraph_metrics. observe_request ( elapsed, "trace_filter" , & self . provider ) ;
294+
295+ if let Err ( e) = result {
296+ self . metrics . add_error ( "trace_filter" , & self . provider ) ;
297+ subgraph_metrics. add_error ( "trace_filter" , & self . provider ) ;
298+ debug ! (
299+ logger,
300+ "Error querying traces error = {:#} from = {} to = {}" , e, from, to
301+ ) ;
302+ }
303+ }
304+
255305 // This is a lazy check for block receipt support. It is only called once and then the result is
256306 // cached. The result is not used for anything critical, so it is fine to be lazy.
257307 async fn check_block_receipt_support_and_update_cache (
@@ -368,8 +418,8 @@ impl EthereumAdapter {
368418 subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
369419 from : BlockNumber ,
370420 to : BlockNumber ,
371- addresses : Vec < H160 > ,
372- ) -> impl futures03:: Stream < Item = Result < Trace , Error > > + Send {
421+ addresses : Vec < alloy :: primitives :: Address > ,
422+ ) -> impl futures03:: Stream < Item = Result < LocalizedTransactionTrace , Error > > + Send {
373423 if from > to {
374424 panic ! (
375425 "Can not produce a call stream on a backwards block range: from = {}, to = {}" ,
@@ -900,6 +950,11 @@ impl EthereumAdapter {
900950 addresses = vec ! [ ] ;
901951 }
902952
953+ let addresses = addresses
954+ . iter ( )
955+ . map ( |addr| h160_to_alloy_address ( * addr) )
956+ . collect ( ) ;
957+
903958 Box :: new (
904959 eth. trace_stream ( logger, subgraph_metrics, from, to, addresses)
905960 . try_filter_map ( move |trace| {
@@ -987,11 +1042,11 @@ impl EthereumAdapter {
9871042 logger : & Logger ,
9881043 subgraph_metrics : Arc < SubgraphEthRpcMetrics > ,
9891044 block_number : BlockNumber ,
990- block_hash : H256 ,
1045+ block_hash : alloy :: primitives :: B256 ,
9911046 ) -> Result < Vec < EthereumCall > , Error > {
9921047 let eth = self . clone ( ) ;
9931048 let addresses = Vec :: new ( ) ;
994- let traces: Vec < Trace > = eth
1049+ let traces: Vec < LocalizedTransactionTrace > = eth
9951050 . trace_stream (
9961051 logger,
9971052 subgraph_metrics. clone ( ) ,
@@ -1017,7 +1072,7 @@ impl EthereumAdapter {
10171072 // all the traces for the block, we need to ensure that the
10181073 // block hash for the traces is equal to the desired block hash.
10191074 // Assume all traces are for the same block.
1020- if traces. iter ( ) . nth ( 0 ) . unwrap ( ) . block_hash != block_hash {
1075+ if traces. iter ( ) . nth ( 0 ) . unwrap ( ) . block_hash != Some ( block_hash) {
10211076 return Err ( anyhow ! (
10221077 "Trace stream returned traces for an unexpected block: \
10231078 number = `{}`, hash = `{}`",
@@ -1835,7 +1890,7 @@ pub(crate) async fn get_calls(
18351890 subgraph_metrics. clone ( ) ,
18361891 BlockNumber :: try_from ( ethereum_block. block . number . unwrap ( ) . as_u64 ( ) )
18371892 . unwrap ( ) ,
1838- ethereum_block. block . hash . unwrap ( ) ,
1893+ h256_to_b256 ( ethereum_block. block . hash . unwrap ( ) ) ,
18391894 )
18401895 . await ?
18411896 } ;
0 commit comments