@@ -5,8 +5,8 @@ use std::process::ExitStatus;
55
66use anyhow:: Result ;
77use process_wrap:: tokio:: { TokioChildWrapper , TokioCommandWrap } ;
8- use tokio:: io:: { AsyncBufReadExt , BufReader } ;
9- use tokio:: process:: { ChildStderr , ChildStdout } ;
8+ use tokio:: io:: { AsyncBufReadExt , AsyncWriteExt , BufReader } ;
9+ use tokio:: process:: { ChildStderr , ChildStdin , ChildStdout } ;
1010use tokio:: sync:: { mpsc, oneshot} ;
1111
1212use crate :: agent:: event:: AgentCommandStatus ;
@@ -27,22 +27,25 @@ pub struct ProcessRegistry {
2727struct ProcessData {
2828 command : String ,
2929 output : String ,
30- exit_status : Option < ExitStatus > ,
30+ exit_status : Option < i32 > ,
3131 receiver : mpsc:: UnboundedReceiver < ProcessOutput > ,
3232 terminate_sender : Option < oneshot:: Sender < ( ) > > ,
33+ input_sender : Option < mpsc:: UnboundedSender < Vec < u8 > > > ,
3334}
3435
3536enum ProcessOutput {
36- Exited ( ExitStatus ) ,
37+ Exited ( Option < ExitStatus > ) ,
3738 Output ( String ) ,
3839 Error ( String ) ,
3940}
4041
4142struct ProcessRuntime {
4243 _process : Box < dyn TokioChildWrapper > ,
4344 stdout : ChildStdout ,
45+ stdin : ChildStdin ,
4446 stderr : ChildStderr ,
4547 sender : mpsc:: UnboundedSender < ProcessOutput > ,
48+ input_signal : mpsc:: UnboundedReceiver < Vec < u8 > > ,
4649 terminate_signal : oneshot:: Receiver < ( ) > ,
4750}
4851
@@ -52,29 +55,37 @@ impl ProcessRuntime {
5255
5356 let stdout = Self :: handle_stdout ( self . stdout , self . sender . clone ( ) ) ;
5457 let stderr = Self :: handle_stderr ( self . stderr , self . sender . clone ( ) ) ;
58+ let stdin = Self :: handle_stdin ( self . stdin , self . input_signal ) ;
59+
5560 let status = Box :: into_pin ( self . _process . wait ( ) ) ;
5661 pin ! ( stdout) ;
5762 pin ! ( stderr) ;
58- let mut exit_status = ExitStatus :: default ( ) ;
63+ pin ! ( stdin) ;
64+
65+ let mut exit_status = None ;
5966 tokio:: select! {
6067 result = & mut stdout => {
6168 tracing:: trace!( "Stdout handler completed: {:?}" , result) ;
69+ exit_status = Some ( ExitStatus :: default ( ) ) ;
6270 }
6371 result = & mut stderr => {
6472 tracing:: trace!( "Stderr handler completed: {:?}" , result) ;
6573 }
6674 // capture the status so we don't need to wait for a timeout
6775 result = status => {
6876 if let Ok ( result) = result {
69- exit_status = result;
77+ exit_status = Some ( result) ;
7078 }
7179 tracing:: trace!( "Process exited with status: {:?}" , result) ;
7280 }
81+ result = & mut stdin => {
82+ tracing:: trace!( "Stdin handler completed: {:?}" , result) ;
83+ }
7384 _ = self . terminate_signal => {
7485 tracing:: debug!( "Receive terminal_signal" ) ;
7586 if self . _process. start_kill( ) . is_ok( ) {
7687 if let Ok ( status) = Box :: into_pin( self . _process. wait( ) ) . await {
77- exit_status = status;
88+ exit_status = Some ( status) ;
7889 }
7990 }
8091 }
@@ -123,17 +134,36 @@ impl ProcessRuntime {
123134 }
124135 }
125136 }
137+
138+ async fn handle_stdin ( mut stdin : ChildStdin , mut receiver : mpsc:: UnboundedReceiver < Vec < u8 > > ) {
139+ while let Some ( data) = receiver. recv ( ) . await {
140+ tracing:: trace!( "Writing data to stdin: {:?}" , data) ;
141+ if let Err ( e) = stdin. write_all ( data. as_slice ( ) ) . await {
142+ tracing:: error!( error = ?e, "Error writing data to child process" ) ;
143+ break ;
144+ }
145+ if let Err ( e) = stdin. flush ( ) . await {
146+ tracing:: error!( error = ?e, "Error flushing data to child process" ) ;
147+ break ;
148+ }
149+ }
150+ }
126151}
127152
128153impl ProcessRegistry {
129154 async fn spawn_process (
130155 & self ,
131156 command : & str ,
132157 cwd : & str ,
133- ) -> Result < ( Box < dyn TokioChildWrapper > , ChildStdout , ChildStderr ) > {
158+ ) -> Result < (
159+ Box < dyn TokioChildWrapper > ,
160+ ChildStdout ,
161+ ChildStderr ,
162+ ChildStdin ,
163+ ) > {
134164 let mut child = TokioCommandWrap :: with_new ( SHELL , |cmd| {
135165 cmd. current_dir ( cwd)
136- . stdin ( std:: process:: Stdio :: null ( ) )
166+ . stdin ( std:: process:: Stdio :: piped ( ) )
137167 . stdout ( std:: process:: Stdio :: piped ( ) )
138168 . stderr ( std:: process:: Stdio :: piped ( ) ) ;
139169
@@ -164,20 +194,28 @@ impl ProcessRegistry {
164194 . take ( )
165195 . ok_or_else ( || anyhow:: anyhow!( "Failed to get stdout" ) ) ?;
166196
167- Ok ( ( process, stdout, stderr) )
197+ let stdin = process
198+ . stdin ( )
199+ . take ( )
200+ . ok_or_else ( || anyhow:: anyhow!( "Failed to get stdin" ) ) ?;
201+
202+ Ok ( ( process, stdout, stderr, stdin) )
168203 }
169204
170205 pub async fn execute_command ( & mut self , command : & str , cwd : & str ) -> Result < usize > {
171206 self . counter = self . counter . saturating_add ( 1 ) ;
172- let ( process, stdout, stderr) = self . spawn_process ( command, cwd) . await ?;
207+ let ( process, stdout, stderr, stdin ) = self . spawn_process ( command, cwd) . await ?;
173208 let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
174209 let ( t_tx, t_rx) = tokio:: sync:: oneshot:: channel ( ) ;
210+ let ( in_tx, in_rx) = mpsc:: unbounded_channel ( ) ;
175211
176212 let runtime = ProcessRuntime {
177213 _process : process,
178214 stdout,
179215 stderr,
216+ stdin,
180217 sender : tx,
218+ input_signal : in_rx,
181219 terminate_signal : t_rx,
182220 } ;
183221
@@ -191,6 +229,7 @@ impl ProcessRegistry {
191229 exit_status : None ,
192230 receiver : rx,
193231 terminate_sender : Some ( t_tx) ,
232+ input_sender : Some ( in_tx) ,
194233 } ,
195234 ) ;
196235 Ok ( self . counter )
@@ -214,7 +253,11 @@ impl ProcessRegistry {
214253 while let Ok ( output) = process. receiver . try_recv ( ) {
215254 match output {
216255 ProcessOutput :: Exited ( exit_status) => {
217- process. exit_status = Some ( exit_status)
256+ process. exit_status = Some (
257+ exit_status
258+ . map ( |s| s. code ( ) . unwrap_or_default ( ) )
259+ . unwrap_or ( 1 ) ,
260+ )
218261 }
219262 ProcessOutput :: Output ( str) => process. output += & str,
220263 ProcessOutput :: Error ( str) => process. output += & str,
@@ -231,12 +274,12 @@ impl ProcessRegistry {
231274 modified_terminal_states
232275 }
233276
234- pub fn get_process ( & self , id : usize ) -> Option < ( Option < ExitStatus > , & String ) > {
277+ pub fn get_process ( & self , id : usize ) -> Option < ( Option < i32 > , & String ) > {
235278 let process = self . processes . get ( & id) ?;
236279 Some ( ( process. exit_status , & process. output ) )
237280 }
238281
239- pub fn processes ( & self ) -> impl Iterator < Item = ( usize , Option < ExitStatus > , & String ) > {
282+ pub fn processes ( & self ) -> impl Iterator < Item = ( usize , Option < i32 > , & String ) > {
240283 self . processes
241284 . iter ( )
242285 . map ( |( key, value) | ( * key, value. exit_status , & value. command ) )
@@ -253,4 +296,12 @@ impl ProcessRegistry {
253296 }
254297 Ok ( ( ) )
255298 }
299+
300+ pub fn send_data ( & self , idx : usize , data : Vec < u8 > ) {
301+ if let Some ( process) = self . processes . get ( & idx) {
302+ if let Some ( sender) = process. input_sender . as_ref ( ) {
303+ sender. send ( data) . ok ( ) ;
304+ }
305+ }
306+ }
256307}
0 commit comments