1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! shell
16 #![allow(missing_docs)]
17
18 #[allow(unused_imports)]
19 use crate::daemon_lib::task_manager;
20 use crate::utils::hdc_log::*;
21 use crate::common::base::Base;
22 use crate::config::TaskMessage;
23 use crate::config::{HdcCommand, MessageLevel, SHELL_PROG};
24 use crate::transfer;
25
26 use std::collections::HashMap;
27 use std::io::{self, Error, ErrorKind};
28 use std::mem::MaybeUninit;
29 use std::os::fd::AsRawFd;
30 use std::process::Stdio;
31 use std::sync::{Arc, Once};
32 use std::env;
33
34 use ylong_runtime::process::pty_process::{Pty, PtyCommand};
35 use ylong_runtime::process::{Child, Command, ChildStdin, ChildStdout, ChildStderr};
36 use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReader};
37 use ylong_runtime::sync::{mpsc, Mutex};
38 use ylong_runtime::sync::error::TryRecvError::Closed;
39
40
41 // -----inner common functions-----
42 #[derive(Debug)]
43 struct ShellTaskID {
44 session_id: u32,
45 channel_id: u32,
46 }
47
trim_quotation_for_cmd(cmd_input: String) -> String48 fn trim_quotation_for_cmd(cmd_input: String) -> String {
49 let mut cmd = cmd_input.trim().to_string();
50 if cmd.starts_with('"') && cmd.ends_with('"') {
51 cmd = match cmd.strip_prefix('"') {
52 Some(cmd_res) => cmd_res.to_string(),
53 None => cmd,
54 };
55 cmd = match cmd.strip_suffix('"') {
56 Some(cmd_res) => cmd_res.to_string(),
57 None => cmd,
58 };
59 }
60 cmd
61 }
62
shell_channel_close(channel_id: u32, session_id: u32)63 async fn shell_channel_close(channel_id: u32, session_id: u32){
64 let message = TaskMessage {
65 channel_id,
66 command: HdcCommand::KernelChannelClose,
67 payload: [1].to_vec(),
68 };
69 transfer::put(session_id, message).await;
70 }
71
stop_task(session_id: u32)72 pub async fn stop_task(session_id: u32) {
73 PtyMap::stop_task(session_id).await;
74 ShellExecuteMap::stop_task(session_id).await;
75 }
76
dump_task() -> String77 pub async fn dump_task() -> String {
78 PtyMap::dump_task().await
79 }
80
81 // -----interactive shell inplementation-----
82 pub struct PtyTask {
83 pub handle: ylong_runtime::task::JoinHandle<()>,
84 pub tx: mpsc::BoundedSender<Vec<u8>>,
85 pub session_id: u32,
86 pub channel_id: u32,
87 pub cmd: Option<String>,
88 }
89
90 struct PtyProcess {
91 pub pty: Pty,
92 pub child: Arc<Mutex<Child>>,
93 }
94
95 impl PtyProcess {
new(pty: Pty, child: Arc<Mutex<Child>>) -> Self96 fn new(pty: Pty, child: Arc<Mutex<Child>>) -> Self {
97 Self {
98 pty,
99 child,
100 }
101 }
102 }
103
104 // hdc shell "/system/bin/uitest start-daemon /data/app/el2/100/base/com.ohos.devicetest/cache/shmf &"
105 // hdc shell "nohup test.sh &"
106 // async cmd will ignor stdout and stderr, if you want the output, cmd format is:
107 // hdc shell "cmd_xxx >/data/local/tmp/log 2>&1 &"
108 // example:
109 // hdc shell "nohup /data/local/tmp/test.sh >/data/local/tmp/log 2>&1 &"
110 // hdc shell "/data/local/tmp/test.sh >/data/local/tmp/log 2>&1 &"
init_pty_process(cmd: Option<String>, _channel_id: u32) -> io::Result<PtyProcess>111 fn init_pty_process(cmd: Option<String>, _channel_id: u32) -> io::Result<PtyProcess> {
112 let pty = match Pty::new() {
113 Ok(pty) => pty,
114 Err(e) => {
115 let msg = format!("pty create error: {}", e);
116 crate::error!("{msg}");
117 return Err(io::Error::new(io::ErrorKind::Other, msg));
118 }
119 };
120
121 let pts = match pty.pts() {
122 Ok(pts) => pts,
123 Err(e) => {
124 let msg = format!("pty pts error: {}", e);
125 crate::error!("{msg}");
126 return Err(io::Error::new(io::ErrorKind::Other, msg));
127 }
128 };
129 let child = match cmd {
130 None => {
131 crate::debug!("input cmd is None. channel_id {_channel_id}");
132 let mut command = PtyCommand::new(SHELL_PROG);
133
134 unsafe {
135 command.pre_exec(|| {
136 Base::de_init_process();
137
138 let home_dir = match env::var("HOME") {
139 Ok(dir) => dir,
140 Err(_) => String::from("/")
141 };
142 let _ = std::env::set_current_dir(home_dir);
143
144 Ok(())
145 });
146 }
147
148 command.spawn(&pts)?
149 }
150 Some(mut cmd) => {
151 crate::debug!("input cmd [{}]", cmd);
152 cmd = trim_quotation_for_cmd(cmd);
153 let params = ["-c", cmd.as_str()].to_vec();
154 let mut proc = PtyCommand::new(SHELL_PROG);
155 let command = proc.args(params);
156 command.spawn(&pts)?
157 }
158 };
159 Ok(PtyProcess::new(
160 pty,
161 Arc::new(Mutex::new(child)),
162 ))
163 }
164
subprocess_task( cmd: Option<String>, session_id: u32, channel_id: u32, ret_command: HdcCommand, mut rx: mpsc::BoundedReceiver<Vec<u8>>, )165 async fn subprocess_task(
166 cmd: Option<String>,
167 session_id: u32,
168 channel_id: u32,
169 ret_command: HdcCommand,
170 mut rx: mpsc::BoundedReceiver<Vec<u8>>,
171 ) {
172 let mut pty_process = match init_pty_process(cmd.clone(), channel_id) {
173 Err(e) => {
174 let msg = format!("execute cmd [{cmd:?}] fail: {e:?}");
175 crate::error!("{}", msg);
176 crate::common::hdctransfer::echo_client(
177 session_id,
178 channel_id,
179 &msg,
180 MessageLevel::Fail,
181 )
182 .await;
183 shell_channel_close(channel_id, session_id).await;
184 return;
185 }
186 Ok(pty) => pty,
187 };
188 PtyChildProcessMap::put(session_id, channel_id, pty_process.child.clone()).await;
189 let mut buf = [0_u8; 30720];
190 loop {
191 ylong_runtime::select! {
192 read_res = pty_process.pty.read(&mut buf) => {
193 match read_res {
194 Ok(bytes) => {
195 let message = TaskMessage {
196 channel_id,
197 command: ret_command,
198 payload: buf[..bytes].to_vec(),
199 };
200 transfer::put(session_id, message).await;
201 }
202 Err(e) => {
203 crate::warn!("pty read failed: {e:?}");
204 break;
205 }
206 }
207 },
208 recv_res = rx.recv() => {
209 match recv_res {
210 Ok(val) => {
211 if val[..].contains(&0x4_u8) {
212 // ctrl-D: end pty
213 crate::info!("ctrl-D: end pty");
214 break;
215 } else if val[..].contains(&0x3_u8) {
216 // ctrl-C: end process
217 crate::info!("ctrl-C: end process");
218 unsafe {
219 let tpgid = libc::tcgetpgrp(pty_process.pty.as_raw_fd());
220 if tpgid > 1 {
221 libc::kill(tpgid,libc::SIGINT);
222 }
223 }
224 continue;
225 } else if val[..].contains(&0x11_u8) {
226 // ctrl-Q: dump process
227 crate::info!("ctrl-Q: dump process");
228 let dump_message = task_manager::dump_running_task_info().await;
229 crate::debug!("dump_message: {}", dump_message);
230 #[cfg(feature = "hdc_debug")]
231 let message = TaskMessage {
232 channel_id,
233 command: ret_command,
234 payload: dump_message.as_bytes().to_vec(),
235 };
236 #[cfg(feature = "hdc_debug")]
237 transfer::put(session_id, message).await;
238 }
239 if let Err(e) = pty_process.pty.write_all(&val).await {
240 crate::warn!(
241 "session_id: {} channel_id: {}, pty write failed: {e:?}",
242 session_id, channel_id
243 );
244 break;
245 }
246 }
247 Err(e) => {
248 crate::debug!("rx recv failed: {e:?}");
249 }
250 }
251 }
252 }
253
254 {
255 let mut child_lock = pty_process.child.lock().await;
256 let status = child_lock.try_wait();
257 match status {
258 Ok(Some(exit_status)) => {
259 crate::debug!("interactive shell finish a process {:?}", exit_status);
260 }
261 Ok(None) => {}
262 Err(e) => {
263 crate::error!("interactive shell wait failed: {e:?}");
264 break;
265 }
266 }
267 }
268 }
269
270 let mut child_lock = pty_process.child.lock().await;
271
272 let kill_result = child_lock.kill().await;
273 crate::debug!("subprocess_task kill child(session_id {session_id}, channel_id {channel_id}), result:{:?}", kill_result);
274 match child_lock.wait().await {
275 Ok(exit_status) => {
276 PtyMap::del(session_id, channel_id).await;
277 crate::debug!(
278 "subprocess_task waiting child exit success, status:{:?}.",
279 exit_status
280 );
281 }
282 Err(e) => {
283 let kill_result = child_lock.kill().await;
284 crate::debug!(
285 "subprocess_task child exit status {:?}, kill child, result:{:?}",
286 e,
287 kill_result
288 );
289 }
290 }
291
292 match child_lock.wait().await {
293 Ok(exit_status) => {
294 PtyMap::del(session_id, channel_id).await;
295 crate::debug!(
296 "subprocess_task waiting child exit success, status:{:?}.",
297 exit_status
298 );
299 }
300 Err(e) => {
301 crate::debug!("subprocess_task waiting child exit fail, error:{:?}.", e);
302 }
303 }
304
305 let message = TaskMessage {
306 channel_id,
307 command: HdcCommand::KernelChannelClose,
308 payload: vec![1],
309 };
310 transfer::put(session_id, message).await;
311 }
312
313 impl PtyTask {
new( session_id: u32, channel_id: u32, option_cmd: Option<String>, ret_command: HdcCommand, ) -> Self314 pub fn new(
315 session_id: u32,
316 channel_id: u32,
317 option_cmd: Option<String>,
318 ret_command: HdcCommand,
319 ) -> Self {
320 let (tx, rx) = ylong_runtime::sync::mpsc::bounded_channel::<Vec<u8>>(16);
321 let cmd = option_cmd.clone();
322 crate::debug!("PtyTask new session_id {session_id}, channel_id {channel_id}");
323 let handle = ylong_runtime::spawn(subprocess_task(
324 option_cmd,
325 session_id,
326 channel_id,
327 ret_command,
328 rx,
329 ));
330 Self {
331 handle,
332 tx,
333 session_id,
334 channel_id,
335 cmd,
336 }
337 }
338 }
339
340 impl Drop for PtyTask {
drop(&mut self)341 fn drop(&mut self) {
342 crate::info!(
343 "PtyTask Drop session_id {}, channel_id {}",
344 self.session_id,
345 self.channel_id
346 );
347 }
348 }
349
350 type Child_ = Arc<Mutex<Child>>;
351 type PtyChildProcessMap_ = Arc<Mutex<HashMap<(u32, u32), Child_>>>;
352 pub struct PtyChildProcessMap {}
353 impl PtyChildProcessMap {
get_instance() -> PtyChildProcessMap_354 fn get_instance() -> PtyChildProcessMap_ {
355 static mut PTY_CHILD_MAP: Option<PtyChildProcessMap_> = None;
356 unsafe {
357 PTY_CHILD_MAP
358 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
359 .clone()
360 }
361 }
362
get(session_id: u32, channel_id: u32) -> Option<Child_>363 pub async fn get(session_id: u32, channel_id: u32) -> Option<Child_> {
364 let pty_child_map = Self::get_instance();
365 let map = pty_child_map.lock().await;
366 if let Some(pty_child) = map.get(&(session_id, channel_id)) {
367 return Some(pty_child.clone());
368 }
369 None
370 }
371
372 #[allow(unused)]
put(session_id: u32, channel_id: u32, pty_child: Child_)373 pub async fn put(session_id: u32, channel_id: u32, pty_child: Child_) {
374 let pty_child_map = Self::get_instance();
375 let mut map = pty_child_map.lock().await;
376 map.insert((session_id, channel_id), pty_child);
377 }
378
del(session_id: u32, channel_id: u32)379 pub async fn del(session_id: u32, channel_id: u32) {
380 let pty_child_map = Self::get_instance();
381 let mut map = pty_child_map.lock().await;
382 map.remove(&(session_id, channel_id));
383 }
384 }
385
386 type PtyMap_ = Arc<Mutex<HashMap<(u32, u32), Arc<PtyTask>>>>;
387 pub struct PtyMap {}
388 impl PtyMap {
get_instance() -> PtyMap_389 fn get_instance() -> PtyMap_ {
390 static mut PTY_MAP: Option<PtyMap_> = None;
391 unsafe {
392 PTY_MAP
393 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
394 .clone()
395 }
396 }
397
get(session_id: u32, channel_id: u32) -> Option<Arc<PtyTask>>398 pub async fn get(session_id: u32, channel_id: u32) -> Option<Arc<PtyTask>> {
399 let pty_map = Self::get_instance();
400 let map = pty_map.lock().await;
401 if let Some(pty_task) = map.get(&(session_id, channel_id)) {
402 return Some(pty_task.clone());
403 }
404 None
405 }
406
put(session_id: u32, channel_id: u32, pty_task: PtyTask)407 pub async fn put(session_id: u32, channel_id: u32, pty_task: PtyTask) {
408 let pty_map = Self::get_instance();
409 let mut map = pty_map.lock().await;
410 let arc_pty_task = Arc::new(pty_task);
411 map.insert((session_id, channel_id), arc_pty_task);
412 }
413
del(session_id: u32, channel_id: u32)414 pub async fn del(session_id: u32, channel_id: u32) {
415 let pty_map = Self::get_instance();
416 let mut map = pty_map.lock().await;
417 map.remove(&(session_id, channel_id));
418 PtyChildProcessMap::del(session_id, channel_id).await;
419 }
420
stop_task(session_id: u32)421 pub async fn stop_task(session_id: u32) {
422 let pty_map = Self::get_instance();
423 {
424 let map = pty_map.lock().await;
425 crate::info!("hdc shell stop_task, session_id:{}, task_size: {}", session_id, map.len());
426 for _iter in map.iter() {
427 if _iter.0 .0 != session_id {
428 continue;
429 }
430 if let Some(pty_child) = PtyChildProcessMap::get(session_id, _iter.0 .1).await {
431 let mut child = pty_child.lock().await;
432 let kill_result = child.kill().await;
433 crate::debug!(
434 "do map clear kill child, result:{:?}, session_id {}, channel_id {}",
435 kill_result,
436 session_id,
437 _iter.0 .1
438 );
439 match child.wait().await {
440 Ok(exit_status) => {
441 crate::debug!(
442 "waiting child exit success, status:{:?}, session_id {}, channel_id {}",
443 exit_status,
444 session_id,
445 _iter.0.1
446 );
447 }
448 Err(e) => {
449 crate::error!(
450 "waiting child exit fail, error:{:?}, session_id {}, channel_id {}",
451 e,
452 session_id,
453 _iter.0 .1
454 );
455 }
456 }
457 PtyChildProcessMap::del(session_id, _iter.0 .1).await;
458 }
459 crate::debug!(
460 "Clear tty task, session_id: {}, channel_id:{}",
461 _iter.0 .0,
462 session_id
463 );
464 }
465 }
466 }
467
dump_task() -> String468 pub async fn dump_task() -> String {
469 let arc = Self::get_instance();
470 let map = arc.lock().await;
471 let mut result = String::new();
472 for _iter in map.iter() {
473 let command = match &_iter.1.cmd {
474 Some(b) => b,
475 _ => "",
476 };
477 result.push_str(&format!(
478 "session_id:{},\tchannel_id:{},\tcommand:{}\n",
479 _iter.1.session_id, _iter.1.channel_id, command
480 ));
481 }
482 result
483 }
484 }
485
486 // -----noninteractive shell implementation-----
487
488 type ShellExecuteMap_ = Mutex<HashMap<(u32, u32), Arc<ShellExecuteTask>>>;
489 pub struct ShellExecuteMap {}
490 impl ShellExecuteMap {
get_instance() -> &'static ShellExecuteMap_491 fn get_instance() -> &'static ShellExecuteMap_ {
492 static mut SHELLEXECUTE_MAP: MaybeUninit<ShellExecuteMap_> = MaybeUninit::uninit();
493 static ONCE: Once = Once::new();
494
495 unsafe {
496 ONCE.call_once(|| {
497 SHELLEXECUTE_MAP = MaybeUninit::new(Mutex::new(HashMap::new()));
498 }
499 );
500 &*SHELLEXECUTE_MAP.as_ptr()
501 }
502 }
503
put(session_id: u32, channel_id: u32, shell_execute_task: ShellExecuteTask)504 pub async fn put(session_id: u32, channel_id: u32, shell_execute_task: ShellExecuteTask) {
505 let shell_execute_map = Self::get_instance();
506 let mut map = shell_execute_map.lock().await;
507 let arc_shell_execute_task = Arc::new(shell_execute_task);
508 map.insert((session_id, channel_id), arc_shell_execute_task);
509 }
510
del(session_id: u32, channel_id: u32)511 pub async fn del(session_id: u32, channel_id: u32) {
512 let shell_execute_map = Self::get_instance();
513 let mut map = shell_execute_map.lock().await;
514 map.remove(&(session_id, channel_id));
515 }
516
stop_task(session_id: u32)517 pub async fn stop_task(session_id: u32) {
518 let shell_execute_map = Self::get_instance();
519 {
520 let mut map = shell_execute_map.lock().await;
521 let mut channel_vec = vec![];
522 for iter in map.iter() {
523 if iter.0 .0 != session_id {
524 continue;
525 }
526 iter.1.handle.cancel();
527 channel_vec.push(iter.0 .1);
528 crate::debug!(
529 "Clear shell_execute_map task, session_id: {}, channel_id:{}, task_size: {}",
530 session_id,
531 iter.0 .1,
532 map.len(),
533 );
534 }
535 for channel_id in channel_vec{
536 map.remove(&(session_id, channel_id));
537 }
538 }
539 }
540
541 }
542
543 pub struct ShellExecuteTask {
544 pub handle: ylong_runtime::task::JoinHandle<()>,
545 pub tx: mpsc::BoundedSender<Vec<u8>>,
546 pub session_id: u32,
547 pub channel_id: u32,
548 pub cmd: String,
549 }
550
551
watch_pipe_states(rx: &mut mpsc::BoundedReceiver<Vec<u8>>, child_in: &mut ChildStdin) -> io::Result<()>552 async fn watch_pipe_states(rx: &mut mpsc::BoundedReceiver<Vec<u8>>, child_in: &mut ChildStdin) -> io::Result<()> {
553 match rx.try_recv() {
554 Err(e) => {
555 if e == Closed {
556 return Err(Error::new(ErrorKind::Other, "pipe closed"));
557 }
558 // 执行top指令时,存在短暂无返回值场景,此时返回值为Err(Empty),需要返回Empty
559 Ok(())
560 },
561 Ok(val) => {
562 crate::debug!("pipe recv {:?}", val);
563 let _ = child_in.write_all(&val).await;
564 Ok(())
565 }
566 }
567 }
568
569
read_buf_from_stdout_stderr(child_out_reader: &mut AsyncBufReader<ChildStdout>, child_err_reader: &mut AsyncBufReader<ChildStderr>, shell_task_id: &ShellTaskID, ret_command: HdcCommand)570 async fn read_buf_from_stdout_stderr(child_out_reader: &mut AsyncBufReader<ChildStdout>, child_err_reader: &mut AsyncBufReader<ChildStderr>, shell_task_id: &ShellTaskID, ret_command: HdcCommand) {
571 let mut buffer = Vec::new();
572 if let Ok(n) = child_out_reader.read_to_end(&mut buffer).await {
573 crate::debug!("read {n} bytes child_out after child exit");
574 if n > 0 {
575 let message = TaskMessage {
576 channel_id: shell_task_id.channel_id,
577 command: ret_command,
578 payload: buffer,
579 };
580 transfer::put(shell_task_id.session_id, message).await;
581 }
582 }
583
584 let mut buffer = Vec::new();
585 if let Ok(n) = child_err_reader.read_to_end(&mut buffer).await {
586 crate::debug!("read {n} bytes child_err child exit");
587 if n > 0 {
588 let message = TaskMessage {
589 channel_id: shell_task_id.channel_id,
590 command: ret_command,
591 payload: buffer,
592 };
593 transfer::put(shell_task_id.session_id, message).await;
594 }
595 }
596 }
597
task_for_shell_execute( cmd_param: String, shell_task_id: ShellTaskID, ret_command: HdcCommand, mut rx: mpsc::BoundedReceiver<Vec<u8>>, )598 async fn task_for_shell_execute(
599 cmd_param: String,
600 shell_task_id: ShellTaskID,
601 ret_command: HdcCommand,
602 mut rx: mpsc::BoundedReceiver<Vec<u8>>,
603 ) {
604 crate::info!(
605 "Execute cmd:[{:?}], session_id: {}, channel_id: {}",
606 cmd_param,
607 shell_task_id.session_id,
608 shell_task_id.channel_id,
609 );
610 let cmd = trim_quotation_for_cmd(cmd_param);
611 let mut shell_cmd = Command::new(SHELL_PROG);
612 shell_cmd.args(["-c", &cmd])
613 .stdout(Stdio::piped())
614 .stdin(Stdio::piped())
615 .stderr(Stdio::piped())
616 .kill_on_drop(true);
617
618 unsafe {
619 shell_cmd.pre_exec(|| {
620 Base::de_init_process();
621 libc::setsid();
622 let pid = libc::getpid();
623 libc::setpgid(pid, pid);
624 Ok(())
625 });
626 }
627
628 if let Ok(mut child) = shell_cmd.spawn() {
629
630 let mut child_in = match child.take_stdin() {
631 Some(child_in_inner) => {
632 child_in_inner
633 },
634 None => {
635 crate::error!("take_stdin failed");
636 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
637 return;
638 },
639 };
640
641 let child_out = match child.take_stdout() {
642 Some(child_out_inner) => {
643 child_out_inner
644 },
645 None => {
646 crate::error!("take_stdin failed");
647 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
648 return;
649 },
650 };
651
652 let child_err = match child.take_stderr() {
653 Some(child_err_inner) => {
654 child_err_inner
655 },
656 None => {
657 crate::error!("take_stdin failed");
658 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
659 return;
660 },
661 };
662
663 let mut child_out_reader = ylong_runtime::io::AsyncBufReader::new(child_out);
664 let mut child_err_reader = ylong_runtime::io::AsyncBufReader::new(child_err);
665 let mut buf_out = [0u8; 30720];
666 let mut buf_err = [0u8; 30720];
667
668 loop {
669 ylong_runtime::select! {
670 read_res = child_out_reader.read(&mut buf_out) => {
671 match read_res {
672 Ok(bytes) => {
673 let message = TaskMessage {
674 channel_id: shell_task_id.channel_id,
675 command: ret_command,
676 payload: buf_out[..bytes].to_vec(),
677 };
678 transfer::put(shell_task_id.session_id, message).await;
679 }
680 Err(e) => {
681 crate::warn!("pty read failed: {e:?}");
682 break;
683 }
684 }
685 },
686
687 read_res = child_err_reader.read(&mut buf_err) => {
688 match read_res {
689 Ok(bytes) => {
690 let message = TaskMessage {
691 channel_id: shell_task_id.channel_id,
692 command: ret_command,
693 payload: buf_err[..bytes].to_vec(),
694 };
695 transfer::put(shell_task_id.session_id, message).await;
696 }
697 Err(e) => {
698 crate::warn!("pty read failed: {e:?}");
699 break;
700 }
701 }
702 }
703 }
704
705 if (watch_pipe_states(&mut rx, &mut child_in).await).is_err() {
706 crate::warn!("pipe closed shell_task_id:{:?}", shell_task_id);
707 break;
708 }
709
710 match child.try_wait() {
711 Ok(Some(status)) => {
712 crate::debug!("child exited with:{status} shell_task_id:{:?}", shell_task_id);
713 read_buf_from_stdout_stderr(&mut child_out_reader, &mut child_err_reader, &shell_task_id, ret_command).await;
714 break;
715 },
716 Ok(None) => {},
717 Err(e) => {
718 crate::warn!("child exited with: {:?} shell_task_id:{:?}", e, shell_task_id);
719 break;
720 }
721 }
722 }
723
724 let _ = child.kill().await;
725 crate::debug!("child kill shell_task_id:{:?}", shell_task_id);
726 let _ = child.wait().await;
727 crate::debug!("shell execute finish shell_task_id:{:?}", shell_task_id);
728 } else {
729 crate::debug!("shell spawn failed shell_task_id:{:?}", shell_task_id);
730 }
731
732 ShellExecuteMap::del(shell_task_id.session_id, shell_task_id.channel_id).await;
733 shell_channel_close(shell_task_id.channel_id, shell_task_id.session_id).await;
734 }
735
736
737
738 impl ShellExecuteTask {
new( session_id: u32, channel_id: u32, cmd_param: String, ret_command: HdcCommand, ) -> Self739 pub fn new(
740 session_id: u32,
741 channel_id: u32,
742 cmd_param: String,
743 ret_command: HdcCommand,
744 ) -> Self {
745 let (tx, rx) = ylong_runtime::sync::mpsc::bounded_channel::<Vec<u8>>(16);
746 let cmd = cmd_param.clone();
747 crate::debug!("ShellExecuteTask new session_id {session_id}, channel_id {channel_id}");
748 let shell_task_id = ShellTaskID {session_id, channel_id};
749 let handle = ylong_runtime::spawn(task_for_shell_execute(
750 cmd_param,
751 shell_task_id,
752 ret_command,
753 rx,
754 ));
755 Self {
756 handle,
757 tx,
758 session_id,
759 channel_id,
760 cmd,
761 }
762 }
763 }
764