From 2883f3c7bc6d43339b9188c52d4d751f7f53097c Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 13 Aug 2025 22:02:05 +0800 Subject: [PATCH 1/8] feat: impl routine load sop --- src/lib.rs | 377 ++++++++++++++---- src/tools/fe/mod.rs | 2 + src/tools/fe/routine_load/error_checker.rs | 92 +++++ src/tools/fe/routine_load/job_lister.rs | 266 ++++++++++++ src/tools/fe/routine_load/job_manager.rs | 205 ++++++++++ src/tools/fe/routine_load/log_parser.rs | 129 ++++++ src/tools/fe/routine_load/mod.rs | 43 ++ src/tools/fe/routine_load/models.rs | 79 ++++ .../fe/routine_load/performance_analyzer.rs | 281 +++++++++++++ src/tools/fe/routine_load/traffic_monitor.rs | 150 +++++++ src/tools/mod.rs | 4 + src/tools/mysql/mod.rs | 2 +- src/ui/menu.rs | 107 +++++ 13 files changed, 1651 insertions(+), 86 deletions(-) create mode 100644 src/tools/fe/routine_load/error_checker.rs create mode 100644 src/tools/fe/routine_load/job_lister.rs create mode 100644 src/tools/fe/routine_load/job_manager.rs create mode 100644 src/tools/fe/routine_load/log_parser.rs create mode 100644 src/tools/fe/routine_load/mod.rs create mode 100644 src/tools/fe/routine_load/models.rs create mode 100644 src/tools/fe/routine_load/performance_analyzer.rs create mode 100644 src/tools/fe/routine_load/traffic_monitor.rs diff --git a/src/lib.rs b/src/lib.rs index 0493c22..a6952c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,10 +210,166 @@ fn collect_cluster_info_background(doris_config: &crate::config_loader::DorisCon /// Generic loop for handling a service type (FE or BE). fn handle_service_loop(config: &Config, service_name: &str, tools: &[Box]) -> Result<()> { + if service_name == "FE" { + handle_fe_service_loop(config, tools) + } else { + handle_be_service_loop(config, tools) + } +} + +/// Handle FE service loop with nested menu structure +fn handle_fe_service_loop(config: &Config, tools: &[Box]) -> Result<()> { + loop { + match ui::show_fe_tools_menu()? { + ui::FeToolAction::JmapDump => { + let tool = &*tools[0]; // jmap-dump + if let Err(e) = execute_tool_enhanced(config, tool, "FE") { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Tool execution failed: {e}")), + } + } + match ui::show_post_execution_menu(tool.name())? { + ui::PostExecutionAction::Continue => continue, + ui::PostExecutionAction::BackToMain => return Ok(()), + ui::PostExecutionAction::Exit => { + ui::print_goodbye(); + std::process::exit(0); + } + } + } + ui::FeToolAction::JmapHisto => { + let tool = &*tools[1]; // jmap-histo + if let Err(e) = execute_tool_enhanced(config, tool, "FE") { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Tool execution failed: {e}")), + } + } + match ui::show_post_execution_menu(tool.name())? { + ui::PostExecutionAction::Continue => continue, + ui::PostExecutionAction::BackToMain => return Ok(()), + ui::PostExecutionAction::Exit => { + ui::print_goodbye(); + std::process::exit(0); + } + } + } + ui::FeToolAction::Jstack => { + let tool = &*tools[2]; // jstack + if let Err(e) = execute_tool_enhanced(config, tool, "FE") { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Tool execution failed: {e}")), + } + } + match ui::show_post_execution_menu(tool.name())? { + ui::PostExecutionAction::Continue => continue, + ui::PostExecutionAction::BackToMain => return Ok(()), + ui::PostExecutionAction::Exit => { + ui::print_goodbye(); + std::process::exit(0); + } + } + } + ui::FeToolAction::FeProfiler => { + let tool = &*tools[3]; // fe-profiler + if let Err(e) = execute_tool_enhanced(config, tool, "FE") { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Tool execution failed: {e}")), + } + } + match ui::show_post_execution_menu(tool.name())? { + ui::PostExecutionAction::Continue => continue, + ui::PostExecutionAction::BackToMain => return Ok(()), + ui::PostExecutionAction::Exit => { + ui::print_goodbye(); + std::process::exit(0); + } + } + } + ui::FeToolAction::RoutineLoad => { + if let Err(e) = handle_routine_load_loop(config, tools) { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Routine Load error: {e}")), + } + } + } + ui::FeToolAction::Back => return Ok(()), + } + } +} + +/// Handle Routine Load sub-menu loop +fn handle_routine_load_loop(config: &Config, tools: &[Box]) -> Result<()> { loop { - match show_tool_selection_menu(2, &format!("Select {service_name} tool"), tools)? { + match ui::show_routine_load_menu()? { + ui::RoutineLoadAction::GetJobId => execute_routine_load_tool( + config, + tools, + crate::tools::fe::routine_load::RoutineLoadToolIndex::JobLister, + )?, + ui::RoutineLoadAction::ErrorCheck => execute_routine_load_tool( + config, + tools, + crate::tools::fe::routine_load::RoutineLoadToolIndex::ErrorChecker, + )?, + ui::RoutineLoadAction::Performance => execute_routine_load_tool( + config, + tools, + crate::tools::fe::routine_load::RoutineLoadToolIndex::PerformanceAnalyzer, + )?, + ui::RoutineLoadAction::Traffic => execute_routine_load_tool( + config, + tools, + crate::tools::fe::routine_load::RoutineLoadToolIndex::TrafficMonitor, + )?, + ui::RoutineLoadAction::Back => return Ok(()), + } + } +} + +/// 执行 Routine Load 工具的辅助函数 +fn execute_routine_load_tool( + config: &Config, + tools: &[Box], + tool_index: crate::tools::fe::routine_load::RoutineLoadToolIndex, +) -> Result<()> { + let tool = tool_index.get_tool(tools).ok_or_else(|| { + error::CliError::ToolExecutionFailed(format!( + "Tool not found at index {}", + tool_index as usize + )) + })?; + + if let Err(e) = execute_tool_enhanced(config, tool, "FE") { + match e { + error::CliError::GracefulExit => { /* Do nothing, just loop again */ } + _ => print_error(&format!("Tool execution failed: {e}")), + } + // For failed Routine Load tools, continue loop to show menu again + return Ok(()); + } + + // Only show post execution menu for successful execution + match ui::show_post_execution_menu(tool.name())? { + ui::PostExecutionAction::Continue => Ok(()), + ui::PostExecutionAction::BackToMain => Err(error::CliError::GracefulExit), + ui::PostExecutionAction::Exit => { + ui::print_goodbye(); + std::process::exit(0); + } + } +} + +/// Handle BE service loop (original logic) +fn handle_be_service_loop(config: &Config, tools: &[Box]) -> Result<()> { + loop { + match show_tool_selection_menu(2, "Select BE tool", tools)? { Some(tool) => { - if let Err(e) = execute_tool_enhanced(config, tool, service_name) { + if let Err(e) = execute_tool_enhanced(config, tool, "BE") { match e { error::CliError::GracefulExit => { /* Do nothing, just loop again */ } _ => print_error(&format!("Tool execution failed: {e}")), @@ -235,7 +391,7 @@ fn handle_service_loop(config: &Config, service_name: &str, tools: &[Box Result<()> { +fn execute_tool_enhanced(config: &Config, tool: &dyn Tool, service_name: &str) -> Result<()> { let pid = if tool.requires_pid() { // Try to get PID from configuration first match config_loader::get_current_pid() { @@ -274,10 +430,10 @@ fn execute_tool_enhanced(config: &Config, tool: &dyn Tool, _service_name: &str) Err(error::CliError::GracefulExit) => Ok(()), // Simply return to the menu Err(e) => { // Handle the error and get the potentially updated config - match handle_tool_execution_error(config, &e)? { + match handle_tool_execution_error(config, &e, service_name, tool.name())? { Some(updated_config) => { // Try executing the tool again with the updated config - execute_tool_enhanced(&updated_config, tool, _service_name) + execute_tool_enhanced(&updated_config, tool, service_name) } None => Ok(()), } @@ -285,99 +441,150 @@ fn execute_tool_enhanced(config: &Config, tool: &dyn Tool, _service_name: &str) } } -fn handle_tool_execution_error(config: &Config, error: &error::CliError) -> Result> { +fn handle_tool_execution_error( + config: &Config, + error: &error::CliError, + service_name: &str, + tool_name: &str, +) -> Result> { println!(); - print_warning("Tool execution failed due to configuration issues."); - print_error(&format!("Error: {error}")); - println!(); - print_info("Would you like to:"); - - let options = vec![ - "Fix JDK path and retry".to_string(), - "Fix output directory and retry".to_string(), - "Cancel and return to menu".to_string(), - ]; - - let selection = dialoguer::Select::with_theme(&dialoguer::theme::ColorfulTheme::default()) - .with_prompt("Choose an option") - .items(&options) - .default(0) - .interact() - .map_err(|e| error::CliError::InvalidInput(format!("Error fix selection failed: {e}")))?; - - match selection { - 0 => { - // Fix JDK path - let new_path: String = - dialoguer::Input::with_theme(&dialoguer::theme::ColorfulTheme::default()) - .with_prompt("Enter the correct JDK path") - .with_initial_text(config.jdk_path.to_string_lossy().to_string()) - .interact_text() - .map_err(|e| { - error::CliError::InvalidInput(format!("JDK path input failed: {e}")) - })?; - - let new_path = std::path::PathBuf::from(new_path); - - // Validate the new path - if !new_path.exists() { - let path_display = new_path.display(); - print_error(&format!("Path does not exist: {path_display}")); - return Ok(None); + // Special handling for Routine Load tools when Job ID is missing + if service_name == "FE" + && tool_name.contains("routine_load") + && error.to_string().contains("No Job ID in memory") + { + print_warning("Routine Load tool execution failed: No Job ID selected."); + print_error(&format!("Error: {error}")); + + println!(); + print_info("Would you like to:"); + + let options = vec![ + "Go to Get Job ID".to_string(), + "Return to Routine Load menu".to_string(), + "Cancel and return to menu".to_string(), + ]; + + let selection = dialoguer::Select::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Choose an option") + .items(&options) + .default(0) + .interact() + .map_err(|e| { + error::CliError::InvalidInput(format!("Error fix selection failed: {e}")) + })?; + + match selection { + 0 => { + // Signal to go to Get Job ID - this will be handled by the calling loop + Err(error::CliError::GracefulExit) + } + 1 => { + // Signal to return to Routine Load menu + Err(error::CliError::GracefulExit) } + 2 => Ok(None), + _ => Err(error::CliError::InvalidInput( + "Invalid selection".to_string(), + )), + } + } else { + // Original generic error handling for other tools + print_warning("Tool execution failed due to configuration issues."); + print_error(&format!("Error: {error}")); + + println!(); + print_info("Would you like to:"); + + let options = vec![ + "Fix JDK path and retry".to_string(), + "Fix output directory and retry".to_string(), + "Cancel and return to menu".to_string(), + ]; + + let selection = dialoguer::Select::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Choose an option") + .items(&options) + .default(0) + .interact() + .map_err(|e| { + error::CliError::InvalidInput(format!("Error fix selection failed: {e}")) + })?; + + match selection { + 0 => { + // Fix JDK path + let new_path: String = + dialoguer::Input::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Enter the correct JDK path") + .with_initial_text(config.jdk_path.to_string_lossy().to_string()) + .interact_text() + .map_err(|e| { + error::CliError::InvalidInput(format!("JDK path input failed: {e}")) + })?; + + let new_path = std::path::PathBuf::from(new_path); + + // Validate the new path + if !new_path.exists() { + let path_display = new_path.display(); + print_error(&format!("Path does not exist: {path_display}")); + return Ok(None); + } - let jmap_path = new_path.join("bin/jmap"); - let jstack_path = new_path.join("bin/jstack"); + let jmap_path = new_path.join("bin/jmap"); + let jstack_path = new_path.join("bin/jstack"); - if !jmap_path.exists() || !jstack_path.exists() { - print_error("Required JDK tools (jmap/jstack) not found in the specified path"); - return Ok(None); - } + if !jmap_path.exists() || !jstack_path.exists() { + print_error("Required JDK tools (jmap/jstack) not found in the specified path"); + return Ok(None); + } - let fixed_config = config.clone().with_jdk_path(new_path); + let fixed_config = config.clone().with_jdk_path(new_path); - // Persist the updated configuration - if let Err(e) = persist_updated_config(&fixed_config) { - print_warning(&format!("Failed to persist configuration: {e}")); - } + // Persist the updated configuration + if let Err(e) = persist_updated_config(&fixed_config) { + print_warning(&format!("Failed to persist configuration: {e}")); + } - print_success("JDK path updated successfully!"); - Ok(Some(fixed_config)) - } - 1 => { - // Fix output directory - let new_path: String = - dialoguer::Input::with_theme(&dialoguer::theme::ColorfulTheme::default()) - .with_prompt("Enter the output directory path") - .with_initial_text(config.output_dir.to_string_lossy().to_string()) - .interact_text() - .map_err(|e| { - error::CliError::InvalidInput(format!("Output dir input failed: {e}")) - })?; - - let new_path = std::path::PathBuf::from(new_path); - - // Test creating the directory - if let Err(e) = std::fs::create_dir_all(&new_path) { - print_error(&format!("Cannot create directory: {e}")); - return Ok(None); + print_success("JDK path updated successfully!"); + Ok(Some(fixed_config)) } + 1 => { + // Fix output directory + let new_path: String = + dialoguer::Input::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Enter the output directory path") + .with_initial_text(config.output_dir.to_string_lossy().to_string()) + .interact_text() + .map_err(|e| { + error::CliError::InvalidInput(format!("Output dir input failed: {e}")) + })?; + + let new_path = std::path::PathBuf::from(new_path); + + // Test creating the directory + if let Err(e) = std::fs::create_dir_all(&new_path) { + print_error(&format!("Cannot create directory: {e}")); + return Ok(None); + } - let fixed_config = config.clone().with_output_dir(new_path); + let fixed_config = config.clone().with_output_dir(new_path); - // Persist the updated configuration - if let Err(e) = persist_updated_config(&fixed_config) { - print_warning(&format!("Failed to persist configuration: {e}")); - } + // Persist the updated configuration + if let Err(e) = persist_updated_config(&fixed_config) { + print_warning(&format!("Failed to persist configuration: {e}")); + } - print_success("Output directory updated successfully!"); - Ok(Some(fixed_config)) + print_success("Output directory updated successfully!"); + Ok(Some(fixed_config)) + } + 2 => Ok(None), + _ => Err(error::CliError::InvalidInput( + "Invalid selection".to_string(), + )), } - 2 => Ok(None), - _ => Err(error::CliError::InvalidInput( - "Invalid selection".to_string(), - )), } } diff --git a/src/tools/fe/mod.rs b/src/tools/fe/mod.rs index 37a0306..35a88ae 100644 --- a/src/tools/fe/mod.rs +++ b/src/tools/fe/mod.rs @@ -1,7 +1,9 @@ mod jmap; mod jstack; mod profiler; +pub mod routine_load; pub use jmap::{JmapDumpTool, JmapHistoTool}; pub use jstack::JstackTool; pub use profiler::FeProfilerTool; +pub use routine_load::{RoutineLoadJobLister, get_routine_load_tools}; diff --git a/src/tools/fe/routine_load/error_checker.rs b/src/tools/fe/routine_load/error_checker.rs new file mode 100644 index 0000000..ec525a0 --- /dev/null +++ b/src/tools/fe/routine_load/error_checker.rs @@ -0,0 +1,92 @@ +use super::job_manager::RoutineLoadJobManager; +use crate::config::Config; +use crate::config_loader; +use crate::error::{CliError, Result}; +use crate::tools::mysql::MySQLTool; +use crate::tools::{ExecutionResult, Tool}; +use crate::ui; + +pub struct RoutineLoadErrorChecker; + +impl Tool for RoutineLoadErrorChecker { + fn name(&self) -> &str { + "routine_load_error_checker" + } + fn description(&self) -> &str { + "Check for errors in Routine Load job" + } + fn requires_pid(&self) -> bool { + false + } + + fn execute(&self, _config: &Config, _pid: u32) -> Result { + let job_manager = RoutineLoadJobManager; + let job_id = job_manager.get_current_job_id().ok_or_else(|| { + CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) + })?; + let database = job_manager + .get_last_database() + .ok_or_else(|| CliError::InvalidInput("Unknown database for current Job ID".into()))?; + + ui::print_info(&format!( + "Checking Routine Load errors for job {}...", + job_id + )); + + let doris_config = config_loader::load_config()?; + let sql = format!("USE `{}`; SHOW ROUTINE LOAD \\G", database); + let output = MySQLTool::query_sql_with_config(&doris_config, &sql)?; + + let jobs = job_manager.parse_routine_load_output(&output)?; + let job = jobs.into_iter().find(|j| j.id == job_id).ok_or_else(|| { + CliError::InvalidInput(format!("Job {} not found in database {}", job_id, database)) + })?; + + let mut findings: Vec = Vec::new(); + + if job.state != "RUNNING" && job.state != "NEED_SCHEDULE" { + findings.push(format!("State is {}", job.state)); + } + + if let Some(stat) = &job.statistic { + if stat.error_rows > 0 { + findings.push(format!("Error rows: {}", stat.error_rows)); + } + if stat.unselected_rows > 0 { + findings.push(format!("Unselected rows: {}", stat.unselected_rows)); + } + } + + if let Some(urls) = &job.error_log_urls { + if !urls.trim().is_empty() && urls.trim() != "NULL" { + findings.push(format!("Error log URLs: {}", urls.trim())); + } + } + + println!("\nRoutine Load Error Check Report\n================================\n"); + println!("Job ID: {}", job.id); + println!("Name : {}", job.name); + println!("State : {}", job.state); + println!("DB : {}", job.db_name); + println!("Table : {}", job.table_name); + + if findings.is_empty() { + println!("\nNo obvious errors detected."); + } else { + println!("\nFindings:"); + for f in &findings { + println!(" - {}", f); + } + } + + println!("\nHints:"); + println!(" - Check FE logs if state is PAUSED/STOPPED/CANCELLED"); + println!(" - Review ErrorLogUrls if present"); + println!(" - Verify source offsets and Lag for Kafka"); + + Ok(ExecutionResult { + output_path: std::path::PathBuf::from("console_output"), + message: format!("Error check completed for Job ID: {}", job_id), + }) + } +} diff --git a/src/tools/fe/routine_load/job_lister.rs b/src/tools/fe/routine_load/job_lister.rs new file mode 100644 index 0000000..61a85c6 --- /dev/null +++ b/src/tools/fe/routine_load/job_lister.rs @@ -0,0 +1,266 @@ +use super::job_manager::RoutineLoadJobManager; +use super::models::RoutineLoadJob; +use crate::config::Config; +use crate::config_loader; +use crate::error::{CliError, Result}; +use crate::tools::mysql::MySQLTool; +use crate::tools::{ExecutionResult, Tool}; +use crate::ui; +use chrono::Utc; +use console::{Key, Term, style}; +use dialoguer::Input; + +/// Routine Load Job Lister +pub struct RoutineLoadJobLister; + +impl Tool for RoutineLoadJobLister { + fn name(&self) -> &str { + "routine_load_job_lister" + } + + fn description(&self) -> &str { + "List and select Routine Load jobs" + } + + fn requires_pid(&self) -> bool { + false + } + + fn execute(&self, _config: &Config, _pid: u32) -> Result { + let database = self.prompt_database_name()?; + let jobs = self.query_routine_load_jobs(&database)?; + self.display_jobs(&jobs)?; + let selected_job = self.prompt_job_selection(&jobs)?; + self.save_selected_job(selected_job, &database)?; + // 直接在终端输出关键信息,不再保存文件 + let report = self.generate_selection_report(selected_job)?; + println!("\n{}", report); + Ok(ExecutionResult { + output_path: std::path::PathBuf::from("console_output"), + message: format!("Job ID '{}' selected and saved in memory", selected_job.id), + }) + } +} + +impl RoutineLoadJobLister { + fn prompt_database_name(&self) -> Result { + ui::print_info("Please enter the database name:"); + + let database: String = Input::new() + .with_prompt("Database name") + .allow_empty(false) + .interact()?; + + let database = database.trim().to_string(); + + if database.is_empty() { + return Err(CliError::InvalidInput( + "Database name cannot be empty".into(), + )); + } + + Ok(database) + } + + fn query_routine_load_jobs(&self, database: &str) -> Result> { + let doris_config = config_loader::load_config()?; + + let sql = format!("USE `{}`; SHOW ALL ROUTINE LOAD \\G", database); + let output = MySQLTool::query_sql_with_config(&doris_config, &sql)?; + + let job_manager = RoutineLoadJobManager; + let jobs = job_manager.parse_routine_load_output(&output)?; + + if jobs.is_empty() { + ui::print_warning(&format!( + "No Routine Load jobs found in database '{}'", + database + )); + ui::print_info("This could mean:"); + ui::print_info(" - The database name is incorrect"); + ui::print_info(" - No Routine Load jobs have been created"); + ui::print_info(" - All jobs have been stopped or deleted"); + return Err(CliError::ToolExecutionFailed(format!( + "No Routine Load jobs found in database '{}'", + database + ))); + } + + Ok(jobs) + } + + fn display_jobs(&self, jobs: &[RoutineLoadJob]) -> Result<()> { + println!("\nRoutine Load Jobs in Database:"); + println!("{}", "=".repeat(100)); + + println!( + "{:<4} {:<20} {:<32} {:<12} {:<15}", + "No.", "Job ID", "Name", "State", "Table" + ); + println!("{}", "-".repeat(100)); + + for (index, job) in jobs.iter().enumerate() { + let number = index + 1; + let name = self.truncate_string(&job.name, 32); + let table = self.truncate_string(&job.table_name, 13); + + println!( + "{:<4} {:<20} {:<32} {:<12} {:<15}", + number, job.id, name, job.state, table + ); + } + + println!("{}", "=".repeat(100)); + + let running_count = jobs.iter().filter(|j| j.state == "RUNNING").count(); + let paused_count = jobs.iter().filter(|j| j.state == "PAUSED").count(); + let stopped_count = jobs.iter().filter(|j| j.state == "STOPPED").count(); + + println!( + "Summary: {} total jobs ({} running, {} paused, {} stopped)", + jobs.len(), + running_count, + paused_count, + stopped_count + ); + + Ok(()) + } + + fn prompt_job_selection<'a>(&self, jobs: &'a [RoutineLoadJob]) -> Result<&'a RoutineLoadJob> { + let term = Term::stdout(); + let mut selection: usize = 0; + + println!("\nUse ↑/↓ or press number, then Enter to select:"); + term.hide_cursor() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + self.render_selection_list(&term, jobs, selection)?; + + loop { + match term + .read_key() + .map_err(|e| CliError::InvalidInput(e.to_string()))? + { + Key::Enter => { + term.show_cursor() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + term.clear_last_lines(jobs.len()).ok(); + break; + } + Key::ArrowUp => { + selection = if selection == 0 { + jobs.len() - 1 + } else { + selection - 1 + }; + } + Key::ArrowDown => { + selection = if selection + 1 >= jobs.len() { + 0 + } else { + selection + 1 + }; + } + Key::Char(c) => { + if let Some(d) = c.to_digit(10) { + let idx = d.saturating_sub(1) as usize; + if idx < jobs.len() { + selection = idx; + } + } + } + _ => {} + } + + term.move_cursor_up(jobs.len()).ok(); + self.render_selection_list(&term, jobs, selection)?; + } + + Ok(&jobs[selection]) + } + + fn render_selection_list( + &self, + term: &Term, + jobs: &[RoutineLoadJob], + selection: usize, + ) -> Result<()> { + for (i, job) in jobs.iter().enumerate() { + let arrow = if i == selection { + style(">").cyan().bold().to_string() + } else { + " ".to_string() + }; + let name = self.truncate_string(&job.name, 32); + let line = format!("{arrow} {}. {} - {} ({})", i + 1, job.id, name, job.state); + term.write_line(&line) + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + } + Ok(()) + } + + fn save_selected_job(&self, job: &RoutineLoadJob, database: &str) -> Result<()> { + let job_manager = RoutineLoadJobManager; + + job_manager.save_job_id(job.id.clone(), job.name.clone(), database.to_string())?; + + job_manager.update_job_cache(vec![job.clone()])?; + + ui::print_success(&format!("Job ID '{}' saved in memory", job.id)); + + Ok(()) + } + + fn generate_selection_report(&self, job: &RoutineLoadJob) -> Result { + let mut report = String::new(); + report.push_str("Routine Load Job Selection Report\n"); + report.push_str("=================================\n\n"); + report.push_str(&format!("Selected Job ID: {}\n", job.id)); + report.push_str(&format!("Job Name: {}\n", job.name)); + report.push_str(&format!("State: {}\n", job.state)); + report.push_str(&format!("Database: {}\n", job.db_name)); + report.push_str(&format!("Table: {}\n", job.table_name)); + report.push_str(&format!("Create Time: {}\n", job.create_time)); + + if let Some(ref pause_time) = job.pause_time { + report.push_str(&format!("Pause Time: {}\n", pause_time)); + } + + if let Some(ref stat) = job.statistic { + report.push_str("\nStatistics:\n"); + report.push_str(&format!(" Loaded Rows: {}\n", stat.loaded_rows)); + report.push_str(&format!(" Error Rows: {}\n", stat.error_rows)); + report.push_str(&format!(" Received Bytes: {}\n", stat.received_bytes)); + } + + if let Some(ref progress) = job.progress { + report.push_str("\nProgress:\n"); + for (partition, offset) in progress { + report.push_str(&format!(" Partition {partition}: {offset}\n")); + } + } + + if let Some(ref lag) = job.lag { + report.push_str("\nLag:\n"); + for (partition, lag_value) in lag { + report.push_str(&format!(" Partition {partition}: {lag_value}\n")); + } + } + + report.push_str(&format!( + "\nSelection Time: {}\n", + Utc::now().format("%Y-%m-%d %H:%M:%S") + )); + + Ok(report) + } + + fn truncate_string(&self, s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + format!("{}...", &s[..max_len - 3]) + } + } +} diff --git a/src/tools/fe/routine_load/job_manager.rs b/src/tools/fe/routine_load/job_manager.rs new file mode 100644 index 0000000..c9876dc --- /dev/null +++ b/src/tools/fe/routine_load/job_manager.rs @@ -0,0 +1,205 @@ +use super::models::{JobStatistic, RoutineLoadJob, RoutineLoadState}; +use crate::error::{CliError, Result}; +use crate::tools::mysql::parser::{parse_key_value_pairs, split_into_blocks}; +use once_cell::sync::Lazy; +use serde_json; +use std::collections::HashMap; +use std::sync::Mutex; + +/// Global Routine Load state manager +static ROUTINE_LOAD_STATE: Lazy> = + Lazy::new(|| Mutex::new(RoutineLoadState::new())); + +/// Routine Load Job ID manager +pub struct RoutineLoadJobManager; + +impl RoutineLoadJobManager { + /// Helper function: safely acquire state lock and execute operation + fn with_state(&self, f: F) -> Result + where + F: FnOnce(&mut RoutineLoadState) -> Result, + { + let mut state = ROUTINE_LOAD_STATE + .lock() + .map_err(|_| CliError::ToolExecutionFailed("Failed to acquire state lock".into()))?; + f(&mut state) + } + + /// Helper function: read-only access to state + fn with_state_readonly(&self, f: F) -> Result + where + F: FnOnce(&RoutineLoadState) -> Result, + { + let state = ROUTINE_LOAD_STATE + .lock() + .map_err(|_| CliError::ToolExecutionFailed("Failed to acquire state lock".into()))?; + f(&state) + } + + /// Save Job ID to memory + pub fn save_job_id(&self, job_id: String, job_name: String, database: String) -> Result<()> { + self.with_state(|state| { + state.current_job_id = Some(job_id.clone()); + state.current_job_name = Some(job_name); + state.last_database = Some(database); + Ok(()) + }) + } + + /// Get current Job ID from memory + pub fn get_current_job_id(&self) -> Option { + self.with_state_readonly(|state| Ok(state.current_job_id.clone())) + .unwrap_or(None) + } + + pub fn get_current_job_name(&self) -> Option { + self.with_state_readonly(|state| Ok(state.current_job_name.clone())) + .unwrap_or(None) + } + + pub fn get_last_database(&self) -> Option { + self.with_state_readonly(|state| Ok(state.last_database.clone())) + .unwrap_or(None) + } + + pub fn validate_job_id(&self, job_id: &str) -> Result { + if !job_id.chars().all(|c| c.is_ascii_digit()) { + return Ok(false); + } + + self.with_state_readonly(|state| Ok(state.job_cache.contains_key(job_id))) + } + + pub fn clear_state(&self) -> Result<()> { + self.with_state(|state| { + state.clear(); + Ok(()) + }) + } + + pub fn update_job_cache(&self, jobs: Vec) -> Result<()> { + self.with_state(|state| { + state.job_cache.clear(); + for job in jobs { + state.job_cache.insert(job.id.clone(), job); + } + Ok(()) + }) + } + + /// Get job cache + pub fn get_job_cache(&self) -> Result> { + self.with_state_readonly(|state| Ok(state.job_cache.clone())) + } + + /// Parse Routine Load output + pub fn parse_routine_load_output(&self, output: &str) -> Result> { + let blocks = split_into_blocks(output); + let mut jobs = Vec::new(); + + for block in blocks { + if let Some(job) = self.parse_job_block(&block)? { + jobs.push(job); + } + } + + Ok(jobs) + } + + /// Parse single job block + fn parse_job_block(&self, block: &str) -> Result> { + let fields = parse_key_value_pairs(block); + + if !fields.contains_key("Id") || !fields.contains_key("Name") { + return Ok(None); + } + + let statistic = if let Some(stat_str) = fields.get("Statistic") { + if stat_str != "NULL" { + Some(self.parse_statistic(stat_str)?) + } else { + None + } + } else { + None + }; + + let progress = if let Some(prog_str) = fields.get("Progress") { + if prog_str != "NULL" { + Some(self.parse_progress(prog_str)?) + } else { + None + } + } else { + None + }; + + let lag = if let Some(lag_str) = fields.get("Lag") { + if lag_str != "NULL" { + Some(self.parse_lag(lag_str)?) + } else { + None + } + } else { + None + }; + + let job = RoutineLoadJob { + id: fields.get("Id").unwrap().clone(), + name: fields.get("Name").unwrap().clone(), + state: fields + .get("State") + .unwrap_or(&"UNKNOWN".to_string()) + .clone(), + db_name: fields.get("DbName").unwrap_or(&"".to_string()).clone(), + table_name: fields.get("TableName").unwrap_or(&"".to_string()).clone(), + create_time: fields.get("CreateTime").unwrap_or(&"".to_string()).clone(), + pause_time: fields.get("PauseTime").filter(|&s| s != "NULL").cloned(), + end_time: fields.get("EndTime").filter(|&s| s != "NULL").cloned(), + current_task_num: fields.get("CurrentTaskNum").cloned(), + data_source_type: fields.get("DataSourceType").cloned(), + statistic, + progress, + lag, + error_log_urls: fields.get("ErrorLogUrls").cloned(), + other_msg: fields.get("OtherMsg").cloned(), + }; + + Ok(Some(job)) + } + + /// Parse Statistic JSON field + fn parse_statistic(&self, stat_str: &str) -> Result { + let stat: serde_json::Value = serde_json::from_str(stat_str).map_err(|e| { + CliError::ToolExecutionFailed(format!("Failed to parse statistic: {}", e)) + })?; + + Ok(JobStatistic { + received_bytes: stat["receivedBytes"].as_u64().unwrap_or(0), + loaded_rows: stat["loadedRows"].as_u64().unwrap_or(0), + error_rows: stat["errorRows"].as_u64().unwrap_or(0), + committed_task_num: stat["committedTaskNum"].as_u64().unwrap_or(0), + load_rows_rate: stat["loadRowsRate"].as_u64().unwrap_or(0), + aborted_task_num: stat["abortedTaskNum"].as_u64().unwrap_or(0), + total_rows: stat["totalRows"].as_u64().unwrap_or(0), + unselected_rows: stat["unselectedRows"].as_u64().unwrap_or(0), + received_bytes_rate: stat["receivedBytesRate"].as_u64().unwrap_or(0), + task_execute_time_ms: stat["taskExecuteTimeMs"].as_u64().unwrap_or(0), + }) + } + + /// Parse Progress JSON field + fn parse_progress(&self, prog_str: &str) -> Result> { + let prog: HashMap = serde_json::from_str(prog_str).map_err(|e| { + CliError::ToolExecutionFailed(format!("Failed to parse progress: {}", e)) + })?; + Ok(prog) + } + + /// Parse Lag JSON field + fn parse_lag(&self, lag_str: &str) -> Result> { + let lag: HashMap = serde_json::from_str(lag_str) + .map_err(|e| CliError::ToolExecutionFailed(format!("Failed to parse lag: {e}")))?; + Ok(lag) + } +} diff --git a/src/tools/fe/routine_load/log_parser.rs b/src/tools/fe/routine_load/log_parser.rs new file mode 100644 index 0000000..b59f4cf --- /dev/null +++ b/src/tools/fe/routine_load/log_parser.rs @@ -0,0 +1,129 @@ +use crate::error::{CliError, Result}; +use chrono::NaiveDateTime; +use regex::Regex; +use std::fs; +use std::io::{BufRead, BufReader}; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone, Default)] +pub struct LogCommitEntry { + pub timestamp: NaiveDateTime, + pub loaded_rows: Option, + pub received_bytes: Option, + pub task_execution_ms: Option, + pub transaction_id: Option, +} + +pub struct FeLogParser { + re_ts: Regex, + re_fields: Regex, + re_txn: Regex, +} + +impl FeLogParser { + pub fn new() -> Self { + Self { + re_ts: Regex::new(r"^(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d{3}").unwrap(), + re_fields: Regex::new(r"(loadedRows|receivedBytes|taskExecutionTimeMs)=([0-9]+)") + .unwrap(), + re_txn: Regex::new(r"transactionId:([0-9A-Za-z_-]+)").unwrap(), + } + } + + pub fn parse_line(&self, line: &str, job_id: &str) -> Option { + if !line.contains(job_id) { + return None; + } + // Only parse lines containing key fragments + if !(line.contains("commitTxn") || line.contains("RLTaskTxnCommitAttachment")) { + return None; + } + + let ts = self.re_ts.captures(line)?.name("ts")?.as_str(); + let timestamp = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d %H:%M:%S").ok()?; + + let mut entry = LogCommitEntry { + timestamp, + ..Default::default() + }; + + for cap in self.re_fields.captures_iter(line) { + let k = &cap[1]; + let v: u64 = cap[2].parse().ok()?; + match k { + "loadedRows" => entry.loaded_rows = Some(v), + "receivedBytes" => entry.received_bytes = Some(v), + "taskExecutionTimeMs" => entry.task_execution_ms = Some(v), + _ => {} + } + } + + if let Some(c) = self.re_txn.captures(line) { + entry.transaction_id = Some(c[1].to_string()); + } + + Some(entry) + } +} + +pub fn collect_fe_logs(dir: &Path) -> Result> { + if !dir.exists() { + return Err(CliError::ConfigError(format!( + "Log directory does not exist: {}", + dir.display() + ))); + } + + if !dir.is_dir() { + return Err(CliError::ConfigError(format!( + "Path is not a directory: {}", + dir.display() + ))); + } + + let mut files: Vec = fs::read_dir(dir) + .map_err(CliError::IoError)? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| { + p.file_name() + .and_then(|n| n.to_str()) + .map(|s| s.starts_with("fe.log")) + .unwrap_or(false) + }) + .collect(); + + if files.is_empty() { + return Err(CliError::ConfigError(format!( + "No fe.log files found in directory: {}", + dir.display() + ))); + } + + // Sort by modification time (newest first) + files.sort_by_key(|p| fs::metadata(p).and_then(|m| m.modified()).ok()); + files.reverse(); + + Ok(files) +} + +pub fn scan_file( + parser: &FeLogParser, + path: &Path, + job_id: &str, + out: &mut Vec, +) -> Result<()> { + let f = fs::File::open(path).map_err(CliError::IoError)?; + + let reader = BufReader::new(f); + + for line_result in reader.lines() { + let line = line_result.map_err(CliError::IoError)?; + + if let Some(entry) = parser.parse_line(&line, job_id) { + out.push(entry); + } + } + + Ok(()) +} diff --git a/src/tools/fe/routine_load/mod.rs b/src/tools/fe/routine_load/mod.rs new file mode 100644 index 0000000..5675fc7 --- /dev/null +++ b/src/tools/fe/routine_load/mod.rs @@ -0,0 +1,43 @@ +mod error_checker; +mod job_lister; +mod job_manager; +mod log_parser; +mod models; +mod performance_analyzer; +mod traffic_monitor; + +pub use error_checker::RoutineLoadErrorChecker; +pub use job_lister::RoutineLoadJobLister; +pub use job_manager::RoutineLoadJobManager; +pub use models::*; +pub use performance_analyzer::RoutineLoadPerformanceAnalyzer; +pub use traffic_monitor::RoutineLoadTrafficMonitor; + +/// Routine Load tool index enum to avoid hardcoded indices +#[derive(Debug, Clone, Copy)] +pub enum RoutineLoadToolIndex { + JobLister = 4, + ErrorChecker = 5, + PerformanceAnalyzer = 6, + TrafficMonitor = 7, +} + +impl RoutineLoadToolIndex { + /// Get tool instance + pub fn get_tool( + self, + tools: &[Box], + ) -> Option<&dyn crate::tools::Tool> { + tools.get(self as usize).map(|t| &**t) + } +} + +// Re-export all tools for use in ToolRegistry +pub fn get_routine_load_tools() -> Vec> { + vec![ + Box::new(RoutineLoadJobLister), + Box::new(RoutineLoadErrorChecker), + Box::new(RoutineLoadPerformanceAnalyzer), + Box::new(RoutineLoadTrafficMonitor), + ] +} diff --git a/src/tools/fe/routine_load/models.rs b/src/tools/fe/routine_load/models.rs new file mode 100644 index 0000000..e24c38d --- /dev/null +++ b/src/tools/fe/routine_load/models.rs @@ -0,0 +1,79 @@ +use std::collections::HashMap; + +/// Routine Load job information +#[derive(Debug, Clone)] +pub struct RoutineLoadJob { + pub id: String, + pub name: String, + pub state: String, + pub db_name: String, + pub table_name: String, + pub create_time: String, + pub pause_time: Option, + pub end_time: Option, + pub current_task_num: Option, + pub data_source_type: Option, + pub statistic: Option, + pub progress: Option>, + pub lag: Option>, + pub error_log_urls: Option, + pub other_msg: Option, +} + +/// Job statistics information +#[derive(Debug, Clone)] +pub struct JobStatistic { + pub received_bytes: u64, + pub loaded_rows: u64, + pub error_rows: u64, + pub committed_task_num: u64, + pub load_rows_rate: u64, + pub aborted_task_num: u64, + pub total_rows: u64, + pub unselected_rows: u64, + pub received_bytes_rate: u64, + pub task_execute_time_ms: u64, +} + +/// In-memory state management +#[derive(Debug, Clone)] +pub struct RoutineLoadState { + pub current_job_id: Option, + pub current_job_name: Option, + pub last_database: Option, + pub job_cache: HashMap, +} + +impl RoutineLoadState { + pub fn new() -> Self { + Self { + current_job_id: None, + current_job_name: None, + last_database: None, + job_cache: HashMap::new(), + } + } + + pub fn clear(&mut self) { + self.current_job_id = None; + self.current_job_name = None; + self.last_database = None; + self.job_cache.clear(); + } +} + +impl Default for RoutineLoadState { + fn default() -> Self { + Self::new() + } +} + +/// Log commit entry for parsed log data +#[derive(Debug, Clone, Default)] +pub struct LogCommitEntry { + pub timestamp: chrono::NaiveDateTime, + pub loaded_rows: Option, + pub received_bytes: Option, + pub task_execution_ms: Option, + pub transaction_id: Option, +} diff --git a/src/tools/fe/routine_load/performance_analyzer.rs b/src/tools/fe/routine_load/performance_analyzer.rs new file mode 100644 index 0000000..1dc7471 --- /dev/null +++ b/src/tools/fe/routine_load/performance_analyzer.rs @@ -0,0 +1,281 @@ +use super::job_manager::RoutineLoadJobManager; +use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file}; +use crate::config::Config; +use crate::error::{CliError, Result}; +use crate::tools::{ExecutionResult, Tool}; +use crate::ui; +use chrono::Duration; +use std::collections::HashSet; + +pub struct RoutineLoadPerformanceAnalyzer; + +impl Tool for RoutineLoadPerformanceAnalyzer { + fn name(&self) -> &str { + "routine_load_performance_analyzer" + } + fn description(&self) -> &str { + "Analyze per-commit rows/bytes/time from FE logs" + } + fn requires_pid(&self) -> bool { + false + } + + fn execute(&self, _config: &Config, _pid: u32) -> Result { + let job_manager = RoutineLoadJobManager; + let job_id = job_manager.get_current_job_id().ok_or_else(|| { + CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) + })?; + + let doris = crate::config_loader::load_config()?; + let log_dir = doris.log_dir; + + let minutes = self.prompt_time_window()?; + + ui::print_info(&format!( + "Analyzing FE logs in {} for job {} (last {} min)...", + log_dir.display(), + job_id, + minutes + )); + + let entries = self.collect_and_parse_logs(&log_dir, &job_id)?; + + let filtered_entries = self.filter_entries_by_time_window(entries, minutes)?; + + let deduplicated_entries = self.deduplicate_entries(filtered_entries)?; + + self.display_performance_results(&deduplicated_entries)?; + + Ok(ExecutionResult { + output_path: std::path::PathBuf::from("console_output"), + message: "Performance analysis completed".into(), + }) + } +} + +impl RoutineLoadPerformanceAnalyzer { + fn prompt_time_window(&self) -> Result { + let minutes_str: String = dialoguer::Input::new() + .with_prompt("Analyze recent minutes") + .default("30".to_string()) + .interact_text() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let minutes: i64 = minutes_str.trim().parse().unwrap_or(30).max(1); + Ok(minutes) + } + + fn collect_and_parse_logs( + &self, + log_dir: &std::path::Path, + job_id: &str, + ) -> Result> { + let files = collect_fe_logs(log_dir)?; + let parser = FeLogParser::new(); + let mut entries: Vec = Vec::new(); + + for path in files { + scan_file(&parser, &path, job_id, &mut entries)?; + } + + if entries.is_empty() { + return Err(CliError::ToolExecutionFailed( + "No matching commit entries found in FE logs".into(), + )); + } + + Ok(entries) + } + + fn filter_entries_by_time_window( + &self, + mut entries: Vec, + minutes: i64, + ) -> Result> { + // Use latest timestamp from logs as reference to avoid timezone/clock inconsistencies + let latest_ts = entries.iter().map(|e| e.timestamp).max().unwrap(); + let window_start = latest_ts - Duration::minutes(minutes); + entries.retain(|e| e.timestamp >= window_start); + + if entries.is_empty() { + return Err(CliError::ToolExecutionFailed( + "No matching commit entries found in FE logs".into(), + )); + } + + Ok(entries) + } + + /// Deduplicate entries + fn deduplicate_entries(&self, mut entries: Vec) -> Result> { + let mut seen: HashSet = HashSet::new(); + entries.retain(|e| { + let key = if let Some(txn) = &e.transaction_id { + format!("txn:{}", txn) + } else { + format!( + "ts:{}|r:{}|b:{}|ms:{}", + e.timestamp, + e.loaded_rows.unwrap_or(0), + e.received_bytes.unwrap_or(0), + e.task_execution_ms.unwrap_or(0) + ) + }; + seen.insert(key) + }); + + if entries.is_empty() { + return Err(CliError::ToolExecutionFailed( + "No matching commit entries found in FE logs".into(), + )); + } + + Ok(entries) + } + + /// Display performance analysis results + fn display_performance_results(&self, entries: &[LogCommitEntry]) -> Result<()> { + println!("\nPer-commit stats (time | ms | loadedRows | receivedBytes | txnId)"); + println!("{}", "-".repeat(90)); + + let mut stats = PerformanceStats::new(); + + // Sort by time in ascending order + let mut sorted_entries = entries.to_vec(); + sorted_entries.sort_by_key(|e| e.timestamp); + + for entry in &sorted_entries { + self.display_single_entry(entry); + stats.update(entry); + } + + println!("{}", "-".repeat(90)); + stats.display_summary(); + + Ok(()) + } + + fn display_single_entry(&self, entry: &LogCommitEntry) { + let time_str = entry.timestamp.format("%H:%M:%S").to_string(); + let ms = entry.task_execution_ms.unwrap_or(0); + let rows = entry.loaded_rows.unwrap_or(0); + let bytes = entry.received_bytes.unwrap_or(0); + + println!( + "{} | {:>6} | {:>13} | {:>16} | {}", + time_str, + ms, + fmt_int(rows), + fmt_int(bytes), + entry.transaction_id.clone().unwrap_or_else(|| "-".into()) + ); + } +} + +/// Performance statistics information +struct PerformanceStats { + count: u64, + sum_ms: u128, + min_ms: u64, + max_ms: u64, + sum_rows: u128, + min_rows: u64, + max_rows: u64, + sum_bytes: u128, + min_bytes: u64, + max_bytes: u64, +} + +impl PerformanceStats { + fn new() -> Self { + Self { + count: 0, + sum_ms: 0, + min_ms: u64::MAX, + max_ms: 0, + sum_rows: 0, + min_rows: u64::MAX, + max_rows: 0, + sum_bytes: 0, + min_bytes: u64::MAX, + max_bytes: 0, + } + } + + fn update(&mut self, entry: &LogCommitEntry) { + let ms = entry.task_execution_ms.unwrap_or(0); + let rows = entry.loaded_rows.unwrap_or(0); + let bytes = entry.received_bytes.unwrap_or(0); + + self.count += 1; + self.sum_ms += ms as u128; + self.min_ms = self.min_ms.min(ms); + self.max_ms = self.max_ms.max(ms); + self.sum_rows += rows as u128; + self.min_rows = self.min_rows.min(rows); + self.max_rows = self.max_rows.max(rows); + self.sum_bytes += bytes as u128; + self.min_bytes = self.min_bytes.min(bytes); + self.max_bytes = self.max_bytes.max(bytes); + } + + fn display_summary(&self) { + if self.count > 0 { + println!( + "count={} avg_ms={} min_ms={} max_ms={}", + self.count, + self.sum_ms / self.count as u128, + if self.min_ms == u64::MAX { + 0 + } else { + self.min_ms + }, + self.max_ms + ); + println!( + " avg_rows={} min_rows={} max_rows={}", + fmt_int_u128(self.sum_rows / self.count as u128), + fmt_int(if self.min_rows == u64::MAX { + 0 + } else { + self.min_rows + }), + fmt_int(self.max_rows) + ); + println!( + " avg_bytes={} min_bytes={} max_bytes={}", + fmt_int_u128(self.sum_bytes / self.count as u128), + fmt_int(if self.min_bytes == u64::MAX { + 0 + } else { + self.min_bytes + }), + fmt_int(self.max_bytes) + ); + } + } +} + +fn fmt_int(v: u64) -> String { + let s = v.to_string(); + group_digits(&s) +} + +fn fmt_int_u128(v: u128) -> String { + let s = v.to_string(); + group_digits(&s) +} + +fn group_digits(s: &str) -> String { + let bytes = s.as_bytes(); + let mut out = String::with_capacity(s.len() + s.len() / 3); + let mut count = 0; + for i in (0..bytes.len()).rev() { + out.push(bytes[i] as char); + count += 1; + if count % 3 == 0 && i != 0 { + out.push(','); + } + } + out.chars().rev().collect() +} diff --git a/src/tools/fe/routine_load/traffic_monitor.rs b/src/tools/fe/routine_load/traffic_monitor.rs new file mode 100644 index 0000000..37f1e34 --- /dev/null +++ b/src/tools/fe/routine_load/traffic_monitor.rs @@ -0,0 +1,150 @@ +use super::job_manager::RoutineLoadJobManager; +use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file}; +use crate::config::Config; +use crate::error::{CliError, Result}; +use crate::tools::{ExecutionResult, Tool}; +use crate::ui; +use chrono::Duration; +use dialoguer::Input; +use std::collections::BTreeMap; + +pub struct RoutineLoadTrafficMonitor; + +impl Tool for RoutineLoadTrafficMonitor { + fn name(&self) -> &str { + "routine_load_traffic_monitor" + } + fn description(&self) -> &str { + "Aggregate per-minute loadedRows from FE logs" + } + fn requires_pid(&self) -> bool { + false + } + + fn execute(&self, _config: &Config, _pid: u32) -> Result { + let job_id = self.get_job_id()?; + let log_dir = self.get_log_directory()?; + + let minutes = self.prompt_time_window()?; + + ui::print_info(&format!( + "Analyzing traffic in {} for job {} (last {} min)...", + log_dir.display(), + job_id, + minutes + )); + + let entries = self.collect_and_parse_logs(&log_dir, &job_id)?; + + let filtered_entries = self.filter_entries_by_time_window(entries, minutes)?; + + let per_minute_data = self.aggregate_per_minute(filtered_entries); + + self.display_traffic_results(&per_minute_data)?; + + Ok(ExecutionResult { + output_path: std::path::PathBuf::from("console_output"), + message: "Traffic monitor completed".into(), + }) + } +} + +impl RoutineLoadTrafficMonitor { + fn get_job_id(&self) -> Result { + let job_manager = RoutineLoadJobManager; + job_manager.get_current_job_id().ok_or_else(|| { + CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) + }) + } + + fn get_log_directory(&self) -> Result { + let doris = crate::config_loader::load_config()?; + Ok(doris.log_dir) + } + + fn prompt_time_window(&self) -> Result { + let minutes_str: String = Input::new() + .with_prompt("Analyze recent minutes") + .default("60".to_string()) + .interact_text() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let minutes: i64 = minutes_str.trim().parse().unwrap_or(60).max(1); + Ok(minutes) + } + + fn collect_and_parse_logs( + &self, + log_dir: &std::path::Path, + job_id: &str, + ) -> Result> { + let files = collect_fe_logs(log_dir)?; + let parser = FeLogParser::new(); + let mut entries: Vec = Vec::new(); + + for path in files { + scan_file(&parser, &path, job_id, &mut entries)?; + } + + if entries.is_empty() { + return Err(CliError::ToolExecutionFailed( + "No matching entries found".into(), + )); + } + + Ok(entries) + } + + fn filter_entries_by_time_window( + &self, + mut entries: Vec, + minutes: i64, + ) -> Result> { + let latest_ts = entries.iter().map(|e| e.timestamp).max().unwrap(); + let window_start = latest_ts - Duration::minutes(minutes); + entries.retain(|e| e.timestamp >= window_start); + + if entries.is_empty() { + return Err(CliError::ToolExecutionFailed( + "No entries in selected window".into(), + )); + } + + Ok(entries) + } + + fn aggregate_per_minute(&self, entries: Vec) -> BTreeMap { + let mut per_minute: BTreeMap = BTreeMap::new(); + + for entry in entries { + let rows = entry.loaded_rows.unwrap_or(0) as u128; + let key = entry.timestamp.format("%H:%M").to_string(); + *per_minute.entry(key).or_insert(0) += rows; + } + + per_minute + } + + fn display_traffic_results(&self, per_minute_data: &BTreeMap) -> Result<()> { + println!("\nPer-minute loadedRows (ascending time)"); + println!("{}", "-".repeat(40)); + + for (minute, rows) in per_minute_data.iter() { + println!("{minute} loadedRows={rows}"); + } + + let total_rows: u128 = per_minute_data.values().sum(); + let avg_rows = if !per_minute_data.is_empty() { + total_rows / per_minute_data.len() as u128 + } else { + 0 + }; + + println!("{}", "-".repeat(40)); + println!("Total minutes: {}", per_minute_data.len()); + println!("Total loadedRows: {total_rows}"); + println!("Average per minute: {avg_rows}"); + + Ok(()) + } +} diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 458772a..80f0280 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -50,6 +50,7 @@ impl ToolRegistry { BeVarsTool, MemzGlobalTool, MemzTool, PipelineTasksTool, PstackTool, }; use crate::tools::be::{JmapDumpTool as BeJmapDumpTool, JmapHistoTool as BeJmapHistoTool}; + use crate::tools::fe::routine_load::get_routine_load_tools; use crate::tools::fe::{FeProfilerTool, JmapDumpTool, JmapHistoTool, JstackTool}; let mut registry = Self { @@ -63,6 +64,9 @@ impl ToolRegistry { registry.fe_tools.push(Box::new(JstackTool)); registry.fe_tools.push(Box::new(FeProfilerTool)); + // Register Routine Load tools + registry.fe_tools.extend(get_routine_load_tools()); + // Register BE tools registry.be_tools.push(Box::new(PstackTool)); registry.be_tools.push(Box::new(BeVarsTool)); diff --git a/src/tools/mysql/mod.rs b/src/tools/mysql/mod.rs index 6c74c7d..5ad6f13 100644 --- a/src/tools/mysql/mod.rs +++ b/src/tools/mysql/mod.rs @@ -1,7 +1,7 @@ mod client; mod cluster; mod credentials; -mod parser; +pub mod parser; pub use client::MySQLTool; pub use cluster::{Backend, ClusterInfo, Frontend}; diff --git a/src/ui/menu.rs b/src/ui/menu.rs index 1bfb352..7f35fed 100644 --- a/src/ui/menu.rs +++ b/src/ui/menu.rs @@ -112,6 +112,25 @@ pub enum MainMenuAction { Exit, } +#[derive(Debug, Clone, Copy)] +pub enum FeToolAction { + JmapDump, + JmapHisto, + Jstack, + FeProfiler, + RoutineLoad, + Back, +} + +#[derive(Debug, Clone, Copy)] +pub enum RoutineLoadAction { + GetJobId, + ErrorCheck, + Performance, + Traffic, + Back, +} + pub fn show_main_menu() -> Result { let menu = Menu { step: 1, @@ -140,6 +159,94 @@ pub fn show_main_menu() -> Result { menu.show() } +pub fn show_fe_tools_menu() -> Result { + let menu = Menu { + step: 2, + title: "Select FE tool".to_string(), + options: vec![ + MenuOption { + action: FeToolAction::JmapDump, + key: "[1]".to_string(), + name: "jmap-dump".to_string(), + description: "Generate heap dump (.hprof)".to_string(), + }, + MenuOption { + action: FeToolAction::JmapHisto, + key: "[2]".to_string(), + name: "jmap-histo".to_string(), + description: "Generate histogram (.log)".to_string(), + }, + MenuOption { + action: FeToolAction::Jstack, + key: "[3]".to_string(), + name: "jstack".to_string(), + description: "Generate thread stack trace (.log)".to_string(), + }, + MenuOption { + action: FeToolAction::FeProfiler, + key: "[4]".to_string(), + name: "fe-profiler".to_string(), + description: + "Generate flame graph for FE performance analysis using async-profiler" + .to_string(), + }, + MenuOption { + action: FeToolAction::RoutineLoad, + key: "[5]".to_string(), + name: "Routine Load".to_string(), + description: "Routine Load management tools".to_string(), + }, + MenuOption { + action: FeToolAction::Back, + key: "[6]".to_string(), + name: "← Back".to_string(), + description: "Return to main menu".to_string(), + }, + ], + }; + menu.show() +} + +pub fn show_routine_load_menu() -> Result { + let menu = Menu { + step: 3, + title: "Routine Load Tools".to_string(), + options: vec![ + MenuOption { + action: RoutineLoadAction::GetJobId, + key: "[1]".to_string(), + name: "Get Job ID".to_string(), + description: "List and select Routine Load jobs".to_string(), + }, + MenuOption { + action: RoutineLoadAction::ErrorCheck, + key: "[2]".to_string(), + name: "Error Check".to_string(), + description: "Check for errors in selected Routine Load job".to_string(), + }, + MenuOption { + action: RoutineLoadAction::Performance, + key: "[3]".to_string(), + name: "Performance Analysis".to_string(), + description: "Analyze per-commit rows/bytes/time from FE logs".to_string(), + }, + MenuOption { + action: RoutineLoadAction::Traffic, + key: "[4]".to_string(), + name: "Traffic Monitor".to_string(), + description: "Aggregate per-minute loadedRows from FE logs".to_string(), + }, + MenuOption { + action: RoutineLoadAction::Back, + key: "[5]".to_string(), + name: "← Back to FE Tools".to_string(), + description: "Return to FE tools menu".to_string(), + }, + ], + }; + menu.show() +} + pub fn show_tool_selection_menu<'a>( step: u8, title: &str, From 0fa41888995c0ff174179885e11eecd4c7a50a20 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Thu, 21 Aug 2025 17:29:57 +0800 Subject: [PATCH 2/8] refactor: remove RoutineLoadErrorChecker and streamline error handling in routine load tools - Removed the RoutineLoadErrorChecker tool to simplify the routine load process. - Updated the job lister and performance analyzer to handle error checks directly. - Enhanced user prompts and error messages for better clarity and interaction. - Refactored MySQL query execution to support both standard and raw output modes. - Introduced new utility functions for input handling and formatting. --- src/lib.rs | 12 +- src/tools/fe/routine_load/error_checker.rs | 92 ------- src/tools/fe/routine_load/job_lister.rs | 218 +++++++---------- src/tools/fe/routine_load/job_manager.rs | 12 +- src/tools/fe/routine_load/log_parser.rs | 8 +- src/tools/fe/routine_load/mod.rs | 13 +- src/tools/fe/routine_load/models.rs | 2 +- .../fe/routine_load/performance_analyzer.rs | 226 +++++++++++------- src/tools/fe/routine_load/traffic_monitor.rs | 45 ++-- src/tools/mysql/client.rs | 101 +++++++- src/tools/mysql/mod.rs | 3 + src/ui/dialogs.rs | 56 +++++ src/ui/menu.rs | 17 +- src/ui/mod.rs | 6 + src/ui/selector.rs | 172 +++++++++++++ src/ui/utils.rs | 65 +++++ 16 files changed, 652 insertions(+), 396 deletions(-) delete mode 100644 src/tools/fe/routine_load/error_checker.rs create mode 100644 src/ui/dialogs.rs create mode 100644 src/ui/selector.rs create mode 100644 src/ui/utils.rs diff --git a/src/lib.rs b/src/lib.rs index a6952c3..487d81b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -311,11 +311,7 @@ fn handle_routine_load_loop(config: &Config, tools: &[Box]) -> Result< tools, crate::tools::fe::routine_load::RoutineLoadToolIndex::JobLister, )?, - ui::RoutineLoadAction::ErrorCheck => execute_routine_load_tool( - config, - tools, - crate::tools::fe::routine_load::RoutineLoadToolIndex::ErrorChecker, - )?, + ui::RoutineLoadAction::Performance => execute_routine_load_tool( config, tools, @@ -447,7 +443,7 @@ fn handle_tool_execution_error( service_name: &str, tool_name: &str, ) -> Result> { - println!(); + print_info(""); // Special handling for Routine Load tools when Job ID is missing if service_name == "FE" @@ -457,7 +453,7 @@ fn handle_tool_execution_error( print_warning("Routine Load tool execution failed: No Job ID selected."); print_error(&format!("Error: {error}")); - println!(); + print_info(""); print_info("Would you like to:"); let options = vec![ @@ -494,7 +490,7 @@ fn handle_tool_execution_error( print_warning("Tool execution failed due to configuration issues."); print_error(&format!("Error: {error}")); - println!(); + print_info(""); print_info("Would you like to:"); let options = vec![ diff --git a/src/tools/fe/routine_load/error_checker.rs b/src/tools/fe/routine_load/error_checker.rs deleted file mode 100644 index ec525a0..0000000 --- a/src/tools/fe/routine_load/error_checker.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::job_manager::RoutineLoadJobManager; -use crate::config::Config; -use crate::config_loader; -use crate::error::{CliError, Result}; -use crate::tools::mysql::MySQLTool; -use crate::tools::{ExecutionResult, Tool}; -use crate::ui; - -pub struct RoutineLoadErrorChecker; - -impl Tool for RoutineLoadErrorChecker { - fn name(&self) -> &str { - "routine_load_error_checker" - } - fn description(&self) -> &str { - "Check for errors in Routine Load job" - } - fn requires_pid(&self) -> bool { - false - } - - fn execute(&self, _config: &Config, _pid: u32) -> Result { - let job_manager = RoutineLoadJobManager; - let job_id = job_manager.get_current_job_id().ok_or_else(|| { - CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) - })?; - let database = job_manager - .get_last_database() - .ok_or_else(|| CliError::InvalidInput("Unknown database for current Job ID".into()))?; - - ui::print_info(&format!( - "Checking Routine Load errors for job {}...", - job_id - )); - - let doris_config = config_loader::load_config()?; - let sql = format!("USE `{}`; SHOW ROUTINE LOAD \\G", database); - let output = MySQLTool::query_sql_with_config(&doris_config, &sql)?; - - let jobs = job_manager.parse_routine_load_output(&output)?; - let job = jobs.into_iter().find(|j| j.id == job_id).ok_or_else(|| { - CliError::InvalidInput(format!("Job {} not found in database {}", job_id, database)) - })?; - - let mut findings: Vec = Vec::new(); - - if job.state != "RUNNING" && job.state != "NEED_SCHEDULE" { - findings.push(format!("State is {}", job.state)); - } - - if let Some(stat) = &job.statistic { - if stat.error_rows > 0 { - findings.push(format!("Error rows: {}", stat.error_rows)); - } - if stat.unselected_rows > 0 { - findings.push(format!("Unselected rows: {}", stat.unselected_rows)); - } - } - - if let Some(urls) = &job.error_log_urls { - if !urls.trim().is_empty() && urls.trim() != "NULL" { - findings.push(format!("Error log URLs: {}", urls.trim())); - } - } - - println!("\nRoutine Load Error Check Report\n================================\n"); - println!("Job ID: {}", job.id); - println!("Name : {}", job.name); - println!("State : {}", job.state); - println!("DB : {}", job.db_name); - println!("Table : {}", job.table_name); - - if findings.is_empty() { - println!("\nNo obvious errors detected."); - } else { - println!("\nFindings:"); - for f in &findings { - println!(" - {}", f); - } - } - - println!("\nHints:"); - println!(" - Check FE logs if state is PAUSED/STOPPED/CANCELLED"); - println!(" - Review ErrorLogUrls if present"); - println!(" - Verify source offsets and Lag for Kafka"); - - Ok(ExecutionResult { - output_path: std::path::PathBuf::from("console_output"), - message: format!("Error check completed for Job ID: {}", job_id), - }) - } -} diff --git a/src/tools/fe/routine_load/job_lister.rs b/src/tools/fe/routine_load/job_lister.rs index 61a85c6..4288ac4 100644 --- a/src/tools/fe/routine_load/job_lister.rs +++ b/src/tools/fe/routine_load/job_lister.rs @@ -6,9 +6,9 @@ use crate::error::{CliError, Result}; use crate::tools::mysql::MySQLTool; use crate::tools::{ExecutionResult, Tool}; use crate::ui; +use crate::ui::{InputHelper, InteractiveSelector}; +use crate::ui::{NoJobsNextAction, show_no_jobs_recovery_menu, show_unknown_db_recovery_menu}; use chrono::Utc; -use console::{Key, Term, style}; -use dialoguer::Input; /// Routine Load Job Lister pub struct RoutineLoadJobLister; @@ -27,39 +27,72 @@ impl Tool for RoutineLoadJobLister { } fn execute(&self, _config: &Config, _pid: u32) -> Result { - let database = self.prompt_database_name()?; - let jobs = self.query_routine_load_jobs(&database)?; - self.display_jobs(&jobs)?; - let selected_job = self.prompt_job_selection(&jobs)?; - self.save_selected_job(selected_job, &database)?; - // 直接在终端输出关键信息,不再保存文件 - let report = self.generate_selection_report(selected_job)?; - println!("\n{}", report); - Ok(ExecutionResult { - output_path: std::path::PathBuf::from("console_output"), - message: format!("Job ID '{}' selected and saved in memory", selected_job.id), - }) + // Retry loop: allow reselecting database if no jobs found + let mut database = self.prompt_database_name()?; + loop { + match self.query_routine_load_jobs(&database) { + Ok(jobs) => { + self.display_jobs(&jobs)?; + let selected_job = self.prompt_job_selection(&jobs)?; + self.save_selected_job(selected_job, &database)?; + let report = self.generate_selection_report(selected_job)?; + ui::print_info(&format!("\n{report}")); + return Ok(ExecutionResult { + output_path: std::path::PathBuf::from("console_output"), + message: format!( + "Job ID '{}' selected and saved in memory", + selected_job.id + ), + }); + } + Err(CliError::ToolExecutionFailed(msg)) + if msg.contains("No Routine Load jobs found in database") => + { + match show_no_jobs_recovery_menu(&database)? { + NoJobsNextAction::ChooseAnotherDatabase => { + database = self.prompt_database_name()?; + } + NoJobsNextAction::BackToMenu => return Err(CliError::GracefulExit), + } + } + Err(CliError::ToolExecutionFailed(msg)) if msg.contains("Unknown database") => { + match show_unknown_db_recovery_menu(&database)? { + NoJobsNextAction::ChooseAnotherDatabase => { + database = self.prompt_database_name()?; + } + NoJobsNextAction::BackToMenu => return Err(CliError::GracefulExit), + } + } + Err(e) => return Err(e), + } + } } } impl RoutineLoadJobLister { fn prompt_database_name(&self) -> Result { - ui::print_info("Please enter the database name:"); - - let database: String = Input::new() - .with_prompt("Database name") - .allow_empty(false) - .interact()?; - - let database = database.trim().to_string(); - - if database.is_empty() { - return Err(CliError::InvalidInput( - "Database name cannot be empty".into(), - )); + let doris_config = config_loader::load_config()?; + match MySQLTool::list_databases(&doris_config) { + Ok(output) => { + let dbs = output; + + if !dbs.is_empty() { + ui::print_info("Select a database:"); + let selector = + InteractiveSelector::new(dbs.clone(), "Available databases:".to_string()) + .with_page_size(30); + if let Ok(selected) = selector.select() { + return Ok(selected.clone()); + } + } + } + Err(_) => { + // Fallback to manual input + } } - Ok(database) + ui::print_info("Please enter the database name:"); + InputHelper::prompt_non_empty("Database name") } fn query_routine_load_jobs(&self, database: &str) -> Result> { @@ -72,17 +105,8 @@ impl RoutineLoadJobLister { let jobs = job_manager.parse_routine_load_output(&output)?; if jobs.is_empty() { - ui::print_warning(&format!( - "No Routine Load jobs found in database '{}'", - database - )); - ui::print_info("This could mean:"); - ui::print_info(" - The database name is incorrect"); - ui::print_info(" - No Routine Load jobs have been created"); - ui::print_info(" - All jobs have been stopped or deleted"); return Err(CliError::ToolExecutionFailed(format!( - "No Routine Load jobs found in database '{}'", - database + "No Routine Load jobs found in database '{database}'" ))); } @@ -90,114 +114,40 @@ impl RoutineLoadJobLister { } fn display_jobs(&self, jobs: &[RoutineLoadJob]) -> Result<()> { - println!("\nRoutine Load Jobs in Database:"); - println!("{}", "=".repeat(100)); + ui::print_info("\nRoutine Load Jobs in Database:"); + ui::print_info(&"=".repeat(100)); - println!( - "{:<4} {:<20} {:<32} {:<12} {:<15}", - "No.", "Job ID", "Name", "State", "Table" - ); - println!("{}", "-".repeat(100)); - - for (index, job) in jobs.iter().enumerate() { - let number = index + 1; - let name = self.truncate_string(&job.name, 32); - let table = self.truncate_string(&job.table_name, 13); - - println!( - "{:<4} {:<20} {:<32} {:<12} {:<15}", - number, job.id, name, job.state, table - ); + for job in jobs.iter() { + ui::print_info(&format!( + "ID: {} | Name: {} | State: {} | CreateTime: {}", + job.id, job.name, job.state, job.create_time + )); } - println!("{}", "=".repeat(100)); + ui::print_info(&"-".repeat(100)); + ui::print_info(&format!("Total jobs found: {count}", count = jobs.len())); + ui::print_info(&"=".repeat(100)); let running_count = jobs.iter().filter(|j| j.state == "RUNNING").count(); let paused_count = jobs.iter().filter(|j| j.state == "PAUSED").count(); let stopped_count = jobs.iter().filter(|j| j.state == "STOPPED").count(); println!( - "Summary: {} total jobs ({} running, {} paused, {} stopped)", - jobs.len(), - running_count, - paused_count, - stopped_count + "Summary: {} total jobs ({running_count} running, {paused_count} paused, {stopped_count} stopped)", + jobs.len() ); Ok(()) } fn prompt_job_selection<'a>(&self, jobs: &'a [RoutineLoadJob]) -> Result<&'a RoutineLoadJob> { - let term = Term::stdout(); - let mut selection: usize = 0; + let selector = + InteractiveSelector::new(jobs.to_vec(), "Select a Routine Load job:".to_string()); + let selected_job = selector.select()?; - println!("\nUse ↑/↓ or press number, then Enter to select:"); - term.hide_cursor() - .map_err(|e| CliError::InvalidInput(e.to_string()))?; - - self.render_selection_list(&term, jobs, selection)?; - - loop { - match term - .read_key() - .map_err(|e| CliError::InvalidInput(e.to_string()))? - { - Key::Enter => { - term.show_cursor() - .map_err(|e| CliError::InvalidInput(e.to_string()))?; - term.clear_last_lines(jobs.len()).ok(); - break; - } - Key::ArrowUp => { - selection = if selection == 0 { - jobs.len() - 1 - } else { - selection - 1 - }; - } - Key::ArrowDown => { - selection = if selection + 1 >= jobs.len() { - 0 - } else { - selection + 1 - }; - } - Key::Char(c) => { - if let Some(d) = c.to_digit(10) { - let idx = d.saturating_sub(1) as usize; - if idx < jobs.len() { - selection = idx; - } - } - } - _ => {} - } - - term.move_cursor_up(jobs.len()).ok(); - self.render_selection_list(&term, jobs, selection)?; - } - - Ok(&jobs[selection]) - } - - fn render_selection_list( - &self, - term: &Term, - jobs: &[RoutineLoadJob], - selection: usize, - ) -> Result<()> { - for (i, job) in jobs.iter().enumerate() { - let arrow = if i == selection { - style(">").cyan().bold().to_string() - } else { - " ".to_string() - }; - let name = self.truncate_string(&job.name, 32); - let line = format!("{arrow} {}. {} - {} ({})", i + 1, job.id, name, job.state); - term.write_line(&line) - .map_err(|e| CliError::InvalidInput(e.to_string()))?; - } - Ok(()) + jobs.iter() + .find(|j| j.id == selected_job.id) + .ok_or_else(|| CliError::InvalidInput("Selected job not found in original list".into())) } fn save_selected_job(&self, job: &RoutineLoadJob, database: &str) -> Result<()> { @@ -255,12 +205,4 @@ impl RoutineLoadJobLister { Ok(report) } - - fn truncate_string(&self, s: &str, max_len: usize) -> String { - if s.len() <= max_len { - s.to_string() - } else { - format!("{}...", &s[..max_len - 3]) - } - } } diff --git a/src/tools/fe/routine_load/job_manager.rs b/src/tools/fe/routine_load/job_manager.rs index c9876dc..07beaa6 100644 --- a/src/tools/fe/routine_load/job_manager.rs +++ b/src/tools/fe/routine_load/job_manager.rs @@ -14,7 +14,6 @@ static ROUTINE_LOAD_STATE: Lazy> = pub struct RoutineLoadJobManager; impl RoutineLoadJobManager { - /// Helper function: safely acquire state lock and execute operation fn with_state(&self, f: F) -> Result where F: FnOnce(&mut RoutineLoadState) -> Result, @@ -25,7 +24,6 @@ impl RoutineLoadJobManager { f(&mut state) } - /// Helper function: read-only access to state fn with_state_readonly(&self, f: F) -> Result where F: FnOnce(&RoutineLoadState) -> Result, @@ -87,12 +85,10 @@ impl RoutineLoadJobManager { }) } - /// Get job cache pub fn get_job_cache(&self) -> Result> { self.with_state_readonly(|state| Ok(state.job_cache.clone())) } - /// Parse Routine Load output pub fn parse_routine_load_output(&self, output: &str) -> Result> { let blocks = split_into_blocks(output); let mut jobs = Vec::new(); @@ -106,7 +102,6 @@ impl RoutineLoadJobManager { Ok(jobs) } - /// Parse single job block fn parse_job_block(&self, block: &str) -> Result> { let fields = parse_key_value_pairs(block); @@ -168,7 +163,6 @@ impl RoutineLoadJobManager { Ok(Some(job)) } - /// Parse Statistic JSON field fn parse_statistic(&self, stat_str: &str) -> Result { let stat: serde_json::Value = serde_json::from_str(stat_str).map_err(|e| { CliError::ToolExecutionFailed(format!("Failed to parse statistic: {}", e)) @@ -188,7 +182,6 @@ impl RoutineLoadJobManager { }) } - /// Parse Progress JSON field fn parse_progress(&self, prog_str: &str) -> Result> { let prog: HashMap = serde_json::from_str(prog_str).map_err(|e| { CliError::ToolExecutionFailed(format!("Failed to parse progress: {}", e)) @@ -196,9 +189,8 @@ impl RoutineLoadJobManager { Ok(prog) } - /// Parse Lag JSON field - fn parse_lag(&self, lag_str: &str) -> Result> { - let lag: HashMap = serde_json::from_str(lag_str) + fn parse_lag(&self, lag_str: &str) -> Result> { + let lag: HashMap = serde_json::from_str(lag_str) .map_err(|e| CliError::ToolExecutionFailed(format!("Failed to parse lag: {e}")))?; Ok(lag) } diff --git a/src/tools/fe/routine_load/log_parser.rs b/src/tools/fe/routine_load/log_parser.rs index b59f4cf..6bf30d3 100644 --- a/src/tools/fe/routine_load/log_parser.rs +++ b/src/tools/fe/routine_load/log_parser.rs @@ -114,16 +114,16 @@ pub fn scan_file( out: &mut Vec, ) -> Result<()> { let f = fs::File::open(path).map_err(CliError::IoError)?; - + let reader = BufReader::new(f); - + for line_result in reader.lines() { let line = line_result.map_err(CliError::IoError)?; - + if let Some(entry) = parser.parse_line(&line, job_id) { out.push(entry); } } - + Ok(()) } diff --git a/src/tools/fe/routine_load/mod.rs b/src/tools/fe/routine_load/mod.rs index 5675fc7..b64862a 100644 --- a/src/tools/fe/routine_load/mod.rs +++ b/src/tools/fe/routine_load/mod.rs @@ -1,4 +1,3 @@ -mod error_checker; mod job_lister; mod job_manager; mod log_parser; @@ -6,7 +5,10 @@ mod models; mod performance_analyzer; mod traffic_monitor; -pub use error_checker::RoutineLoadErrorChecker; +pub mod messages { + pub const NO_JOB_ID: &str = "No Job ID in memory. Run 'Get Job ID' first."; +} + pub use job_lister::RoutineLoadJobLister; pub use job_manager::RoutineLoadJobManager; pub use models::*; @@ -17,13 +19,11 @@ pub use traffic_monitor::RoutineLoadTrafficMonitor; #[derive(Debug, Clone, Copy)] pub enum RoutineLoadToolIndex { JobLister = 4, - ErrorChecker = 5, - PerformanceAnalyzer = 6, - TrafficMonitor = 7, + PerformanceAnalyzer = 5, + TrafficMonitor = 6, } impl RoutineLoadToolIndex { - /// Get tool instance pub fn get_tool( self, tools: &[Box], @@ -36,7 +36,6 @@ impl RoutineLoadToolIndex { pub fn get_routine_load_tools() -> Vec> { vec![ Box::new(RoutineLoadJobLister), - Box::new(RoutineLoadErrorChecker), Box::new(RoutineLoadPerformanceAnalyzer), Box::new(RoutineLoadTrafficMonitor), ] diff --git a/src/tools/fe/routine_load/models.rs b/src/tools/fe/routine_load/models.rs index e24c38d..ba56264 100644 --- a/src/tools/fe/routine_load/models.rs +++ b/src/tools/fe/routine_load/models.rs @@ -15,7 +15,7 @@ pub struct RoutineLoadJob { pub data_source_type: Option, pub statistic: Option, pub progress: Option>, - pub lag: Option>, + pub lag: Option>, pub error_log_urls: Option, pub other_msg: Option, } diff --git a/src/tools/fe/routine_load/performance_analyzer.rs b/src/tools/fe/routine_load/performance_analyzer.rs index 1dc7471..a0c95be 100644 --- a/src/tools/fe/routine_load/performance_analyzer.rs +++ b/src/tools/fe/routine_load/performance_analyzer.rs @@ -2,10 +2,12 @@ use super::job_manager::RoutineLoadJobManager; use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file}; use crate::config::Config; use crate::error::{CliError, Result}; +use crate::tools::fe::routine_load::messages as ErrMsg; use crate::tools::{ExecutionResult, Tool}; use crate::ui; +use crate::ui::{FormatHelper, InputHelper}; use chrono::Duration; -use std::collections::HashSet; +use std::collections::HashMap; pub struct RoutineLoadPerformanceAnalyzer; @@ -22,9 +24,9 @@ impl Tool for RoutineLoadPerformanceAnalyzer { fn execute(&self, _config: &Config, _pid: u32) -> Result { let job_manager = RoutineLoadJobManager; - let job_id = job_manager.get_current_job_id().ok_or_else(|| { - CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) - })?; + let job_id = job_manager + .get_current_job_id() + .ok_or_else(|| CliError::InvalidInput(ErrMsg::NO_JOB_ID.into()))?; let doris = crate::config_loader::load_config()?; let log_dir = doris.log_dir; @@ -55,14 +57,7 @@ impl Tool for RoutineLoadPerformanceAnalyzer { impl RoutineLoadPerformanceAnalyzer { fn prompt_time_window(&self) -> Result { - let minutes_str: String = dialoguer::Input::new() - .with_prompt("Analyze recent minutes") - .default("30".to_string()) - .interact_text() - .map_err(|e| CliError::InvalidInput(e.to_string()))?; - - let minutes: i64 = minutes_str.trim().parse().unwrap_or(30).max(1); - Ok(minutes) + InputHelper::prompt_number_with_default("Analyze recent minutes", 30, 1) } fn collect_and_parse_logs( @@ -106,69 +101,142 @@ impl RoutineLoadPerformanceAnalyzer { Ok(entries) } - /// Deduplicate entries - fn deduplicate_entries(&self, mut entries: Vec) -> Result> { - let mut seen: HashSet = HashSet::new(); - entries.retain(|e| { - let key = if let Some(txn) = &e.transaction_id { - format!("txn:{}", txn) - } else { - format!( - "ts:{}|r:{}|b:{}|ms:{}", - e.timestamp, - e.loaded_rows.unwrap_or(0), - e.received_bytes.unwrap_or(0), - e.task_execution_ms.unwrap_or(0) - ) - }; - seen.insert(key) - }); + fn deduplicate_entries(&self, entries: Vec) -> Result> { + let mut map: HashMap = HashMap::new(); - if entries.is_empty() { + for e in entries.into_iter() { + let key = format!( + "ts:{}|r:{}|b:{}|ms:{}", + e.timestamp, + e.loaded_rows.unwrap_or(0), + e.received_bytes.unwrap_or(0), + e.task_execution_ms.unwrap_or(0) + ); + + match map.get_mut(&key) { + Some(existing) => { + if existing.transaction_id.is_none() && e.transaction_id.is_some() { + *existing = e; + } + } + None => { + map.insert(key, e); + } + } + } + + let deduped: Vec = map.into_values().collect(); + if deduped.is_empty() { return Err(CliError::ToolExecutionFailed( "No matching commit entries found in FE logs".into(), )); } - - Ok(entries) + Ok(deduped) } - /// Display performance analysis results fn display_performance_results(&self, entries: &[LogCommitEntry]) -> Result<()> { - println!("\nPer-commit stats (time | ms | loadedRows | receivedBytes | txnId)"); - println!("{}", "-".repeat(90)); - - let mut stats = PerformanceStats::new(); - - // Sort by time in ascending order + // Collect rows let mut sorted_entries = entries.to_vec(); sorted_entries.sort_by_key(|e| e.timestamp); + let headers = ["Time", "ms", "loadedRows", "receivedBytes", "txnId"]; + + let mut rows: Vec<[String; 5]> = Vec::with_capacity(sorted_entries.len()); + let mut stats = PerformanceStats::new(); + for entry in &sorted_entries { - self.display_single_entry(entry); + let time_str = entry.timestamp.format("%H:%M:%S").to_string(); + let ms = entry.task_execution_ms.unwrap_or(0); + let rows_val = entry.loaded_rows.unwrap_or(0); + let bytes_val = entry.received_bytes.unwrap_or(0); + let txn = entry.transaction_id.clone().unwrap_or_else(|| "-".into()); + + rows.push([ + time_str, + ms.to_string(), + FormatHelper::fmt_int(rows_val), + FormatHelper::fmt_int(bytes_val), + txn, + ]); stats.update(entry); } - println!("{}", "-".repeat(90)); - stats.display_summary(); + // Compute column widths + let mut widths = [0usize; 5]; + for i in 0..5 { + widths[i] = headers[i].len(); + } + for row in &rows { + for i in 0..5 { + widths[i] = widths[i].max(row[i].len()); + } + } + // Render table + ui::print_info("\nPer-commit stats"); + self.print_table(&headers, &rows, &widths)?; + + // Summary + stats.display_summary(); Ok(()) } - fn display_single_entry(&self, entry: &LogCommitEntry) { - let time_str = entry.timestamp.format("%H:%M:%S").to_string(); - let ms = entry.task_execution_ms.unwrap_or(0); - let rows = entry.loaded_rows.unwrap_or(0); - let bytes = entry.received_bytes.unwrap_or(0); - - println!( - "{} | {:>6} | {:>13} | {:>16} | {}", - time_str, - ms, - fmt_int(rows), - fmt_int(bytes), - entry.transaction_id.clone().unwrap_or_else(|| "-".into()) + fn print_table( + &self, + headers: &[&str; 5], + rows: &[[String; 5]], + widths: &[usize; 5], + ) -> Result<()> { + // Separator line + let sep = { + let mut s = String::new(); + for (idx, w) in widths.iter().enumerate() { + if idx > 0 { + s.push('+'); + } + s.push_str(&"-".repeat(*w + 2)); + } + s + }; + + // Header + ui::print_info(&sep); + let header_line = format!( + " {:w1$} | {:>w2$} | {:>w3$} | {:w1$} | {:>w2$} | {:>w3$} | {: 0 { - println!( + ui::print_info(&format!( "count={} avg_ms={} min_ms={} max_ms={}", self.count, self.sum_ms / self.count as u128, @@ -231,51 +299,27 @@ impl PerformanceStats { self.min_ms }, self.max_ms - ); - println!( + )); + ui::print_info(&format!( " avg_rows={} min_rows={} max_rows={}", - fmt_int_u128(self.sum_rows / self.count as u128), - fmt_int(if self.min_rows == u64::MAX { + FormatHelper::fmt_int_u128(self.sum_rows / self.count as u128), + FormatHelper::fmt_int(if self.min_rows == u64::MAX { 0 } else { self.min_rows }), - fmt_int(self.max_rows) - ); - println!( + FormatHelper::fmt_int(self.max_rows) + )); + ui::print_info(&format!( " avg_bytes={} min_bytes={} max_bytes={}", - fmt_int_u128(self.sum_bytes / self.count as u128), - fmt_int(if self.min_bytes == u64::MAX { + FormatHelper::fmt_int_u128(self.sum_bytes / self.count as u128), + FormatHelper::fmt_int(if self.min_bytes == u64::MAX { 0 } else { self.min_bytes }), - fmt_int(self.max_bytes) - ); - } - } -} - -fn fmt_int(v: u64) -> String { - let s = v.to_string(); - group_digits(&s) -} - -fn fmt_int_u128(v: u128) -> String { - let s = v.to_string(); - group_digits(&s) -} - -fn group_digits(s: &str) -> String { - let bytes = s.as_bytes(); - let mut out = String::with_capacity(s.len() + s.len() / 3); - let mut count = 0; - for i in (0..bytes.len()).rev() { - out.push(bytes[i] as char); - count += 1; - if count % 3 == 0 && i != 0 { - out.push(','); + FormatHelper::fmt_int(self.max_bytes) + )); } } - out.chars().rev().collect() } diff --git a/src/tools/fe/routine_load/traffic_monitor.rs b/src/tools/fe/routine_load/traffic_monitor.rs index 37f1e34..bfd7bf8 100644 --- a/src/tools/fe/routine_load/traffic_monitor.rs +++ b/src/tools/fe/routine_load/traffic_monitor.rs @@ -2,10 +2,11 @@ use super::job_manager::RoutineLoadJobManager; use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file}; use crate::config::Config; use crate::error::{CliError, Result}; +use crate::tools::fe::routine_load::messages as ErrMsg; use crate::tools::{ExecutionResult, Tool}; use crate::ui; +use crate::ui::InputHelper; use chrono::Duration; -use dialoguer::Input; use std::collections::BTreeMap; pub struct RoutineLoadTrafficMonitor; @@ -28,10 +29,10 @@ impl Tool for RoutineLoadTrafficMonitor { let minutes = self.prompt_time_window()?; ui::print_info(&format!( - "Analyzing traffic in {} for job {} (last {} min)...", - log_dir.display(), - job_id, - minutes + "Analyzing traffic in {log_dir} for job {job_id} (last {minutes} min)...", + log_dir = log_dir.display(), + job_id = job_id, + minutes = minutes )); let entries = self.collect_and_parse_logs(&log_dir, &job_id)?; @@ -52,9 +53,9 @@ impl Tool for RoutineLoadTrafficMonitor { impl RoutineLoadTrafficMonitor { fn get_job_id(&self) -> Result { let job_manager = RoutineLoadJobManager; - job_manager.get_current_job_id().ok_or_else(|| { - CliError::InvalidInput("No Job ID in memory. Run 'Get Job ID' first.".into()) - }) + job_manager + .get_current_job_id() + .ok_or_else(|| CliError::InvalidInput(ErrMsg::NO_JOB_ID.into())) } fn get_log_directory(&self) -> Result { @@ -63,14 +64,7 @@ impl RoutineLoadTrafficMonitor { } fn prompt_time_window(&self) -> Result { - let minutes_str: String = Input::new() - .with_prompt("Analyze recent minutes") - .default("60".to_string()) - .interact_text() - .map_err(|e| CliError::InvalidInput(e.to_string()))?; - - let minutes: i64 = minutes_str.trim().parse().unwrap_or(60).max(1); - Ok(minutes) + InputHelper::prompt_number_with_default("Analyze recent minutes", 60, 1) } fn collect_and_parse_logs( @@ -126,24 +120,27 @@ impl RoutineLoadTrafficMonitor { } fn display_traffic_results(&self, per_minute_data: &BTreeMap) -> Result<()> { - println!("\nPer-minute loadedRows (ascending time)"); - println!("{}", "-".repeat(40)); + ui::print_info("\nPer-minute loadedRows (ascending time)"); + ui::print_info(&"-".repeat(40)); for (minute, rows) in per_minute_data.iter() { - println!("{minute} loadedRows={rows}"); + ui::print_info(&format!("{minute} loadedRows={rows}")); } let total_rows: u128 = per_minute_data.values().sum(); + ui::print_info(&"-".repeat(40)); + ui::print_info(&format!( + "Total minutes: {count}", + count = per_minute_data.len() + )); + ui::print_info(&format!("Total loadedRows: {total_rows}")); + let avg_rows = if !per_minute_data.is_empty() { total_rows / per_minute_data.len() as u128 } else { 0 }; - - println!("{}", "-".repeat(40)); - println!("Total minutes: {}", per_minute_data.len()); - println!("Total loadedRows: {total_rows}"); - println!("Average per minute: {avg_rows}"); + ui::print_info(&format!("Average per minute: {avg_rows}")); Ok(()) } diff --git a/src/tools/mysql/client.rs b/src/tools/mysql/client.rs index 7a84596..51b26fd 100644 --- a/src/tools/mysql/client.rs +++ b/src/tools/mysql/client.rs @@ -5,6 +5,15 @@ use std::process::Command; pub struct MySQLTool; +/// Output mode for mysql CLI +#[derive(Copy, Clone)] +enum OutputMode { + /// Normal formatted output (suitable for \G and table output) + Standard, + /// Raw, no headers, batch, no pretty formatting (-N -B -r -A) + Raw, +} + impl MySQLTool { pub fn detect_fe_process() -> Result { process_detector::get_pid_by_env(Environment::FE) @@ -33,10 +42,27 @@ impl MySQLTool { }) } - /// Executes a MySQL query using credentials from the configuration. + /// Executes a MySQL query using credentials from the configuration (standard output mode). pub fn query_sql_with_config( config: &crate::config_loader::DorisConfig, query: &str, + ) -> Result { + Self::execute_query_with_config(config, query, OutputMode::Standard) + } + + /// Executes a MySQL query and returns raw output without headers or formatting (-N -B -r -A) + pub fn query_sql_raw_with_config( + config: &crate::config_loader::DorisConfig, + query: &str, + ) -> Result { + Self::execute_query_with_config(config, query, OutputMode::Raw) + } + + /// Shared implementation for executing a query with selected output mode + fn execute_query_with_config( + config: &crate::config_loader::DorisConfig, + query: &str, + mode: OutputMode, ) -> Result { let mysql_cfg = config.mysql.as_ref().ok_or_else(|| { CliError::ConfigError("MySQL credentials not found in config".to_string()) @@ -47,27 +73,42 @@ impl MySQLTool { let password = cred_mgr.decrypt_password(&mysql_cfg.password)?; let (host, port) = Self::get_connection_params()?; - let output = Self::run_mysql_command_with_credentials(&host, port, user, &password, query)?; + let output = Self::run_mysql_command(&host, port, user, &password, query, mode)?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Access denied for user") || stderr.contains("ERROR 1045") { - return Err(CliError::MySQLAccessDenied(stderr.to_string())); + Err(CliError::MySQLAccessDenied( + "Access denied. Please update MySQL credentials.".into(), + )) + } else if stderr.contains("Unknown database") || stderr.contains("ERROR 1049") { + Err(CliError::ToolExecutionFailed( + "Unknown database. Please verify the database name.".into(), + )) + } else if stderr.contains("Can't connect") + || stderr.contains("Connection refused") + || stderr.contains("ERROR 2003") + { + Err(CliError::ToolExecutionFailed(format!( + "Cannot connect to MySQL at {host}:{port}. Check host/port and service status." + ))) } else { - return Err(CliError::ToolExecutionFailed(format!( - "mysql query failed: {stderr}" - ))); + Err(CliError::ToolExecutionFailed( + "MySQL query failed. Please try again.".into(), + )) } + } else { + Ok(String::from_utf8_lossy(&output.stdout).to_string()) } - Ok(String::from_utf8_lossy(&output.stdout).to_string()) } - /// Runs a MySQL command with explicit credentials. - fn run_mysql_command_with_credentials( + /// Runs a MySQL command with credentials in the specified output mode + fn run_mysql_command( host: &str, port: u16, user: &str, password: &str, query: &str, + mode: OutputMode, ) -> Result { let mut command = Command::new("mysql"); command.arg("-h").arg(host); @@ -78,7 +119,18 @@ impl MySQLTool { command.arg(format!("-p{password}")); } - command.arg("-A"); + match mode { + OutputMode::Standard => { + command.arg("-A"); + } + OutputMode::Raw => { + command.arg("-N"); + command.arg("-B"); + command.arg("-r"); + command.arg("-A"); + } + } + command.arg("-e").arg(query); // Prevent mysql from prompting for a password interactively @@ -89,6 +141,35 @@ impl MySQLTool { .map_err(|e| CliError::ToolExecutionFailed(format!("Failed to execute mysql: {e}"))) } + /// Lists databases (excluding system databases) using raw mysql output + pub fn list_databases(config: &crate::config_loader::DorisConfig) -> Result> { + let output = Self::query_sql_raw_with_config(config, "SHOW DATABASES;")?; + let mut dbs: Vec = output + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .filter(|s| !crate::tools::mysql::SYSTEM_DATABASES.contains(&s.as_str())) + .collect(); + dbs.sort(); + Ok(dbs) + } + + /// Lists tables for a given database using raw mysql output + pub fn list_tables( + config: &crate::config_loader::DorisConfig, + database: &str, + ) -> Result> { + let sql = format!("USE `{}`; SHOW TABLES;", database); + let output = Self::query_sql_raw_with_config(config, &sql)?; + let mut tables: Vec = output + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + tables.sort(); + Ok(tables) + } + /// Gets the connection parameters for MySQL, with a clear priority: pub fn get_connection_params() -> Result<(String, u16)> { if let (Ok(host), Ok(port_str)) = (std::env::var("MYSQL_HOST"), std::env::var("MYSQL_PORT")) diff --git a/src/tools/mysql/mod.rs b/src/tools/mysql/mod.rs index 5ad6f13..fd44b9b 100644 --- a/src/tools/mysql/mod.rs +++ b/src/tools/mysql/mod.rs @@ -7,3 +7,6 @@ pub use client::MySQLTool; pub use cluster::{Backend, ClusterInfo, Frontend}; pub use credentials::CredentialManager; pub use parser::{parse_backends, parse_frontends}; + +/// System databases to hide from selection +pub const SYSTEM_DATABASES: &[&str] = &["__internal_schema", "mysql", "information_schema"]; diff --git a/src/ui/dialogs.rs b/src/ui/dialogs.rs new file mode 100644 index 0000000..c25ac71 --- /dev/null +++ b/src/ui/dialogs.rs @@ -0,0 +1,56 @@ +use dialoguer::Select; + +use crate::error::{CliError, Result}; +use crate::ui; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NoJobsNextAction { + ChooseAnotherDatabase, + BackToMenu, +} + +pub fn show_no_jobs_recovery_menu(database: &str) -> Result { + ui::print_warning(&format!( + "\n[!] No Routine Load jobs found in database '{database}'" + )); + ui::print_info("This could mean:"); + ui::print_info(" - The database name is incorrect"); + ui::print_info(" - No Routine Load jobs have been created"); + + let options = vec!["Choose another database", "Back to Routine Load menu"]; + + let selection = Select::new() + .with_prompt("What would you like to do?") + .items(&options) + .default(0) + .interact() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let action = match selection { + 0 => NoJobsNextAction::ChooseAnotherDatabase, + _ => NoJobsNextAction::BackToMenu, + }; + + Ok(action) +} + +pub fn show_unknown_db_recovery_menu(database: &str) -> Result { + ui::print_warning(&format!("\n[!] Unknown database '{database}'")); + ui::print_info("Please verify the database name or choose another one."); + + let options = vec!["Choose another database", "Back to Routine Load menu"]; + + let selection = Select::new() + .with_prompt("What would you like to do?") + .items(&options) + .default(0) + .interact() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let action = match selection { + 0 => NoJobsNextAction::ChooseAnotherDatabase, + _ => NoJobsNextAction::BackToMenu, + }; + + Ok(action) +} diff --git a/src/ui/menu.rs b/src/ui/menu.rs index 7f35fed..fb649e7 100644 --- a/src/ui/menu.rs +++ b/src/ui/menu.rs @@ -39,7 +39,9 @@ fn show_interactive_menu(step: u8, title: &str, items: &[String]) -> Result 0 { print_step(step, title); } else if !title.is_empty() { - println!("{}", style(title).bold()); + crate::ui::print_info(&format!("{title}", title = style(title).bold())); + } else { + ui::print_info(""); } term.hide_cursor()?; @@ -125,7 +127,6 @@ pub enum FeToolAction { #[derive(Debug, Clone, Copy)] pub enum RoutineLoadAction { GetJobId, - ErrorCheck, Performance, Traffic, Back, @@ -218,27 +219,21 @@ pub fn show_routine_load_menu() -> Result { name: "Get Job ID".to_string(), description: "List and select Routine Load jobs".to_string(), }, - MenuOption { - action: RoutineLoadAction::ErrorCheck, - key: "[2]".to_string(), - name: "Error Check".to_string(), - description: "Check for errors in selected Routine Load job".to_string(), - }, MenuOption { action: RoutineLoadAction::Performance, - key: "[3]".to_string(), + key: "[2]".to_string(), name: "Performance Analysis".to_string(), description: "Analyze per-commit rows/bytes/time from FE logs".to_string(), }, MenuOption { action: RoutineLoadAction::Traffic, - key: "[4]".to_string(), + key: "[3]".to_string(), name: "Traffic Monitor".to_string(), description: "Aggregate per-minute loadedRows from FE logs".to_string(), }, MenuOption { action: RoutineLoadAction::Back, - key: "[5]".to_string(), + key: "[4]".to_string(), name: "← Back to FE Tools".to_string(), description: "Return to FE tools menu".to_string(), }, diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 64f69f0..ac69268 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -1,8 +1,14 @@ use console::{Term, style}; +pub mod dialogs; pub mod menu; +pub mod selector; +pub mod utils; +pub use dialogs::*; pub use menu::*; +pub use selector::*; +pub use utils::*; pub static SUCCESS: &str = "[+] "; pub static ERROR: &str = "[!] "; diff --git a/src/ui/selector.rs b/src/ui/selector.rs new file mode 100644 index 0000000..6096860 --- /dev/null +++ b/src/ui/selector.rs @@ -0,0 +1,172 @@ +use console::{Key, Term, style}; + +use crate::error::{CliError, Result}; + +pub trait ItemFormatter { + fn format_item(&self, item: &T) -> String; +} + +pub struct InteractiveSelector { + items: Vec, + title: String, + page_size: usize, +} + +impl InteractiveSelector { + pub fn new(items: Vec, title: String) -> Self { + Self { + items, + title, + page_size: 30, + } + } + + pub fn with_page_size(mut self, page_size: usize) -> Self { + self.page_size = page_size.max(1); + self + } + + pub fn select(&self) -> Result<&T> + where + Self: ItemFormatter, + { + if self.items.is_empty() { + return Err(CliError::InvalidInput("No items to select from".into())); + } + + let term = Term::stdout(); + let mut selection: usize = 0; + let mut last_drawn_lines: usize; + + crate::ui::print_info(&format!("\n{title}", title = self.title)); + crate::ui::print_info("Use ↑/↓ move, ←/→ page, 1-9 jump, Enter to select:"); + + term.hide_cursor() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + last_drawn_lines = self.render_selection_list(&term, selection)?; + + loop { + match term + .read_key() + .map_err(|e| CliError::InvalidInput(e.to_string()))? + { + Key::Enter => { + term.show_cursor() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + term.clear_last_lines(last_drawn_lines).ok(); + break; + } + Key::ArrowUp => { + selection = if selection == 0 { + self.items.len() - 1 + } else { + selection - 1 + }; + } + Key::ArrowDown => { + selection = if selection + 1 >= self.items.len() { + 0 + } else { + selection + 1 + }; + } + Key::ArrowLeft => { + if !self.items.is_empty() { + let page_size = self.page_size.min(self.items.len()).max(1); + let current_page = selection / page_size; + if current_page > 0 { + selection = (current_page - 1) * page_size; + } + } + } + Key::ArrowRight => { + if !self.items.is_empty() { + let page_size = self.page_size.min(self.items.len()).max(1); + let total_pages = self.items.len().div_ceil(page_size); + let current_page = selection / page_size; + if current_page + 1 < total_pages { + selection = (current_page + 1) * page_size; + if selection >= self.items.len() { + selection = self.items.len() - 1; + } + } + } + } + Key::Char(c) => { + if let Some(d) = c.to_digit(10) { + let page_size = self.page_size.min(self.items.len()).max(1); + let current_page = selection / page_size; + let page_start = current_page * page_size; + let idx_in_page = d.saturating_sub(1) as usize; + let target = page_start + idx_in_page; + if target < self.items.len() { + selection = target; + } + } + } + _ => {} + } + + // Clear the previously drawn list block to avoid leftover lines when page size shrinks + term.clear_last_lines(last_drawn_lines).ok(); + last_drawn_lines = self.render_selection_list(&term, selection)?; + } + + Ok(&self.items[selection]) + } + + fn render_selection_list(&self, term: &Term, selection: usize) -> Result + where + Self: ItemFormatter, + { + let total = self.items.len(); + let page_size = self.page_size.min(total).max(1); + let total_pages = total.div_ceil(page_size); + let current_page = selection / page_size; + let start = current_page * page_size; + let end = (start + page_size).min(total); + + let mut lines_drawn = 0usize; + let page_title = format!( + "Page {}/{} ({} items)", + current_page + 1, + total_pages, + total + ); + term.clear_line()?; + term.write_line(&page_title) + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + lines_drawn += 1; + + for (i, item) in self.items[start..end].iter().enumerate() { + let global_index = start + i; + term.clear_line()?; + let arrow = if global_index == selection { + style(">").cyan().bold().to_string() + } else { + " ".to_string() + }; + let line = format!("{arrow} {}. {}", global_index + 1, self.format_item(item)); + term.write_line(&line) + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + lines_drawn += 1; + } + Ok(lines_drawn) + } +} + +impl ItemFormatter for InteractiveSelector { + fn format_item(&self, item: &String) -> String { + item.clone() + } +} + +impl ItemFormatter + for InteractiveSelector +{ + fn format_item(&self, job: &crate::tools::fe::routine_load::RoutineLoadJob) -> String { + let name = crate::ui::FormatHelper::truncate_string(&job.name, 32); + format!("{} - {} ({})", job.id, name, job.state) + } +} diff --git a/src/ui/utils.rs b/src/ui/utils.rs new file mode 100644 index 0000000..0d8c8f0 --- /dev/null +++ b/src/ui/utils.rs @@ -0,0 +1,65 @@ +use dialoguer::Input; + +use crate::error::{CliError, Result}; + +pub struct InputHelper; + +impl InputHelper { + pub fn prompt_non_empty(prompt: &str) -> Result { + let input: String = Input::new() + .with_prompt(prompt) + .allow_empty(false) + .interact() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let input = input.trim().to_string(); + if input.is_empty() { + return Err(CliError::InvalidInput("Input cannot be empty".into())); + } + Ok(input) + } + + pub fn prompt_number_with_default(prompt: &str, default: i64, min: i64) -> Result { + let input_str: String = Input::new() + .with_prompt(prompt) + .default(default.to_string()) + .interact_text() + .map_err(|e| CliError::InvalidInput(e.to_string()))?; + + let value: i64 = input_str.trim().parse().unwrap_or(default).max(min); + Ok(value) + } +} + +pub struct FormatHelper; + +impl FormatHelper { + pub fn fmt_int(v: u64) -> String { + Self::group_digits(&v.to_string()) + } + pub fn fmt_int_u128(v: u128) -> String { + Self::group_digits(&v.to_string()) + } + + fn group_digits(s: &str) -> String { + let bytes = s.as_bytes(); + let mut out = String::with_capacity(s.len() + s.len() / 3); + let mut count = 0; + for i in (0..bytes.len()).rev() { + out.push(bytes[i] as char); + count += 1; + if count % 3 == 0 && i != 0 { + out.push(','); + } + } + out.chars().rev().collect() + } + + pub fn truncate_string(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + format!("{}...", &s[..max_len - 3]) + } + } +} From 9b1e432290379901647e3cea146af8c2411f63b8 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Thu, 21 Aug 2025 17:51:49 +0800 Subject: [PATCH 3/8] ci: fix cargo clippy check --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index b15e6fa..4b3645c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,8 +48,8 @@ impl Config { self.output_dir = PathBuf::from(output_dir); } - if let Ok(timeout) = env::var(ENV_TIMEOUT) { - if let Ok(timeout) = timeout.parse::() { + if let Ok(timeout_str) = env::var(ENV_TIMEOUT) { + if let Ok(timeout) = timeout_str.parse::() { self.timeout_seconds = timeout; } } From a283f7b3a562dd4aac1fd63eb9dddb2b8af4b10b Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Fri, 22 Aug 2025 15:04:34 +0800 Subject: [PATCH 4/8] ci: fix cargo clippy check --- src/config.rs | 9 +++--- src/config_loader/config_parser.rs | 17 +++++------ src/config_loader/config_persister.rs | 8 ++--- src/config_loader/process_detector.rs | 17 +++++++---- src/lib.rs | 17 ++++++----- src/tools/be/memz.rs | 43 ++++++++++++++------------- src/tools/mysql/client.rs | 9 +++--- 7 files changed, 66 insertions(+), 54 deletions(-) diff --git a/src/config.rs b/src/config.rs index 4b3645c..d603b27 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,10 +48,11 @@ impl Config { self.output_dir = PathBuf::from(output_dir); } - if let Ok(timeout_str) = env::var(ENV_TIMEOUT) { - if let Ok(timeout) = timeout_str.parse::() { - self.timeout_seconds = timeout; - } + if let Some(timeout) = env::var(ENV_TIMEOUT) + .ok() + .and_then(|s| s.parse::().ok()) + { + self.timeout_seconds = timeout; } self.no_progress_animation = env::var(ENV_NO_PROGRESS) diff --git a/src/config_loader/config_parser.rs b/src/config_loader/config_parser.rs index 7e0da72..420c0b5 100644 --- a/src/config_loader/config_parser.rs +++ b/src/config_loader/config_parser.rs @@ -184,16 +184,15 @@ fn parse_config_content( if line.starts_with(LOG_DIR_KEY) { if let Some(log_dir) = extract_value(line) { - if let Some(install) = install_dir { - if log_dir.contains("${DORIS_HOME}") { + match install_dir { + Some(install) if log_dir.contains("${DORIS_HOME}") => { let replaced = log_dir.replace("${DORIS_HOME}", install.to_str().unwrap_or("")); config.log_dir = PathBuf::from(replaced); - } else { + } + _ => { config.log_dir = PathBuf::from(log_dir); } - } else { - config.log_dir = PathBuf::from(log_dir); } } } @@ -260,10 +259,10 @@ fn parse_path_key_value(line: &str, key: &str, value: &mut Option) -> R /// Generic key-value parser fn parse_key_value(line: &str, key: &str, value: &mut Option) -> Result<()> { - if let Some(val_str) = regex_utils::extract_key_value(line, key) { - if let Ok(parsed_val) = val_str.parse::() { - *value = Some(parsed_val); - } + if let Some(parsed_val) = + regex_utils::extract_key_value(line, key).and_then(|s| s.parse::().ok()) + { + *value = Some(parsed_val); } Ok(()) } diff --git a/src/config_loader/config_persister.rs b/src/config_loader/config_persister.rs index ddcc265..b2b0cf0 100644 --- a/src/config_loader/config_persister.rs +++ b/src/config_loader/config_persister.rs @@ -434,10 +434,10 @@ pub fn load_persisted_config() -> Result { return Ok(from_persistent_config(persistent_config)); } Err(e) => { - if e.to_string().contains("missing field `process`") { - if let Some(config) = migrate_legacy_config(&content, &config_path) { - return Ok(config); - } + if e.to_string().contains("missing field `process`") + && migrate_legacy_config(&content, &config_path).is_some() + { + return Ok(migrate_legacy_config(&content, &config_path).unwrap()); } last_error = Some(CliError::ConfigError(format!( diff --git a/src/config_loader/process_detector.rs b/src/config_loader/process_detector.rs index e92c2f5..195ffab 100644 --- a/src/config_loader/process_detector.rs +++ b/src/config_loader/process_detector.rs @@ -195,12 +195,17 @@ pub fn get_paths(env: Environment) -> Result<(PathBuf, PathBuf)> { /// Get paths by PID for the specified environment fn get_paths_by_pid(pid: u32) -> (PathBuf, PathBuf) { let grep_pattern = "DORIS_HOME|JAVA_HOME"; - if let Ok(environ) = read_proc_environ_by_pid(pid, grep_pattern) { - if let Some(doris_home) = regex_utils::extract_env_var(&environ, "DORIS_HOME") { - let java_home = regex_utils::extract_env_var(&environ, "JAVA_HOME") - .unwrap_or_else(|| "/opt/jdk".to_string()); - return (PathBuf::from(doris_home), PathBuf::from(java_home)); - } + if let Some((doris_home, java_home)) = read_proc_environ_by_pid(pid, grep_pattern) + .ok() + .and_then(|envs| { + regex_utils::extract_env_var(&envs, "DORIS_HOME").map(|home| { + let java_home = regex_utils::extract_env_var(&envs, "JAVA_HOME") + .unwrap_or_else(|| "/opt/jdk".to_string()); + (home, java_home) + }) + }) + { + return (PathBuf::from(doris_home), PathBuf::from(java_home)); } // Default paths if environment variables are not available diff --git a/src/lib.rs b/src/lib.rs index 487d81b..da9d553 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -413,13 +413,16 @@ fn execute_tool_enhanced(config: &Config, tool: &dyn Tool, service_name: &str) - match tool.execute(config, pid) { Ok(result) => { print_success(&result.message); - if let Some(path) = result.output_path.to_str() { - if !path.is_empty() && path != "console_output" { - print_info(&format!( - "Output saved to: {}", - result.output_path.display() - )); - } + if result + .output_path + .to_str() + .filter(|p| !p.is_empty() && *p != "console_output") + .is_some() + { + print_info(&format!( + "Output saved to: {}", + result.output_path.display() + )); } Ok(()) } diff --git a/src/tools/be/memz.rs b/src/tools/be/memz.rs index b7ab8b7..411bcb6 100644 --- a/src/tools/be/memz.rs +++ b/src/tools/be/memz.rs @@ -134,31 +134,34 @@ fn extract_memory_metrics(html_content: &str) -> (String, String) { let mut thread_cache = "Unknown".to_string(); let mut dirty_pages = "Unknown".to_string(); - if let Some(caps) = re.captures(html_content) { - if caps.len() > 6 { - if let Some(bytes) = caps.get(1).and_then(|m| m.as_str().parse::().ok()) { - allocated = format_bytes(bytes); - } + if re + .captures(html_content) + .map(|caps| caps.len() > 6) + .unwrap_or(false) + { + let caps = re.captures(html_content).unwrap(); + if let Some(bytes) = caps.get(1).and_then(|m| m.as_str().parse::().ok()) { + allocated = format_bytes(bytes); + } - if let Some(bytes) = caps.get(2).and_then(|m| m.as_str().parse::().ok()) { - active = format_bytes(bytes); - } + if let Some(bytes) = caps.get(2).and_then(|m| m.as_str().parse::().ok()) { + active = format_bytes(bytes); + } - if let Some(bytes) = caps.get(3).and_then(|m| m.as_str().parse::().ok()) { - metadata = format_bytes(bytes); - } + if let Some(bytes) = caps.get(3).and_then(|m| m.as_str().parse::().ok()) { + metadata = format_bytes(bytes); + } - if let Some(bytes) = caps.get(4).and_then(|m| m.as_str().parse::().ok()) { - resident = format_bytes(bytes); - } + if let Some(bytes) = caps.get(4).and_then(|m| m.as_str().parse::().ok()) { + resident = format_bytes(bytes); + } - if let Some(bytes) = caps.get(5).and_then(|m| m.as_str().parse::().ok()) { - mapped = format_bytes(bytes); - } + if let Some(bytes) = caps.get(5).and_then(|m| m.as_str().parse::().ok()) { + mapped = format_bytes(bytes); + } - if let Some(bytes) = caps.get(6).and_then(|m| m.as_str().parse::().ok()) { - retained = format_bytes(bytes); - } + if let Some(bytes) = caps.get(6).and_then(|m| m.as_str().parse::().ok()) { + retained = format_bytes(bytes); } } diff --git a/src/tools/mysql/client.rs b/src/tools/mysql/client.rs index 51b26fd..3978fee 100644 --- a/src/tools/mysql/client.rs +++ b/src/tools/mysql/client.rs @@ -172,11 +172,12 @@ impl MySQLTool { /// Gets the connection parameters for MySQL, with a clear priority: pub fn get_connection_params() -> Result<(String, u16)> { - if let (Ok(host), Ok(port_str)) = (std::env::var("MYSQL_HOST"), std::env::var("MYSQL_PORT")) + if let Some((host, port)) = std::env::var("MYSQL_HOST") + .ok() + .and_then(|h| std::env::var("MYSQL_PORT").ok().map(|p| (h, p))) + .and_then(|(h, p_str)| p_str.parse::().ok().map(|p| (h, p))) { - if let Ok(port) = port_str.parse::() { - return Ok((host, port)); - } + return Ok((host, port)); } let config = crate::config_loader::load_config()?; From 5e18977ae85da9b8cbac23809734c3683e8506b8 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Fri, 22 Aug 2025 15:19:43 +0800 Subject: [PATCH 5/8] chore: add rust-toolchain configuration for stable channel with components --- rust-toolchain.toml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..f6c7e20 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustfmt", "clippy", "rust-analyzer"] \ No newline at end of file From 37a1eccdacaaeeb688c581e5ca24677df53ce670 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Fri, 22 Aug 2025 15:35:04 +0800 Subject: [PATCH 6/8] ci: add Rust toolchain stable version --- .github/workflows/ci.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 58c8936..15eb377 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,11 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + - name: Format run: cargo fmt --all -- --check @@ -33,6 +38,10 @@ jobs: - macos-latest steps: - uses: actions/checkout@v4 + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: clippy - name: Test run: cargo test --all-targets --workspace env: From c083bd5ec7a51432fb2623a4f9b20c22d8e91f48 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Fri, 22 Aug 2025 16:01:02 +0800 Subject: [PATCH 7/8] refactor: simplify conditional checks in config parsing and process detection --- src/config_loader/config_parser.rs | 22 +++++----- src/config_loader/process_detector.rs | 60 +++++++++++++-------------- src/tools/common/fs_utils.rs | 12 +++--- 3 files changed, 46 insertions(+), 48 deletions(-) diff --git a/src/config_loader/config_parser.rs b/src/config_loader/config_parser.rs index 420c0b5..873158a 100644 --- a/src/config_loader/config_parser.rs +++ b/src/config_loader/config_parser.rs @@ -182,17 +182,17 @@ fn parse_config_content( continue; } - if line.starts_with(LOG_DIR_KEY) { - if let Some(log_dir) = extract_value(line) { - match install_dir { - Some(install) if log_dir.contains("${DORIS_HOME}") => { - let replaced = - log_dir.replace("${DORIS_HOME}", install.to_str().unwrap_or("")); - config.log_dir = PathBuf::from(replaced); - } - _ => { - config.log_dir = PathBuf::from(log_dir); - } + if line.starts_with(LOG_DIR_KEY) + && let Some(log_dir) = extract_value(line) + { + match install_dir { + Some(install) if log_dir.contains("${DORIS_HOME}") => { + let replaced = + log_dir.replace("${DORIS_HOME}", install.to_str().unwrap_or("")); + config.log_dir = PathBuf::from(replaced); + } + _ => { + config.log_dir = PathBuf::from(log_dir); } } } diff --git a/src/config_loader/process_detector.rs b/src/config_loader/process_detector.rs index 195ffab..1eaa2a2 100644 --- a/src/config_loader/process_detector.rs +++ b/src/config_loader/process_detector.rs @@ -89,12 +89,12 @@ fn detect_process_detailed(env: Environment) -> Result { pub fn get_process_command(pid: u32) -> Result { // Try /proc/PID/cmdline on Linux (most direct and reliable when available) let proc_cmdline = Path::new("/proc").join(pid.to_string()).join("cmdline"); - if proc_cmdline.exists() { - if let Ok(content) = std::fs::read_to_string(&proc_cmdline) { - let command = content.replace('\0', " ").trim().to_string(); - if !command.is_empty() { - return Ok(command); - } + if proc_cmdline.exists() + && let Ok(content) = std::fs::read_to_string(&proc_cmdline) + { + let command = content.replace('\0', " ").trim().to_string(); + if !command.is_empty() { + return Ok(command); } } @@ -104,14 +104,12 @@ pub fn get_process_command(pid: u32) -> Result { if let Ok(output) = Command::new("ps") .args(["-p", &pid.to_string(), "-o", format]) .output() + && output.status.success() + && let Ok(s) = String::from_utf8(output.stdout) { - if output.status.success() { - if let Ok(s) = String::from_utf8(output.stdout) { - let cmd = s.trim().to_string(); - if !cmd.is_empty() { - return Ok(cmd); - } - } + let cmd = s.trim().to_string(); + if !cmd.is_empty() { + return Ok(cmd); } } } @@ -246,18 +244,18 @@ pub fn detect_mixed_deployment(config: &mut crate::config_loader::DorisConfig) - config.fe_process_command = Some(fe_process.command.clone()); config.fe_install_dir = Some(fe_process.doris_home.clone()); - if config.environment != crate::config_loader::Environment::FE { - if let Ok(fe_config) = crate::config_loader::config_parser::parse_config_from_path( + if config.environment != crate::config_loader::Environment::FE + && let Ok(fe_config) = crate::config_loader::config_parser::parse_config_from_path( crate::config_loader::Environment::FE, &fe_process.doris_home, - ) { - config.http_port = fe_config.http_port; - config.rpc_port = fe_config.rpc_port; - config.query_port = fe_config.query_port; - config.edit_log_port = fe_config.edit_log_port; - config.cloud_http_port = fe_config.cloud_http_port; - config.meta_dir = fe_config.meta_dir; - } + ) + { + config.http_port = fe_config.http_port; + config.rpc_port = fe_config.rpc_port; + config.query_port = fe_config.query_port; + config.edit_log_port = fe_config.edit_log_port; + config.cloud_http_port = fe_config.cloud_http_port; + config.meta_dir = fe_config.meta_dir; } } @@ -266,16 +264,16 @@ pub fn detect_mixed_deployment(config: &mut crate::config_loader::DorisConfig) - config.be_process_command = Some(be_process.command.clone()); config.be_install_dir = Some(be_process.doris_home.clone()); - if config.environment != crate::config_loader::Environment::BE { - if let Ok(be_config) = crate::config_loader::config_parser::parse_config_from_path( + if config.environment != crate::config_loader::Environment::BE + && let Ok(be_config) = crate::config_loader::config_parser::parse_config_from_path( crate::config_loader::Environment::BE, &be_process.doris_home, - ) { - config.be_port = be_config.be_port; - config.brpc_port = be_config.brpc_port; - config.webserver_port = be_config.webserver_port; - config.heartbeat_service_port = be_config.heartbeat_service_port; - } + ) + { + config.be_port = be_config.be_port; + config.brpc_port = be_config.brpc_port; + config.webserver_port = be_config.webserver_port; + config.heartbeat_service_port = be_config.heartbeat_service_port; } } } diff --git a/src/tools/common/fs_utils.rs b/src/tools/common/fs_utils.rs index 1a414db..5af975f 100644 --- a/src/tools/common/fs_utils.rs +++ b/src/tools/common/fs_utils.rs @@ -16,12 +16,12 @@ pub fn save_toml_to_file(obj: &T, file_path: &Path) -> Resu /// Ensures that the directory for a given path exists, creating it if necessary. pub fn ensure_dir_exists(path: &Path) -> Result<()> { - if let Some(parent) = path.parent() { - if !parent.exists() { - fs::create_dir_all(parent).map_err(|e| { - crate::error::CliError::ConfigError(format!("Failed to create directory: {e}")) - })?; - } + if let Some(parent) = path.parent() + && !parent.exists() + { + fs::create_dir_all(parent).map_err(|e| { + crate::error::CliError::ConfigError(format!("Failed to create directory: {e}")) + })?; } Ok(()) } From aac4064a1b3005e1d2c6fe6e495c1038215b51c1 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Fri, 22 Aug 2025 16:02:23 +0800 Subject: [PATCH 8/8] ci: fix cargo fmt --- src/config_loader/config_parser.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/config_loader/config_parser.rs b/src/config_loader/config_parser.rs index 873158a..8da2e4c 100644 --- a/src/config_loader/config_parser.rs +++ b/src/config_loader/config_parser.rs @@ -187,8 +187,7 @@ fn parse_config_content( { match install_dir { Some(install) if log_dir.contains("${DORIS_HOME}") => { - let replaced = - log_dir.replace("${DORIS_HOME}", install.to_str().unwrap_or("")); + let replaced = log_dir.replace("${DORIS_HOME}", install.to_str().unwrap_or("")); config.log_dir = PathBuf::from(replaced); } _ => {