From 55d4d94a9917d2d852c71795e2db39c92c8f0149 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Thu, 28 Aug 2025 16:08:19 +0800 Subject: [PATCH] feat: implement table information collection --- src/tools/be/memz.rs | 34 +-- src/tools/common/format_utils.rs | 33 +++ src/tools/common/mod.rs | 1 + src/tools/fe/mod.rs | 2 + src/tools/fe/table_info/browser.rs | 341 +++++++++++++++++++++++++++++ src/tools/fe/table_info/mod.rs | 324 +++++++++++++++++++++++++++ src/tools/fe/table_info/ops.rs | 221 +++++++++++++++++++ src/tools/fe/table_info/sql.rs | 55 +++++ src/ui/menu.rs | 11 +- src/ui/service_handlers.rs | 5 + 10 files changed, 1000 insertions(+), 27 deletions(-) create mode 100644 src/tools/common/format_utils.rs create mode 100644 src/tools/fe/table_info/browser.rs create mode 100644 src/tools/fe/table_info/mod.rs create mode 100644 src/tools/fe/table_info/ops.rs create mode 100644 src/tools/fe/table_info/sql.rs diff --git a/src/tools/be/memz.rs b/src/tools/be/memz.rs index 411bcb6..086db9c 100644 --- a/src/tools/be/memz.rs +++ b/src/tools/be/memz.rs @@ -1,6 +1,7 @@ use super::be_http_client; use crate::config::Config; use crate::error::Result; +use crate::tools::common::format_utils; use crate::tools::{ExecutionResult, Tool}; use crate::ui; use chrono::Utc; @@ -102,23 +103,6 @@ impl Tool for MemzGlobalTool { } } -/// Format bytes to a human-readable string -fn format_bytes(bytes: u64) -> String { - const KB: u64 = 1024; - const MB: u64 = KB * 1024; - const GB: u64 = MB * 1024; - - if bytes >= GB { - format!("{:.2} GB ({bytes} bytes)", bytes as f64 / GB as f64) - } else if bytes >= MB { - format!("{:.2} MB ({bytes} bytes)", bytes as f64 / MB as f64) - } else if bytes >= KB { - format!("{:.2} KB ({bytes} bytes)", bytes as f64 / KB as f64) - } else { - format!("{bytes} bytes") - } -} - /// Extract memory metrics from the HTML response fn extract_memory_metrics(html_content: &str) -> (String, String) { let re = Regex::new(r"Allocated: (\d+), active: (\d+), metadata: (\d+).*?, resident: (\d+), mapped: (\d+), retained: (\d+)").unwrap(); @@ -141,27 +125,27 @@ fn extract_memory_metrics(html_content: &str) -> (String, String) { { let caps = re.captures(html_content).unwrap(); if let Some(bytes) = caps.get(1).and_then(|m| m.as_str().parse::().ok()) { - allocated = format_bytes(bytes); + allocated = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = caps.get(2).and_then(|m| m.as_str().parse::().ok()) { - active = format_bytes(bytes); + active = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = caps.get(3).and_then(|m| m.as_str().parse::().ok()) { - metadata = format_bytes(bytes); + metadata = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = caps.get(4).and_then(|m| m.as_str().parse::().ok()) { - resident = format_bytes(bytes); + resident = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = caps.get(5).and_then(|m| m.as_str().parse::().ok()) { - mapped = format_bytes(bytes); + mapped = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = caps.get(6).and_then(|m| m.as_str().parse::().ok()) { - retained = format_bytes(bytes); + retained = format_utils::format_bytes(bytes, 2, true); } } @@ -170,7 +154,7 @@ fn extract_memory_metrics(html_content: &str) -> (String, String) { .and_then(|caps| caps.get(1)) .and_then(|m| m.as_str().parse::().ok()) { - thread_cache = format_bytes(bytes); + thread_cache = format_utils::format_bytes(bytes, 2, true); } if let Some(bytes) = dirty_pages_re @@ -178,7 +162,7 @@ fn extract_memory_metrics(html_content: &str) -> (String, String) { .and_then(|caps| caps.get(1)) .and_then(|m| m.as_str().parse::().ok()) { - dirty_pages = format_bytes(bytes); + dirty_pages = format_utils::format_bytes(bytes, 2, true); } let table = format!( diff --git a/src/tools/common/format_utils.rs b/src/tools/common/format_utils.rs new file mode 100644 index 0000000..059a657 --- /dev/null +++ b/src/tools/common/format_utils.rs @@ -0,0 +1,33 @@ +/// Format bytes to a human-readable string with customizable precision and format +pub fn format_bytes(bytes: u64, precision: usize, show_original: bool) -> String { + const KB: f64 = 1024.0; + const MB: f64 = KB * 1024.0; + const GB: f64 = MB * 1024.0; + + if bytes >= GB as u64 { + let formatted = format!("{:.precision$} GB", bytes as f64 / GB); + if show_original { + format!("{} ({bytes} bytes)", formatted) + } else { + formatted + } + } else if bytes >= MB as u64 { + let formatted = format!("{:.precision$} MB", bytes as f64 / MB); + if show_original { + format!("{} ({bytes} bytes)", formatted) + } else { + formatted + } + } else if bytes >= KB as u64 { + let formatted = format!("{:.precision$} KB", bytes as f64 / KB); + if show_original { + format!("{} ({bytes} bytes)", formatted) + } else { + formatted + } + } else if show_original { + format!("{bytes} bytes") + } else { + format!("{} B", bytes) + } +} diff --git a/src/tools/common/mod.rs b/src/tools/common/mod.rs index b5b6769..e5846ab 100644 --- a/src/tools/common/mod.rs +++ b/src/tools/common/mod.rs @@ -1,2 +1,3 @@ +pub mod format_utils; pub mod fs_utils; pub mod jmap; diff --git a/src/tools/fe/mod.rs b/src/tools/fe/mod.rs index 35a88ae..022ee3a 100644 --- a/src/tools/fe/mod.rs +++ b/src/tools/fe/mod.rs @@ -2,8 +2,10 @@ mod jmap; mod jstack; mod profiler; pub mod routine_load; +pub mod table_info; pub use jmap::{JmapDumpTool, JmapHistoTool}; pub use jstack::JstackTool; pub use profiler::FeProfilerTool; pub use routine_load::{RoutineLoadJobLister, get_routine_load_tools}; +pub use table_info::{FeTableInfoTool, TableIdentity, TableInfoReport}; diff --git a/src/tools/fe/table_info/browser.rs b/src/tools/fe/table_info/browser.rs new file mode 100644 index 0000000..2e4c131 --- /dev/null +++ b/src/tools/fe/table_info/browser.rs @@ -0,0 +1,341 @@ +use anyhow::Result; + +use crate::ui::{InteractiveSelector, print_error, print_info}; + +use super::{FeTableInfoTool, TableIdentity}; +use std::fs; +use std::path::PathBuf; + +pub fn run_interactive(config: &crate::config::Config) -> Result<()> { + loop { + match select_database_or_bulk(config)? { + DatabaseSelection::Single(db) => match select_table_or_bulk(config, &db)? { + TableSelection::Single(ident) => { + let report = FeTableInfoTool::collect_one(config, &ident)?; + render_brief(&report); + } + TableSelection::AllInDb(db_name) => { + let total = FeTableInfoTool::list_tables(config, Some(&db_name))?.len(); + let conc = FeTableInfoTool::suggest_concurrency(total); + let reports = FeTableInfoTool::collect_all_in_db(config, &db_name, conc)?; + if let Ok(files) = save_reports_txt(config, &reports, false) { + for f in files { + print_info(&format!("Saved: {}", f.display())); + } + } + render_batch_summary(&db_name, reports.len()); + } + }, + DatabaseSelection::AllDbs => { + print_info("Scanning all databases and tables..."); + let all_tables = FeTableInfoTool::list_tables(config, None)?; + let conc = if all_tables.is_empty() { + 16 + } else { + FeTableInfoTool::suggest_concurrency(all_tables.len()) + }; + print_info(&format!("Found {} tables, starting...", all_tables.len())); + let reports = FeTableInfoTool::collect_many(config, &all_tables, conc)?; + if let Ok(files) = save_reports_txt(config, &reports, true) { + print_info(&format!("Saved: {}", files[0].display())); + } + render_batch_summary("", reports.len()); + } + } + + match prompt_next_action()? { + NextAction::AnalyzeAnother => continue, + NextAction::BackToFeMenu => return Ok(()), + NextAction::ExitApp => { + crate::ui::print_goodbye(); + std::process::exit(0); + } + } + } +} + +pub fn select_database(config: &crate::config::Config) -> Result { + let dbs = FeTableInfoTool::list_databases(config)?; + match create_string_selector(dbs, "Select a database".to_string(), false, "")? { + SelectionResult::Single(db) => Ok(db), + SelectionResult::All => unreachable!(), + } +} + +enum SelectionResult { + Single(T), + All, +} + +fn create_string_selector( + items: Vec, + title: String, + add_all_option: bool, + all_option_text: &str, +) -> Result> { + if items.is_empty() { + print_error("No items found."); + anyhow::bail!("no items") + } + + let mut options = items; + if add_all_option { + options.push(all_option_text.to_string()); + } + + let selector = InteractiveSelector::new(options.clone(), title).with_page_size(30); + let selected = selector.select()?.clone(); + + if add_all_option && selected == all_option_text { + Ok(SelectionResult::All) + } else { + Ok(SelectionResult::Single(selected)) + } +} + +enum DatabaseSelection { + Single(String), + AllDbs, +} + +fn select_database_or_bulk(config: &crate::config::Config) -> Result { + let dbs = FeTableInfoTool::list_databases(config)?; + match create_string_selector( + dbs, + "Select a database".to_string(), + true, + "[All Databases]", + )? { + SelectionResult::Single(db) => Ok(DatabaseSelection::Single(db)), + SelectionResult::All => Ok(DatabaseSelection::AllDbs), + } +} + +enum TableSelection { + Single(TableIdentity), + AllInDb(String), +} + +fn select_table_or_bulk(config: &crate::config::Config, database: &str) -> Result { + let tables = FeTableInfoTool::list_tables(config, Some(database))?; + let names: Vec = tables + .into_iter() + .filter(|t| t.schema == database) + .map(|t| t.name) + .collect(); + + match create_string_selector( + names, + format!("Select a table in {}", database), + true, + "[All tables in this DB]", + )? { + SelectionResult::Single(name) => Ok(TableSelection::Single(TableIdentity { + schema: database.to_string(), + name, + })), + SelectionResult::All => Ok(TableSelection::AllInDb(database.to_string())), + } +} + +fn render_brief(report: &super::TableInfoReport) { + let content = generate_report_content(report); + for line in content.lines() { + print_info(line); + } +} + +fn generate_report_content(report: &super::TableInfoReport) -> String { + let mut out = String::new(); + out.push('\n'); + out.push_str(&"=".repeat(80)); + out.push('\n'); + out.push_str(&format!( + "Table Info: {}.{}\n", + report.ident.schema, report.ident.name + )); + out.push_str(&"-".repeat(80)); + out.push('\n'); + + let model = format!("{:?}", report.model); + let keys = if report.key_columns.is_empty() { + "-".to_string() + } else { + report.key_columns.join(", ") + }; + let bucket_str = match report.bucket { + super::BucketCount::Fixed(n) => n.to_string(), + super::BucketCount::Auto => "AUTO".to_string(), + }; + let bucket_key = report + .bucketing_key + .as_ref() + .map(|v| v.join(", ")) + .unwrap_or_else(|| "-".to_string()); + let mow = report + .merge_on_write + .map(|v| if v { "Yes" } else { "No" }) + .unwrap_or("-"); + + out.push_str(&format!(" {:<18} {}\n", "Table Type:", model)); + out.push_str(&format!(" {:<18} {}\n", "Key Columns:", keys)); + out.push_str(&format!(" {:<18} {}\n", "Bucketing Key:", bucket_key)); + out.push_str(&format!(" {:<18} {}\n", "Bucket Count:", bucket_str)); + out.push_str(&format!(" {:<18} {}\n", "Merge-on-Write:", mow)); + + let indexes_line = if report.indexes.is_empty() { + "None".to_string() + } else { + report + .indexes + .iter() + .map(|i| format!("{}({})", i.name, i.index_type)) + .collect::>() + .join(", ") + }; + out.push_str(&format!( + " {:<18} {}\n", + "Indexes:", + truncate(&indexes_line, 50) + )); + + out.push('\n'); + out.push_str("Partitions:\n"); + out.push_str(&build_partitions_table(&report.partitions)); + out.push_str(&format!("Total partitions: {}\n", report.partitions.len())); + out.push_str(&"=".repeat(80)); + out +} + +fn render_batch_summary(scope: &str, total: usize) { + print_info(""); + print_info(&"=".repeat(80)); + print_info(&format!("Batch collection completed for {}", scope)); + print_info(&format!("Collected tables: {}", total)); + print_info(&"=".repeat(80)); +} + +fn build_partitions_table(parts: &[super::PartitionStat]) -> String { + let w_part = 18usize; + let w_size = 10usize; + let w_rows = 12usize; + let w_buck = 8usize; + + let mut s = String::new(); + let top = format!( + "┌{}┬{}┬{}┬{}┐\n", + "─".repeat(w_part + 2), + "─".repeat(w_size + 2), + "─".repeat(w_rows + 2), + "─".repeat(w_buck + 2) + ); + let mid = format!( + "├{}┼{}┼{}┼{}┤\n", + "─".repeat(w_part + 2), + "─".repeat(w_size + 2), + "─".repeat(w_rows + 2), + "─".repeat(w_buck + 2) + ); + let bot = format!( + "└{}┴{}┴{}┴{}┘\n", + "─".repeat(w_part + 2), + "─".repeat(w_size + 2), + "─".repeat(w_rows + 2), + "─".repeat(w_buck + 2) + ); + + s.push_str(&top); + s.push_str(&format!( + "│ {:w_size$} │ {:>w_rows$} │ {:>w_buck$} │\n", + "Partition", + "Size", + "Rows", + "Buckets", + w_part = w_part, + w_size = w_size, + w_rows = w_rows, + w_buck = w_buck + )); + s.push_str(&mid); + for p in parts.iter() { + let size = crate::tools::common::format_utils::format_bytes(p.size_bytes, 3, false); + s.push_str(&format!( + "│ {:w_size$} │ {:>w_rows$} │ {:>w_buck$} │\n", + truncate(&p.name, w_part), + size, + p.rows, + p.buckets, + w_part = w_part, + w_size = w_size, + w_rows = w_rows, + w_buck = w_buck + )); + } + s.push_str(&bot); + s +} + +enum NextAction { + AnalyzeAnother, + BackToFeMenu, + ExitApp, +} + +fn prompt_next_action() -> Result { + let items = vec![ + "Analyze another table/database".to_string(), + "Back to FE menu".to_string(), + "Exit".to_string(), + ]; + let selector = + InteractiveSelector::new(items.clone(), "What would you like to do next?".to_string()) + .with_page_size(30); + let sel = selector.select()?; + match sel.as_str() { + "Analyze another table/database" => Ok(NextAction::AnalyzeAnother), + "Back to FE menu" => Ok(NextAction::BackToFeMenu), + _ => Ok(NextAction::ExitApp), + } +} + +fn save_reports_txt( + config: &crate::config::Config, + reports: &[super::TableInfoReport], + single_file: bool, +) -> anyhow::Result> { + let base_dir: PathBuf = config.output_dir.join("table-info"); + config.ensure_output_dir()?; + + if single_file { + let file_path = base_dir.join("all_databases_table_info.txt"); + crate::tools::common::fs_utils::ensure_dir_exists(&file_path)?; + let mut content = String::new(); + for r in reports { + content.push_str(&generate_report_content(r)); + content.push('\n'); + content.push_str(&"-".repeat(80)); + content.push('\n'); + } + fs::write(&file_path, content)?; + Ok(vec![file_path]) + } else { + let mut files: Vec = Vec::with_capacity(reports.len()); + for r in reports { + let dir = base_dir.join(&r.ident.schema); + let file_path = dir.join(format!("{}.txt", &r.ident.name)); + crate::tools::common::fs_utils::ensure_dir_exists(&file_path)?; + let content = generate_report_content(r); + fs::write(&file_path, content)?; + files.push(file_path); + } + Ok(files) + } +} + +fn truncate(s: &str, max: usize) -> String { + if s.len() <= max { + s.to_string() + } else { + format!("{}…", &s[..max.saturating_sub(1)]) + } +} diff --git a/src/tools/fe/table_info/mod.rs b/src/tools/fe/table_info/mod.rs new file mode 100644 index 0000000..29d6d79 --- /dev/null +++ b/src/tools/fe/table_info/mod.rs @@ -0,0 +1,324 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +use std::sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, +}; +use std::thread; + +pub mod browser; +mod ops; +pub mod sql; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TableIdentity { + pub schema: String, + pub name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TableModel { + UniqueKey, + DuplicateKey, + AggregateKey, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BucketCount { + Fixed(u32), + Auto, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PartitionStat { + pub name: String, + pub size_bytes: u64, + pub rows: u64, + pub buckets: u32, + pub avg_bucket_size_bytes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TableStatsFromPartitions { + pub partitions: Vec, + pub total_buckets: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ColumnDef { + pub name: String, + pub data_type: String, + pub nullable: bool, + pub is_key: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IndexInfo { + pub name: String, + pub columns: Vec, + pub index_type: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BucketingSpec { + Hash { + columns: Vec, + buckets: BucketCount, + }, + Random { + buckets: BucketCount, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateTableParsed { + pub model: TableModel, + pub key_columns: Vec, + pub bucketing: BucketingSpec, + pub merge_on_write: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TableInfoReport { + pub ident: TableIdentity, + pub model: TableModel, + pub key_columns: Vec, + pub bucketing_key: Option>, + pub bucket: BucketCount, + pub merge_on_write: Option, + pub indexes: Vec, + pub columns: Vec, + pub partitions: Vec, +} + +pub struct FeTableInfoTool; + +impl FeTableInfoTool { + fn create_client(cfg: &crate::config::Config) -> Result { + let doris_cfg = crate::config_loader::load_config()?.with_app_config(cfg); + Ok(sql::MySqlExecutor::from_config(doris_cfg)) + } + + pub fn list_tables( + cfg: &crate::config::Config, + schema: Option<&str>, + ) -> Result> { + // Load doris config to pass mysql credentials + let client = Self::create_client(cfg)?; + let rs = sql::query_table_list(&client, schema)?; + + // Map raw lines "schema\ttable" into identities (since raw mode -N -B -r -A) + let mut out = Vec::new(); + for line in rs.0.lines() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let mut parts = trimmed.split('\t'); + if let (Some(s), Some(t)) = (parts.next(), parts.next()) { + out.push(TableIdentity { + schema: s.to_string(), + name: t.to_string(), + }); + } + } + Ok(out) + } + + pub fn list_databases(cfg: &crate::config::Config) -> anyhow::Result> { + let client = Self::create_client(cfg)?; + let rs = sql::query_database_list(&client)?; + let mut out = Vec::new(); + for line in rs.0.lines() { + let db = line.trim(); + if db.is_empty() { + continue; + } + if ["information_schema", "mysql", "__internal_schema"].contains(&db) { + continue; + } + out.push(db.to_string()); + } + out.sort(); + Ok(out) + } + + pub fn collect_one( + cfg: &crate::config::Config, + ident: &TableIdentity, + ) -> Result { + let client = Self::create_client(cfg)?; + let (create, parts, cols, idxs) = ops::fetch_and_parse_all(&client, ident)?; + let report = assemble_report(ident, &create, &parts, &cols, &idxs); + Ok(report) + } + + fn collect_many( + cfg: &crate::config::Config, + idents: &[TableIdentity], + concurrency: usize, + ) -> Result> { + if idents.is_empty() { + return Ok(Vec::new()); + } + + let doris_cfg = crate::config_loader::load_config()?.with_app_config(cfg); + let worker_count = concurrency + .max(1) + .min(Self::suggest_concurrency(idents.len())); + + let total = idents.len(); + let shared_idents: Arc> = Arc::new(idents.to_vec()); + let results: Arc>>> = + Arc::new(Mutex::new(vec![None; total])); + let next_index = Arc::new(AtomicUsize::new(0)); + let progress = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::with_capacity(worker_count); + for _ in 0..worker_count { + let doris_cfg_cloned = doris_cfg.clone(); + let shared_idents_cloned = Arc::clone(&shared_idents); + let results_cloned = Arc::clone(&results); + let next_index_cloned = Arc::clone(&next_index); + let progress_cloned = Arc::clone(&progress); + + let handle = thread::spawn(move || { + let client = sql::MySqlExecutor::from_config(doris_cfg_cloned); + loop { + let idx = next_index_cloned.fetch_add(1, Ordering::SeqCst); + if idx >= shared_idents_cloned.len() { + break; + } + let ident = &shared_idents_cloned[idx]; + let res = ops::fetch_and_parse_all(&client, ident).map( + |(create, parts, cols, idxs)| { + assemble_report(ident, &create, &parts, &cols, &idxs) + }, + ); + match res { + Ok(rep) => { + if let Ok(mut guard) = results_cloned.lock() { + guard[idx] = Some(rep); + } + } + Err(e) => { + crate::ui::print_error(&format!( + "Collect failed for {}.{}: {}", + ident.schema, ident.name, e + )); + } + } + let done = progress_cloned.fetch_add(1, Ordering::SeqCst) + 1; + crate::ui::print_info(&format!( + "Process: {}/{} {}.{}", + done, total, ident.schema, ident.name + )); + } + }); + handles.push(handle); + } + + for h in handles { + let _ = h.join(); + } + + let reports: Vec = results + .lock() + .unwrap() + .clone() + .into_iter() + .flatten() + .collect(); + Ok(reports) + } + + pub fn collect_all_in_db( + cfg: &crate::config::Config, + db: &str, + concurrency: usize, + ) -> Result> { + let tables = Self::list_tables(cfg, Some(db))?; + let idents: Vec = tables.into_iter().filter(|t| t.schema == db).collect(); + Self::collect_many(cfg, &idents, concurrency) + } + + pub fn collect_all_in_all_dbs( + cfg: &crate::config::Config, + concurrency: usize, + ) -> Result> { + // One shot: list all tables across all databases to avoid double scanning + let idents: Vec = Self::list_tables(cfg, None)?; + Self::collect_many(cfg, &idents, concurrency) + } + + pub fn suggest_concurrency(total_tables: usize) -> usize { + if total_tables <= 1 { + return 1; + } + let hard_cap = 32usize; + let mut c = 2usize; + while c < total_tables && c < hard_cap { + c = c.saturating_mul(2); + } + c = c.min(total_tables).min(hard_cap); + c.max(1) + } +} + +fn assemble_report( + ident: &TableIdentity, + create: &CreateTableParsed, + parts: &TableStatsFromPartitions, + cols: &[ColumnDef], + idxs: &[IndexInfo], +) -> TableInfoReport { + let (final_bucket, bucketing_key) = match &create.bucketing { + BucketingSpec::Hash { columns, buckets } => (buckets.clone(), Some(columns.clone())), + BucketingSpec::Random { buckets } => (buckets.clone(), None), + }; + + let merge_on_write = match create.model { + TableModel::UniqueKey => create.merge_on_write, + _ => None, + }; + + TableInfoReport { + ident: ident.clone(), + model: create.model.clone(), + key_columns: create.key_columns.clone(), + bucketing_key, + bucket: final_bucket, + merge_on_write, + indexes: idxs.to_vec(), + columns: cols.to_vec(), + partitions: parts.partitions.clone(), + } +} + +pub(crate) fn parse_size(input: &str) -> u64 { + let s = input.trim(); + if s.is_empty() { + return 0; + } + + let parts: Vec<&str> = s.split_whitespace().collect(); + if parts.is_empty() { + return 0; + } + + let num = parts[0].parse::().unwrap_or(0.0); + let unit = parts + .get(1) + .map(|u| u.to_ascii_lowercase()) + .unwrap_or_else(|| "b".to_string()); + + let factor = match unit.as_str() { + "kb" => 1024.0, + "mb" => 1024.0 * 1024.0, + "gb" => 1024.0 * 1024.0 * 1024.0, + _ => 1.0, + }; + + (num * factor) as u64 +} diff --git a/src/tools/fe/table_info/ops.rs b/src/tools/fe/table_info/ops.rs new file mode 100644 index 0000000..a159785 --- /dev/null +++ b/src/tools/fe/table_info/ops.rs @@ -0,0 +1,221 @@ +use anyhow::Result; +use regex::Regex; + +use super::{ColumnDef, CreateTableParsed, IndexInfo, TableIdentity, TableStatsFromPartitions}; + +const V2_MIN_COLS: usize = 15; // up to DataSize index (14) +const V3_MIN_COLS: usize = 22; + +fn parse_column_list(input: &str) -> Vec { + input + .split(',') + .map(|s| s.trim().trim_matches('`').to_string()) + .filter(|s| !s.is_empty()) + .collect() +} + +fn parse_bucket_count(buckets: &str) -> super::BucketCount { + if buckets.eq_ignore_ascii_case("AUTO") { + super::BucketCount::Auto + } else { + buckets + .parse::() + .map(super::BucketCount::Fixed) + .unwrap_or(super::BucketCount::Auto) + } +} + +pub fn fetch_and_parse_all( + exec: &super::sql::MySqlExecutor, + ident: &TableIdentity, +) -> Result<( + CreateTableParsed, + TableStatsFromPartitions, + Vec, + Vec, +)> { + let create_rs = super::sql::query_show_create(exec, ident)?; + let parts_rs = super::sql::query_partitions(exec, ident)?; + + let create = parse_create_table(create_rs.0.as_str())?; + let parts = parse_partitions(&parts_rs)?; + let cols: Vec = Vec::new(); + let idxs = parse_indexes_from_create(create_rs.0.as_str()); + + Ok((create, parts, cols, idxs)) +} + +pub fn parse_create_table(raw_sql: &str) -> Result { + let model = if raw_sql.contains("UNIQUE KEY(") || raw_sql.contains("UNIQUE KEY (`") { + super::TableModel::UniqueKey + } else if raw_sql.contains("AGGREGATE KEY(") { + super::TableModel::AggregateKey + } else { + super::TableModel::DuplicateKey + }; + + let key_cols = Regex::new(r"(?i)(UNIQUE|DUPLICATE|AGGREGATE)\s+KEY\((?P[^\)]*)\)")? + .captures(raw_sql) + .and_then(|c| c.name("cols").map(|m| m.as_str().to_string())) + .unwrap_or_default(); + let key_columns = parse_column_list(&key_cols); + + let re_hash = Regex::new( + r"DISTRIBUTED\s+BY\s+HASH\((?P[^\)]*)\)\s+BUCKETS\s+(?PAUTO|\d+)", + )?; + let re_random = Regex::new(r"DISTRIBUTED\s+BY\s+RANDOM\s+BUCKETS\s+(?PAUTO|\d+)")?; + + let bucketing = if let Some(c) = re_hash.captures(raw_sql) { + let cols = c.name("cols").map(|m| m.as_str()).unwrap_or(""); + let columns = parse_column_list(cols); + let buckets = c.name("buckets").map(|m| m.as_str()).unwrap_or("AUTO"); + let bucket_count = parse_bucket_count(buckets); + super::BucketingSpec::Hash { + columns, + buckets: bucket_count, + } + } else if let Some(c) = re_random.captures(raw_sql) { + let buckets = c.name("buckets").map(|m| m.as_str()).unwrap_or("AUTO"); + let bucket_count = parse_bucket_count(buckets); + super::BucketingSpec::Random { + buckets: bucket_count, + } + } else { + super::BucketingSpec::Hash { + columns: vec![], + buckets: super::BucketCount::Auto, + } + }; + + let mow = if matches!(model, super::TableModel::UniqueKey) { + let lower = raw_sql.to_ascii_lowercase(); + if lower.contains("merge-on-write\" = \"true\"") + || lower.contains("enable_unique_key_merge_on_write\" = \"true\"") + || lower.contains("merge-on-write: yes") + { + Some(true) + } else if lower.contains("merge-on-write\" = \"false\"") { + Some(false) + } else { + None + } + } else { + None + }; + + Ok(CreateTableParsed { + model, + key_columns, + bucketing, + merge_on_write: mow, + }) +} + +pub fn parse_partitions(rows: &super::sql::ResultSet) -> Result { + let mut partitions = Vec::new(); + let mut first_bucket: Option = None; + let mut all_equal: bool = true; + + for line in rows.0.lines() { + let trimmed = line.trim_end(); + if trimmed.is_empty() { + continue; + } + let cols: Vec<&str> = trimmed.split('\t').collect(); + if cols.len() < V2_MIN_COLS { + continue; + } + + // Decide layout by column count + let (name_idx, buckets_idx, size_idx, rowcount_idx_opt): ( + usize, + usize, + usize, + Option, + ) = if cols.len() >= V3_MIN_COLS { + // Doris 3.x (has RowCount at the end) + (1, 8, 14, Some(cols.len() - 1)) + } else if cols.len() >= V2_MIN_COLS { + // Doris 2.x (no RowCount) + (1, 8, 14, None) + } else { + continue; + }; + + let name = cols + .get(name_idx) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + let buckets = cols + .get(buckets_idx) + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(0); + let data_size = cols.get(size_idx).map(|s| s.trim()).unwrap_or(""); + let row_count = rowcount_idx_opt + .and_then(|i| cols.get(i)) + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(0); + + let size_bytes = super::parse_size(data_size); + let avg_bucket_sz = if buckets > 0 { + Some(size_bytes / buckets as u64) + } else { + None + }; + + partitions.push(super::PartitionStat { + name, + size_bytes, + rows: row_count, + buckets, + avg_bucket_size_bytes: avg_bucket_sz, + }); + + if buckets > 0 { + if let Some(fb) = first_bucket { + if fb != buckets { + all_equal = false; + } + } else { + first_bucket = Some(buckets); + } + } + } + + let total_buckets = if all_equal { first_bucket } else { None }; + Ok(TableStatsFromPartitions { + partitions, + total_buckets, + }) +} + +pub fn parse_indexes_from_create(ddl: &str) -> Vec { + let mut result = Vec::new(); + // Match lines like: INDEX idx_comment (`comment`) USING INVERTED ... + let re = Regex::new( + r"(?m)^\s*INDEX\s+`?(?P\w+)`?\s*\((?P[^\)]*)\)\s*USING\s+(?P\w+)", + ) + .ok(); + if let Some(re) = re { + for cap in re.captures_iter(ddl) { + let name = cap + .name("name") + .map(|m| m.as_str()) + .unwrap_or("") + .to_string(); + let cols_raw = cap.name("cols").map(|m| m.as_str()).unwrap_or(""); + let columns = parse_column_list(cols_raw); + let itype = cap + .name("itype") + .map(|m| m.as_str()) + .unwrap_or("INDEX") + .to_uppercase(); + result.push(IndexInfo { + name, + columns, + index_type: itype, + }); + } + } + result +} diff --git a/src/tools/fe/table_info/sql.rs b/src/tools/fe/table_info/sql.rs new file mode 100644 index 0000000..cd99c0d --- /dev/null +++ b/src/tools/fe/table_info/sql.rs @@ -0,0 +1,55 @@ +use anyhow::Result; + +// Minimal ResultSet abstraction wrapping raw mysql output (-N -B -r -A) +#[derive(Debug, Clone)] +pub struct ResultSet(pub String); + +pub struct MySqlExecutor { + doris: crate::config_loader::DorisConfig, +} + +impl MySqlExecutor { + pub fn from_config(doris: crate::config_loader::DorisConfig) -> Self { + Self { doris } + } + + pub fn query(&self, sql: &str) -> Result { + let output = crate::tools::mysql::MySQLTool::query_sql_raw_with_config(&self.doris, sql)?; + Ok(ResultSet(output)) + } +} + +pub fn query_table_list(exec: &MySqlExecutor, schema: Option<&str>) -> Result { + let mut sql = String::from( + "SELECT table_schema, table_name FROM information_schema.tables \ + WHERE TABLE_TYPE = 'BASE TABLE' AND ENGINE = 'Doris' \ + AND TABLE_SCHEMA NOT IN ('__internal_schema', 'information_schema', 'mysql')", + ); + if let Some(db) = schema { + sql.push_str(&format!(" AND table_schema = '{}'", db.replace("'", "''"))); + } + sql.push_str(" ORDER BY table_schema, table_name;"); + exec.query(&sql) +} + +pub fn query_database_list(exec: &MySqlExecutor) -> Result { + exec.query("SHOW DATABASES;") +} + +pub fn query_show_create(exec: &MySqlExecutor, ident: &super::TableIdentity) -> Result { + let sql = format!( + "SHOW CREATE TABLE `{}`.`{}`;", + ident.schema.replace("`", "``"), + ident.name.replace("`", "``") + ); + exec.query(&sql) +} + +pub fn query_partitions(exec: &MySqlExecutor, ident: &super::TableIdentity) -> Result { + let sql = format!( + "SHOW PARTITIONS FROM `{}`.`{}`;", + ident.schema.replace("`", "``"), + ident.name.replace("`", "``") + ); + exec.query(&sql) +} diff --git a/src/ui/menu.rs b/src/ui/menu.rs index 56f5aca..391448d 100644 --- a/src/ui/menu.rs +++ b/src/ui/menu.rs @@ -120,6 +120,7 @@ pub enum FeToolAction { JmapHisto, Jstack, FeProfiler, + TableInfo, RoutineLoad, Back, } @@ -192,14 +193,20 @@ pub fn show_fe_tools_menu() -> Result { .to_string(), }, MenuOption { - action: FeToolAction::RoutineLoad, + action: FeToolAction::TableInfo, key: "[5]".to_string(), + name: "table-info".to_string(), + description: "Collect table info for a selected table".to_string(), + }, + MenuOption { + action: FeToolAction::RoutineLoad, + key: "[6]".to_string(), name: "routine-load".to_string(), description: "Routine Load management tools".to_string(), }, MenuOption { action: FeToolAction::Back, - key: "[6]".to_string(), + key: "[7]".to_string(), name: "← Back".to_string(), description: "Return to main menu".to_string(), }, diff --git a/src/ui/service_handlers.rs b/src/ui/service_handlers.rs index 57b5854..0f70f46 100644 --- a/src/ui/service_handlers.rs +++ b/src/ui/service_handlers.rs @@ -88,6 +88,11 @@ pub fn handle_fe_service_loop(config: &Config, tools: &[Box]) -> Resul } } } + crate::ui::FeToolAction::TableInfo => { + if let Err(e) = crate::tools::fe::table_info::browser::run_interactive(config) { + print_error(&format!("Table info browse failed: {e}")); + } + } crate::ui::FeToolAction::RoutineLoad => { if let Err(e) = handle_routine_load_loop(config, tools) { match e {