• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! shell
16 #![allow(missing_docs)]
17 
18 #[allow(unused_imports)]
19 use crate::daemon_lib::task_manager;
20 use crate::utils::hdc_log::*;
21 use crate::common::base::Base;
22 use crate::config::TaskMessage;
23 use crate::config::{HdcCommand, MessageLevel, SHELL_PROG};
24 use crate::transfer;
25 
26 use std::collections::HashMap;
27 use std::io::{self, Error, ErrorKind};
28 use std::mem::MaybeUninit;
29 use std::os::fd::AsRawFd;
30 use std::process::Stdio;
31 use std::sync::{Arc, Once};
32 use std::env;
33 
34 use ylong_runtime::process::pty_process::{Pty, PtyCommand};
35 use ylong_runtime::process::{Child, Command, ChildStdin, ChildStdout, ChildStderr};
36 use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReader};
37 use ylong_runtime::sync::{mpsc, Mutex};
38 use ylong_runtime::sync::error::TryRecvError::Closed;
39 
40 
41 // -----inner common functions-----
42 #[derive(Debug)]
43 struct ShellTaskID {
44     session_id: u32,
45     channel_id: u32,
46 }
47 
trim_quotation_for_cmd(cmd_input: String) -> String48 fn trim_quotation_for_cmd(cmd_input: String) -> String {
49     let mut cmd = cmd_input.trim().to_string();
50     if cmd.starts_with('"') && cmd.ends_with('"') {
51         cmd = match cmd.strip_prefix('"') {
52             Some(cmd_res) => cmd_res.to_string(),
53             None => cmd,
54         };
55         cmd = match cmd.strip_suffix('"') {
56             Some(cmd_res) => cmd_res.to_string(),
57             None => cmd,
58         };
59     }
60     cmd
61 }
62 
shell_channel_close(channel_id: u32, session_id: u32)63 async fn shell_channel_close(channel_id: u32, session_id: u32){
64     let message = TaskMessage {
65         channel_id,
66         command: HdcCommand::KernelChannelClose,
67         payload: [1].to_vec(),
68     };
69     transfer::put(session_id, message).await;
70 }
71 
stop_task(session_id: u32)72 pub async fn stop_task(session_id: u32) {
73     PtyMap::stop_task(session_id).await;
74     ShellExecuteMap::stop_task(session_id).await;
75 }
76 
dump_task() -> String77 pub async fn dump_task() -> String {
78     PtyMap::dump_task().await
79 }
80 
81 // -----interactive shell inplementation-----
82 pub struct PtyTask {
83     pub handle: ylong_runtime::task::JoinHandle<()>,
84     pub tx: mpsc::BoundedSender<Vec<u8>>,
85     pub session_id: u32,
86     pub channel_id: u32,
87     pub cmd: Option<String>,
88 }
89 
90 struct PtyProcess {
91     pub pty: Pty,
92     pub child: Arc<Mutex<Child>>,
93 }
94 
95 impl PtyProcess {
new(pty: Pty, child: Arc<Mutex<Child>>) -> Self96     fn new(pty: Pty, child: Arc<Mutex<Child>>) -> Self {
97         Self {
98             pty,
99             child,
100         }
101     }
102 }
103 
104 // hdc shell "/system/bin/uitest start-daemon /data/app/el2/100/base/com.ohos.devicetest/cache/shmf &"
105 // hdc shell "nohup test.sh &"
106 // async cmd will ignor stdout and stderr, if you want the output, cmd format is:
107 // hdc shell "cmd_xxx >/data/local/tmp/log 2>&1 &"
108 // example:
109 // hdc shell "nohup /data/local/tmp/test.sh >/data/local/tmp/log 2>&1 &"
110 // hdc shell "/data/local/tmp/test.sh >/data/local/tmp/log 2>&1 &"
init_pty_process(cmd: Option<String>, _channel_id: u32) -> io::Result<PtyProcess>111 fn init_pty_process(cmd: Option<String>, _channel_id: u32) -> io::Result<PtyProcess> {
112     let pty = match Pty::new() {
113         Ok(pty) => pty,
114         Err(e) => {
115             let msg = format!("pty create error: {}", e);
116             crate::error!("{msg}");
117             return Err(io::Error::new(io::ErrorKind::Other, msg));
118         }
119     };
120 
121     let pts = match pty.pts() {
122         Ok(pts) => pts,
123         Err(e) => {
124             let msg = format!("pty pts error: {}", e);
125             crate::error!("{msg}");
126             return Err(io::Error::new(io::ErrorKind::Other, msg));
127         }
128     };
129     let child = match cmd {
130         None => {
131             crate::debug!("input cmd is None. channel_id {_channel_id}");
132             let mut command = PtyCommand::new(SHELL_PROG);
133 
134         unsafe {
135             command.pre_exec(|| {
136                 Base::de_init_process();
137 
138                 let home_dir = match env::var("HOME") {
139                     Ok(dir) => dir,
140                     Err(_) => String::from("/")
141                 };
142                 let _ = std::env::set_current_dir(home_dir);
143 
144                 Ok(())
145             });
146         }
147 
148             command.spawn(&pts)?
149         }
150         Some(mut cmd) => {
151             crate::debug!("input cmd [{}]", cmd);
152             cmd = trim_quotation_for_cmd(cmd);
153             let params = ["-c", cmd.as_str()].to_vec();
154             let mut proc = PtyCommand::new(SHELL_PROG);
155             let command = proc.args(params);
156             command.spawn(&pts)?
157         }
158     };
159     Ok(PtyProcess::new(
160         pty,
161         Arc::new(Mutex::new(child)),
162     ))
163 }
164 
subprocess_task( cmd: Option<String>, session_id: u32, channel_id: u32, ret_command: HdcCommand, mut rx: mpsc::BoundedReceiver<Vec<u8>>, )165 async fn subprocess_task(
166     cmd: Option<String>,
167     session_id: u32,
168     channel_id: u32,
169     ret_command: HdcCommand,
170     mut rx: mpsc::BoundedReceiver<Vec<u8>>,
171 ) {
172     let mut pty_process = match init_pty_process(cmd.clone(), channel_id) {
173         Err(e) => {
174             let msg = format!("execute cmd [{cmd:?}] fail: {e:?}");
175             crate::error!("{}", msg);
176             crate::common::hdctransfer::echo_client(
177                 session_id,
178                 channel_id,
179                 &msg,
180                 MessageLevel::Fail,
181             )
182             .await;
183             shell_channel_close(channel_id, session_id).await;
184             return;
185         }
186         Ok(pty) => pty,
187     };
188     PtyChildProcessMap::put(session_id, channel_id, pty_process.child.clone()).await;
189     let mut buf = [0_u8; 30720];
190     loop {
191         ylong_runtime::select! {
192             read_res = pty_process.pty.read(&mut buf) => {
193                 match read_res {
194                     Ok(bytes) => {
195                         let message = TaskMessage {
196                             channel_id,
197                             command: ret_command,
198                             payload: buf[..bytes].to_vec(),
199                         };
200                         transfer::put(session_id, message).await;
201                     }
202                     Err(e) => {
203                         crate::warn!("pty read failed: {e:?}");
204                         break;
205                     }
206                 }
207             },
208             recv_res = rx.recv() => {
209                 match recv_res {
210                     Ok(val) => {
211                         if val[..].contains(&0x4_u8) {
212                             // ctrl-D: end pty
213                             crate::info!("ctrl-D: end pty");
214                             break;
215                         } else if val[..].contains(&0x3_u8) {
216                             // ctrl-C: end process
217                             crate::info!("ctrl-C: end process");
218                             unsafe {
219                                 let tpgid = libc::tcgetpgrp(pty_process.pty.as_raw_fd());
220                                 if tpgid > 1 {
221                                     libc::kill(tpgid,libc::SIGINT);
222                                 }
223                             }
224                             continue;
225                         } else if val[..].contains(&0x11_u8) {
226                             // ctrl-Q: dump process
227                             crate::info!("ctrl-Q: dump process");
228                             let dump_message = task_manager::dump_running_task_info().await;
229                             crate::debug!("dump_message: {}", dump_message);
230                             #[cfg(feature = "hdc_debug")]
231                             let message = TaskMessage {
232                                 channel_id,
233                                 command: ret_command,
234                                 payload: dump_message.as_bytes().to_vec(),
235                             };
236                             #[cfg(feature = "hdc_debug")]
237                             transfer::put(session_id, message).await;
238                         }
239                         if let Err(e) = pty_process.pty.write_all(&val).await {
240                             crate::warn!(
241                                 "session_id: {} channel_id: {}, pty write failed: {e:?}",
242                                 session_id, channel_id
243                             );
244                             break;
245                         }
246                     }
247                     Err(e) => {
248                         crate::debug!("rx recv failed: {e:?}");
249                     }
250                 }
251             }
252         }
253 
254         {
255             let mut child_lock = pty_process.child.lock().await;
256             let status = child_lock.try_wait();
257             match status {
258                 Ok(Some(exit_status)) => {
259                     crate::debug!("interactive shell finish a process {:?}", exit_status);
260                 }
261                 Ok(None) => {}
262                 Err(e) => {
263                     crate::error!("interactive shell wait failed: {e:?}");
264                     break;
265                 }
266             }
267         }
268     }
269 
270     let mut child_lock = pty_process.child.lock().await;
271 
272     let kill_result = child_lock.kill().await;
273     crate::debug!("subprocess_task kill child(session_id {session_id}, channel_id {channel_id}), result:{:?}", kill_result);
274     match child_lock.wait().await {
275         Ok(exit_status) => {
276             PtyMap::del(session_id, channel_id).await;
277             crate::debug!(
278                 "subprocess_task waiting child exit success, status:{:?}.",
279                 exit_status
280             );
281         }
282         Err(e) => {
283             let kill_result = child_lock.kill().await;
284             crate::debug!(
285                 "subprocess_task child exit status {:?}, kill child, result:{:?}",
286                 e,
287                 kill_result
288             );
289         }
290     }
291 
292     match child_lock.wait().await {
293         Ok(exit_status) => {
294             PtyMap::del(session_id, channel_id).await;
295             crate::debug!(
296                 "subprocess_task waiting child exit success, status:{:?}.",
297                 exit_status
298             );
299         }
300         Err(e) => {
301             crate::debug!("subprocess_task waiting child exit fail, error:{:?}.", e);
302         }
303     }
304 
305     let message = TaskMessage {
306         channel_id,
307         command: HdcCommand::KernelChannelClose,
308         payload: vec![1],
309     };
310     transfer::put(session_id, message).await;
311 }
312 
313 impl PtyTask {
new( session_id: u32, channel_id: u32, option_cmd: Option<String>, ret_command: HdcCommand, ) -> Self314     pub fn new(
315         session_id: u32,
316         channel_id: u32,
317         option_cmd: Option<String>,
318         ret_command: HdcCommand,
319     ) -> Self {
320         let (tx, rx) = ylong_runtime::sync::mpsc::bounded_channel::<Vec<u8>>(16);
321         let cmd = option_cmd.clone();
322         crate::debug!("PtyTask new session_id {session_id}, channel_id {channel_id}");
323         let handle = ylong_runtime::spawn(subprocess_task(
324             option_cmd,
325             session_id,
326             channel_id,
327             ret_command,
328             rx,
329         ));
330         Self {
331             handle,
332             tx,
333             session_id,
334             channel_id,
335             cmd,
336         }
337     }
338 }
339 
340 impl Drop for PtyTask {
drop(&mut self)341     fn drop(&mut self) {
342         crate::info!(
343             "PtyTask Drop session_id {}, channel_id {}",
344             self.session_id,
345             self.channel_id
346         );
347     }
348 }
349 
350 type Child_ = Arc<Mutex<Child>>;
351 type PtyChildProcessMap_ = Arc<Mutex<HashMap<(u32, u32), Child_>>>;
352 pub struct PtyChildProcessMap {}
353 impl PtyChildProcessMap {
get_instance() -> PtyChildProcessMap_354     fn get_instance() -> PtyChildProcessMap_ {
355         static mut PTY_CHILD_MAP: Option<PtyChildProcessMap_> = None;
356         unsafe {
357             PTY_CHILD_MAP
358                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
359                 .clone()
360         }
361     }
362 
get(session_id: u32, channel_id: u32) -> Option<Child_>363     pub async fn get(session_id: u32, channel_id: u32) -> Option<Child_> {
364         let pty_child_map = Self::get_instance();
365         let map = pty_child_map.lock().await;
366         if let Some(pty_child) = map.get(&(session_id, channel_id)) {
367             return Some(pty_child.clone());
368         }
369         None
370     }
371 
372     #[allow(unused)]
put(session_id: u32, channel_id: u32, pty_child: Child_)373     pub async fn put(session_id: u32, channel_id: u32, pty_child: Child_) {
374         let pty_child_map = Self::get_instance();
375         let mut map = pty_child_map.lock().await;
376         map.insert((session_id, channel_id), pty_child);
377     }
378 
del(session_id: u32, channel_id: u32)379     pub async fn del(session_id: u32, channel_id: u32) {
380         let pty_child_map = Self::get_instance();
381         let mut map = pty_child_map.lock().await;
382         map.remove(&(session_id, channel_id));
383     }
384 }
385 
386 type PtyMap_ = Arc<Mutex<HashMap<(u32, u32), Arc<PtyTask>>>>;
387 pub struct PtyMap {}
388 impl PtyMap {
get_instance() -> PtyMap_389     fn get_instance() -> PtyMap_ {
390         static mut PTY_MAP: Option<PtyMap_> = None;
391         unsafe {
392             PTY_MAP
393                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
394                 .clone()
395         }
396     }
397 
get(session_id: u32, channel_id: u32) -> Option<Arc<PtyTask>>398     pub async fn get(session_id: u32, channel_id: u32) -> Option<Arc<PtyTask>> {
399         let pty_map = Self::get_instance();
400         let map = pty_map.lock().await;
401         if let Some(pty_task) = map.get(&(session_id, channel_id)) {
402             return Some(pty_task.clone());
403         }
404         None
405     }
406 
put(session_id: u32, channel_id: u32, pty_task: PtyTask)407     pub async fn put(session_id: u32, channel_id: u32, pty_task: PtyTask) {
408         let pty_map = Self::get_instance();
409         let mut map = pty_map.lock().await;
410         let arc_pty_task = Arc::new(pty_task);
411         map.insert((session_id, channel_id), arc_pty_task);
412     }
413 
del(session_id: u32, channel_id: u32)414     pub async fn del(session_id: u32, channel_id: u32) {
415         let pty_map = Self::get_instance();
416         let mut map = pty_map.lock().await;
417         map.remove(&(session_id, channel_id));
418         PtyChildProcessMap::del(session_id, channel_id).await;
419     }
420 
stop_task(session_id: u32)421     pub async fn stop_task(session_id: u32) {
422         let pty_map = Self::get_instance();
423         {
424             let map = pty_map.lock().await;
425             crate::info!("hdc shell stop_task, session_id:{}, task_size: {}", session_id, map.len());
426             for _iter in map.iter() {
427                 if _iter.0 .0 != session_id {
428                     continue;
429                 }
430                 if let Some(pty_child) = PtyChildProcessMap::get(session_id, _iter.0 .1).await {
431                     let mut child = pty_child.lock().await;
432                     let kill_result = child.kill().await;
433                     crate::debug!(
434                         "do map clear kill child, result:{:?}, session_id {}, channel_id {}",
435                         kill_result,
436                         session_id,
437                         _iter.0 .1
438                     );
439                     match child.wait().await {
440                         Ok(exit_status) => {
441                             crate::debug!(
442                                 "waiting child exit success, status:{:?}, session_id {}, channel_id {}",
443                                 exit_status,
444                                 session_id,
445                                 _iter.0.1
446                             );
447                         }
448                         Err(e) => {
449                             crate::error!(
450                                 "waiting child exit fail, error:{:?}, session_id {}, channel_id {}",
451                                 e,
452                                 session_id,
453                                 _iter.0 .1
454                             );
455                         }
456                     }
457                     PtyChildProcessMap::del(session_id, _iter.0 .1).await;
458                 }
459                 crate::debug!(
460                     "Clear tty task, session_id: {}, channel_id:{}",
461                     _iter.0 .0,
462                     session_id
463                 );
464             }
465         }
466     }
467 
dump_task() -> String468     pub async fn dump_task() -> String {
469         let arc = Self::get_instance();
470         let map = arc.lock().await;
471         let mut result = String::new();
472         for _iter in map.iter() {
473             let command = match &_iter.1.cmd {
474                 Some(b) => b,
475                 _ => "",
476             };
477             result.push_str(&format!(
478                 "session_id:{},\tchannel_id:{},\tcommand:{}\n",
479                 _iter.1.session_id, _iter.1.channel_id, command
480             ));
481         }
482         result
483     }
484 }
485 
486 // -----noninteractive shell implementation-----
487 
488 type ShellExecuteMap_ = Mutex<HashMap<(u32, u32), Arc<ShellExecuteTask>>>;
489 pub struct ShellExecuteMap {}
490 impl ShellExecuteMap {
get_instance() -> &'static ShellExecuteMap_491     fn get_instance() -> &'static ShellExecuteMap_ {
492         static mut SHELLEXECUTE_MAP: MaybeUninit<ShellExecuteMap_> = MaybeUninit::uninit();
493         static ONCE: Once = Once::new();
494 
495         unsafe {
496             ONCE.call_once(|| {
497                     SHELLEXECUTE_MAP = MaybeUninit::new(Mutex::new(HashMap::new()));
498                 }
499             );
500             &*SHELLEXECUTE_MAP.as_ptr()
501         }
502     }
503 
put(session_id: u32, channel_id: u32, shell_execute_task: ShellExecuteTask)504     pub async fn put(session_id: u32, channel_id: u32, shell_execute_task: ShellExecuteTask) {
505         let shell_execute_map = Self::get_instance();
506         let mut map = shell_execute_map.lock().await;
507         let arc_shell_execute_task = Arc::new(shell_execute_task);
508         map.insert((session_id, channel_id), arc_shell_execute_task);
509     }
510 
del(session_id: u32, channel_id: u32)511     pub async fn del(session_id: u32, channel_id: u32) {
512         let shell_execute_map = Self::get_instance();
513         let mut map = shell_execute_map.lock().await;
514         map.remove(&(session_id, channel_id));
515     }
516 
stop_task(session_id: u32)517     pub async fn stop_task(session_id: u32) {
518         let shell_execute_map = Self::get_instance();
519         {
520             let mut map = shell_execute_map.lock().await;
521             let mut channel_vec = vec![];
522             for iter in map.iter() {
523                 if iter.0 .0 != session_id {
524                     continue;
525                 }
526                 iter.1.handle.cancel();
527                 channel_vec.push(iter.0 .1);
528                 crate::debug!(
529                     "Clear shell_execute_map task, session_id: {}, channel_id:{}, task_size: {}",
530                     session_id,
531                     iter.0 .1,
532                     map.len(),
533                 );
534             }
535             for channel_id in channel_vec{
536                 map.remove(&(session_id, channel_id));
537             }
538         }
539     }
540 
541 }
542 
543 pub struct ShellExecuteTask {
544     pub handle: ylong_runtime::task::JoinHandle<()>,
545     pub tx: mpsc::BoundedSender<Vec<u8>>,
546     pub session_id: u32,
547     pub channel_id: u32,
548     pub cmd: String,
549 }
550 
551 
watch_pipe_states(rx: &mut mpsc::BoundedReceiver<Vec<u8>>, child_in: &mut ChildStdin) -> io::Result<()>552 async fn watch_pipe_states(rx: &mut mpsc::BoundedReceiver<Vec<u8>>, child_in: &mut ChildStdin) -> io::Result<()> {
553     match rx.try_recv() {
554         Err(e) => {
555             if e == Closed {
556                 return Err(Error::new(ErrorKind::Other, "pipe closed"));
557             }
558             // 执行top指令时,存在短暂无返回值场景,此时返回值为Err(Empty),需要返回Empty
559             Ok(())
560         },
561         Ok(val) => {
562             crate::debug!("pipe recv {:?}", val);
563             let _ = child_in.write_all(&val).await;
564             Ok(())
565         }
566     }
567 }
568 
569 
read_buf_from_stdout_stderr(child_out_reader: &mut AsyncBufReader<ChildStdout>, child_err_reader: &mut AsyncBufReader<ChildStderr>, shell_task_id: &ShellTaskID, ret_command: HdcCommand)570 async fn read_buf_from_stdout_stderr(child_out_reader: &mut AsyncBufReader<ChildStdout>, child_err_reader: &mut AsyncBufReader<ChildStderr>, shell_task_id: &ShellTaskID,  ret_command: HdcCommand) {
571     let mut buffer = Vec::new();
572     if let Ok(n) = child_out_reader.read_to_end(&mut buffer).await {
573         crate::debug!("read {n} bytes child_out after child exit");
574         if n > 0 {
575             let message = TaskMessage {
576                 channel_id: shell_task_id.channel_id,
577                 command: ret_command,
578                 payload: buffer,
579             };
580             transfer::put(shell_task_id.session_id, message).await;
581         }
582     }
583 
584     let mut buffer = Vec::new();
585     if let Ok(n) = child_err_reader.read_to_end(&mut buffer).await {
586         crate::debug!("read {n} bytes child_err  child exit");
587         if n > 0 {
588             let message = TaskMessage {
589                 channel_id: shell_task_id.channel_id,
590                 command: ret_command,
591                 payload: buffer,
592             };
593             transfer::put(shell_task_id.session_id, message).await;
594         }
595     }
596 }
597 
task_for_shell_execute( cmd_param: String, shell_task_id: ShellTaskID, ret_command: HdcCommand, mut rx: mpsc::BoundedReceiver<Vec<u8>>, )598 async fn task_for_shell_execute(
599     cmd_param: String,
600     shell_task_id: ShellTaskID,
601     ret_command: HdcCommand,
602     mut rx: mpsc::BoundedReceiver<Vec<u8>>,
603 ) {
604     crate::info!(
605         "Execute cmd:[{:?}], session_id: {}, channel_id: {}",
606         cmd_param,
607         shell_task_id.session_id,
608         shell_task_id.channel_id,
609     );
610     let cmd = trim_quotation_for_cmd(cmd_param);
611     let mut shell_cmd = Command::new(SHELL_PROG);
612     shell_cmd.args(["-c", &cmd])
613         .stdout(Stdio::piped())
614         .stdin(Stdio::piped())
615         .stderr(Stdio::piped())
616         .kill_on_drop(true);
617 
618     unsafe {
619         shell_cmd.pre_exec(|| {
620             Base::de_init_process();
621             libc::setsid();
622             let pid = libc::getpid();
623             libc::setpgid(pid, pid);
624             Ok(())
625         });
626     }
627 
628     if let Ok(mut child) = shell_cmd.spawn() {
629 
630         let mut child_in = match child.take_stdin() {
631             Some(child_in_inner) => {
632                 child_in_inner
633             },
634             None => {
635                 crate::error!("take_stdin failed");
636                 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
637                 return;
638             },
639         };
640 
641         let child_out = match child.take_stdout() {
642             Some(child_out_inner) => {
643                 child_out_inner
644             },
645             None => {
646                 crate::error!("take_stdin failed");
647                 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
648                 return;
649             },
650         };
651 
652         let child_err = match child.take_stderr() {
653             Some(child_err_inner) => {
654                 child_err_inner
655             },
656             None => {
657                 crate::error!("take_stdin failed");
658                 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
659                 return;
660             },
661         };
662 
663         let mut child_out_reader = ylong_runtime::io::AsyncBufReader::new(child_out);
664         let mut child_err_reader = ylong_runtime::io::AsyncBufReader::new(child_err);
665         let mut buf_out = [0u8; 30720];
666         let mut buf_err = [0u8; 30720];
667 
668         loop {
669             ylong_runtime::select! {
670                 read_res = child_out_reader.read(&mut buf_out) => {
671                     match read_res {
672                         Ok(bytes) => {
673                             let message = TaskMessage {
674                                 channel_id: shell_task_id.channel_id,
675                                 command: ret_command,
676                                 payload: buf_out[..bytes].to_vec(),
677                             };
678                             transfer::put(shell_task_id.session_id, message).await;
679                         }
680                         Err(e) => {
681                             crate::warn!("pty read failed: {e:?}");
682                             break;
683                         }
684                     }
685                 },
686 
687                 read_res = child_err_reader.read(&mut buf_err) => {
688                     match read_res {
689                         Ok(bytes) => {
690                             let message = TaskMessage {
691                                 channel_id: shell_task_id.channel_id,
692                                 command: ret_command,
693                                 payload: buf_err[..bytes].to_vec(),
694                             };
695                             transfer::put(shell_task_id.session_id, message).await;
696                         }
697                         Err(e) => {
698                             crate::warn!("pty read failed: {e:?}");
699                             break;
700                         }
701                     }
702                 }
703             }
704 
705             if (watch_pipe_states(&mut rx, &mut child_in).await).is_err() {
706                 crate::warn!("pipe closed shell_task_id:{:?}", shell_task_id);
707                 break;
708             }
709 
710             match child.try_wait() {
711                 Ok(Some(status)) => {
712                     crate::debug!("child exited with:{status} shell_task_id:{:?}", shell_task_id);
713                     read_buf_from_stdout_stderr(&mut child_out_reader, &mut child_err_reader, &shell_task_id, ret_command).await;
714                     break;
715                 },
716                 Ok(None) => {},
717                 Err(e) => {
718                     crate::warn!("child exited with: {:?} shell_task_id:{:?}", e, shell_task_id);
719                     break;
720                 }
721             }
722         }
723 
724         let _ = child.kill().await;
725         crate::debug!("child kill shell_task_id:{:?}", shell_task_id);
726         let _ = child.wait().await;
727         crate::debug!("shell execute finish shell_task_id:{:?}", shell_task_id);
728     } else {
729         crate::debug!("shell spawn failed shell_task_id:{:?}", shell_task_id);
730     }
731 
732     ShellExecuteMap::del(shell_task_id.session_id, shell_task_id.channel_id).await;
733     shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
734 }
735 
736 
737 
738 impl ShellExecuteTask {
new( session_id: u32, channel_id: u32, cmd_param: String, ret_command: HdcCommand, ) -> Self739     pub fn new(
740         session_id: u32,
741         channel_id: u32,
742         cmd_param: String,
743         ret_command: HdcCommand,
744     ) -> Self {
745         let (tx, rx) = ylong_runtime::sync::mpsc::bounded_channel::<Vec<u8>>(16);
746         let cmd = cmd_param.clone();
747         crate::debug!("ShellExecuteTask new session_id {session_id}, channel_id {channel_id}");
748         let shell_task_id = ShellTaskID {session_id, channel_id};
749         let handle = ylong_runtime::spawn(task_for_shell_execute(
750             cmd_param,
751             shell_task_id,
752             ret_command,
753             rx,
754         ));
755         Self {
756             handle,
757             tx,
758             session_id,
759             channel_id,
760             cmd,
761         }
762     }
763 }
764