Skip to content

Commit 3e71ed3

Browse files
committed
fix(core, graph): use named streams in the stream aggregator
1 parent 5a8688c commit 3e71ed3

File tree

3 files changed

+64
-38
lines changed

3 files changed

+64
-38
lines changed

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ where
7171

7272
for (j, table) in data_source.transformer.tables.iter().enumerate() {
7373
let query = table.query.build_with_block_range(block_range);
74+
let stream_name = format!("{}.{}", data_source.name, table.name);
7475

75-
query_streams.push(cx.client.query(&cx.logger, query, None));
76+
query_streams.push((stream_name, cx.client.query(&cx.logger, query, None)));
7677
query_streams_table_ptr.push((i, j));
7778
}
7879
}

graph/src/amp/stream_aggregator/error.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use thiserror::Error;
24

35
use crate::amp::error::IsDeterministic;
@@ -7,29 +9,29 @@ pub enum Error {
79
#[error("failed to aggregate record batches: {0:#}")]
810
Aggregation(#[source] anyhow::Error),
911

10-
#[error("failed to buffer record batches from stream {stream_index}: {source:#}")]
12+
#[error("failed to buffer record batches from stream '{stream_name}': {source:#}")]
1113
Buffer {
12-
stream_index: usize,
14+
stream_name: Arc<str>,
1315
source: anyhow::Error,
1416
},
1517

16-
#[error("failed to read record batch from stream {stream_index}: {source:#}")]
18+
#[error("failed to read record batch from stream '{stream_name}': {source:#}")]
1719
Stream {
18-
stream_index: usize,
20+
stream_name: Arc<str>,
1921
source: anyhow::Error,
2022
is_deterministic: bool,
2123
},
2224
}
2325

2426
impl Error {
25-
pub(super) fn stream<E>(stream_index: usize, e: E) -> Self
27+
pub(super) fn stream<E>(stream_name: Arc<str>, e: E) -> Self
2628
where
2729
E: std::error::Error + IsDeterministic + Send + Sync + 'static,
2830
{
2931
let is_deterministic = e.is_deterministic();
3032

3133
Self::Stream {
32-
stream_index,
34+
stream_name,
3335
source: anyhow::Error::from(e),
3436
is_deterministic,
3537
}

graph/src/amp/stream_aggregator/mod.rs

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod record_batch;
33

44
use std::{
55
pin::Pin,
6+
sync::Arc,
67
task::{self, Poll},
78
};
89

@@ -12,7 +13,10 @@ use futures03::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
1213
use slog::{debug, info, Logger};
1314

1415
use self::record_batch::Buffer;
15-
use crate::amp::{client::ResponseBatch, error::IsDeterministic, log::Logger as _};
16+
use crate::{
17+
amp::{client::ResponseBatch, error::IsDeterministic, log::Logger as _},
18+
cheap_clone::CheapClone,
19+
};
1620

1721
pub use self::{
1822
error::Error,
@@ -38,54 +42,72 @@ pub use self::{
3842
/// To ensure data consistency and ordered output, the aggregator waits for slower streams
3943
/// to catch up with faster streams. The output stream speed matches the slowest input stream.
4044
pub struct StreamAggregator {
41-
streams: Vec<BoxStream<'static, Result<RecordBatch, Error>>>,
45+
named_streams: Vec<(Arc<str>, BoxStream<'static, Result<RecordBatch, Error>>)>,
4246
buffer: Buffer,
4347
logger: Logger,
48+
49+
/// Indicates whether all streams are fully consumed.
4450
is_finalized: bool,
51+
52+
/// Indicates whether any stream has produced an error.
53+
///
54+
/// When `true`, the stream aggregator stops polling all other streams.
4555
is_failed: bool,
4656
}
4757

4858
impl StreamAggregator {
4959
/// Creates a new stream aggregator from the `streams` with a bounded buffer.
5060
pub fn new<E>(
5161
logger: &Logger,
52-
streams: impl IntoIterator<Item = BoxStream<'static, Result<ResponseBatch, E>>>,
62+
named_streams: impl IntoIterator<Item = (String, BoxStream<'static, Result<ResponseBatch, E>>)>,
5363
max_buffer_size: usize,
5464
) -> Self
5565
where
5666
E: std::error::Error + IsDeterministic + Send + Sync + 'static,
5767
{
5868
let logger = logger.component("AmpStreamAggregator");
5969

60-
let streams = streams
70+
let named_streams = named_streams
6171
.into_iter()
62-
.enumerate()
63-
.map(|(stream_index, stream)| {
64-
stream
65-
.map_err(move |e| Error::stream(stream_index, e))
66-
.try_filter_map(move |response_batch| async move {
67-
match response_batch {
68-
ResponseBatch::Batch { data } => Ok(Some(data)),
69-
ResponseBatch::Reorg(_) => Err(Error::Stream {
70-
stream_index,
71-
source: anyhow!("chain reorg"),
72-
is_deterministic: false,
73-
}),
74-
}
75-
})
76-
.boxed()
72+
.map(|(stream_name, stream)| {
73+
let stream_name: Arc<str> = stream_name.into();
74+
(
75+
stream_name.cheap_clone(),
76+
stream
77+
.map_err({
78+
let stream_name = stream_name.cheap_clone();
79+
move |e| Error::stream(stream_name.cheap_clone(), e)
80+
})
81+
.try_filter_map({
82+
let stream_name = stream_name.cheap_clone();
83+
move |response_batch| {
84+
let stream_name = stream_name.cheap_clone();
85+
async move {
86+
match response_batch {
87+
ResponseBatch::Batch { data } => Ok(Some(data)),
88+
ResponseBatch::Reorg(_) => Err(Error::Stream {
89+
stream_name: stream_name.cheap_clone(),
90+
source: anyhow!("chain reorg"),
91+
is_deterministic: false,
92+
}),
93+
}
94+
}
95+
}
96+
})
97+
.boxed(),
98+
)
7799
})
78100
.collect::<Vec<_>>();
79101

80-
let num_streams = streams.len();
102+
let num_streams = named_streams.len();
81103

82104
info!(logger, "Initializing stream aggregator";
83105
"num_streams" => num_streams,
84106
"max_buffer_size" => max_buffer_size
85107
);
86108

87109
Self {
88-
streams,
110+
named_streams,
89111
buffer: Buffer::new(num_streams, max_buffer_size),
90112
logger,
91113
is_finalized: false,
@@ -99,7 +121,12 @@ impl StreamAggregator {
99121
) -> Poll<Option<Result<RecordBatchGroups, Error>>> {
100122
let mut made_progress = false;
101123

102-
for (stream_index, stream) in self.streams.iter_mut().enumerate() {
124+
for (stream_index, (stream_name, stream)) in self.named_streams.iter_mut().enumerate() {
125+
let logger = self.logger.new(slog::o!(
126+
"stream_index" => stream_index,
127+
"stream_name" => stream_name.cheap_clone()
128+
));
129+
103130
if self.buffer.is_finalized(stream_index) {
104131
continue;
105132
}
@@ -108,7 +135,7 @@ impl StreamAggregator {
108135
self.is_failed = true;
109136

110137
return Poll::Ready(Some(Err(Error::Buffer {
111-
stream_index,
138+
stream_name: stream_name.cheap_clone(),
112139
source: anyhow!("buffer is blocked"),
113140
})));
114141
}
@@ -123,16 +150,15 @@ impl StreamAggregator {
123150
self.buffer
124151
.extend(stream_index, record_batch)
125152
.map_err(|e| Error::Buffer {
126-
stream_index,
153+
stream_name: stream_name.cheap_clone(),
127154
source: e,
128155
});
129156

130157
match buffer_result {
131158
Ok(()) => {
132159
made_progress = true;
133160

134-
debug!(self.logger, "Buffered record batch";
135-
"stream_index" => stream_index,
161+
debug!(logger, "Buffered record batch";
136162
"buffer_size" => self.buffer.size(stream_index),
137163
"has_capacity" => self.buffer.has_capacity(stream_index)
138164
);
@@ -145,9 +171,7 @@ impl StreamAggregator {
145171
}
146172
}
147173
Poll::Ready(Some(Ok(_empty_record_batch))) => {
148-
debug!(self.logger, "Received an empty record batch";
149-
"stream_index" => stream_index
150-
);
174+
debug!(logger, "Received an empty record batch");
151175
}
152176
Poll::Ready(Some(Err(e))) => {
153177
self.is_failed = true;
@@ -163,8 +187,7 @@ impl StreamAggregator {
163187

164188
made_progress = true;
165189

166-
info!(self.logger, "Stream completed";
167-
"stream_index" => stream_index,
190+
info!(logger, "Stream completed";
168191
"buffer_size" => self.buffer.size(stream_index)
169192
);
170193
}

0 commit comments

Comments
 (0)