Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions crates/tower-cmd/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use tower_package::{Package, PackageSpec};
use tower_runtime::{local::LocalApp, App, AppLauncher, OutputReceiver, Status};
use tower_telemetry::{debug, Context};

use std::sync::Arc;
use tokio::sync::{
mpsc::{unbounded_channel, Receiver as MpscReceiver},
oneshot::{self, Receiver as OneshotReceiver},
Mutex,
};
use tokio::time::{sleep, timeout, Duration};

Expand Down Expand Up @@ -166,12 +168,19 @@ where
.await?;

// Monitor app output and status concurrently
let app = launcher.app.unwrap();
let status_task = tokio::spawn(monitor_local_status(app));

// Wait for both tasks to complete
let app = Arc::new(Mutex::new(launcher.app.unwrap()));
let status_task = tokio::spawn(monitor_local_status(Arc::clone(&app)));

// Wait for app to complete or SIGTERM
let status_result = tokio::select! {
status = status_task => status.unwrap(),
_ = tokio::signal::ctrl_c(), if !output::get_output_mode().is_mcp() => {
output::write("\nReceived Ctrl+C, stopping local run...\n");
app.lock().await.terminate().await.ok();
return Ok(output_task.await.unwrap());
}
};
let final_result = output_task.await.unwrap();
let status_result = status_task.await.unwrap();

// And if we crashed, err out
match status_result {
Expand Down Expand Up @@ -548,7 +557,7 @@ async fn monitor_output(mut output: OutputReceiver) {

/// monitor_local_status is a helper function that will monitor the status of a given app and waits for
/// it to progress to a terminal state.
async fn monitor_local_status(app: LocalApp) -> Status {
async fn monitor_local_status(app: Arc<Mutex<LocalApp>>) -> Status {
debug!("Starting status monitoring for LocalApp");
let mut check_count = 0;
let mut err_count = 0;
Expand All @@ -561,7 +570,7 @@ async fn monitor_local_status(app: LocalApp) -> Status {
check_count
);

match app.status().await {
match app.lock().await.status().await {
Ok(status) => {
// We reset the error count to indicate that we can intermittently get statuses.
err_count = 0;
Expand Down
Loading