Skip to content

Commit b3e9b53

Browse files
committed
fix(core, graph): simplify working with identifiers
1 parent dbc4810 commit b3e9b53

File tree

11 files changed

+109
-300
lines changed

11 files changed

+109
-300
lines changed

core/src/amp_subgraph/runner/latest_blocks.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@ use alloy::primitives::BlockNumber;
22
use anyhow::anyhow;
33
use arrow::array::RecordBatch;
44
use futures::{future::try_join_all, stream::BoxStream, StreamExt, TryFutureExt};
5-
use graph::{
6-
amp::{
7-
client::ResponseBatch,
8-
codec::{utils::block_number_decoder, Decoder},
9-
common::Ident,
10-
error::IsDeterministic,
11-
manifest::DataSource,
12-
Client,
13-
},
14-
cheap_clone::CheapClone,
5+
use graph::amp::{
6+
client::ResponseBatch,
7+
codec::{utils::block_number_decoder, Decoder},
8+
error::IsDeterministic,
9+
manifest::DataSource,
10+
Client,
1511
};
1612
use itertools::Itertools;
1713
use slog::debug;
@@ -43,7 +39,7 @@ impl LatestBlocks {
4339
.map(move |(j, table)| ((i, j), &data_source.source.dataset, table))
4440
})
4541
.flatten()
46-
.unique_by(|(_, dataset, table)| (dataset.cheap_clone(), table.cheap_clone()))
42+
.unique_by(|(_, dataset, table)| (dataset.to_string(), table.to_string()))
4743
.map(|(table_ptr, dataset, table)| {
4844
latest_block(&cx, dataset, table)
4945
.map_ok(move |latest_block| (table_ptr, latest_block))
@@ -127,8 +123,8 @@ fn indexing_completed(data_source: &DataSource, latest_synced_block: &Option<Blo
127123

128124
async fn latest_block<AC>(
129125
cx: &Context<AC>,
130-
dataset: &Ident,
131-
table: &Ident,
126+
dataset: &str,
127+
table: &str,
132128
) -> Result<BlockNumber, Error>
133129
where
134130
AC: Client,
@@ -148,8 +144,8 @@ where
148144

149145
async fn latest_block_changed<AC>(
150146
cx: &Context<AC>,
151-
dataset: &Ident,
152-
table: &Ident,
147+
dataset: &str,
148+
table: &str,
153149
latest_block: BlockNumber,
154150
) -> Result<(), Error>
155151
where

core/src/amp_subgraph/runner/reorg_handler.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use futures::{future::try_join_all, StreamExt, TryFutureExt};
44
use graph::{
55
amp::{
66
client::{LatestBlockBeforeReorg, RequestMetadata, ResponseBatch, ResumeStreamingQuery},
7-
common::Ident,
87
Client,
98
},
109
blockchain::block_stream::FirehoseCursor,
@@ -124,8 +123,8 @@ where
124123
async fn detect_reorg<AC>(
125124
cx: &Context<AC>,
126125
network: &str,
127-
dataset: &Ident,
128-
table: &Ident,
126+
dataset: &str,
127+
table: &str,
129128
latest_synced_block_number: BlockNumber,
130129
latest_synced_block_hash: BlockHash,
131130
) -> Result<Option<LatestBlockBeforeReorg>, Error>

graph/src/amp/codec/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ mod value_decoder;
77

88
pub mod utils;
99

10-
use std::collections::{BTreeMap, HashMap};
10+
use std::{
11+
collections::{BTreeMap, HashMap},
12+
sync::Arc,
13+
};
1114

1215
use anyhow::{anyhow, bail, Context, Result};
1316
use arrow::array::{Array, RecordBatch};
1417

1518
use self::{list_decoder::ListDecoder, mapping_decoder::MappingDecoder, name_cache::NameCache};
1619
use crate::{
17-
amp::common::Ident,
1820
data::{
1921
graphql::TypeExt,
2022
store::{Id, IdType, Value},
@@ -163,7 +165,7 @@ impl Codec {
163165
.fields()
164166
.into_iter()
165167
.zip(record_batch.columns())
166-
.map(|(field, array)| Ok((self.ident(field.name())?, array.as_ref())))
168+
.map(|(field, array)| Ok((self.ident(field.name()), array.as_ref())))
167169
.collect::<Result<HashMap<_, _>>>()?;
168170

169171
let mut value_decoders = BTreeMap::new();
@@ -194,7 +196,7 @@ impl Codec {
194196
fn value_decoder<'a>(
195197
&mut self,
196198
field: &'a Field,
197-
columns: &HashMap<Ident, &'a dyn Array>,
199+
columns: &HashMap<Arc<str>, &'a dyn Array>,
198200
) -> Result<Option<Box<dyn Decoder<Value> + 'a>>> {
199201
// VIDs are auto-generated
200202
if field.name.eq_ignore_ascii_case("vid") {
@@ -206,7 +208,7 @@ impl Codec {
206208
return Ok(None);
207209
}
208210

209-
let normalized_name = self.ident(&field.name)?;
211+
let normalized_name = self.ident(&field.name);
210212
let array = match columns.get(&normalized_name) {
211213
Some(&array) => array,
212214
None => {
@@ -230,7 +232,7 @@ impl Codec {
230232
Ok(Some(decoder))
231233
}
232234

233-
fn ident(&mut self, name: impl AsRef<str>) -> Result<Ident> {
235+
fn ident(&mut self, name: impl AsRef<str>) -> Arc<str> {
234236
self.name_cache.ident(name.as_ref())
235237
}
236238
}

graph/src/amp/codec/name_cache.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::collections::HashMap;
1+
use std::{collections::HashMap, sync::Arc};
22

3-
use anyhow::Result;
3+
use inflector::Inflector;
44

5-
use crate::{amp::common::Ident, cheap_clone::CheapClone};
5+
use crate::cheap_clone::CheapClone;
66

7-
/// Caches identifiers that are used to match Arrow columns and subgraph entity fields.
7+
/// Normalizes and caches identifiers that are used to match Arrow columns and subgraph entity fields.
88
pub(super) struct NameCache {
9-
cache: HashMap<Box<str>, Ident>,
9+
cache: HashMap<Box<str>, Arc<str>>,
1010
}
1111

1212
impl NameCache {
@@ -17,18 +17,18 @@ impl NameCache {
1717
}
1818
}
1919

20-
/// Returns the identifier for the given name.
20+
/// Normalizes and returns the identifier for the given name.
2121
///
2222
/// If the identifier exists in the cache, returns the cached version.
23-
/// Otherwise, creates a new identifier, caches it, and returns it.
24-
pub(super) fn ident(&mut self, name: &str) -> Result<Ident> {
23+
/// Otherwise, creates a new normalized identifier, caches it, and returns it.
24+
pub(super) fn ident(&mut self, name: &str) -> Arc<str> {
2525
if let Some(ident) = self.cache.get(name) {
26-
return Ok(ident.cheap_clone());
26+
return ident.cheap_clone();
2727
}
2828

29-
let ident = Ident::new(name)?;
29+
let ident: Arc<str> = name.to_camel_case().into();
3030
self.cache.insert(name.into(), ident.cheap_clone());
3131

32-
Ok(ident)
32+
ident
3333
}
3434
}

graph/src/amp/common/ident.rs

Lines changed: 0 additions & 174 deletions
This file was deleted.

graph/src/amp/common/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
mod ident;
2-
3-
pub use self::ident::Ident;
4-
51
pub(super) mod column_aliases {
62
pub(in crate::amp) static BLOCK_NUMBER: &[&str] = &[
73
"_block_num", // Meta column present in all tables

0 commit comments

Comments
 (0)