Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions src/tools/fe/routine_load/job_lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ impl RoutineLoadJobLister {
}

if job.lag.is_some() {
// Partitions Overview: show Top 40 (max lag) and Bottom 10 (min lag)
// Partitions Overview: show Top 30 (largest non-zero lag) and Bottom 20 (smallest non-zero lag)
let rows = self.build_partition_rows(job.progress.as_ref(), job.lag.as_ref());
let delayed = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count();
let nonzero_count = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count();
let zero_count = rows.len().saturating_sub(nonzero_count);
if !rows.is_empty() {
report.push_str("\nPartitions Overview (by lag):\n");
report.push_str(&self.format_partitions_overview_top_bottom(&rows, 40, 10));
report.push_str("\nPartitions Overview (non-zero lags only):\n");
report.push_str(&self.format_partitions_overview_nonzero_top_bottom(&rows, 30, 20));
report.push_str(&format!(
"Top 40 & Bottom 10 shown; delayed: {delayed}, total partitions: {}\n",
"non-zero-lag: {nonzero_count}, zero-lag: {zero_count}, total: {}\n",
rows.len()
));

Expand Down Expand Up @@ -256,22 +257,30 @@ impl RoutineLoadJobLister {
rows
}

fn format_partitions_overview_top_bottom(
fn format_partitions_overview_nonzero_top_bottom(
&self,
rows: &[(String, Option<String>, i64)],
top_n: usize,
bottom_n: usize,
) -> String {
let total = rows.len();
// filter non-zero lag rows
let mut nonzero: Vec<(String, Option<String>, i64)> = rows
.iter()
.filter(|(_, _, lag_v)| *lag_v > 0)
.cloned()
.collect();
let total = nonzero.len();
let mut out = String::new();

// Top section (largest lag first)
out.push_str("Top by lag:\n");
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
out.push_str("│ Partition │ Progress │ Lag │\n");
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
// sort desc
nonzero.sort_by(|a, b| b.2.cmp(&a.2));
let mut printed = 0usize;
for (part, prog, lag_v) in rows.iter().take(top_n) {
for (part, prog, lag_v) in nonzero.iter().take(top_n) {
let prog_s = prog.as_deref().unwrap_or("N/A");
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
printed += 1;
Expand All @@ -286,12 +295,13 @@ impl RoutineLoadJobLister {
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
out.push_str("│ Partition │ Progress │ Lag │\n");
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
if total > top_n {
let start = total.saturating_sub(bottom_n);
for (part, prog, lag_v) in rows.iter().skip(start) {
let prog_s = prog.as_deref().unwrap_or("N/A");
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
}
// sort asc
nonzero.sort_by(|a, b| a.2.cmp(&b.2));
let start = 0usize; // beginning for smallest
let end = bottom_n.min(total);
for (part, prog, lag_v) in nonzero.iter().skip(start).take(end) {
let prog_s = prog.as_deref().unwrap_or("N/A");
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
}
out.push_str("└─────────────┴─────────────┴─────────────┘\n");

Expand Down
Loading