From 5f8165c84710eb5bb101c9827dfba90c4640b4b5 Mon Sep 17 00:00:00 2001 From: Andreas Richter <708186+richtera@users.noreply.github.com> Date: Mon, 6 May 2024 08:55:27 -0400 Subject: [PATCH 1/3] fix: Repair --- src/lib.rs | 16 +++++++++++++++- src/query.rs | 22 +++++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d29eead..e896524 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,7 +150,21 @@ impl HypersyncClient { query: Query, config: StreamConfig, ) -> napi::Result { - let query = query.try_convert().context("parse query")?; + let mut query = query.try_convert().context("parse query")?; + + query.logs = query + .logs + .into_iter() + .map(|log| { + let mut log = log; + let address = log.address.clone(); + if address.into_iter().all(|v| v.as_ref() == [0u8; 20]) { + log.address = vec![]; + } + log + }) + .collect(); + let config = config.try_convert().context("parse stream config")?; let inner = self diff --git a/src/query.rs b/src/query.rs index 780e9be..3b8d66d 100644 --- a/src/query.rs +++ b/src/query.rs @@ -129,7 +129,27 @@ pub struct Query { impl Query { pub fn try_convert(&self) -> Result { let json = serde_json::to_vec(self).context("serialize to json")?; - serde_json::from_slice(&json).context("parse json") + let query_result: Result = + serde_json::from_slice(&json).context("parse json"); + match query_result { + Ok(query) => { + let mut query = query.clone(); + query.logs = query + .logs + .into_iter() + .map(|log| { + let mut log = log; + let address = log.address.clone(); + if address.into_iter().all(|v| v.as_ref() == [0u8; 20]) { + log.address = vec![]; + } + log + }) + .collect(); + Ok(query) + } + Err(e) => Err(e), + } } } From dab8353783222500ec45cd8188a8b1a9b9d05050 Mon Sep 17 00:00:00 2001 From: Andreas Richter <708186+richtera@users.noreply.github.com> Date: Mon, 6 May 2024 18:33:09 -0400 Subject: [PATCH 2/3] fix: More work to make wildcard addresses work. Includes a patch to skar-client --- Cargo.toml | 6 + patch/skar-client-0.16.3/.cargo-ok | 1 + patch/skar-client-0.16.3/.cargo_vcs_info.json | 6 + patch/skar-client-0.16.3/Cargo.toml | 133 +++ patch/skar-client-0.16.3/Cargo.toml.orig | 58 ++ .../skar-client-0.16.3/src/column_mapping.rs | 206 +++++ patch/skar-client-0.16.3/src/config.rs | 18 + patch/skar-client-0.16.3/src/decode.rs | 159 ++++ patch/skar-client-0.16.3/src/lib.rs | 465 +++++++++++ patch/skar-client-0.16.3/src/parquet_out.rs | 783 ++++++++++++++++++ patch/skar-client-0.16.3/src/rayon_async.rs | 16 + .../src/transport_format.rs | 39 + patch/skar-client-0.16.3/src/types.rs | 104 +++ .../test-data/ens_token_abi.json | 1 + .../test-data/erc20.abi.json | 222 +++++ .../test-data/nameless.abi.json | 1 + patch/skar-client-0.16.3/tests/api_test.rs | 394 +++++++++ src/lib.rs | 2 +- 18 files changed, 2613 insertions(+), 1 deletion(-) create mode 100644 patch/skar-client-0.16.3/.cargo-ok create mode 100644 patch/skar-client-0.16.3/.cargo_vcs_info.json create mode 100644 patch/skar-client-0.16.3/Cargo.toml create mode 100644 patch/skar-client-0.16.3/Cargo.toml.orig create mode 100644 patch/skar-client-0.16.3/src/column_mapping.rs create mode 100644 patch/skar-client-0.16.3/src/config.rs create mode 100644 patch/skar-client-0.16.3/src/decode.rs create mode 100644 patch/skar-client-0.16.3/src/lib.rs create mode 100644 patch/skar-client-0.16.3/src/parquet_out.rs create mode 100644 patch/skar-client-0.16.3/src/rayon_async.rs create mode 100644 patch/skar-client-0.16.3/src/transport_format.rs create mode 100644 patch/skar-client-0.16.3/src/types.rs create mode 100644 patch/skar-client-0.16.3/test-data/ens_token_abi.json create mode 100644 patch/skar-client-0.16.3/test-data/erc20.abi.json create mode 100644 patch/skar-client-0.16.3/test-data/nameless.abi.json create mode 100644 patch/skar-client-0.16.3/tests/api_test.rs diff --git a/Cargo.toml b/Cargo.toml index d9116a9..4f40f82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,12 @@ version = "0.0.0" [lib] crate-type = ["cdylib"] +[package.metadata.patch] +crates = ["skar-client"] + +[patch.crates-io] +skar-client = { path="./patch/skar-client-0.16.3" } + [dependencies] # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix napi = { version = "2", default-features = false, features = [ diff --git a/patch/skar-client-0.16.3/.cargo-ok b/patch/skar-client-0.16.3/.cargo-ok new file mode 100644 index 0000000..5f8b795 --- /dev/null +++ b/patch/skar-client-0.16.3/.cargo-ok @@ -0,0 +1 @@ +{"v":1} \ No newline at end of file diff --git a/patch/skar-client-0.16.3/.cargo_vcs_info.json b/patch/skar-client-0.16.3/.cargo_vcs_info.json new file mode 100644 index 0000000..d2a9589 --- /dev/null +++ b/patch/skar-client-0.16.3/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "598b49bbf4fe44008a0728847b147f21a35ff1e5" + }, + "path_in_vcs": "skar-client" +} \ No newline at end of file diff --git a/patch/skar-client-0.16.3/Cargo.toml b/patch/skar-client-0.16.3/Cargo.toml new file mode 100644 index 0000000..0f62fc2 --- /dev/null +++ b/patch/skar-client-0.16.3/Cargo.toml @@ -0,0 +1,133 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2021" +name = "skar-client" +version = "0.16.3" +description = "client library for skar" +license = "MIT" + +[dependencies.alloy-dyn-abi] +version = "0.6" + +[dependencies.alloy-json-abi] +version = "0.6" + +[dependencies.anyhow] +version = "1" + +[dependencies.arrayvec] +version = "0.7" +features = ["serde"] + +[dependencies.arrow2] +version = "0.18" +features = [ + "io_json", + "io_ipc", + "io_ipc_compression", + "io_parquet_zstd", + "io_parquet_lz4", + "io_parquet", + "compute_cast", +] + +[dependencies.capnp] +version = "0.19" + +[dependencies.faster-hex] +version = "0.9" + +[dependencies.fastrange-rs] +version = "0.1" + +[dependencies.futures] +version = "0.3" + +[dependencies.log] +version = "0.4" + +[dependencies.num_cpus] +version = "1" + +[dependencies.parquet2] +version = "0.17" +features = ["async"] + +[dependencies.rand] +version = "0.8" + +[dependencies.rayon] +version = "1" + +[dependencies.reqwest] +version = "0.11" +features = [ + "json", + "rustls-tls", +] +default-features = false + +[dependencies.ruint] +version = "1" + +[dependencies.serde] +version = "1" +features = ["derive"] + +[dependencies.serde_json] +version = "1" + +[dependencies.skar-format] +version = "0.2.0" + +[dependencies.skar-net-types] +version = "0.2.0" + +[dependencies.skar-schema] +version = "0.1.0" + +[dependencies.tokio] +version = "1" +features = [ + "rt-multi-thread", + "fs", + "test-util", + "rt", + "macros", +] +default-features = false + +[dependencies.tokio-util] +version = "0.7.10" +features = ["compat"] + +[dependencies.url] +version = "2" +features = ["serde"] + +[dependencies.xxhash-rust] +version = "0.8" +features = ["xxh3"] + +[dev-dependencies.env_logger] +version = "0.11" + +[dev-dependencies.hex-literal] +version = "0.4" + +[dev-dependencies.maplit] +version = "1" + +[dev-dependencies.uuid] +version = "1" +features = ["v4"] diff --git a/patch/skar-client-0.16.3/Cargo.toml.orig b/patch/skar-client-0.16.3/Cargo.toml.orig new file mode 100644 index 0000000..372cf27 --- /dev/null +++ b/patch/skar-client-0.16.3/Cargo.toml.orig @@ -0,0 +1,58 @@ +[package] +name = "skar-client" +version = "0.16.3" +edition = "2021" +description = "client library for skar" +license = "MIT" + +[dependencies] +anyhow = "1" +url = { version = "2", features = ["serde"] } +arrow2 = { version = "0.18", features = [ + "io_json", + "io_ipc", + "io_ipc_compression", + "io_parquet_zstd", + "io_parquet_lz4", + "io_parquet", + "compute_cast", +] } +parquet2 = { version = "0.17", features = ["async"] } +serde_json = "1" +capnp = "0.19" +serde = { version = "1", features = ["derive"] } +futures = "0.3" +arrayvec = { version = "0.7", features = ["serde"] } +tokio = { version = "1", default-features = false, features = [ + "rt-multi-thread", + "fs", + "test-util", + "rt", + "macros", +] } +log = "0.4" +fastrange-rs = "0.1" +rand = "0.8" +tokio-util = { version = "0.7.10", features = ["compat"] } +alloy-dyn-abi = "0.6" +alloy-json-abi = "0.6" +xxhash-rust = { version = "0.8", features = ["xxh3"] } +num_cpus = "1" +rayon = "1" +faster-hex = "0.9" +ruint = "1" + +skar-net-types = { path = "../net-types", version = "0.2.0" } +skar-format = { path = "../format", version = "0.2.0" } +skar-schema = { path = "../schema", version = "0.1.0" } + +[dependencies.reqwest] +version = "0.11" +default-features = false +features = ["json", "rustls-tls"] + +[dev-dependencies] +maplit = "1" +hex-literal = "0.4" +uuid = { version = "1", features = ["v4"] } +env_logger = "0.11" diff --git a/patch/skar-client-0.16.3/src/column_mapping.rs b/patch/skar-client-0.16.3/src/column_mapping.rs new file mode 100644 index 0000000..d12da0f --- /dev/null +++ b/patch/skar-client-0.16.3/src/column_mapping.rs @@ -0,0 +1,206 @@ +use std::collections::BTreeMap; + +use anyhow::{anyhow, Context, Result}; +use arrow2::array::{ + Array, BinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, MutablePrimitiveArray, + PrimitiveArray, UInt32Array, UInt64Array, +}; +use arrow2::compute::cast; +use arrow2::datatypes::{DataType as ArrowDataType, Field, Schema}; +use arrow2::types::NativeType; +use rayon::prelude::*; +use ruint::aliases::U256; +use serde::{Deserialize, Serialize}; +use skar_schema::ArrowChunk; + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct ColumnMapping { + #[serde(default)] + pub block: BTreeMap, + #[serde(default)] + pub transaction: BTreeMap, + #[serde(default)] + pub log: BTreeMap, + #[serde(default)] + pub trace: BTreeMap, + #[serde(default)] + pub decoded_log: BTreeMap, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DataType { + Float64, + Float32, + UInt64, + UInt32, + Int64, + Int32, +} + +impl From for ArrowDataType { + fn from(value: DataType) -> Self { + match value { + DataType::Float64 => Self::Float64, + DataType::Float32 => Self::Float32, + DataType::UInt64 => Self::UInt64, + DataType::UInt32 => Self::UInt32, + DataType::Int64 => Self::Int64, + DataType::Int32 => Self::Int32, + } + } +} + +pub fn apply_to_chunk( + chunk: &ArrowChunk, + field_names: &[&str], + mapping: &BTreeMap, +) -> Result { + if mapping.is_empty() { + return Ok(chunk.clone()); + } + + let columns = chunk + .columns() + .par_iter() + .zip(field_names.par_iter()) + .map(|(col, &field_name)| { + let col = match mapping.get(field_name) { + Some(&dt) => map_column(&**col, dt) + .context(format!("apply cast to colum '{}'", field_name))?, + None => col.clone(), + }; + + Ok(col) + }) + .collect::>>()?; + + Ok(ArrowChunk::new(columns)) +} + +/// Warning: This function does not validate the mapping types! +/// So same mapping might fail if applied to actual data even if this function maps the schema normally. +pub fn apply_to_schema(schema: &Schema, mapping: &BTreeMap) -> Result { + let fields = schema + .fields + .iter() + .map(|field| match mapping.get(&field.name) { + Some(&dt) => Field::new(&field.name, dt.into(), field.is_nullable), + None => field.clone(), + }) + .collect::>(); + + Ok(Schema::from(fields)) +} + +pub fn map_column(col: &dyn Array, target_data_type: DataType) -> Result> { + fn to_box(arr: T) -> Box { + Box::new(arr) + } + + match target_data_type { + DataType::Float64 => map_to_f64(col).map(to_box), + DataType::Float32 => map_to_f32(col).map(to_box), + DataType::UInt64 => map_to_uint64(col).map(to_box), + DataType::UInt32 => map_to_uint32(col).map(to_box), + DataType::Int64 => map_to_int64(col).map(to_box), + DataType::Int32 => map_to_int32(col).map(to_box), + } +} + +fn map_to_f64(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::Float64, + )), + dt => Err(anyhow!("Can't convert {:?} to f64", dt)), + } +} + +fn map_to_f32(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::Float32, + )), + dt => Err(anyhow!("Can't convert {:?} to f32", dt)), + } +} + +fn map_to_uint64(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::UInt64, + )), + dt => Err(anyhow!("Can't convert {:?} to uint64", dt)), + } +} + +fn map_to_uint32(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::UInt32, + )), + dt => Err(anyhow!("Can't convert {:?} to uint32", dt)), + } +} + +fn map_to_int64(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::Int64, + )), + dt => Err(anyhow!("Can't convert {:?} to int64", dt)), + } +} + +fn map_to_int32(col: &dyn Array) -> Result { + match col.data_type() { + &ArrowDataType::Binary => { + binary_to_target_array(col.as_any().downcast_ref::>().unwrap()) + } + &ArrowDataType::UInt64 => Ok(cast::primitive_as_primitive( + col.as_any().downcast_ref::().unwrap(), + &ArrowDataType::Int32, + )), + dt => Err(anyhow!("Can't convert {:?} to int32", dt)), + } +} + +fn binary_to_target_array>( + src: &BinaryArray, +) -> Result> { + let mut out = MutablePrimitiveArray::with_capacity(src.len()); + + for val in src.iter() { + out.push(val.map(binary_to_target).transpose()?); + } + + Ok(out.into()) +} + +fn binary_to_target>(src: &[u8]) -> Result { + let big_num = U256::from_be_slice(src); + big_num + .try_into() + .map_err(|_e| anyhow!("failed to cast number to requested type")) +} diff --git a/patch/skar-client-0.16.3/src/config.rs b/patch/skar-client-0.16.3/src/config.rs new file mode 100644 index 0000000..b79c00e --- /dev/null +++ b/patch/skar-client-0.16.3/src/config.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; +use std::num::NonZeroU64; +use url::Url; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Config { + /// Url of the source hypersync instance + pub url: Url, + /// Optional bearer_token to put into http requests made to source hypersync instance + pub bearer_token: Option, + /// Timout treshold for a single http request in milliseconds, default is 30 seconds (30_000ms) + #[serde(default = "default_http_req_timeout_millis")] + pub http_req_timeout_millis: NonZeroU64, +} + +pub fn default_http_req_timeout_millis() -> NonZeroU64 { + NonZeroU64::new(30000).unwrap() +} diff --git a/patch/skar-client-0.16.3/src/decode.rs b/patch/skar-client-0.16.3/src/decode.rs new file mode 100644 index 0000000..58f6b1c --- /dev/null +++ b/patch/skar-client-0.16.3/src/decode.rs @@ -0,0 +1,159 @@ +use std::collections::HashMap as StdHashMap; + +use alloy_dyn_abi::{DecodedEvent, DynSolEvent, DynSolType, ResolveSolEvent}; +use alloy_json_abi::JsonAbi; +use anyhow::{Context, Result}; +use arrow2::array::BinaryArray; +use xxhash_rust::xxh3::Xxh3Builder; + +use crate::ArrowBatch; + +pub type FastMap = StdHashMap; + +pub struct Decoder { + contracts: FastMap, FastMap, DynSolEvent>>, +} + +impl Decoder { + pub fn new(json_abis: &[(skar_format::Address, JsonAbi)]) -> Result { + let mut contracts = FastMap::default(); + + for (addr, abi) in json_abis.iter() { + let mut event_map = FastMap::default(); + + for (_, events) in abi.events.iter() { + for event in events { + event_map.insert( + event.selector().to_vec(), + event.resolve().context("resolve event")?, + ); + } + } + + contracts.insert(addr.to_vec(), event_map.clone()); + } + + Ok(Self { contracts }) + } + + #[inline] + pub fn decode( + &self, + address: &[u8], + topic0: &[u8], + topics: &[Option<&[u8]>], + data: &[u8], + ) -> Result> { + let event = match self.contracts.get(address) { + Some(contract) => match contract.get(topic0) { + Some(event) => event, + None => { + if let Some(event) = self.contracts.iter().find_map(|(address, contract)| { + if (*address).as_slice()[0..19] == [0u8; 19] { + contract.get(topic0) + } else { + None + } + }) { + event + } else { + return Ok(None); + } + } + }, + None => { + if let Some(event) = self.contracts.iter().find_map(|(address, contract)| { + if (*address).as_slice()[0..19] == [0u8; 19] { + contract.get(topic0) + } else { + None + } + }) { + event + } else { + return Ok(None); + } + } + }; + + let topics = topics + .iter() + .filter_map(|&t| t.map(|t| t.try_into().unwrap())); + + // Check if we are decoding a single u256 and the body is empty + // + // This case can happen when decoding zero value erc20 transfers + let decoded = if data.is_empty() && event.body() == [DynSolType::Uint(256)] { + event + .decode_log_parts(topics, [0; 32].as_slice(), false) + .context("decode log parts")? + } else { + event + .decode_log_parts(topics, data, false) + .context("decode log parts")? + }; + + Ok(Some(decoded)) + } + + pub fn decode_logs(&self, logs: &[ArrowBatch]) -> Result>>> { + let mut events = Vec::new(); + + for batch in logs { + let address = match batch.column::>("address") { + Ok(address) => address, + Err(_) => return Ok(None), + }; + let data = match batch.column::>("data") { + Ok(data) => data, + Err(_) => return Ok(None), + }; + let topic0 = match batch.column::>("topic0") { + Ok(topic0) => topic0, + Err(_) => return Ok(None), + }; + let topic1 = match batch.column::>("topic1") { + Ok(topic1) => topic1, + Err(_) => return Ok(None), + }; + let topic2 = match batch.column::>("topic2") { + Ok(topic2) => topic2, + Err(_) => return Ok(None), + }; + let topic3 = match batch.column::>("topic3") { + Ok(topic3) => topic3, + Err(_) => return Ok(None), + }; + + for (((((address, data), topic0), topic1), topic2), topic3) in address + .values_iter() + .zip(data.values_iter()) + .zip(topic0.iter()) + .zip(topic1.iter()) + .zip(topic2.iter()) + .zip(topic3.iter()) + { + let topic0 = match topic0 { + Some(topic0) => topic0, + None => { + events.push(None); + continue; + } + }; + + let decoded = self + .decode( + address, + topic0, + &[Some(topic0), topic1, topic2, topic3], + data, + ) + .context("decode event")?; + + events.push(decoded); + } + } + + Ok(Some(events)) + } +} diff --git a/patch/skar-client-0.16.3/src/lib.rs b/patch/skar-client-0.16.3/src/lib.rs new file mode 100644 index 0000000..9f18abb --- /dev/null +++ b/patch/skar-client-0.16.3/src/lib.rs @@ -0,0 +1,465 @@ +use std::{cmp, collections::BTreeSet, error::Error, time::Duration}; + +use anyhow::{anyhow, Context, Result}; +use arrayvec::ArrayVec; +use arrow2::{array::Array, chunk::Chunk}; +use format::{Address, LogArgument}; +use futures::StreamExt; +use reqwest::Method; +use skar_net_types::{ + skar_net_types_capnp, ArchiveHeight, FieldSelection, LogSelection, Query, RollbackGuard, + TransactionSelection, +}; + +mod column_mapping; +pub mod config; +mod decode; +mod parquet_out; +mod rayon_async; +mod transport_format; +mod types; + +pub use column_mapping::{ColumnMapping, DataType}; +pub use config::Config; +pub use decode::Decoder; +pub use skar_format as format; +use tokio::sync::mpsc; +pub use transport_format::{ArrowIpc, TransportFormat}; +pub use types::{ArrowBatch, ParquetConfig, QueryResponse, QueryResponseData, StreamConfig}; + +pub type ArrowChunk = Chunk>; + +#[derive(Clone)] +pub struct Client { + http_client: reqwest::Client, + cfg: Config, +} + +impl Client { + /// Create a new client with given config + pub fn new(cfg: Config) -> Result { + let http_client = reqwest::Client::builder() + .no_gzip() + .http1_only() + .timeout(Duration::from_millis(cfg.http_req_timeout_millis.get())) + .tcp_keepalive(Duration::from_secs(7200)) + .connect_timeout(Duration::from_millis(cfg.http_req_timeout_millis.get())) + .build() + .unwrap(); + + Ok(Self { http_client, cfg }) + } + + /// Create a parquet file by executing a query. + /// + /// Path should point to a folder that will contain the parquet files in the end. + pub async fn create_parquet_folder(&self, query: Query, config: ParquetConfig) -> Result<()> { + parquet_out::create_parquet_folder(self, query, config).await + } + + /// Get the height of the source hypersync instance + pub async fn get_height(&self) -> Result { + let mut url = self.cfg.url.clone(); + let mut segments = url.path_segments_mut().ok().context("get path segments")?; + segments.push("height"); + std::mem::drop(segments); + let mut req = self.http_client.request(Method::GET, url); + + if let Some(bearer_token) = &self.cfg.bearer_token { + req = req.bearer_auth(bearer_token); + } + + let res = req.send().await.context("execute http req")?; + + let status = res.status(); + if !status.is_success() { + return Err(anyhow!("http response status code {}", status)); + } + + let height: ArchiveHeight = res.json().await.context("read response body json")?; + + Ok(height.height.unwrap_or(0)) + } + + /// Get the height of the source hypersync instance + /// Internally calls get_height. + /// On an error from the source hypersync instance, sleeps for + /// 1 second (increasing by 1 each failure up to max of 5 seconds) + /// and retries query until success. + pub async fn get_height_with_retry(&self) -> Result { + let mut base = 1; + + loop { + match self.get_height().await { + Ok(res) => return Ok(res), + Err(e) => { + log::error!("failed to send request to skar server: {:?}", e); + } + } + + let secs = Duration::from_secs(base); + let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000)); + + tokio::time::sleep(secs + millis).await; + + base = std::cmp::min(base + 1, 5); + } + } + + pub async fn stream( + &self, + query: Query, + config: StreamConfig, + ) -> Result>> { + let (tx, rx) = mpsc::channel(config.concurrency); + + let to_block = match query.to_block { + Some(to_block) => to_block, + None => { + if config.retry { + self.get_height_with_retry().await.context("get height")? + } else { + self.get_height().await.context("get height")? + } + } + }; + + let client = self.clone(); + let step = usize::try_from(config.batch_size).unwrap(); + tokio::spawn(async move { + let futs = (query.from_block..to_block) + .step_by(step) + .map(move |start| { + let end = cmp::min(start + config.batch_size, to_block); + let mut query = query.clone(); + query.from_block = start; + query.to_block = Some(end); + + Self::run_query_to_end(client.clone(), query, config.retry) + }); + + let mut stream = futures::stream::iter(futs).buffered(config.concurrency); + + while let Some(resps) = stream.next().await { + let resps = match resps { + Ok(resps) => resps, + Err(e) => { + tx.send(Err(e)).await.ok(); + return; + } + }; + + for resp in resps { + if tx.send(Ok(resp)).await.is_err() { + return; + } + } + } + }); + + Ok(rx) + } + + async fn run_query_to_end(self, query: Query, retry: bool) -> Result> { + let mut resps = Vec::new(); + + let to_block = query.to_block.unwrap(); + + let mut query = query; + + loop { + let resp = if retry { + self.send_with_retry::(&query) + .await + .context("send query")? + } else { + self.send::(&query) + .await + .context("send query")? + }; + + let next_block = resp.next_block; + + resps.push(resp); + + if next_block >= to_block { + break; + } else { + query.from_block = next_block; + } + } + + Ok(resps) + } + + /// Send a query request to the source hypersync instance. + /// + /// Returns a query response which contains block, tx and log data. + /// Format can be ArrowIpc or Parquet. + pub async fn send(&self, query: &Query) -> Result { + let mut url = self.cfg.url.clone(); + let mut segments = url.path_segments_mut().ok().context("get path segments")?; + segments.push("query"); + segments.push(Format::path()); + std::mem::drop(segments); + let mut req = self.http_client.request(Method::POST, url); + + if let Some(bearer_token) = &self.cfg.bearer_token { + req = req.bearer_auth(bearer_token); + } + + let res = req.json(&query).send().await.context("execute http req")?; + + let status = res.status(); + if !status.is_success() { + let text = res.text().await.context("read text to see error")?; + + return Err(anyhow!( + "http response status code {}, err body: {}", + status, + text + )); + } + + let bytes = res.bytes().await.context("read response body bytes")?; + + let res = tokio::task::block_in_place(|| { + Self::parse_query_response::(&bytes).context("parse query response") + })?; + + Ok(res) + } + + /// Send a query request to the source hypersync instance. + /// Internally calls send. + /// On an error from the source hypersync instance, sleeps for + /// 1 second (increasing by 1 each failure up to max of 5 seconds) + /// and retries query until success. + /// + /// Returns a query response which contains block, tx and log data. + /// Format can be ArrowIpc or Parquet. + pub async fn send_with_retry( + &self, + query: &Query, + ) -> Result { + let mut base = 1; + + loop { + match self.send::(query).await { + Ok(res) => return Ok(res), + Err(e) => { + log::error!("failed to send request to skar server: {:?}", e); + } + } + + let secs = Duration::from_secs(base); + let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000)); + + tokio::time::sleep(secs + millis).await; + + base = std::cmp::min(base + 1, 5); + } + } + + fn parse_query_response(bytes: &[u8]) -> Result { + let mut opts = capnp::message::ReaderOptions::new(); + opts.nesting_limit(i32::MAX).traversal_limit_in_words(None); + let message_reader = + capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?; + + let query_response = message_reader + .get_root::() + .context("get root")?; + + let archive_height = match query_response.get_archive_height() { + -1 => None, + h => Some( + h.try_into() + .context("invalid archive height returned from server")?, + ), + }; + + let rollback_guard = if query_response.has_rollback_guard() { + let rg = query_response + .get_rollback_guard() + .context("get rollback guard")?; + + Some(RollbackGuard { + block_number: rg.get_block_number(), + timestamp: rg.get_timestamp(), + hash: rg + .get_hash() + .context("get rollback guard hash")? + .try_into() + .context("hash size")?, + first_block_number: rg.get_first_block_number(), + first_parent_hash: rg + .get_first_parent_hash() + .context("get rollback guard first parent hash")? + .try_into() + .context("hash size")?, + }) + } else { + None + }; + + let data = query_response.get_data().context("read data")?; + + let blocks = Format::read_chunks(data.get_blocks().context("get data")?) + .context("parse block data")?; + let transactions = Format::read_chunks(data.get_transactions().context("get data")?) + .context("parse tx data")?; + let logs = + Format::read_chunks(data.get_logs().context("get data")?).context("parse log data")?; + let traces = if data.has_traces() { + Format::read_chunks(data.get_traces().context("get data")?) + .context("parse traces data")? + } else { + Vec::new() + }; + + Ok(QueryResponse { + archive_height, + next_block: query_response.get_next_block(), + total_execution_time: query_response.get_total_execution_time(), + data: QueryResponseData { + blocks, + transactions, + logs, + traces, + }, + rollback_guard, + }) + } + + /// Returns a query for all Blocks and Transactions within the block range (from_block, to_block] + /// If to_block is None then query runs to the head of the chain. + pub fn preset_query_blocks_and_transactions(from_block: u64, to_block: Option) -> Query { + let all_block_fields: BTreeSet = skar_schema::block_header() + .fields + .iter() + .map(|x| x.name.clone()) + .collect(); + + let all_tx_fields: BTreeSet = skar_schema::transaction() + .fields + .iter() + .map(|x| x.name.clone()) + .collect(); + + Query { + from_block, + to_block, + transactions: vec![TransactionSelection::default()], + field_selection: FieldSelection { + block: all_block_fields, + transaction: all_tx_fields, + ..Default::default() + }, + ..Default::default() + } + } + + /// Returns a query object for all Blocks and hashes of the Transactions within the block range + /// (from_block, to_block]. Also returns the block_hash and block_number fields on each Transaction + /// so it can be mapped to a block. If to_block is None then query runs to the head of the chain. + pub fn preset_query_blocks_and_transaction_hashes( + from_block: u64, + to_block: Option, + ) -> Query { + let mut tx_field_selection = BTreeSet::new(); + tx_field_selection.insert("block_hash".to_owned()); + tx_field_selection.insert("block_number".to_owned()); + tx_field_selection.insert("hash".to_owned()); + + let all_block_fields: BTreeSet = skar_schema::block_header() + .fields + .iter() + .map(|x| x.name.clone()) + .collect(); + + Query { + from_block, + to_block, + transactions: vec![TransactionSelection::default()], + field_selection: FieldSelection { + block: all_block_fields, + transaction: tx_field_selection, + ..Default::default() + }, + ..Default::default() + } + } + + /// Returns a query object for all Logs within the block range from the given address. + /// If to_block is None then query runs to the head of the chain. + pub fn preset_query_logs(from_block: u64, to_block: Option, address: A) -> Result + where + A: TryInto
, + >::Error: Error + Send + Sync + 'static, + { + let address = address.try_into().context("convert Address type")?; + + let all_log_fields: BTreeSet = skar_schema::log() + .fields + .iter() + .map(|x| x.name.clone()) + .collect(); + + Ok(Query { + from_block, + to_block, + logs: vec![LogSelection { + address: vec![address], + ..Default::default() + }], + field_selection: FieldSelection { + log: all_log_fields, + ..Default::default() + }, + ..Default::default() + }) + } + + /// Returns a query for all Logs within the block range from the given address with a + /// matching topic0 event signature. Topic0 is the keccak256 hash of the event signature. + /// If to_block is None then query runs to the head of the chain. + pub fn preset_query_logs_of_event( + from_block: u64, + to_block: Option, + topic0: T, + address: A, + ) -> Result + where + A: TryInto
, + >::Error: Error + Send + Sync + 'static, + T: TryInto, + >::Error: Error + Send + Sync + 'static, + { + let topic0 = topic0.try_into().context("convert Topic0 type")?; + let mut topics = ArrayVec::, 4>::new(); + topics.insert(0, vec![topic0]); + + let address = address.try_into().context("convert Address type")?; + + let all_log_fields: BTreeSet = skar_schema::log() + .fields + .iter() + .map(|x| x.name.clone()) + .collect(); + + Ok(Query { + from_block, + to_block, + logs: vec![LogSelection { + address: vec![address], + topics, + }], + field_selection: FieldSelection { + log: all_log_fields, + ..Default::default() + }, + ..Default::default() + }) + } +} diff --git a/patch/skar-client-0.16.3/src/parquet_out.rs b/patch/skar-client-0.16.3/src/parquet_out.rs new file mode 100644 index 0000000..5dddac1 --- /dev/null +++ b/patch/skar-client-0.16.3/src/parquet_out.rs @@ -0,0 +1,783 @@ +use std::{ + collections::{BTreeMap, BTreeSet, VecDeque}, + path::PathBuf, + sync::Arc, + time::Instant, +}; + +use alloy_dyn_abi::{DynSolType, DynSolValue, ResolveSolEvent}; +use anyhow::{anyhow, Context, Result}; +use arrow2::{ + array::{ + Array, BinaryArray, MutableArray, MutableBinaryArray, MutableBooleanArray, + MutableUtf8Array, Utf8Array, + }, + datatypes::{DataType, Field, Schema}, + io::parquet::{ + read::ParquetError, + write::{ + array_to_columns, to_parquet_schema, to_parquet_type, transverse, CompressedPage, + DynIter, DynStreamingIterator, Encoding, FallibleStreamingIterator, RowGroupIter, + WriteOptions, + }, + }, +}; +use parquet2::write::FileStreamer; +use rayon::prelude::*; +use skar_net_types::Query; +use skar_schema::{concat_chunks, empty_chunk, project_schema}; +use tokio::{sync::mpsc, task::JoinHandle}; +use tokio_util::compat::TokioAsyncReadCompatExt; + +use crate::{ + column_mapping, rayon_async, types::StreamConfig, ArrowBatch, ArrowChunk, Client, ParquetConfig, +}; + +pub async fn create_parquet_folder( + client: &Client, + query: Query, + config: ParquetConfig, +) -> Result<()> { + let path = PathBuf::from(config.path); + + tokio::fs::create_dir_all(&path) + .await + .context("create parquet dir")?; + + let mut blocks_path = path.clone(); + blocks_path.push("blocks.parquet"); + let (mut blocks_sender, blocks_join) = spawn_writer( + blocks_path, + &skar_schema::block_header(), + Some(&query.field_selection.block), + &config.column_mapping.block, + config.hex_output, + )?; + + let mut transactions_path = path.clone(); + transactions_path.push("transactions.parquet"); + let (mut transactions_sender, transactions_join) = spawn_writer( + transactions_path, + &skar_schema::transaction(), + Some(&query.field_selection.transaction), + &config.column_mapping.transaction, + config.hex_output, + )?; + + let mut logs_path = path.clone(); + logs_path.push("logs.parquet"); + let (mut logs_sender, logs_join) = spawn_writer( + logs_path, + &skar_schema::log(), + Some(&query.field_selection.log), + &config.column_mapping.log, + config.hex_output, + )?; + + let mut traces_path = path.clone(); + traces_path.push("traces.parquet"); + let (mut traces_sender, traces_join) = spawn_writer( + traces_path, + &skar_schema::trace(), + Some(&query.field_selection.trace), + &config.column_mapping.trace, + config.hex_output, + )?; + + let event_signature = match &config.event_signature { + Some(sig) => Some(alloy_json_abi::Event::parse(sig).context("parse event signature")?), + None => None, + }; + + let mut decoded_logs_path = path.clone(); + decoded_logs_path.push("decoded_logs.parquet"); + let (mut decoded_logs_sender, decoded_logs_join) = spawn_writer( + decoded_logs_path, + &schema_from_event_signature(&event_signature) + .context("arrow schema from event signature")?, + None, + &config.column_mapping.decoded_log, + config.hex_output, + )?; + + let mut rx = client + .stream::( + query, + StreamConfig { + concurrency: config.concurrency, + batch_size: config.batch_size, + retry: config.retry, + }, + ) + .await + .context("start stream")?; + + while let Some(resp) = rx.recv().await { + let resp = resp.context("get query response")?; + + log::trace!("got data up to block {}", resp.next_block); + + let blocks_fut = async move { + for batch in resp.data.blocks { + blocks_sender + .send(batch) + .await + .context("write blocks chunk to parquet")?; + } + + Ok::<_, anyhow::Error>(blocks_sender) + }; + + let txs_fut = async move { + for batch in resp.data.transactions { + transactions_sender + .send(batch) + .await + .context("write transactions chunk to parquet")?; + } + + Ok::<_, anyhow::Error>(transactions_sender) + }; + + let logs_fut = { + let data = resp.data.logs.clone(); + async move { + for batch in data { + logs_sender + .send(batch) + .await + .context("write logs chunk to parquet")?; + } + + Ok::<_, anyhow::Error>(logs_sender) + } + }; + + let traces_fut = async move { + for batch in resp.data.traces { + traces_sender + .send(batch) + .await + .context("write traces chunk to parquet")?; + } + + Ok::<_, anyhow::Error>(traces_sender) + }; + + let sig = Arc::new(event_signature.clone()); + let decoded_logs_fut = async move { + for batch in resp.data.logs { + let sig = sig.clone(); + let batch = rayon_async::spawn(move || decode_logs_batch(&sig, batch)) + .await + .context("join decode logs task")? + .context("decode logs")?; + + decoded_logs_sender + .send(batch) + .await + .context("write decoded_logs chunk to parquet")?; + } + + Ok::<_, anyhow::Error>(decoded_logs_sender) + }; + + let start = Instant::now(); + + ( + blocks_sender, + transactions_sender, + logs_sender, + traces_sender, + decoded_logs_sender, + ) = futures::future::try_join5(blocks_fut, txs_fut, logs_fut, traces_fut, decoded_logs_fut) + .await + .context("write to parquet")?; + + log::trace!("wrote to parquet in {} ms", start.elapsed().as_millis()); + } + + std::mem::drop(blocks_sender); + std::mem::drop(transactions_sender); + std::mem::drop(logs_sender); + std::mem::drop(traces_sender); + std::mem::drop(decoded_logs_sender); + + blocks_join + .await + .context("join blocks task")? + .context("finish blocks file")?; + transactions_join + .await + .context("join transactions task")? + .context("finish transactions file")?; + logs_join + .await + .context("join logs task")? + .context("finish logs file")?; + traces_join + .await + .context("join traces task")? + .context("finish traces file")?; + decoded_logs_join + .await + .context("join decoded_logs task")? + .context("finish decoded_logs file")?; + + Ok(()) +} + +fn hex_encode_chunk(chunk: &ArrowChunk) -> anyhow::Result { + let cols = chunk + .columns() + .par_iter() + .map(|col| { + let col = match col.data_type() { + DataType::Binary => Box::new(hex_encode(col.as_any().downcast_ref().unwrap())), + _ => col.clone(), + }; + + Ok::<_, anyhow::Error>(col) + }) + .collect::>>()?; + + Ok(ArrowChunk::new(cols)) +} + +fn hex_encode(input: &BinaryArray) -> Utf8Array { + let mut arr = MutableUtf8Array::new(); + + for buf in input.iter() { + arr.push(buf.map(faster_hex::hex_string)); + } + + arr.into() +} + +fn spawn_writer( + path: PathBuf, + schema: &Schema, + field_selection: Option<&BTreeSet>, + mapping: &BTreeMap, + hex_output: bool, +) -> Result<(mpsc::Sender, JoinHandle>)> { + let schema = if let Some(field_selection) = field_selection { + project_schema(schema, field_selection).context("project schema")? + } else { + schema.clone() + }; + let schema = column_mapping::apply_to_schema(&schema, mapping) + .context("apply column mapping to schema")?; + let schema = if hex_output { + Schema::from( + schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Binary => { + Field::new(field.name.clone(), DataType::Utf8, field.is_nullable) + } + _ => field.clone(), + }) + .collect::>(), + ) + } else { + schema + }; + + let (tx, rx) = mpsc::channel(64); + + let mapping = Arc::new(mapping.clone()); + + let handle = tokio::task::spawn(async move { + match run_writer(rx, path, schema, mapping, hex_output).await { + Ok(v) => Ok(v), + Err(e) => { + log::error!("failed to run parquet writer: {:?}", e); + Err(e) + } + } + }); + + Ok((tx, handle)) +} + +async fn run_writer( + mut rx: mpsc::Receiver, + path: PathBuf, + schema: Schema, + mapping: Arc>, + hex_output: bool, +) -> Result<()> { + let write_options = parquet2::write::WriteOptions { + write_statistics: true, + version: parquet2::write::Version::V2, + }; + + let file = tokio::fs::File::create(&path) + .await + .context("create parquet file")? + .compat(); + + let parquet_schema = to_parquet_schema(&schema).context("to parquet schema")?; + + let schema = Arc::new(schema); + + let mut writer = FileStreamer::new(file, parquet_schema, write_options, None); + + let num_cpus = num_cpus::get(); + let mut encode_jobs = VecDeque::::with_capacity(num_cpus); + + let mut data = Vec::new(); + let mut total_rows = 0; + loop { + let mut stop = false; + if let Some(batch) = rx.recv().await { + total_rows += batch.chunk.len(); + data.push(batch); + } else { + stop = true; + } + + if !data.is_empty() && (stop || total_rows >= ROW_GROUP_MAX_ROWS) { + if encode_jobs.len() >= num_cpus { + let fut = encode_jobs.pop_front().unwrap(); + let rg = fut + .await + .context("join prepare task")? + .context("prepare row group")?; + writer + .write(rg) + .await + .context("write encoded row group to file")?; + } + + let batches = std::mem::take(&mut data); + total_rows = 0; + let chunks = batches + .into_iter() + .map(|b| Arc::new(b.chunk)) + .collect::>(); + + let chunk = concat_chunks(chunks.as_slice()).context("concat chunks")?; + + let mapping = mapping.clone(); + let schema = schema.clone(); + let fut = rayon_async::spawn(move || { + let field_names = schema + .fields + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + let chunk = column_mapping::apply_to_chunk(&chunk, &field_names, &mapping) + .context("apply column mapping to batch")?; + + let chunk = if hex_output { + hex_encode_chunk(&chunk).context("hex encode batch")? + } else { + chunk + }; + + encode_row_group( + ArrowBatch { chunk, schema }, + WriteOptions { + write_statistics: true, + version: arrow2::io::parquet::write::Version::V2, + compression: arrow2::io::parquet::write::CompressionOptions::Lz4Raw, + data_pagesize_limit: None, + }, + ) + }); + + encode_jobs.push_back(fut); + } + + if stop { + break; + } + } + + while let Some(fut) = encode_jobs.pop_front() { + let rg = fut + .await + .context("join prepare task")? + .context("prepare row group")?; + writer + .write(rg) + .await + .context("write encoded row group to file")?; + } + + let _size = writer.end(None).await.context("write footer")?; + + Ok(()) +} + +type EncodeFut = tokio::sync::oneshot::Receiver< + Result< + DynIter< + 'static, + std::result::Result< + DynStreamingIterator<'static, CompressedPage, arrow2::error::Error>, + arrow2::error::Error, + >, + >, + >, +>; + +fn encode_row_group( + batch: ArrowBatch, + write_options: WriteOptions, +) -> Result> { + let fields = batch + .schema + .fields + .iter() + .map(|field| to_parquet_type(field).context("map to parquet field")) + .collect::>>()?; + let encodings = batch + .schema + .fields + .iter() + .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .collect::>(); + + let data = batch + .chunk + .into_arrays() + .into_iter() + .zip(fields) + .zip(encodings) + .flat_map(move |((array, type_), encoding)| { + let encoded_columns = array_to_columns(array, type_, write_options, &encoding).unwrap(); + encoded_columns + .into_iter() + .map(|encoded_pages| { + let pages = encoded_pages; + + let pages = DynIter::new( + pages + .into_iter() + .map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))), + ); + + let compressed_pages = pages + .map(|page| { + let page = page?; + arrow2::io::parquet::write::compress( + page, + vec![], + write_options.compression, + ) + .map_err(arrow2::error::Error::from) + }) + .collect::>(); + + Ok(DynStreamingIterator::new(CompressedPageIter { + data: compressed_pages.into_iter(), + current: None, + })) + }) + .collect::>() + }) + .collect::>(); + Ok(DynIter::new(data.into_iter())) +} + +struct CompressedPageIter { + data: std::vec::IntoIter>, + current: Option, +} + +impl FallibleStreamingIterator for CompressedPageIter { + type Item = CompressedPage; + type Error = arrow2::error::Error; + + fn get(&self) -> Option<&Self::Item> { + self.current.as_ref() + } + + fn advance(&mut self) -> std::result::Result<(), Self::Error> { + self.current = match self.data.next() { + Some(page) => Some(page?), + None => None, + }; + Ok(()) + } +} + +const ROW_GROUP_MAX_ROWS: usize = 10_000; + +fn decode_logs_batch(sig: &Option, batch: ArrowBatch) -> Result { + let schema = + schema_from_event_signature(sig).context("build arrow schema from event signature")?; + + if batch.chunk.is_empty() { + return Ok(ArrowBatch { + chunk: empty_chunk(&schema), + schema: Arc::new(schema), + }); + } + + let sig = match sig { + Some(sig) => sig, + None => { + return Ok(ArrowBatch { + chunk: empty_chunk(&schema), + schema: Arc::new(schema), + }) + } + }; + + let event = sig.resolve().context("resolve signature into event")?; + + let topic_cols = event + .indexed() + .iter() + .zip(["topic1", "topic2", "topic3"].iter()) + .map(|(decoder, topic_name)| { + let col = batch + .column::>(topic_name) + .context("get column")?; + let col = decode_col(col, decoder).context("decode column")?; + Ok::<_, anyhow::Error>(col) + }) + .collect::>>()?; + + let body_cols = if event.body() == [DynSolType::Uint(256)] { + let data = batch + .column::>("data") + .context("get column")?; + vec![decode_erc20_amount(data, &DynSolType::Uint(256)).context("decode amount column")?] + } else if !event.body().is_empty() { + let data = batch + .column::>("data") + .context("get column")?; + + let tuple_decoder = DynSolType::Tuple(event.body().to_vec()); + + let mut decoded_tuples = Vec::with_capacity(data.len()); + for val in data.values_iter() { + let tuple = tuple_decoder + .abi_decode(val) + .context("decode body tuple")? + .as_tuple() + .context("expected tuple after decoding")? + .to_vec(); + + if tuple.len() != event.body().len() { + return Err(anyhow!( + "expected tuple of length {} after decoding", + event.body().len() + )); + } + + decoded_tuples.push(tuple); + } + + let mut decoded_cols = Vec::with_capacity(event.body().len()); + + for (i, ty) in event.body().iter().enumerate() { + decoded_cols.push( + decode_body_col(decoded_tuples.iter().map(|t| t.get(i).unwrap()), ty) + .context("decode body column")?, + ); + } + + decoded_cols + } else { + Vec::new() + }; + + let mut cols = topic_cols; + cols.extend_from_slice(&body_cols); + + let chunk = ArrowChunk::try_new(cols).context("create arrow chunk")?; + + Ok(ArrowBatch { + chunk, + schema: Arc::new(schema), + }) +} + +fn decode_body_col<'a, I: ExactSizeIterator>( + vals: I, + ty: &DynSolType, +) -> Result> { + match ty { + DynSolType::Bool => { + let mut builder = MutableBooleanArray::with_capacity(vals.len()); + + for val in vals { + match val { + DynSolValue::Bool(b) => builder.push(Some(*b)), + v => { + return Err(anyhow!( + "unexpected output type from decode: {:?}", + v.as_type() + )) + } + } + } + + Ok(builder.as_box()) + } + _ => { + let mut builder = MutableBinaryArray::::new(); + + for val in vals { + match val { + DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())), + DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())), + DynSolValue::FixedBytes(v, _) => builder.push(Some(v)), + DynSolValue::Address(v) => builder.push(Some(v)), + DynSolValue::Bytes(v) => builder.push(Some(v)), + DynSolValue::String(v) => builder.push(Some(v)), + v => { + return Err(anyhow!( + "unexpected output type from decode: {:?}", + v.as_type() + )) + } + } + } + + Ok(builder.as_box()) + } + } +} + +fn decode_erc20_amount(data: &BinaryArray, decoder: &DynSolType) -> Result> { + let mut builder = MutableBinaryArray::::new(); + + for val in data.values_iter() { + // Check if we are decoding a single u256 and the body is empty + // + // This case can happen when decoding zero value erc20 transfers + let v = if val.is_empty() { + [0; 32].as_slice() + } else { + val + }; + + match decoder.abi_decode(v).context("decode val")? { + DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())), + v => { + return Err(anyhow!( + "unexpected output type from decode: {:?}", + v.as_type() + )) + } + } + } + + Ok(builder.as_box()) +} + +fn decode_col(col: &BinaryArray, decoder: &DynSolType) -> Result> { + match decoder { + DynSolType::Bool => { + let mut builder = MutableBooleanArray::with_capacity(col.len()); + + for val in col.iter() { + let val = val.context("found null value")?; + match decoder.abi_decode(val).context("decode sol value")? { + DynSolValue::Bool(b) => builder.push(Some(b)), + v => { + return Err(anyhow!( + "unexpected output type from decode: {:?}", + v.as_type() + )) + } + } + } + + Ok(builder.as_box()) + } + _ => { + let mut builder = MutableBinaryArray::::new(); + + for val in col.iter() { + let val = val.context("found null value")?; + + match decoder.abi_decode(val).context("decode sol value")? { + DynSolValue::Int(v, _) => builder.push(Some(v.to_be_bytes::<32>())), + DynSolValue::Uint(v, _) => builder.push(Some(v.to_be_bytes::<32>())), + DynSolValue::FixedBytes(v, _) => builder.push(Some(v)), + DynSolValue::Address(v) => builder.push(Some(v)), + DynSolValue::Bytes(v) => builder.push(Some(v)), + DynSolValue::String(v) => builder.push(Some(v)), + v => { + return Err(anyhow!( + "unexpected output type from decode: {:?}", + v.as_type() + )) + } + } + } + + Ok(builder.as_box()) + } + } +} + +fn schema_from_event_signature(sig: &Option) -> Result { + let sig = match sig { + Some(sig) => sig, + None => { + return Ok(Schema::from(vec![Field::new( + "dummy", + DataType::Boolean, + true, + )])); + } + }; + + let event = sig.resolve().context("resolve signature into event")?; + + let mut fields: Vec = Vec::with_capacity(sig.inputs.len()); + + for (input, resolved_type) in sig + .inputs + .iter() + .zip(event.indexed().iter().chain(event.body().iter())) + { + if input.name.is_empty() { + return Err(anyhow!("empty param names are not supported")); + } + + if fields + .iter() + .any(|f| f.name.as_str() == input.name.as_str()) + { + return Err(anyhow!("duplicate param name: {}", input.name)); + } + + let ty = DynSolType::parse(&input.ty).context("parse solidity type")?; + + if &ty != resolved_type { + return Err(anyhow!("Internal error: Parsed type doesn't match resolved type. This should never happen.")); + } + + let dt = simple_type_to_data_type(&ty).context("convert simple type to arrow datatype")?; + fields.push(Field::new(input.name.clone(), dt, false)); + } + + Ok(Schema::from(fields)) +} + +fn simple_type_to_data_type(ty: &DynSolType) -> Result { + match ty { + DynSolType::Bool => Ok(DataType::Boolean), + DynSolType::Int(_) => Ok(DataType::Binary), + DynSolType::Uint(_) => Ok(DataType::Binary), + DynSolType::FixedBytes(_) => Ok(DataType::Binary), + DynSolType::Address => Ok(DataType::Binary), + DynSolType::Bytes => Ok(DataType::Binary), + DynSolType::String => Ok(DataType::Binary), + ty => Err(anyhow!( + "Complex types are not supported. Unexpected type: {}", + ty + )), + } +} diff --git a/patch/skar-client-0.16.3/src/rayon_async.rs b/patch/skar-client-0.16.3/src/rayon_async.rs new file mode 100644 index 0000000..cecd585 --- /dev/null +++ b/patch/skar-client-0.16.3/src/rayon_async.rs @@ -0,0 +1,16 @@ +use tokio::sync::oneshot; + +pub fn spawn(func: F) -> oneshot::Receiver +where + F: 'static + FnOnce() -> T + Send, + T: 'static + Send + Sync, +{ + let (tx, rx) = oneshot::channel(); + + rayon::spawn(move || { + let res = func(); + tx.send(res).ok(); + }); + + rx +} diff --git a/patch/skar-client-0.16.3/src/transport_format.rs b/patch/skar-client-0.16.3/src/transport_format.rs new file mode 100644 index 0000000..efd165f --- /dev/null +++ b/patch/skar-client-0.16.3/src/transport_format.rs @@ -0,0 +1,39 @@ +use std::{io::Cursor, sync::Arc}; + +use crate::ArrowBatch; +use anyhow::{Context, Result}; +use arrow2::io::ipc; + +pub trait TransportFormat { + fn read_chunks(bytes: &[u8]) -> Result>; + fn path() -> &'static str; +} + +pub struct ArrowIpc; + +impl TransportFormat for ArrowIpc { + fn read_chunks(bytes: &[u8]) -> Result> { + let mut reader = Cursor::new(bytes); + + let metadata = ipc::read::read_file_metadata(&mut reader).context("read metadata")?; + + let schema = Arc::new(metadata.schema.clone()); + + let reader = ipc::read::FileReader::new(reader, metadata, None, None); + + let chunks = reader + .map(|chunk| { + chunk.context("read chunk").map(|chunk| ArrowBatch { + chunk, + schema: schema.clone(), + }) + }) + .collect::>>()?; + + Ok(chunks) + } + + fn path() -> &'static str { + "arrow-ipc" + } +} diff --git a/patch/skar-client-0.16.3/src/types.rs b/patch/skar-client-0.16.3/src/types.rs new file mode 100644 index 0000000..ceaaa6e --- /dev/null +++ b/patch/skar-client-0.16.3/src/types.rs @@ -0,0 +1,104 @@ +use crate::{column_mapping::ColumnMapping, ArrowChunk}; +use anyhow::{anyhow, Context, Result}; +use arrow2::datatypes::SchemaRef; +use serde::{Deserialize, Serialize}; +use skar_net_types::RollbackGuard; + +#[derive(Debug, Clone)] +pub struct QueryResponseData { + pub blocks: Vec, + pub transactions: Vec, + pub logs: Vec, + pub traces: Vec, +} + +#[derive(Debug, Clone)] +pub struct QueryResponse { + /// Current height of the source hypersync instance + pub archive_height: Option, + /// Next block to query for, the responses are paginated so + /// the caller should continue the query from this block if they + /// didn't get responses up to the to_block they specified in the Query. + pub next_block: u64, + /// Total time it took the hypersync instance to execute the query. + pub total_execution_time: u64, + /// Response data + pub data: QueryResponseData, + /// Rollback guard + pub rollback_guard: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamConfig { + #[serde(default = "default_batch_size")] + /// Block range size to use when making individual requests. + pub batch_size: u64, + #[serde(default = "default_concurrency")] + /// Controls the number of concurrent requests made to hypersync server. + pub concurrency: usize, + /// Requests are retried forever internally if this param is set to true. + #[serde(default)] + pub retry: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParquetConfig { + /// Path to write parquet files to + pub path: String, + /// Define type mapping for output columns + #[serde(default)] + pub column_mapping: ColumnMapping, + /// Event signature to parse the logs with. example: Transfer(address indexed from, address indexed to, uint256 amount) + pub event_signature: Option, + /// Convert binary output columns to hex + #[serde(default)] + pub hex_output: bool, + #[serde(default = "default_batch_size")] + /// Block range size to use when making individual requests. + pub batch_size: u64, + #[serde(default = "default_concurrency")] + /// Controls the number of concurrent requests made to hypersync server. + pub concurrency: usize, + /// Requests are retried forever internally if this param is set to true. + #[serde(default)] + pub retry: bool, +} + +fn default_batch_size() -> u64 { + 400 +} + +fn default_concurrency() -> usize { + 10 +} + +#[derive(Debug, Clone)] +pub struct ArrowBatch { + pub chunk: ArrowChunk, + pub schema: SchemaRef, +} + +impl ArrowBatch { + pub fn column(&self, name: &str) -> Result<&T> { + match self + .schema + .fields + .iter() + .enumerate() + .find(|(_, f)| f.name == name) + { + Some((idx, _)) => { + let col = self + .chunk + .columns() + .get(idx) + .context("get column")? + .as_any() + .downcast_ref::() + .context("cast column type")?; + Ok(col) + } + None => Err(anyhow!("field {} not found in schema", name)), + } + } +} diff --git a/patch/skar-client-0.16.3/test-data/ens_token_abi.json b/patch/skar-client-0.16.3/test-data/ens_token_abi.json new file mode 100644 index 0000000..1eef4e3 --- /dev/null +++ b/patch/skar-client-0.16.3/test-data/ens_token_abi.json @@ -0,0 +1 @@ +[{"inputs":[{"internalType":"uint256","name":"freeSupply","type":"uint256"},{"internalType":"uint256","name":"airdropSupply","type":"uint256"},{"internalType":"uint256","name":"_claimPeriodEnds","type":"uint256"}],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"owner","type":"address"},{"indexed":true,"internalType":"address","name":"spender","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"claimant","type":"address"},{"indexed":false,"internalType":"uint256","name":"amount","type":"uint256"}],"name":"Claim","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"delegator","type":"address"},{"indexed":true,"internalType":"address","name":"fromDelegate","type":"address"},{"indexed":true,"internalType":"address","name":"toDelegate","type":"address"}],"name":"DelegateChanged","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"delegate","type":"address"},{"indexed":false,"internalType":"uint256","name":"previousBalance","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"newBalance","type":"uint256"}],"name":"DelegateVotesChanged","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"bytes32","name":"merkleRoot","type":"bytes32"}],"name":"MerkleRootChanged","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"previousOwner","type":"address"},{"indexed":true,"internalType":"address","name":"newOwner","type":"address"}],"name":"OwnershipTransferred","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"from","type":"address"},{"indexed":true,"internalType":"address","name":"to","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Transfer","type":"event"},{"inputs":[],"name":"DOMAIN_SEPARATOR","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"owner","type":"address"},{"internalType":"address","name":"spender","type":"address"}],"name":"allowance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"approve","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"},{"internalType":"uint32","name":"pos","type":"uint32"}],"name":"checkpoints","outputs":[{"components":[{"internalType":"uint32","name":"fromBlock","type":"uint32"},{"internalType":"uint224","name":"votes","type":"uint224"}],"internalType":"struct ERC20Votes.Checkpoint","name":"","type":"tuple"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"claimPeriodEnds","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"amount","type":"uint256"},{"internalType":"address","name":"delegate","type":"address"},{"internalType":"bytes32[]","name":"merkleProof","type":"bytes32[]"}],"name":"claimTokens","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"subtractedValue","type":"uint256"}],"name":"decreaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"delegatee","type":"address"}],"name":"delegate","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"delegatee","type":"address"},{"internalType":"uint256","name":"nonce","type":"uint256"},{"internalType":"uint256","name":"expiry","type":"uint256"},{"internalType":"uint8","name":"v","type":"uint8"},{"internalType":"bytes32","name":"r","type":"bytes32"},{"internalType":"bytes32","name":"s","type":"bytes32"}],"name":"delegateBySig","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"delegates","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"uint256","name":"blockNumber","type":"uint256"}],"name":"getPastTotalSupply","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"},{"internalType":"uint256","name":"blockNumber","type":"uint256"}],"name":"getPastVotes","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"getVotes","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"addedValue","type":"uint256"}],"name":"increaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"index","type":"uint256"}],"name":"isClaimed","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"merkleRoot","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"minimumMintInterval","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"dest","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"mint","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"mintCap","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"name","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"nextMint","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"owner","type":"address"}],"name":"nonces","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"numCheckpoints","outputs":[{"internalType":"uint32","name":"","type":"uint32"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"owner","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"owner","type":"address"},{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"value","type":"uint256"},{"internalType":"uint256","name":"deadline","type":"uint256"},{"internalType":"uint8","name":"v","type":"uint8"},{"internalType":"bytes32","name":"r","type":"bytes32"},{"internalType":"bytes32","name":"s","type":"bytes32"}],"name":"permit","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"renounceOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"bytes32","name":"_merkleRoot","type":"bytes32"}],"name":"setMerkleRoot","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"dest","type":"address"}],"name":"sweep","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"symbol","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"totalSupply","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transfer","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"sender","type":"address"},{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transferFrom","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"newOwner","type":"address"}],"name":"transferOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"}] diff --git a/patch/skar-client-0.16.3/test-data/erc20.abi.json b/patch/skar-client-0.16.3/test-data/erc20.abi.json new file mode 100644 index 0000000..06b572d --- /dev/null +++ b/patch/skar-client-0.16.3/test-data/erc20.abi.json @@ -0,0 +1,222 @@ +[ + { + "constant": true, + "inputs": [], + "name": "name", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_spender", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "approve", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "totalSupply", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_from", + "type": "address" + }, + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transferFrom", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "decimals", + "outputs": [ + { + "name": "", + "type": "uint8" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + } + ], + "name": "balanceOf", + "outputs": [ + { + "name": "balance", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "symbol", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transfer", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + }, + { + "name": "_spender", + "type": "address" + } + ], + "name": "allowance", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "payable": true, + "stateMutability": "payable", + "type": "fallback" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "owner", + "type": "address" + }, + { + "indexed": true, + "name": "spender", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Approval", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "from", + "type": "address" + }, + { + "indexed": true, + "name": "to", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Transfer", + "type": "event" + } +] diff --git a/patch/skar-client-0.16.3/test-data/nameless.abi.json b/patch/skar-client-0.16.3/test-data/nameless.abi.json new file mode 100644 index 0000000..2d9ce2f --- /dev/null +++ b/patch/skar-client-0.16.3/test-data/nameless.abi.json @@ -0,0 +1 @@ +[{"type":"event","name":"Approval","inputs":[{"name":"owner","type":"address","indexed":true},{"name":"approved","type":"address","indexed":true},{"name":"tokenId","type":"uint256","indexed":true}],"anonymous":false},{"type":"event","name":"ApprovalForAll","inputs":[{"name":"owner","type":"address","indexed":true},{"name":"operator","type":"address","indexed":true},{"name":"approved","type":"bool","indexed":false}],"anonymous":false},{"type":"event","name":"CancelLockupStream","inputs":[{"name":"streamId","type":"uint256","indexed":false},{"name":"sender","type":"address","indexed":true},{"name":"recipient","type":"address","indexed":true},{"name":"asset","type":"address","indexed":true},{"name":"senderAmount","type":"uint128","indexed":false},{"name":"recipientAmount","type":"uint128","indexed":false}],"anonymous":false},{"type":"event","name":"CreateLockupDynamicStream","inputs":[{"name":"streamId","type":"uint256","indexed":false},{"name":"funder","type":"address","indexed":false},{"name":"sender","type":"address","indexed":true},{"name":"recipient","type":"address","indexed":true},{"name":"amounts","type":"tuple","indexed":false,"components":[{"type":"uint128"},{"type":"uint128"},{"type":"uint128"}]},{"name":"asset","type":"address","indexed":true},{"name":"cancelable","type":"bool","indexed":false},{"name":"transferable","type":"bool","indexed":false},{"name":"segments","type":"tuple[]","indexed":false,"components":[{"type":"uint128"},{"type":"uint64"},{"type":"uint40"}]},{"name":"range","type":"tuple","indexed":false,"components":[{"type":"uint40"},{"type":"uint40"}]},{"name":"broker","type":"address","indexed":false}],"anonymous":false},{"type":"event","name":"CreateLockupLinearStream","inputs":[{"name":"streamId","type":"uint256","indexed":false},{"name":"funder","type":"address","indexed":false},{"name":"sender","type":"address","indexed":true},{"name":"recipient","type":"address","indexed":true},{"name":"amounts","type":"tuple","indexed":false,"components":[{"type":"uint128"},{"type":"uint128"},{"type":"uint128"}]},{"name":"asset","type":"address","indexed":true},{"name":"cancelable","type":"bool","indexed":false},{"name":"transferable","type":"bool","indexed":false},{"name":"range","type":"tuple","indexed":false,"components":[{"type":"uint40"},{"type":"uint40"},{"type":"uint40"}]},{"name":"broker","type":"address","indexed":false}],"anonymous":false},{"type":"event","name":"RenounceLockupStream","inputs":[{"name":"streamId","type":"uint256","indexed":true}],"anonymous":false},{"type":"event","name":"Transfer","inputs":[{"name":"from","type":"address","indexed":true},{"name":"to","type":"address","indexed":true},{"name":"tokenId","type":"uint256","indexed":true}],"anonymous":false},{"type":"event","name":"TransferAdmin","inputs":[{"name":"oldAdmin","type":"address","indexed":true},{"name":"newAdmin","type":"address","indexed":true}],"anonymous":false},{"type":"event","name":"WithdrawFromLockupStream","inputs":[{"name":"streamId","type":"uint256","indexed":true},{"name":"to","type":"address","indexed":true},{"name":"asset","type":"address","indexed":true},{"name":"amount","type":"uint128","indexed":false}],"anonymous":false}] \ No newline at end of file diff --git a/patch/skar-client-0.16.3/tests/api_test.rs b/patch/skar-client-0.16.3/tests/api_test.rs new file mode 100644 index 0000000..31b5114 --- /dev/null +++ b/patch/skar-client-0.16.3/tests/api_test.rs @@ -0,0 +1,394 @@ +use std::{collections::BTreeSet, env::temp_dir}; + +use alloy_dyn_abi::DynSolValue; +use alloy_json_abi::JsonAbi; +use arrow2::array::UInt64Array; +use skar_client::{ArrowIpc, Client, ColumnMapping, Config, Decoder, ParquetConfig}; +use skar_format::{Address, Hex, LogArgument}; +use skar_net_types::{FieldSelection, Query}; + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_arrow_ipc() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let mut block_field_selection = BTreeSet::new(); + block_field_selection.insert("number".to_owned()); + block_field_selection.insert("timestamp".to_owned()); + block_field_selection.insert("hash".to_owned()); + + let res = client + .send::(&Query { + from_block: 14000000, + to_block: None, + logs: Vec::new(), + transactions: Vec::new(), + include_all_blocks: true, + field_selection: FieldSelection { + block: block_field_selection, + log: Default::default(), + transaction: Default::default(), + trace: Default::default(), + }, + ..Default::default() + }) + .await + .unwrap(); + + dbg!(res.next_block); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_arrow_ipc_ordering() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let mut block_field_selection = BTreeSet::new(); + block_field_selection.insert("number".to_owned()); + + let query: Query = serde_json::from_value(serde_json::json!({ + "from_block": 13171881, + "to_block": 18270333, + "logs": [ + { + "address": [ + "0x15b7c0c907e4C6b9AdaAaabC300C08991D6CEA05" + ], + "topics": [ + [ + "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" + ] + ] + } + ], + "field_selection": { + "block": [ + "number" + ], + "log": [ + "log_index", + "block_number" + ] + } + })) + .unwrap(); + + let res = client.send::(&query).await.unwrap(); + + assert!(res.next_block > 13223105); + + let mut last = (0, 0); + for batch in res.data.logs { + let block_number = batch.column::("block_number").unwrap(); + let log_index = batch.column::("log_index").unwrap(); + + for (&block_number, &log_index) in block_number.values_iter().zip(log_index.values_iter()) { + let number = (block_number, log_index); + assert!(last < number, "last: {:?};number: {:?};", last, number); + last = number; + } + } +} + +fn get_file_path(name: &str) -> String { + format!("{}/test-data/{name}", env!("CARGO_MANIFEST_DIR")) +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_decode_logs() { + const ADDR: &str = "0xc18360217d8f7ab5e7c516566761ea12ce7f9d72"; + let address = Address::decode_hex(ADDR).unwrap(); + + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let query: Query = serde_json::from_value(serde_json::json!({ + "from_block": 18680952, + "to_block": 18680953, + "logs": [ + { + "address": [ + ADDR + ] + } + ], + "field_selection": { + "log": [ + "address", + "data", + "topic0", + "topic1", + "topic2", + "topic3" + ] + } + })) + .unwrap(); + + let res = client.send::(&query).await.unwrap(); + + let path = get_file_path("ens_token_abi.json"); + let abi = tokio::fs::read_to_string(path).await.unwrap(); + let abi: JsonAbi = serde_json::from_str(&abi).unwrap(); + + let decoder = Decoder::new(&[(address, abi)]).unwrap(); + + let decoded_logs = decoder.decode_logs(&res.data.logs).unwrap().unwrap(); + + assert_eq!(decoded_logs.len(), 1); + + println!("{:?}", decoded_logs[0]); +} + +const URL: &str = "https://eth.hypersync.xyz"; + +#[test] +fn decode_zero_erc20_transfer() { + const ADDR: &str = "0xc18360217d8f7ab5e7c516566761ea12ce7f9d72"; + let address = Address::decode_hex(ADDR).unwrap(); + + let path = get_file_path("erc20.abi.json"); + let abi = std::fs::read_to_string(path).unwrap(); + let abi: JsonAbi = serde_json::from_str(&abi).unwrap(); + + let decoder = Decoder::new(&[(address.clone(), abi)]).unwrap(); + + let topics = [ + Some( + LogArgument::decode_hex( + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + ) + .unwrap(), + ), + Some( + LogArgument::decode_hex( + "0x000000000000000000000000327339b55b16345a4b206bfb09c3fa27ab4689ec", + ) + .unwrap(), + ), + Some( + LogArgument::decode_hex( + "0x0000000000000000000000001e037f97d730cc881e77f01e409d828b0bb14de0", + ) + .unwrap(), + ), + None, + ]; + + let topics = topics + .iter() + .map(|t| t.as_ref().map(|t| t.as_slice())) + .collect::>(); + + let event = decoder + .decode( + address.as_slice(), + topics[0].unwrap(), + topics.as_slice(), + &[], + ) + .unwrap() + .unwrap(); + + assert_eq!(event.body[0], DynSolValue::Uint(0.try_into().unwrap(), 256)); +} + +#[test] +fn parse_nameless_abi() { + let path = get_file_path("nameless.abi.json"); + let abi = std::fs::read_to_string(path).unwrap(); + let _abi: JsonAbi = serde_json::from_str(&abi).unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_parquet_out() { + env_logger::try_init().ok(); + + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let path = format!("{}/{}", temp_dir().to_string_lossy(), uuid::Uuid::new_v4()); + + let query: Query = serde_json::from_value(serde_json::json!({ + "from_block": 19277345, + "to_block": 19277346, + "logs": [{ + "address": ["0xdAC17F958D2ee523a2206206994597C13D831ec7"], + "topics": [["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]], + }], + "transactions": [{}], + "include_all_blocks": true, + "field_selection": { + "log": ["block_number"], + } + })) + .unwrap(); + + client + .create_parquet_folder( + query, + ParquetConfig { + path, + hex_output: true, + batch_size: 100, + concurrency: 10, + retry: false, + column_mapping: ColumnMapping { + block: maplit::btreemap! { + "number".to_owned() => skar_client::DataType::Float32, + }, + transaction: maplit::btreemap! { + "value".to_owned() => skar_client::DataType::Float64, + }, + log: Default::default(), + trace: Default::default(), + decoded_log: maplit::btreemap! { + "value".to_owned() => skar_client::DataType::Float64, + }, + }, + event_signature: None, + }, + ) + .await + .unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_preset_query_blocks_and_transactions() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + let query = Client::preset_query_blocks_and_transactions(18_000_000, Some(18_000_100)); + let res = client.send::(&query).await.unwrap(); + + let num_blocks: usize = res + .data + .blocks + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + let num_txs: usize = res + .data + .transactions + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + + assert!(res.next_block == 18_000_100); + assert!(num_blocks == 100); + assert!(num_txs > 1); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_preset_query_blocks_and_transaction_hashes() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + let query = Client::preset_query_blocks_and_transaction_hashes(18_000_000, Some(18_000_100)); + let res = client.send::(&query).await.unwrap(); + + let num_blocks: usize = res + .data + .blocks + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + let num_txs: usize = res + .data + .transactions + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + + assert!(res.next_block == 18_000_100); + assert!(num_blocks == 100); + assert!(num_txs > 1); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_preset_query_logs() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let usdt_addr = hex_literal::hex!("dAC17F958D2ee523a2206206994597C13D831ec7"); + let query = Client::preset_query_logs(18_000_000, Some(18_001_000), usdt_addr).unwrap(); + let res = client.send::(&query).await.unwrap(); + + let num_logs: usize = res + .data + .logs + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + + assert!(res.next_block == 18_001_000); + assert!(num_logs > 1); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_api_preset_query_logs_of_event() { + let client = Client::new(Config { + url: URL.parse().unwrap(), + bearer_token: None, + http_req_timeout_millis: 20000.try_into().unwrap(), + }) + .unwrap(); + + let usdt_addr = hex_literal::hex!("dAC17F958D2ee523a2206206994597C13D831ec7"); + let transfer_topic0 = + hex_literal::hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"); + let query = Client::preset_query_logs_of_event( + 18_000_000, + Some(18_001_000), + transfer_topic0, + usdt_addr, + ) + .unwrap(); + + let res = client.send::(&query).await.unwrap(); + + let num_logs: usize = res + .data + .logs + .into_iter() + .map(|batch| batch.chunk.len()) + .sum(); + + assert!(res.next_block == 18_001_000); + assert!(num_logs > 1); +} diff --git a/src/lib.rs b/src/lib.rs index e896524..a535bf0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,7 +158,7 @@ impl HypersyncClient { .map(|log| { let mut log = log; let address = log.address.clone(); - if address.into_iter().all(|v| v.as_ref() == [0u8; 20]) { + if address.into_iter().all(|v| v.as_ref()[0..19] == [0u8; 19]) { log.address = vec![]; } log From 3fc3b2cff0363689f9a4d6e298478d7e95e78059 Mon Sep 17 00:00:00 2001 From: Andreas Richter <708186+richtera@users.noreply.github.com> Date: Tue, 14 May 2024 16:20:21 -0400 Subject: [PATCH 3/3] fix: Remove logging --- patch/skar-client-0.16.3/tests/api_test.rs | 2 -- src/lib.rs | 16 +---------- src/query.rs | 31 +++++++++++++++++++--- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/patch/skar-client-0.16.3/tests/api_test.rs b/patch/skar-client-0.16.3/tests/api_test.rs index 31b5114..9a5a917 100644 --- a/patch/skar-client-0.16.3/tests/api_test.rs +++ b/patch/skar-client-0.16.3/tests/api_test.rs @@ -152,8 +152,6 @@ async fn test_api_decode_logs() { let decoded_logs = decoder.decode_logs(&res.data.logs).unwrap().unwrap(); assert_eq!(decoded_logs.len(), 1); - - println!("{:?}", decoded_logs[0]); } const URL: &str = "https://eth.hypersync.xyz"; diff --git a/src/lib.rs b/src/lib.rs index a535bf0..d29eead 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -150,21 +150,7 @@ impl HypersyncClient { query: Query, config: StreamConfig, ) -> napi::Result { - let mut query = query.try_convert().context("parse query")?; - - query.logs = query - .logs - .into_iter() - .map(|log| { - let mut log = log; - let address = log.address.clone(); - if address.into_iter().all(|v| v.as_ref()[0..19] == [0u8; 19]) { - log.address = vec![]; - } - log - }) - .collect(); - + let query = query.try_convert().context("parse query")?; let config = config.try_convert().context("parse stream config")?; let inner = self diff --git a/src/query.rs b/src/query.rs index 3b8d66d..e32cd59 100644 --- a/src/query.rs +++ b/src/query.rs @@ -134,18 +134,43 @@ impl Query { match query_result { Ok(query) => { let mut query = query.clone(); + let mut transactions = Vec::::new(); + + // 0x0000... is a special address that means no address query.logs = query .logs - .into_iter() + .iter() .map(|log| { - let mut log = log; + let mut log = log.clone(); let address = log.address.clone(); - if address.into_iter().all(|v| v.as_ref() == [0u8; 20]) { + if address.into_iter().all(|v| v.as_ref()[0..19] == [0u8; 19]) { log.address = vec![]; + } else { + let topics = log.topics.clone(); + let sighash: Option>> = topics.first().cloned(); + if let Some(sighash) = sighash { + let sighash = sighash + .iter() + .map(|v| { + let mut data = [0u8; 4]; + data.copy_from_slice(&v.as_slice()[0..4]); + FixedSizeData::from(data) + }) + .collect(); + transactions.push(TransactionSelection { + to: Some(log.address.clone()), + contract_address: Some(log.address.clone()), + from: None, + sighash, + status: None, + kind: None, + }); + }; } log }) .collect(); + query.transactions = transactions; Ok(query) } Err(e) => Err(e),