From 87dd0755287d3a6a296edf40db5d68b969af6977 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Tue, 30 Dec 2025 15:18:44 +0800 Subject: [PATCH] feat: Add FE Audit TopSQL tool for analyzing audit logs and generating reports --- README.md | 70 +++- src/lib.rs | 6 - src/tools/common/fs_utils.rs | 4 + src/tools/fe/audit_topsql/aggregate.rs | 144 +++++++ src/tools/fe/audit_topsql/mod.rs | 7 + src/tools/fe/audit_topsql/normalize.rs | 74 ++++ src/tools/fe/audit_topsql/parser.rs | 152 +++++++ src/tools/fe/audit_topsql/report.rs | 522 +++++++++++++++++++++++++ src/tools/fe/audit_topsql/tool.rs | 150 +++++++ src/tools/fe/mod.rs | 2 + src/tools/mod.rs | 3 +- src/ui/menu.rs | 9 +- src/ui/service_handlers.rs | 10 +- 13 files changed, 1131 insertions(+), 22 deletions(-) create mode 100644 src/tools/fe/audit_topsql/aggregate.rs create mode 100644 src/tools/fe/audit_topsql/mod.rs create mode 100644 src/tools/fe/audit_topsql/normalize.rs create mode 100644 src/tools/fe/audit_topsql/parser.rs create mode 100644 src/tools/fe/audit_topsql/report.rs create mode 100644 src/tools/fe/audit_topsql/tool.rs diff --git a/README.md b/README.md index 06ce291..92bebaf 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,77 @@ # cloud-cli -`cloud-cli` is a command-line tool designed to simplify the management and diagnosis of server-side applications. It provides an interactive menu to access various diagnostic tools, making it easier for developers and system administrators to troubleshoot processes. +`cloud-cli` is an interactive command-line tool for on-call troubleshooting of Apache Doris / SelectDB clusters. It provides a TUI menu that groups common FE/BE diagnostic workflows and writes collected artifacts into an output directory for easy archiving and sharing. ## Features -The tool is organized into two main categories: +The tool is organized into two categories: `FE` and `BE`. -### FE (Frontend/Java Applications) +### FE -- **`jstack`**: Prints Java thread stack traces for a given Java process, helping to diagnose hangs and deadlocks. -- **`jmap`**: Generates heap dumps and provides memory statistics for a Java process, useful for analyzing memory leaks. +- `fe-list`: List and select the FE target host (IP) for the current session based on `clusters.toml`. +- `jmap` (`jmap-dump`, `jmap-histo`): Java heap dump / histogram. +- `jstack`: Java thread dump. +- `fe-profiler`: Generate a flame graph using Doris `bin/profile_fe.sh` (requires async-profiler). +- `table-info`: Interactive database/table browser that collects and summarizes schema, indexes, partitions, and bucket details (supports exporting `.txt` reports). +- `routine-load`: Routine Load helper tools: + - `Get Job ID`: List and select a Routine Load job (cached locally for later analysis). + - `Performance Analysis`: Analyze per-commit rows/bytes/time from FE logs. + - `Traffic Monitor`: Aggregate per-minute `loadedRows` from FE logs. +- `fe-audit-topsql`: Parse `fe.audit.log`, normalize SQL into templates, and aggregate by CPU/time to generate a TopSQL report. Default filter is `count > 10`; if nothing matches, it falls back to showing results without the count filter. -### BE (Backend/General Processes) +### BE -- **`pstack`**: Displays the stack trace for any running process, offering insights into its execution state. -- **`get_be_vars`**: Retrieves and displays the environment variables of a running process. +- `be-list`: List and select the BE target host (IP) for the current session based on `clusters.toml` (defaults to `127.0.0.1`). +- `pstack`: Use `gdb` to dump process thread stacks (writes a `.txt` file). +- `jmap` (`jmap-dump`, `jmap-histo`): Java heap dump / histogram (only meaningful for JVM processes). +- `be-config`: + - `get-vars` (`get-be-vars`): Query BE configuration variables via HTTP `/varz`. + - `update-config` (`set-be-config`): Update configuration via HTTP `/api/update_config` (supports persist). +- `pipeline-tasks`: Fetch running pipeline tasks via HTTP `/api/running_pipeline_tasks` (auto-saves when the response is large). +- `memz`: + - `current` (`memz`): Fetch the current Jemalloc memory view via HTTP `/memz` (saves HTML and prints a summary). + - `global` (`memz-global`): Fetch the global memory view via HTTP `/memz?type=global`. ## Usage -To run the application, execute the binary. An interactive menu will appear, allowing you to select the desired diagnostic tool. +### Build and run ```sh -./cloud-cli +cargo build --release +./target/release/cloud-cli ``` +### Configuration + +- Persistent config: `~/.config/cloud-cli/config.toml` +- MySQL key file: `~/.config/cloud-cli/key` +- Cluster topology cache: `~/.config/cloud-cli/clusters.toml` +- First run: if an FE process is detected and MySQL credentials are missing, it will prompt you to configure and test the connection. On success, it writes `config.toml` and fetches cluster topology into `clusters.toml` (used by `fe-list`/`be-list`). +- Common environment variables (override persistent config): + - `JDK_PATH`: Set the JDK path (used by `jmap`/`jstack`). + - `OUTPUT_DIR`: Set the output directory (default: `/tmp/doris/collection`). + - `CLOUD_CLI_TIMEOUT`: External command timeout in seconds (default: `60`). + - `CLOUD_CLI_NO_PROGRESS`: Disable progress animation (`1`/`true`). + - `MYSQL_HOST` / `MYSQL_PORT`: Set Doris MySQL endpoint (defaults are derived from config; otherwise `127.0.0.1:9030`). + - `PROFILE_SECONDS`: Override `fe-profiler` collection duration in seconds. + - `CLOUD_CLI_FE_AUDIT_TOPSQL_INPUT`: Provide an explicit `fe.audit.log` path for `fe-audit-topsql` (skips interactive selection). + +### Output + +- Most tools write artifacts into the output directory (default: `/tmp/doris/collection`) and print `Output saved to: ...`. +- A few tools primarily print to stdout (e.g., `get-be-vars`), but still follow consistent prompts and error messages. + +### Runtime dependencies + +The CLI invokes some system commands at runtime. Ensure these are installed and available in `PATH`: + +- `mysql` (for cluster info, Routine Load, Table Info, etc.) +- `curl` (for BE HTTP tools) +- `gdb` (for `pstack`) +- `bash` + ## Releases -This project uses GitHub Actions to automatically build and release binaries for Linux (`x86_64` and `aarch64`). When a new version is tagged (e.g., `v1.0.0`), a new release is created. +This project uses GitHub Actions to build and publish Linux (`x86_64` / `aarch64`) binaries. When you create a tag (e.g., `v1.0.0`), a corresponding GitHub Release is produced. -You can download the latest pre-compiled binaries from the [GitHub Releases](https://github.com/QuakeWang/cloud-cli/releases) page. +You can download the latest prebuilt binaries from GitHub Releases: https://github.com/QuakeWang/cloud-cli/releases diff --git a/src/lib.rs b/src/lib.rs index 114042a..ef5c245 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,11 +7,9 @@ pub mod process; pub mod tools; pub mod ui; -use config::Config; use config_loader::persist_configuration; use dialoguer::Confirm; use error::Result; -use tools::Tool; use tools::mysql::CredentialManager; use ui::*; @@ -98,7 +96,3 @@ pub fn run_cli() -> Result<()> { ui::print_goodbye(); Ok(()) } - -fn execute_tool_enhanced(config: &Config, tool: &dyn Tool, service_name: &str) -> Result<()> { - ui::tool_executor::execute_tool_enhanced(config, tool, service_name) -} diff --git a/src/tools/common/fs_utils.rs b/src/tools/common/fs_utils.rs index 85f3c0d..e4c3629 100644 --- a/src/tools/common/fs_utils.rs +++ b/src/tools/common/fs_utils.rs @@ -92,6 +92,10 @@ pub fn collect_fe_logs(dir: &Path) -> Result> { collect_log_files(dir, "fe.log") } +pub fn collect_fe_audit_logs(dir: &Path) -> Result> { + collect_log_files(dir, "fe.audit.log") +} + pub fn collect_be_logs(dir: &Path) -> Result> { collect_log_files(dir, "be.INFO") } diff --git a/src/tools/fe/audit_topsql/aggregate.rs b/src/tools/fe/audit_topsql/aggregate.rs new file mode 100644 index 0000000..0eb3945 --- /dev/null +++ b/src/tools/fe/audit_topsql/aggregate.rs @@ -0,0 +1,144 @@ +use std::collections::HashMap; + +#[derive(Debug, Clone)] +pub struct TemplateStats { + pub sql_template: String, + pub table: Option, + pub slowest_stmt: String, + pub slowest_query_id: Option, + pub slowest_time_ms: u64, + pub count: u64, + pub total_time_ms: u64, + pub total_cpu_ms: u64, + pub max_time_ms: u64, + pub min_time_ms: u64, +} + +impl TemplateStats { + pub fn avg_time_ms(&self) -> f64 { + if self.count == 0 { + return 0.0; + } + self.total_time_ms as f64 / self.count as f64 + } + + pub fn avg_cpu_ms(&self) -> f64 { + if self.count == 0 { + return 0.0; + } + self.total_cpu_ms as f64 / self.count as f64 + } +} + +#[derive(Debug, Clone)] +pub struct AnalysisResult { + pub items: Vec, + pub total_templates: u64, + pub total_executions: u64, + pub total_time_ms: u64, + pub total_cpu_ms: u64, + pub used_fallback: bool, +} + +#[derive(Debug, Default)] +struct TemplateAgg { + table: Option, + slowest_stmt: String, + slowest_query_id: Option, + slowest_time_ms: u64, + count: u64, + total_time_ms: u64, + total_cpu_ms: u64, + max_time_ms: u64, + min_time_ms: u64, +} + +pub struct TemplateAggregator { + map: HashMap, +} + +impl TemplateAggregator { + pub fn new() -> Self { + Self { + map: HashMap::new(), + } + } + + pub fn push( + &mut self, + template: String, + table: Option, + time_ms: u64, + cpu_ms: u64, + stmt: String, + query_id: Option, + ) { + let entry = self.map.entry(template).or_insert_with(|| TemplateAgg { + min_time_ms: u64::MAX, + ..Default::default() + }); + + if entry.table.is_none() { + entry.table = table; + } + + entry.count += 1; + entry.total_time_ms += time_ms; + entry.total_cpu_ms += cpu_ms; + entry.max_time_ms = entry.max_time_ms.max(time_ms); + entry.min_time_ms = entry.min_time_ms.min(time_ms); + + if time_ms > entry.slowest_time_ms { + entry.slowest_time_ms = time_ms; + entry.slowest_stmt = stmt; + entry.slowest_query_id = query_id; + } + } + + pub fn finish(self, min_count_exclusive: u64) -> AnalysisResult { + let has_threshold_matches = self.map.values().any(|x| x.count > min_count_exclusive); + + let mut items: Vec = self + .map + .into_iter() + .filter(|(_, x)| !has_threshold_matches || x.count > min_count_exclusive) + .map(|(sql_template, x)| TemplateStats { + sql_template, + table: x.table, + slowest_stmt: x.slowest_stmt, + slowest_query_id: x.slowest_query_id, + slowest_time_ms: x.slowest_time_ms, + count: x.count, + total_time_ms: x.total_time_ms, + total_cpu_ms: x.total_cpu_ms, + max_time_ms: x.max_time_ms, + min_time_ms: if x.min_time_ms == u64::MAX { + 0 + } else { + x.min_time_ms + }, + }) + .collect(); + + items.sort_by_key(|x| std::cmp::Reverse(x.total_cpu_ms)); + + let total_templates = items.len() as u64; + let (total_executions, total_time_ms, total_cpu_ms) = + items.iter().fold((0u64, 0u64, 0u64), |acc, it| { + ( + acc.0 + it.count, + acc.1 + it.total_time_ms, + acc.2 + it.total_cpu_ms, + ) + }); + + AnalysisResult { + used_fallback: !has_threshold_matches && !items.is_empty(), + items, + total_templates, + total_executions, + total_time_ms, + total_cpu_ms, + } + } +} diff --git a/src/tools/fe/audit_topsql/mod.rs b/src/tools/fe/audit_topsql/mod.rs new file mode 100644 index 0000000..4ca14a6 --- /dev/null +++ b/src/tools/fe/audit_topsql/mod.rs @@ -0,0 +1,7 @@ +mod aggregate; +mod normalize; +mod parser; +mod report; +mod tool; + +pub use tool::FeAuditTopSqlTool; diff --git a/src/tools/fe/audit_topsql/normalize.rs b/src/tools/fe/audit_topsql/normalize.rs new file mode 100644 index 0000000..4c340a1 --- /dev/null +++ b/src/tools/fe/audit_topsql/normalize.rs @@ -0,0 +1,74 @@ +use once_cell::sync::Lazy; +use regex::Regex; +use std::collections::HashSet; + +static RE_LITERALS: Lazy = Lazy::new(|| Regex::new(r"'[^']*'|\b\d+(?:\.\d+)?\b").unwrap()); +static RE_IN_LIST: Lazy = + Lazy::new(|| Regex::new(r"(?i)\bin\s*\(\s*\?(?:\s*,\s*\?)*\s*\)").unwrap()); +static RE_NOT_IN_LIST: Lazy = + Lazy::new(|| Regex::new(r"(?i)\bnot\s+in\s*\(\s*\?(?:\s*,\s*\?)*\s*\)").unwrap()); +static RE_MULTI_SPACE: Lazy = Lazy::new(|| Regex::new(r"\s+").unwrap()); +static RE_OPERATOR_SPACE: Lazy = Lazy::new(|| Regex::new(r"\s*([=(),])\s*").unwrap()); +static RE_LINE_COMMENT: Lazy = Lazy::new(|| Regex::new(r"(?m)--[^\n]*").unwrap()); +static RE_BLOCK_COMMENT: Lazy = Lazy::new(|| Regex::new(r"(?s)/\*.*?\*/").unwrap()); +static RE_CTE: Lazy = + Lazy::new(|| Regex::new(r"(?i)(?:^|\bwith\b|,)\s*([a-z0-9_]+)\s+as\s*\(").unwrap()); +static RE_FROM: Lazy = Lazy::new(|| Regex::new(r"(?i)\bfrom\s+([a-z0-9_.`]+)").unwrap()); + +pub fn normalize_sql(sql: &str) -> String { + if sql.is_empty() { + return String::new(); + } + + let mut out = RE_BLOCK_COMMENT.replace_all(sql, " ").to_string(); + if out.contains('\n') { + out = RE_LINE_COMMENT.replace_all(&out, " ").to_string(); + } + out = out.replace(['\r', '\n', '\t'], " "); + + out = RE_LITERALS.replace_all(&out, "?").to_string(); + out = RE_NOT_IN_LIST.replace_all(&out, "not in (?)").to_string(); + out = RE_IN_LIST.replace_all(&out, "in (?)").to_string(); + out = RE_OPERATOR_SPACE.replace_all(&out, "$1").to_string(); + out.make_ascii_lowercase(); + out = RE_MULTI_SPACE.replace_all(&out, " ").to_string(); + out.trim().to_string() +} + +pub fn guess_table(normalized_sql: &str) -> Option { + if normalized_sql.is_empty() { + return None; + } + + let mut ctes: HashSet = HashSet::new(); + for cap in RE_CTE.captures_iter(normalized_sql) { + if let Some(name) = cap.get(1) { + ctes.insert(name.as_str().to_ascii_lowercase()); + } + } + + for cap in RE_FROM.captures_iter(normalized_sql) { + let Some(m) = cap.get(1) else { continue }; + let table = m.as_str().replace('`', ""); + let table_lc = table.to_ascii_lowercase(); + if ctes.contains(&table_lc) { + continue; + } + if matches!( + table_lc.as_str(), + "a" | "b" + | "c" + | "d" + | "t" + | "t_index" + | "params" + | "current_data" + | "last_period_data" + ) { + continue; + } + return Some(table); + } + + None +} diff --git a/src/tools/fe/audit_topsql/parser.rs b/src/tools/fe/audit_topsql/parser.rs new file mode 100644 index 0000000..1740564 --- /dev/null +++ b/src/tools/fe/audit_topsql/parser.rs @@ -0,0 +1,152 @@ +use crate::error::{CliError, Result}; +use once_cell::sync::Lazy; +use regex::Regex; +use std::io::BufRead; + +static RE_RECORD_START: Lazy = + Lazy::new(|| Regex::new(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} \[").unwrap()); +static RE_TIME_MS: Lazy = Lazy::new(|| Regex::new(r"\|Time\(ms\)=(\d+)").unwrap()); +static RE_CPU_MS: Lazy = Lazy::new(|| Regex::new(r"\|CpuTimeMS=(\d+)").unwrap()); +static RE_QUERY_ID: Lazy = Lazy::new(|| Regex::new(r"\|QueryId=([^|\n]+)").unwrap()); + +#[derive(Debug, Clone)] +pub struct AuditRecord { + pub time_ms: u64, + pub cpu_ms: u64, + pub query_id: Option, + pub stmt: String, +} + +#[derive(Debug, Default, Clone)] +pub struct ParseStats { + pub lines: u64, + pub records_total: u64, + pub records_ok: u64, + pub records_bad: u64, +} + +pub fn parse_records_streaming( + reader: R, + mut on_record: F, +) -> Result { + let mut stats = ParseStats::default(); + let mut buf = String::new(); + + let flush = |record: &str, stats: &mut ParseStats, on_record: &mut F| { + if record.trim().is_empty() { + return; + } + stats.records_total += 1; + + let time_ms = RE_TIME_MS + .captures(record) + .and_then(|c| c.get(1)) + .and_then(|m| m.as_str().parse::().ok()) + .unwrap_or(0); + + let cpu_ms = RE_CPU_MS + .captures(record) + .and_then(|c| c.get(1)) + .and_then(|m| m.as_str().parse::().ok()) + .unwrap_or(0); + + let stmt = extract_stmt(record); + if stmt.trim().is_empty() { + stats.records_bad += 1; + return; + } + + let query_id = RE_QUERY_ID + .captures(record) + .and_then(|c| c.get(1)) + .map(|m| m.as_str().trim().to_string()) + .filter(|s| !s.is_empty()); + + stats.records_ok += 1; + on_record(AuditRecord { + time_ms, + cpu_ms, + query_id, + stmt, + }); + }; + + for line in reader.lines() { + let mut line = line.map_err(CliError::IoError)?; + stats.lines += 1; + if line.ends_with('\r') { + line.pop(); + } + + if RE_RECORD_START.is_match(&line) { + if !buf.is_empty() { + flush(&buf, &mut stats, &mut on_record); + buf.clear(); + } + buf.push_str(&line); + } else if !buf.is_empty() { + buf.push('\n'); + buf.push_str(&line); + } else { + continue; + } + } + + if !buf.is_empty() { + flush(&buf, &mut stats, &mut on_record); + } + + Ok(stats) +} + +fn extract_stmt(record: &str) -> String { + const STMT_KEY: &str = "|Stmt="; + + let Some(stmt_pos) = record.find(STMT_KEY) else { + return String::new(); + }; + + let stmt_start = stmt_pos + STMT_KEY.len(); + let stmt_end = find_stmt_end(record, stmt_start); + + let stmt = record[stmt_start..stmt_end].trim(); + stmt.strip_suffix('|').unwrap_or(stmt).trim().to_string() +} + +fn find_stmt_end(record: &str, stmt_start: usize) -> usize { + let mut search = stmt_start; + while let Some(rel) = record[search..].find('|') { + let idx = search + rel; + if idx > stmt_start && is_kv_boundary(&record[idx..]) { + return idx; + } + search = idx + 1; + } + record.len() +} + +fn is_kv_boundary(s: &str) -> bool { + let Some(rest) = s.strip_prefix('|') else { + return false; + }; + let mut chars = rest.chars(); + let Some(first) = chars.next() else { + return false; + }; + if !first.is_ascii_uppercase() { + return false; + } + + let mut key_len = 1usize; + for ch in chars { + if ch == '=' { + return key_len >= 2; + } + if ch.is_ascii_alphanumeric() || matches!(ch, '_' | '(' | ')') { + key_len += 1; + continue; + } + return false; + } + false +} diff --git a/src/tools/fe/audit_topsql/report.rs b/src/tools/fe/audit_topsql/report.rs new file mode 100644 index 0000000..1e0ad0e --- /dev/null +++ b/src/tools/fe/audit_topsql/report.rs @@ -0,0 +1,522 @@ +use super::aggregate::{AnalysisResult, TemplateStats}; +use crate::ui::FormatHelper; +use console::style; +use std::collections::HashMap; + +const REPORT_WIDTH: usize = 96; +const CPU_SHARE_HOT: f64 = 0.25; +const CPU_SHARE_WARN: f64 = 0.15; +const CONSOLE_TEMPLATE_SNIPPET_LEN: usize = 80; +const CONSOLE_DIFF_CONTEXT_CHARS: usize = 32; +const CONSOLE_SAMPLE_STMT_MAX_LEN: usize = 400; +#[derive(Copy, Clone)] +enum Align { + Left, + Right, +} + +struct TableColumn<'a> { + header: &'a str, + width: usize, + align: Align, +} + +impl<'a> TableColumn<'a> { + const fn new(header: &'a str, width: usize, align: Align) -> Self { + Self { + header, + width, + align, + } + } + + fn format_cell(&self, value: &str) -> String { + let truncated = truncate_to_width(value, self.width); + match self.align { + Align::Left => format!("{: format!("{:>width$}", truncated, width = self.width), + } + } +} + +pub struct ReportOptions { + pub input_path: String, + pub min_count_exclusive: u64, + pub top_n: usize, + pub samples: usize, + pub max_stmt_len: usize, +} + +pub fn render_report( + result: &AnalysisResult, + opts: &ReportOptions, + parse_lines: u64, + parsed: u64, + bad: u64, +) -> String { + let mut out = String::new(); + + let rule = "=".repeat(REPORT_WIDTH); + out.push_str(&format!("{rule}\nFE Audit TopSQL Report\n{rule}\n")); + + render_summary(&mut out, result, opts, parse_lines, parsed, bad); + out.push('\n'); + + let top_n = opts.top_n.min(result.items.len()); + let by_total_cpu = &result.items; + + render_template_metrics(&mut out, by_total_cpu, top_n, result.total_cpu_ms); + render_samples(&mut out, by_total_cpu, opts, result.total_cpu_ms); + + out +} + +pub fn render_console_visualization( + result: &AnalysisResult, + opts: &ReportOptions, + _parse_lines: u64, + _parsed: u64, + _bad: u64, +) -> String { + let mut out = String::new(); + render_console_insights(result, opts, &mut out); + render_console_top_templates(result, &mut out); + render_console_template_details(result, &mut out); + out +} + +fn render_console_insights(result: &AnalysisResult, opts: &ReportOptions, out: &mut String) { + if result.items.is_empty() { + out.push_str("Insights:\n No SQL templates found in audit log.\n\n"); + return; + } + + let mut table_map: HashMap<&str, (u64, usize)> = HashMap::new(); + for tpl in &result.items { + if let Some(table) = tpl.table.as_deref() { + let entry = table_map.entry(table).or_insert((0, 0)); + entry.0 += tpl.total_cpu_ms; + entry.1 += 1; + } + } + out.push_str("Insights:\n"); + if result.used_fallback { + out.push_str(&format!( + " • No SQL patterns matched count > {}; showing results without count filter\n", + opts.min_count_exclusive + )); + } + if table_map.is_empty() { + out.push_str(" • Table information unavailable in audit log\n"); + } else { + let mut table_entries: Vec<(&str, (u64, usize))> = table_map.into_iter().collect(); + table_entries.sort_unstable_by_key(|(_, (cpu, _))| std::cmp::Reverse(*cpu)); + for (table, (cpu_ms, patterns)) in table_entries.into_iter().take(3) { + let share = percent(result.total_cpu_ms, cpu_ms); + let severity = if share >= CPU_SHARE_HOT { + style("⚠").red().bold() + } else if share >= CPU_SHARE_WARN { + style("▲").yellow() + } else { + style("•").green() + }; + out.push_str(&format!( + " {sev} Table {table} uses {pct} CPU ({patterns} patterns)\n", + sev = severity, + table = table, + pct = fmt_pct(share), + patterns = patterns + )); + } + } + + out.push('\n'); +} + +fn render_console_top_templates(result: &AnalysisResult, out: &mut String) { + if result.items.is_empty() { + return; + } + out.push_str(&format!( + "Top SQL Patterns Overview:\n{}\n", + "-".repeat(REPORT_WIDTH) + )); + let mut prev_tpl: Option<&str> = None; + for (idx, tpl) in result.items.iter().take(5).enumerate() { + let share = percent(result.total_cpu_ms, tpl.total_cpu_ms); + let colored_bar = colorize(render_bar(share, 30), share); + let colored_pct = colorize(fmt_pct(share), share); + let query_id = tpl.slowest_query_id.as_deref().unwrap_or("-"); + let header = format!( + "#{rank:<2} {bar} {pct:<8} cpu={cpu}ms count={count} avg={avg:.2}ms min={min_time}ms max={max_time}ms slowest={slow}ms", + rank = idx + 1, + bar = colored_bar, + pct = colored_pct, + cpu = FormatHelper::fmt_int(tpl.total_cpu_ms), + count = FormatHelper::fmt_int(tpl.count), + avg = tpl.avg_time_ms(), + min_time = FormatHelper::fmt_int(tpl.min_time_ms), + max_time = FormatHelper::fmt_int(tpl.max_time_ms), + slow = FormatHelper::fmt_int(tpl.slowest_time_ms), + ); + out.push_str(&format!("{header}\n")); + out.push_str(&format!( + " query_id={query_id} table={} sql={}\n", + tpl.table.as_deref().unwrap_or("-"), + truncate_one_line(&tpl.sql_template, CONSOLE_TEMPLATE_SNIPPET_LEN) + )); + + if let Some(prev_tpl) = prev_tpl + && let Some(diff_pos) = first_diff_char_pos(prev_tpl, &tpl.sql_template) + { + let head_len = (CONSOLE_TEMPLATE_SNIPPET_LEN.saturating_sub(3)) / 2; + if diff_pos >= head_len { + let prev_ctx = diff_context(prev_tpl, diff_pos, CONSOLE_DIFF_CONTEXT_CHARS); + let cur_ctx = diff_context(&tpl.sql_template, diff_pos, CONSOLE_DIFF_CONTEXT_CHARS); + out.push_str(&format!( + " diff@{diff_pos} prev=\"{prev_ctx}\" -> curr=\"{cur_ctx}\"\n" + )); + } + } + prev_tpl = Some(&tpl.sql_template); + } + out.push('\n'); +} + +fn render_console_template_details(result: &AnalysisResult, out: &mut String) { + if result.items.is_empty() { + return; + } + out.push_str(&format!("Slowest Samples:\n{}\n", "-".repeat(REPORT_WIDTH))); + for (idx, tpl) in result.items.iter().take(3).enumerate() { + let query_id = tpl.slowest_query_id.as_deref().unwrap_or("-"); + let header = format!( + "[#{rank}] query_id={query_id} slowest_time={slow}ms avg_time={avg:.2}ms count={count} cpu_total={cpu}ms", + rank = idx + 1, + slow = FormatHelper::fmt_int(tpl.slowest_time_ms), + avg = tpl.avg_time_ms(), + count = FormatHelper::fmt_int(tpl.count), + cpu = FormatHelper::fmt_int(tpl.total_cpu_ms) + ); + out.push_str(&format!("{}\n", style(header).bold())); + let stmt = truncate_stmt(&tpl.slowest_stmt, CONSOLE_SAMPLE_STMT_MAX_LEN); + out.push_str(&stmt); + out.push_str(if stmt.ends_with('\n') { "\n" } else { "\n\n" }); + } +} +fn render_summary( + out: &mut String, + result: &AnalysisResult, + opts: &ReportOptions, + parse_lines: u64, + parsed: u64, + bad: u64, +) { + out.push_str(§ion_header("Summary")); + let filter_value = if result.used_fallback { + format!( + "count > {} (no matches, showing all)", + opts.min_count_exclusive + ) + } else { + format!("count > {}", opts.min_count_exclusive) + }; + let columns = vec![ + TableColumn::new("Field", 24, Align::Left), + TableColumn::new("Value", 68, Align::Left), + ]; + let rows = vec![ + vec!["Input".to_string(), opts.input_path.clone()], + vec!["Lines".to_string(), FormatHelper::fmt_int(parse_lines)], + vec!["Parsed records".to_string(), FormatHelper::fmt_int(parsed)], + vec!["Bad records".to_string(), FormatHelper::fmt_int(bad)], + vec!["Filter".to_string(), filter_value], + vec![ + "SQL templates".to_string(), + FormatHelper::fmt_int(result.total_templates), + ], + vec![ + "Executions".to_string(), + FormatHelper::fmt_int(result.total_executions), + ], + vec![ + "Total CPU".to_string(), + format!("{} ms", FormatHelper::fmt_int(result.total_cpu_ms)), + ], + vec![ + "Total Time".to_string(), + format!("{} ms", FormatHelper::fmt_int(result.total_time_ms)), + ], + ]; + render_table(out, &columns, &rows); +} + +fn render_template_metrics( + out: &mut String, + items: &[TemplateStats], + top_n: usize, + total_cpu: u64, +) { + if top_n == 0 { + out.push_str("No SQL templates available for analysis.\n"); + return; + } + + out.push_str(§ion_header(&format!( + "Top {top_n} SQL Patterns (by total_cpu_ms)" + ))); + let columns = vec![ + TableColumn::new("rank", 4, Align::Right), + TableColumn::new("cpu_ms", 12, Align::Right), + TableColumn::new("cpu%", 7, Align::Right), + TableColumn::new("count", 9, Align::Right), + TableColumn::new("avg_time", 10, Align::Right), + TableColumn::new("min_time", 10, Align::Right), + TableColumn::new("max_time", 10, Align::Right), + TableColumn::new("slowest", 10, Align::Right), + TableColumn::new("table", 12, Align::Left), + TableColumn::new("template", 40, Align::Left), + ]; + let mut rows = Vec::new(); + for (idx, tpl) in items.iter().take(top_n).enumerate() { + let share = percent(total_cpu, tpl.total_cpu_ms); + rows.push(vec![ + (idx + 1).to_string(), + FormatHelper::fmt_int(tpl.total_cpu_ms), + fmt_pct(share), + FormatHelper::fmt_int(tpl.count), + format!("{:.2}", tpl.avg_time_ms()), + FormatHelper::fmt_int(tpl.min_time_ms), + FormatHelper::fmt_int(tpl.max_time_ms), + FormatHelper::fmt_int(tpl.slowest_time_ms), + tpl.table.clone().unwrap_or_else(|| "-".to_string()), + truncate_one_line(&tpl.sql_template, 160), + ]); + } + render_table(out, &columns, &rows); +} + +fn render_samples( + out: &mut String, + items_by_total_cpu: &[TemplateStats], + opts: &ReportOptions, + total_cpu: u64, +) { + let sample_n = opts.samples.min(items_by_total_cpu.len()); + if sample_n == 0 { + return; + } + + out.push_str(§ion_header(&format!( + "Slowest Samples (top {sample_n} by total_cpu_ms)" + ))); + let columns = vec![ + TableColumn::new("#", 3, Align::Right), + TableColumn::new("count", 10, Align::Right), + TableColumn::new("total_cpu_ms", 14, Align::Right), + TableColumn::new("avg_cpu", 10, Align::Right), + TableColumn::new("max_time_ms", 12, Align::Right), + TableColumn::new("cpu_share", 9, Align::Right), + TableColumn::new("template", 36, Align::Left), + ]; + let mut rows = Vec::new(); + for (idx, it) in items_by_total_cpu.iter().take(sample_n).enumerate() { + let share = percent(total_cpu, it.total_cpu_ms); + rows.push(vec![ + (idx + 1).to_string(), + FormatHelper::fmt_int(it.count), + FormatHelper::fmt_int(it.total_cpu_ms), + format!("{:.2}", it.avg_cpu_ms()), + format!("{:.2}", it.slowest_time_ms as f64), + fmt_pct(share), + truncate_one_line(&it.sql_template, 120), + ]); + } + render_table(out, &columns, &rows); + + for (idx, it) in items_by_total_cpu.iter().take(sample_n).enumerate() { + let query_id = it.slowest_query_id.as_deref().unwrap_or("-"); + out.push_str(&format!( + "Sample #{} (query_id={query_id})\n{}\n", + idx + 1, + "-".repeat(32) + )); + let stmt = truncate_stmt(&it.slowest_stmt, opts.max_stmt_len); + out.push_str(&stmt); + out.push_str(if stmt.ends_with('\n') { "\n" } else { "\n\n" }); + } +} + +fn truncate_stmt(stmt: &str, max_len: usize) -> String { + let stmt = stmt.trim_end(); + let total_chars = stmt.chars().count(); + if total_chars <= max_len { + return stmt.to_string(); + } + + if max_len < 80 { + let truncated = total_chars.saturating_sub(max_len); + return format!( + "{}\n-- [truncated {} chars]\n", + stmt.chars().take(max_len).collect::().trim_end(), + truncated + ); + } + + let head_len = max_len / 2; + let tail_len = max_len.saturating_sub(head_len); + let truncated = total_chars.saturating_sub(head_len + tail_len); + + let head = stmt + .chars() + .take(head_len) + .collect::() + .trim_end() + .to_string(); + let tail = stmt + .chars() + .skip(total_chars.saturating_sub(tail_len)) + .collect::() + .trim_start() + .to_string(); + + format!("{head}\n-- [truncated {truncated} chars, showing head and tail]\n{tail}") +} + +fn truncate_one_line(s: &str, max_len: usize) -> String { + let s = normalize_whitespace(&s.replace(['\r', '\n', '\t'], " ")); + let total_chars = s.chars().count(); + if total_chars <= max_len { + return s; + } + + if max_len < 20 { + return format!("{}...", s.chars().take(max_len).collect::()); + } + + let head_len = (max_len.saturating_sub(3)) / 2; + let tail_len = max_len.saturating_sub(3).saturating_sub(head_len); + let head = s.chars().take(head_len).collect::(); + let tail = s + .chars() + .skip(total_chars.saturating_sub(tail_len)) + .collect::(); + format!("{head}...{tail}") +} + +fn normalize_whitespace(s: &str) -> String { + s.split_whitespace().collect::>().join(" ") +} + +fn first_diff_char_pos(a: &str, b: &str) -> Option { + a.chars() + .zip(b.chars()) + .position(|(ca, cb)| ca != cb) + .or_else(|| { + let common = a.chars().zip(b.chars()).count(); + (a.chars().count() != b.chars().count()).then_some(common) + }) +} + +fn diff_context(s: &str, diff_pos: usize, context: usize) -> String { + let start = diff_pos.saturating_sub(context); + let len = context.saturating_mul(2).max(1); + let snippet: String = s.chars().skip(start).take(len).collect(); + normalize_whitespace(&snippet.replace(['\r', '\n', '\t'], " ")) +} + +fn render_table(out: &mut String, columns: &[TableColumn<'_>], rows: &[Vec]) { + if columns.is_empty() { + return; + } + let border = table_rule(columns, '-'); + out.push_str(&border); + out.push_str(&table_header(columns)); + out.push_str(&table_rule(columns, '=')); + for row in rows { + out.push('|'); + for (idx, col) in columns.iter().enumerate() { + let value = row.get(idx).map(|s| s.as_str()).unwrap_or(""); + out.push(' '); + out.push_str(&col.format_cell(value)); + out.push(' '); + out.push('|'); + } + out.push('\n'); + } + out.push_str(&border); + out.push('\n'); +} + +fn table_rule(columns: &[TableColumn<'_>], ch: char) -> String { + let mut line = String::new(); + line.push('+'); + for col in columns { + line.extend(std::iter::repeat_n(ch, col.width + 2)); + line.push('+'); + } + line.push('\n'); + line +} + +fn table_header(columns: &[TableColumn<'_>]) -> String { + let mut line = String::new(); + line.push('|'); + for col in columns { + let header = truncate_to_width(col.header, col.width); + line.push(' '); + line.push_str(&format!("{:^width$}", header, width = col.width)); + line.push(' '); + line.push('|'); + } + line.push('\n'); + line +} + +fn truncate_to_width(value: &str, width: usize) -> String { + if width == 0 { + return String::new(); + } + let len = value.chars().count(); + if len <= width { + return value.to_string(); + } + if width <= 3 { + return value.chars().take(width).collect(); + } + let mut s: String = value.chars().take(width - 3).collect(); + s.push_str("..."); + s +} + +fn section_header(title: &str) -> String { + let rule = "-".repeat(REPORT_WIDTH); + format!("{rule}\n{title}\n{rule}\n") +} + +fn fmt_pct(x: f64) -> String { + format!("{:.2}%", x * 100.0) +} + +fn render_bar(share: f64, width: usize) -> String { + let filled = (share.clamp(0.0, 1.0) * width as f64).round() as usize; + "#".repeat(filled) + &".".repeat(width.saturating_sub(filled)) +} + +fn percent(total: u64, part: u64) -> f64 { + if total == 0 { + 0.0 + } else { + part as f64 / total as f64 + } +} + +fn colorize(value: impl std::fmt::Display, share: f64) -> String { + if share >= CPU_SHARE_HOT { + style(value).red().bold().to_string() + } else if share >= CPU_SHARE_WARN { + style(value).yellow().to_string() + } else { + style(value).green().to_string() + } +} diff --git a/src/tools/fe/audit_topsql/tool.rs b/src/tools/fe/audit_topsql/tool.rs new file mode 100644 index 0000000..1263ef5 --- /dev/null +++ b/src/tools/fe/audit_topsql/tool.rs @@ -0,0 +1,150 @@ +use super::aggregate::TemplateAggregator; +use super::normalize::{guess_table, normalize_sql}; +use super::parser::parse_records_streaming; +use super::report::{ReportOptions, render_console_visualization, render_report}; +use crate::config::Config; +use crate::error::{CliError, Result}; +use crate::tools::common::fs_utils; +use crate::tools::{ExecutionResult, Tool}; +use crate::ui; +use dialoguer::{Input, Select}; +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; + +const ENV_AUDIT_TOPSQL_INPUT: &str = "CLOUD_CLI_FE_AUDIT_TOPSQL_INPUT"; +const MIN_COUNT_EXCLUSIVE: u64 = 10; +const TOP_N: usize = 50; +const SAMPLES: usize = 10; +const MAX_STMT_LEN: usize = 4000; + +pub struct FeAuditTopSqlTool; + +impl Tool for FeAuditTopSqlTool { + fn name(&self) -> &str { + "fe-audit-topsql" + } + + fn description(&self) -> &str { + "Analyze fe.audit.log and generate a TopSQL template report" + } + + fn requires_pid(&self) -> bool { + false + } + + fn execute(&self, config: &Config, _pid: u32) -> Result { + config.ensure_output_dir()?; + + let doris_config = crate::config_loader::load_config()?; + let input_path = select_audit_log_path(&doris_config.log_dir)?; + + ui::print_info("Parsing audit log..."); + let file = File::open(&input_path).map_err(CliError::IoError)?; + let reader = BufReader::new(file); + let mut aggregator = TemplateAggregator::new(); + let stats = parse_records_streaming(reader, |r| { + let tpl = normalize_sql(&r.stmt); + if tpl.is_empty() { + return; + } + let table = guess_table(&tpl); + aggregator.push(tpl, table, r.time_ms, r.cpu_ms, r.stmt, r.query_id); + })?; + + ui::print_info("Finalizing report..."); + let analysis = aggregator.finish(MIN_COUNT_EXCLUSIVE); + + let report_opts = ReportOptions { + input_path: input_path.display().to_string(), + min_count_exclusive: MIN_COUNT_EXCLUSIVE, + top_n: TOP_N, + samples: SAMPLES, + max_stmt_len: MAX_STMT_LEN, + }; + let report = render_report( + &analysis, + &report_opts, + stats.lines, + stats.records_ok, + stats.records_bad, + ); + + let out_path = build_output_path(&config.output_dir, &input_path); + std::fs::write(&out_path, report).map_err(CliError::IoError)?; + + let console_view = render_console_visualization( + &analysis, + &report_opts, + stats.lines, + stats.records_ok, + stats.records_bad, + ); + ui::print_info(&console_view); + + Ok(ExecutionResult { + output_path: out_path, + message: "TopSQL report generated successfully.".to_string(), + }) + } +} + +fn select_audit_log_path(log_dir: &Path) -> Result { + if let Ok(input) = std::env::var(ENV_AUDIT_TOPSQL_INPUT) { + let path = PathBuf::from(input.trim()); + if !path.exists() { + return Err(CliError::ConfigError(format!( + "Audit log does not exist: {} (from {})", + path.display(), + ENV_AUDIT_TOPSQL_INPUT + ))); + } + return Ok(path); + } + + let candidates = fs_utils::collect_fe_audit_logs(log_dir).ok(); + + if let Some(files) = candidates { + let mut items: Vec = files.iter().map(|p| p.display().to_string()).collect(); + items.push("Enter path manually".to_string()); + + let selection = Select::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Select FE audit log file") + .items(&items) + .default(0) + .interact() + .map_err(|e| CliError::InvalidInput(format!("Audit log selection failed: {e}")))?; + + if selection < files.len() { + return Ok(files[selection].clone()); + } + } else { + ui::print_warning(&format!( + "Failed to discover audit logs under: {}", + log_dir.display() + )); + } + + let input: String = Input::with_theme(&dialoguer::theme::ColorfulTheme::default()) + .with_prompt("Enter FE audit log path") + .interact_text() + .map_err(|e| CliError::InvalidInput(format!("Path input failed: {e}")))?; + + let path = PathBuf::from(input.trim()); + if !path.exists() { + return Err(CliError::ConfigError(format!( + "Audit log does not exist: {}", + path.display() + ))); + } + Ok(path) +} + +fn build_output_path(output_dir: &Path, input_path: &Path) -> PathBuf { + let ts = chrono::Utc::now().format("%Y%m%d-%H%M%S").to_string(); + let base = input_path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("fe.audit.log"); + output_dir.join(format!("fe-audit-topsql-{base}-{ts}.txt")) +} diff --git a/src/tools/fe/mod.rs b/src/tools/fe/mod.rs index c388d8c..8755a1d 100644 --- a/src/tools/fe/mod.rs +++ b/src/tools/fe/mod.rs @@ -1,3 +1,4 @@ +mod audit_topsql; mod jmap; mod jstack; mod list; @@ -5,6 +6,7 @@ mod profiler; pub mod routine_load; pub mod table_info; +pub use audit_topsql::FeAuditTopSqlTool; pub use jmap::{JmapDumpTool, JmapHistoTool}; pub use jstack::JstackTool; pub use list::FeListTool; diff --git a/src/tools/mod.rs b/src/tools/mod.rs index abae93c..a6db35c 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -50,7 +50,7 @@ impl ToolRegistry { use crate::tools::be::{JmapDumpTool as BeJmapDumpTool, JmapHistoTool as BeJmapHistoTool}; use crate::tools::fe::routine_load::get_routine_load_tools; use crate::tools::fe::{ - FeListTool, FeProfilerTool, JmapDumpTool, JmapHistoTool, JstackTool, + FeAuditTopSqlTool, FeListTool, FeProfilerTool, JmapDumpTool, JmapHistoTool, JstackTool, }; let mut registry = Self { @@ -64,6 +64,7 @@ impl ToolRegistry { registry.fe_tools.push(Box::new(JmapHistoTool)); registry.fe_tools.push(Box::new(JstackTool)); registry.fe_tools.push(Box::new(FeProfilerTool)); + registry.fe_tools.push(Box::new(FeAuditTopSqlTool)); // Register Routine Load tools registry.fe_tools.extend(get_routine_load_tools()); diff --git a/src/ui/menu.rs b/src/ui/menu.rs index 6f509f7..3373805 100644 --- a/src/ui/menu.rs +++ b/src/ui/menu.rs @@ -123,6 +123,7 @@ pub enum FeToolAction { FeProfiler, TableInfo, RoutineLoad, + FeAuditTopSql, Back, } @@ -206,8 +207,14 @@ pub fn show_fe_tools_menu() -> Result { description: "Routine Load management tools".to_string(), }, MenuOption { - action: FeToolAction::Back, + action: FeToolAction::FeAuditTopSql, key: "[7]".to_string(), + name: "fe-audit-topsql".to_string(), + description: "Analyze fe.audit.log and generate TopSQL report".to_string(), + }, + MenuOption { + action: FeToolAction::Back, + key: "[8]".to_string(), name: "← Back".to_string(), description: "Return to main menu".to_string(), }, diff --git a/src/ui/service_handlers.rs b/src/ui/service_handlers.rs index 7d5e5b4..aa6454e 100644 --- a/src/ui/service_handlers.rs +++ b/src/ui/service_handlers.rs @@ -14,7 +14,7 @@ fn run_tool_with_post( service: &str, ) -> Result> { let tool = &*tools[index]; - if let Err(e) = crate::execute_tool_enhanced(config, tool, service) { + if let Err(e) = crate::ui::tool_executor::execute_tool_enhanced(config, tool, service) { match e { error::CliError::GracefulExit => {} _ => print_error(&format!("Tool execution failed: {e}")), @@ -122,6 +122,12 @@ pub fn handle_fe_service_loop(config: &Config, tools: &[Box]) -> Resul } } } + crate::ui::FeToolAction::FeAuditTopSql => { + match run_tool_by_name(config, tools, "fe-audit-topsql", "FE") { + Err(error::CliError::GracefulExit) => return Ok(()), + _ => continue, + } + } crate::ui::FeToolAction::Back => return Ok(()), } } @@ -164,7 +170,7 @@ fn execute_routine_load_tool( )) })?; - if let Err(e) = crate::execute_tool_enhanced(config, tool, "FE") { + if let Err(e) = crate::ui::tool_executor::execute_tool_enhanced(config, tool, "FE") { match e { error::CliError::GracefulExit => { /* Do nothing, just loop again */ } _ => print_error(&format!("Tool execution failed: {e}")),