Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ fn handle_routine_load_loop(config: &Config, tools: &[Box<dyn Tool>]) -> Result<
}
}

/// 执行 Routine Load 工具的辅助函数
fn execute_routine_load_tool(
config: &Config,
tools: &[Box<dyn Tool>],
Expand Down
55 changes: 54 additions & 1 deletion src/tools/common/fs_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::Result;
use std::fs;
use std::path::Path;
use std::path::{Path, PathBuf};

/// A generic utility to serialize a struct to a TOML file.
pub fn save_toml_to_file<T: serde::Serialize>(obj: &T, file_path: &Path) -> Result<()> {
Expand Down Expand Up @@ -42,3 +42,56 @@ pub fn read_file_content(path: &Path) -> Result<String> {
fs::read_to_string(path)
.map_err(|e| crate::error::CliError::ConfigError(format!("Failed to read file: {e}")))
}

pub fn collect_log_files(dir: &Path, log_prefix: &str) -> Result<Vec<PathBuf>> {
if !dir.exists() {
return Err(crate::error::CliError::ConfigError(format!(
"Log directory does not exist: {}",
dir.display()
)));
}

if !dir.is_dir() {
return Err(crate::error::CliError::ConfigError(format!(
"Path is not a directory: {}",
dir.display()
)));
}

let mut files: Vec<PathBuf> = fs::read_dir(dir)
.map_err(crate::error::CliError::IoError)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
let name = p.file_name().and_then(|n| n.to_str()).unwrap_or("");
// Only accept log files with the specified prefix and exclude compressed archives
name.starts_with(log_prefix)
&& !name.ends_with(".gz")
&& !name.ends_with(".zip")
&& !name.ends_with(".tar")
&& !name.ends_with(".tar.gz")
})
.collect();

if files.is_empty() {
return Err(crate::error::CliError::ConfigError(format!(
"No {} files found in directory: {}",
log_prefix,
dir.display()
)));
}

// Sort by modification time (newest first)
files.sort_by_key(|p| fs::metadata(p).and_then(|m| m.modified()).ok());
files.reverse();

Ok(files)
}

pub fn collect_fe_logs(dir: &Path) -> Result<Vec<PathBuf>> {
collect_log_files(dir, "fe.log")
}

pub fn collect_be_logs(dir: &Path) -> Result<Vec<PathBuf>> {
collect_log_files(dir, "be.INFO")
}
43 changes: 1 addition & 42 deletions src/tools/fe/routine_load/log_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::NaiveDateTime;
use regex::Regex;
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::path::Path;

#[derive(Debug, Clone, Default)]
pub struct LogCommitEntry {
Expand Down Expand Up @@ -66,47 +66,6 @@ impl FeLogParser {
}
}

pub fn collect_fe_logs(dir: &Path) -> Result<Vec<PathBuf>> {
if !dir.exists() {
return Err(CliError::ConfigError(format!(
"Log directory does not exist: {}",
dir.display()
)));
}

if !dir.is_dir() {
return Err(CliError::ConfigError(format!(
"Path is not a directory: {}",
dir.display()
)));
}

let mut files: Vec<PathBuf> = fs::read_dir(dir)
.map_err(CliError::IoError)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|s| s.starts_with("fe.log"))
.unwrap_or(false)
})
.collect();

if files.is_empty() {
return Err(CliError::ConfigError(format!(
"No fe.log files found in directory: {}",
dir.display()
)));
}

// Sort by modification time (newest first)
files.sort_by_key(|p| fs::metadata(p).and_then(|m| m.modified()).ok());
files.reverse();

Ok(files)
}

pub fn scan_file(
parser: &FeLogParser,
path: &Path,
Expand Down
10 changes: 6 additions & 4 deletions src/tools/fe/routine_load/performance_analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use chrono::Duration;
use std::collections::HashMap;

use super::job_manager::RoutineLoadJobManager;
use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file};
use super::log_parser::{FeLogParser, LogCommitEntry, scan_file};
use crate::config::Config;
use crate::error::{CliError, Result};
use crate::tools::common::fs_utils;
use crate::tools::fe::routine_load::messages as ErrMsg;
use crate::tools::{ExecutionResult, Tool};
use crate::ui;
use crate::ui::{FormatHelper, InputHelper};
use chrono::Duration;
use std::collections::HashMap;

pub struct RoutineLoadPerformanceAnalyzer;

Expand Down Expand Up @@ -65,7 +67,7 @@ impl RoutineLoadPerformanceAnalyzer {
log_dir: &std::path::Path,
job_id: &str,
) -> Result<Vec<LogCommitEntry>> {
let files = collect_fe_logs(log_dir)?;
let files = fs_utils::collect_fe_logs(log_dir)?;
let parser = FeLogParser::new();
let mut entries: Vec<LogCommitEntry> = Vec::new();

Expand Down
10 changes: 6 additions & 4 deletions src/tools/fe/routine_load/traffic_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use chrono::Duration;
use std::collections::BTreeMap;

use super::job_manager::RoutineLoadJobManager;
use super::log_parser::{FeLogParser, LogCommitEntry, collect_fe_logs, scan_file};
use super::log_parser::{FeLogParser, LogCommitEntry, scan_file};
use crate::config::Config;
use crate::error::{CliError, Result};
use crate::tools::common::fs_utils;
use crate::tools::fe::routine_load::messages as ErrMsg;
use crate::tools::{ExecutionResult, Tool};
use crate::ui;
use crate::ui::InputHelper;
use chrono::Duration;
use std::collections::BTreeMap;

pub struct RoutineLoadTrafficMonitor;

Expand Down Expand Up @@ -72,7 +74,7 @@ impl RoutineLoadTrafficMonitor {
log_dir: &std::path::Path,
job_id: &str,
) -> Result<Vec<LogCommitEntry>> {
let files = collect_fe_logs(log_dir)?;
let files = fs_utils::collect_fe_logs(log_dir)?;
let parser = FeLogParser::new();
let mut entries: Vec<LogCommitEntry> = Vec::new();

Expand Down
2 changes: 1 addition & 1 deletion src/ui/menu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub fn show_fe_tools_menu() -> Result<FeToolAction> {
MenuOption {
action: FeToolAction::RoutineLoad,
key: "[5]".to_string(),
name: "Routine Load".to_string(),
name: "routine-load".to_string(),
description: "Routine Load management tools".to_string(),
},
MenuOption {
Expand Down
Loading