diff --git a/src/tools/fe/routine_load/job_lister.rs b/src/tools/fe/routine_load/job_lister.rs index b284be9..173a8e1 100644 --- a/src/tools/fe/routine_load/job_lister.rs +++ b/src/tools/fe/routine_load/job_lister.rs @@ -196,14 +196,15 @@ impl RoutineLoadJobLister { } if job.lag.is_some() { - // Partitions Overview: show Top 40 (max lag) and Bottom 10 (min lag) + // Partitions Overview: show Top 30 (largest non-zero lag) and Bottom 20 (smallest non-zero lag) let rows = self.build_partition_rows(job.progress.as_ref(), job.lag.as_ref()); - let delayed = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count(); + let nonzero_count = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count(); + let zero_count = rows.len().saturating_sub(nonzero_count); if !rows.is_empty() { - report.push_str("\nPartitions Overview (by lag):\n"); - report.push_str(&self.format_partitions_overview_top_bottom(&rows, 40, 10)); + report.push_str("\nPartitions Overview (non-zero lags only):\n"); + report.push_str(&self.format_partitions_overview_nonzero_top_bottom(&rows, 30, 20)); report.push_str(&format!( - "Top 40 & Bottom 10 shown; delayed: {delayed}, total partitions: {}\n", + "non-zero-lag: {nonzero_count}, zero-lag: {zero_count}, total: {}\n", rows.len() )); @@ -256,13 +257,19 @@ impl RoutineLoadJobLister { rows } - fn format_partitions_overview_top_bottom( + fn format_partitions_overview_nonzero_top_bottom( &self, rows: &[(String, Option, i64)], top_n: usize, bottom_n: usize, ) -> String { - let total = rows.len(); + // filter non-zero lag rows + let mut nonzero: Vec<(String, Option, i64)> = rows + .iter() + .filter(|(_, _, lag_v)| *lag_v > 0) + .cloned() + .collect(); + let total = nonzero.len(); let mut out = String::new(); // Top section (largest lag first) @@ -270,8 +277,10 @@ impl RoutineLoadJobLister { out.push_str("┌─────────────┬─────────────┬─────────────┐\n"); out.push_str("│ Partition │ Progress │ Lag │\n"); out.push_str("├─────────────┼─────────────┼─────────────┤\n"); + // sort desc + nonzero.sort_by(|a, b| b.2.cmp(&a.2)); let mut printed = 0usize; - for (part, prog, lag_v) in rows.iter().take(top_n) { + for (part, prog, lag_v) in nonzero.iter().take(top_n) { let prog_s = prog.as_deref().unwrap_or("N/A"); out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n")); printed += 1; @@ -286,12 +295,13 @@ impl RoutineLoadJobLister { out.push_str("┌─────────────┬─────────────┬─────────────┐\n"); out.push_str("│ Partition │ Progress │ Lag │\n"); out.push_str("├─────────────┼─────────────┼─────────────┤\n"); - if total > top_n { - let start = total.saturating_sub(bottom_n); - for (part, prog, lag_v) in rows.iter().skip(start) { - let prog_s = prog.as_deref().unwrap_or("N/A"); - out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n")); - } + // sort asc + nonzero.sort_by(|a, b| a.2.cmp(&b.2)); + let start = 0usize; // beginning for smallest + let end = bottom_n.min(total); + for (part, prog, lag_v) in nonzero.iter().skip(start).take(end) { + let prog_s = prog.as_deref().unwrap_or("N/A"); + out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n")); } out.push_str("└─────────────┴─────────────┴─────────────┘\n");