Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 125 additions & 15 deletions src/tools/fe/routine_load/job_lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use super::models::RoutineLoadJob;
use crate::config::Config;
use crate::config_loader;
use crate::error::{CliError, Result};
use crate::tools::common::fs_utils::ensure_dir_exists;
use crate::tools::mysql::MySQLTool;
use crate::tools::{ExecutionResult, Tool};
use crate::ui;
use crate::ui::{InputHelper, InteractiveSelector};
use crate::ui::{NoJobsNextAction, show_no_jobs_recovery_menu, show_unknown_db_recovery_menu};
use chrono::Utc;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;

/// Routine Load Job Lister
pub struct RoutineLoadJobLister;
Expand All @@ -26,7 +30,7 @@ impl Tool for RoutineLoadJobLister {
false
}

fn execute(&self, _config: &Config, _pid: u32) -> Result<ExecutionResult> {
fn execute(&self, config: &Config, _pid: u32) -> Result<ExecutionResult> {
// Retry loop: allow reselecting database if no jobs found
let mut database = self.prompt_database_name()?;
loop {
Expand All @@ -35,11 +39,12 @@ impl Tool for RoutineLoadJobLister {
self.display_jobs(&jobs)?;
let selected_job = self.prompt_job_selection(&jobs)?;
self.save_selected_job(selected_job, &database)?;
let report = self.generate_selection_report(selected_job)?;
let report =
self.generate_selection_report(selected_job, &config.output_dir)?;
ui::print_info("");
ui::print_info(&report);
return Ok(ExecutionResult {
output_path: std::path::PathBuf::from("console_output"),
output_path: config.output_dir.clone(),
message: format!(
"Job ID '{}' selected and saved in memory",
selected_job.id
Expand Down Expand Up @@ -164,7 +169,11 @@ impl RoutineLoadJobLister {
Ok(())
}

fn generate_selection_report(&self, job: &RoutineLoadJob) -> Result<String> {
fn generate_selection_report(
&self,
job: &RoutineLoadJob,
output_dir: &std::path::Path,
) -> Result<String> {
let mut report = String::new();
report.push_str("Routine Load Job Selection Report\n");
report.push_str("=================================\n\n");
Expand All @@ -186,17 +195,26 @@ impl RoutineLoadJobLister {
report.push_str(&format!(" Received Bytes: {}\n", stat.received_bytes));
}

if let Some(ref progress) = job.progress {
report.push_str("\nProgress:\n");
for (partition, offset) in progress {
report.push_str(&format!(" Partition {partition}: {offset}\n"));
}
}

if let Some(ref lag) = job.lag {
report.push_str("\nLag:\n");
for (partition, lag_value) in lag {
report.push_str(&format!(" Partition {partition}: {lag_value}\n"));
if job.lag.is_some() {
// Partitions Overview: show Top 40 (max lag) and Bottom 10 (min lag)
let rows = self.build_partition_rows(job.progress.as_ref(), job.lag.as_ref());
let delayed = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count();
if !rows.is_empty() {
report.push_str("\nPartitions Overview (by lag):\n");
report.push_str(&self.format_partitions_overview_top_bottom(&rows, 40, 10));
report.push_str(&format!(
"Top 40 & Bottom 10 shown; delayed: {delayed}, total partitions: {}\n",
rows.len()
));

match self.write_full_partitions_file(&rows, &job.id, output_dir) {
Ok(path) => {
report.push_str(&format!("Full partitions saved to: {}\n", path.display()));
}
Err(e) => {
report.push_str(&format!("Failed to save full partitions file: {}\n", e));
}
}
}
}

Expand All @@ -207,4 +225,96 @@ impl RoutineLoadJobLister {

Ok(report)
}

fn build_partition_rows(
&self,
progress: Option<&HashMap<String, String>>,
lag: Option<&HashMap<String, i64>>,
) -> Vec<(String, Option<String>, i64)> {
let mut rows: Vec<(String, Option<String>, i64)> = Vec::new();

// Union of partitions
let mut keys: Vec<String> = Vec::new();
if let Some(p) = progress {
keys.extend(p.keys().cloned());
}
if let Some(l) = lag {
for k in l.keys() {
if !keys.contains(k) {
keys.push(k.clone());
}
}
}

for part in keys {
let prog = progress.and_then(|p| p.get(&part).cloned());
let lag_v = lag.and_then(|l| l.get(&part).copied()).unwrap_or(0);
rows.push((part, prog, lag_v));
}

rows.sort_by(|a, b| b.2.cmp(&a.2));
rows
}

fn format_partitions_overview_top_bottom(
&self,
rows: &[(String, Option<String>, i64)],
top_n: usize,
bottom_n: usize,
) -> String {
let total = rows.len();
let mut out = String::new();

// Top section (largest lag first)
out.push_str("Top by lag:\n");
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
out.push_str("│ Partition │ Progress │ Lag │\n");
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
let mut printed = 0usize;
for (part, prog, lag_v) in rows.iter().take(top_n) {
let prog_s = prog.as_deref().unwrap_or("N/A");
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
printed += 1;
}
if printed == 0 {
out.push_str("│ (no data) │\n");
}
out.push_str("└─────────────┴─────────────┴─────────────┘\n");

// Bottom section (smallest lag last); avoid overlap with top
out.push_str("Bottom by lag:\n");
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
out.push_str("│ Partition │ Progress │ Lag │\n");
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
if total > top_n {
let start = total.saturating_sub(bottom_n);
for (part, prog, lag_v) in rows.iter().skip(start) {
let prog_s = prog.as_deref().unwrap_or("N/A");
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
}
}
out.push_str("└─────────────┴─────────────┴─────────────┘\n");

out
}

fn write_full_partitions_file(
&self,
rows: &[(String, Option<String>, i64)],
job_id: &str,
base_dir: &std::path::Path,
) -> Result<PathBuf> {
let file_path = base_dir.join(format!("routine_load_partitions_{job_id}.txt"));
ensure_dir_exists(&file_path)?;

let mut content = String::from("Partition\tProgress\tLag\n");
for (part, prog, lag_v) in rows {
let prog_s = prog.as_deref().unwrap_or("N/A");
content.push_str(&format!("{part}\t{prog_s}\t{lag_v}\n"));
}
fs::write(&file_path, content)
.map_err(|e| CliError::ToolExecutionFailed(format!("Write failed: {e}")))?;

Ok(file_path)
}
}
Loading