diff --git a/src/tools/fe/routine_load/job_lister.rs b/src/tools/fe/routine_load/job_lister.rs index ff23572..b284be9 100644 --- a/src/tools/fe/routine_load/job_lister.rs +++ b/src/tools/fe/routine_load/job_lister.rs @@ -3,12 +3,16 @@ use super::models::RoutineLoadJob; use crate::config::Config; use crate::config_loader; use crate::error::{CliError, Result}; +use crate::tools::common::fs_utils::ensure_dir_exists; use crate::tools::mysql::MySQLTool; use crate::tools::{ExecutionResult, Tool}; use crate::ui; use crate::ui::{InputHelper, InteractiveSelector}; use crate::ui::{NoJobsNextAction, show_no_jobs_recovery_menu, show_unknown_db_recovery_menu}; use chrono::Utc; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; /// Routine Load Job Lister pub struct RoutineLoadJobLister; @@ -26,7 +30,7 @@ impl Tool for RoutineLoadJobLister { false } - fn execute(&self, _config: &Config, _pid: u32) -> Result { + fn execute(&self, config: &Config, _pid: u32) -> Result { // Retry loop: allow reselecting database if no jobs found let mut database = self.prompt_database_name()?; loop { @@ -35,11 +39,12 @@ impl Tool for RoutineLoadJobLister { self.display_jobs(&jobs)?; let selected_job = self.prompt_job_selection(&jobs)?; self.save_selected_job(selected_job, &database)?; - let report = self.generate_selection_report(selected_job)?; + let report = + self.generate_selection_report(selected_job, &config.output_dir)?; ui::print_info(""); ui::print_info(&report); return Ok(ExecutionResult { - output_path: std::path::PathBuf::from("console_output"), + output_path: config.output_dir.clone(), message: format!( "Job ID '{}' selected and saved in memory", selected_job.id @@ -164,7 +169,11 @@ impl RoutineLoadJobLister { Ok(()) } - fn generate_selection_report(&self, job: &RoutineLoadJob) -> Result { + fn generate_selection_report( + &self, + job: &RoutineLoadJob, + output_dir: &std::path::Path, + ) -> Result { let mut report = String::new(); report.push_str("Routine Load Job Selection Report\n"); report.push_str("=================================\n\n"); @@ -186,17 +195,26 @@ impl RoutineLoadJobLister { report.push_str(&format!(" Received Bytes: {}\n", stat.received_bytes)); } - if let Some(ref progress) = job.progress { - report.push_str("\nProgress:\n"); - for (partition, offset) in progress { - report.push_str(&format!(" Partition {partition}: {offset}\n")); - } - } - - if let Some(ref lag) = job.lag { - report.push_str("\nLag:\n"); - for (partition, lag_value) in lag { - report.push_str(&format!(" Partition {partition}: {lag_value}\n")); + if job.lag.is_some() { + // Partitions Overview: show Top 40 (max lag) and Bottom 10 (min lag) + let rows = self.build_partition_rows(job.progress.as_ref(), job.lag.as_ref()); + let delayed = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count(); + if !rows.is_empty() { + report.push_str("\nPartitions Overview (by lag):\n"); + report.push_str(&self.format_partitions_overview_top_bottom(&rows, 40, 10)); + report.push_str(&format!( + "Top 40 & Bottom 10 shown; delayed: {delayed}, total partitions: {}\n", + rows.len() + )); + + match self.write_full_partitions_file(&rows, &job.id, output_dir) { + Ok(path) => { + report.push_str(&format!("Full partitions saved to: {}\n", path.display())); + } + Err(e) => { + report.push_str(&format!("Failed to save full partitions file: {}\n", e)); + } + } } } @@ -207,4 +225,96 @@ impl RoutineLoadJobLister { Ok(report) } + + fn build_partition_rows( + &self, + progress: Option<&HashMap>, + lag: Option<&HashMap>, + ) -> Vec<(String, Option, i64)> { + let mut rows: Vec<(String, Option, i64)> = Vec::new(); + + // Union of partitions + let mut keys: Vec = Vec::new(); + if let Some(p) = progress { + keys.extend(p.keys().cloned()); + } + if let Some(l) = lag { + for k in l.keys() { + if !keys.contains(k) { + keys.push(k.clone()); + } + } + } + + for part in keys { + let prog = progress.and_then(|p| p.get(&part).cloned()); + let lag_v = lag.and_then(|l| l.get(&part).copied()).unwrap_or(0); + rows.push((part, prog, lag_v)); + } + + rows.sort_by(|a, b| b.2.cmp(&a.2)); + rows + } + + fn format_partitions_overview_top_bottom( + &self, + rows: &[(String, Option, i64)], + top_n: usize, + bottom_n: usize, + ) -> String { + let total = rows.len(); + let mut out = String::new(); + + // Top section (largest lag first) + out.push_str("Top by lag:\n"); + out.push_str("┌─────────────┬─────────────┬─────────────┐\n"); + out.push_str("│ Partition │ Progress │ Lag │\n"); + out.push_str("├─────────────┼─────────────┼─────────────┤\n"); + let mut printed = 0usize; + for (part, prog, lag_v) in rows.iter().take(top_n) { + let prog_s = prog.as_deref().unwrap_or("N/A"); + out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n")); + printed += 1; + } + if printed == 0 { + out.push_str("│ (no data) │\n"); + } + out.push_str("└─────────────┴─────────────┴─────────────┘\n"); + + // Bottom section (smallest lag last); avoid overlap with top + out.push_str("Bottom by lag:\n"); + out.push_str("┌─────────────┬─────────────┬─────────────┐\n"); + out.push_str("│ Partition │ Progress │ Lag │\n"); + out.push_str("├─────────────┼─────────────┼─────────────┤\n"); + if total > top_n { + let start = total.saturating_sub(bottom_n); + for (part, prog, lag_v) in rows.iter().skip(start) { + let prog_s = prog.as_deref().unwrap_or("N/A"); + out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n")); + } + } + out.push_str("└─────────────┴─────────────┴─────────────┘\n"); + + out + } + + fn write_full_partitions_file( + &self, + rows: &[(String, Option, i64)], + job_id: &str, + base_dir: &std::path::Path, + ) -> Result { + let file_path = base_dir.join(format!("routine_load_partitions_{job_id}.txt")); + ensure_dir_exists(&file_path)?; + + let mut content = String::from("Partition\tProgress\tLag\n"); + for (part, prog, lag_v) in rows { + let prog_s = prog.as_deref().unwrap_or("N/A"); + content.push_str(&format!("{part}\t{prog_s}\t{lag_v}\n")); + } + fs::write(&file_path, content) + .map_err(|e| CliError::ToolExecutionFailed(format!("Write failed: {e}")))?; + + Ok(file_path) + } }