• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19 
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 use std::fs;
26 #[cfg(not(target_os = "windows"))]
27 use std::fs::File;
28 #[cfg(not(target_os = "windows"))]
29 use std::io::Read;
30 use std::io::{self, Error, ErrorKind};
31 use ylong_runtime::sync::{Mutex, RwLock};
32 
33 use crate::common::base::Base;
34 use crate::common::hdctransfer::transfer_task_finish;
35 use crate::common::hdctransfer::{self, HdcTransferBase};
36 #[cfg(not(feature = "host"))]
37 use crate::common::jdwp::Jdwp;
38 #[cfg(not(target_os = "windows"))]
39 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
40 use crate::config;
41 use crate::config::HdcCommand;
42 use crate::config::MessageLevel;
43 use crate::config::TaskMessage;
44 use crate::transfer;
45 #[allow(unused)]
46 use crate::utils::hdc_log::*;
47 use std::sync::Arc;
48 #[cfg(not(feature = "host"))]
49 use std::time::Duration;
50 use ylong_runtime::io::AsyncReadExt;
51 use ylong_runtime::io::AsyncWriteExt;
52 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
53 use ylong_runtime::task::JoinHandle;
54 
55 pub const ARG_COUNT2: u32 = 2;
56 pub const BUF_SIZE_SMALL: usize = 256;
57 pub const SOCKET_BUFFER_SIZE: usize = 65535;
58 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
59 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
60 
61 #[cfg(feature = "host")]
62 #[derive(Clone, Debug)]
63 pub struct HdcForwardInfo {
64     pub session_id: u32,
65     pub channel_id: u32,
66     pub forward_direction: bool,
67     pub task_string: String,
68     pub connect_key: String,
69 }
70 
71 #[cfg(feature = "host")]
72 impl HdcForwardInfo {
new( session_id: u32, channel_id: u32, forward_direction: bool, task_string: String, connect_key: String, ) -> Self73     fn new(
74         session_id: u32,
75         channel_id: u32,
76         forward_direction: bool,
77         task_string: String,
78         connect_key: String,
79     ) -> Self {
80         Self {
81             session_id,
82             channel_id,
83             forward_direction,
84             task_string,
85             connect_key,
86         }
87     }
88 }
89 
90 #[cfg(feature = "host")]
91 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
92 #[cfg(feature = "host")]
93 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
94 #[cfg(feature = "host")]
95 pub struct HdcForwardInfoMap {}
96 #[cfg(feature = "host")]
97 impl HdcForwardInfoMap {
get_instance() -> HdcForwardInfoMap_98     fn get_instance() -> HdcForwardInfoMap_ {
99         static mut MAP: Option<HdcForwardInfoMap_> = None;
100         unsafe {
101             MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
102                 .clone()
103         }
104     }
105 
put(forward_info: HdcForwardInfo)106     async fn put(forward_info: HdcForwardInfo) {
107         let instance = Self::get_instance();
108         let mut map = instance.lock().await;
109         map.insert(
110             forward_info.task_string.clone(),
111             Arc::new(Mutex::new(forward_info)),
112         );
113     }
114 
get_all_forward_infos() -> Vec<HdcForwardInfo>115     pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
116         let instance = Self::get_instance();
117         let map = instance.lock().await;
118         let mut result = Vec::new();
119         for (_key, value) in map.iter() {
120             let info = value.lock().await;
121             result.push((*info).clone());
122         }
123         result
124     }
125 
remove_forward(task_string: String, forward_direction: bool) -> bool126     pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
127         crate::info!(
128             "remove_forward task_string:{}, direction:{}",
129             task_string,
130             forward_direction
131         );
132         let instance = Self::get_instance();
133         let map = instance.lock().await;
134         let mut remove_key = String::new();
135         let prefix = if forward_direction {
136             "1|".to_string()
137         } else {
138             "0|".to_string()
139         };
140         let mut task_string1 = prefix;
141         task_string1.push_str(task_string.as_str());
142         for (key, value) in map.iter() {
143             let info = value.lock().await;
144             if info.task_string.contains(&task_string1)
145                 && info.forward_direction == forward_direction
146             {
147                 remove_key = (*key.clone()).to_string();
148                 break;
149             }
150         }
151         drop(map);
152         if remove_key.is_empty() {
153             return false;
154         }
155 
156         let mut map = instance.lock().await;
157         let result = map.remove(&remove_key);
158         result.is_some()
159     }
160 }
161 
162 type TcpRead = Arc<Mutex<SplitReadHalf>>;
163 type TcpReadMap_ = Arc<RwLock<HashMap<u32, TcpRead>>>;
164 pub struct TcpReadStreamMap {}
165 impl TcpReadStreamMap {
get_instance() -> TcpReadMap_166     fn get_instance() -> TcpReadMap_ {
167         static mut TCP_MAP: Option<TcpReadMap_> = None;
168         unsafe {
169             TCP_MAP
170                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
171                 .clone()
172         }
173     }
174     #[allow(unused)]
put(id: u32, rd: SplitReadHalf)175     async fn put(id: u32, rd: SplitReadHalf) {
176         let instance = Self::get_instance();
177         let mut map = instance.write().await;
178         let arc_rd = Arc::new(Mutex::new(rd));
179         map.insert(id, arc_rd);
180     }
181     #[allow(unused)]
read(session_id: u32, channel_id: u32, cid: u32)182     async fn read(session_id: u32, channel_id: u32, cid: u32) {
183         let arc_map = Self::get_instance();
184         let map = arc_map.read().await;
185         let Some(arc_rd) = map.get(&cid) else {
186             crate::error!("TcpReadStreamMap failed to get cid {:#?}", cid);
187             return;
188         };
189         let rd = &mut arc_rd.lock().await;
190         let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
191         loop {
192             match rd.read(&mut data).await {
193                 Ok(recv_size) => {
194                     if recv_size == 0 {
195                         free_context(session_id, channel_id, 0, true).await;
196                         crate::info!("tcp close shutdown, channel_id = {:#?}", channel_id);
197                         return;
198                     }
199                     if send_to_task(
200                         session_id,
201                         channel_id,
202                         HdcCommand::ForwardData,
203                         &data[0..recv_size],
204                         recv_size,
205                         cid,
206                     )
207                     .await
208                     {
209                         crate::info!("send task success");
210                     }
211                 }
212                 Err(_e) => {
213                     crate::error!("tcp stream rd read failed");
214                 }
215             }
216         }
217     }
218 }
219 
220 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
221 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
222 pub struct TcpWriteStreamMap {}
223 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_224     fn get_instance() -> TcpWriterMap_ {
225         static mut TCP_MAP: Option<TcpWriterMap_> = None;
226         unsafe {
227             TCP_MAP
228                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
229                 .clone()
230         }
231     }
232     #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)233     async fn put(id: u32, wr: SplitWriteHalf) {
234         let instance = Self::get_instance();
235         let mut map = instance.write().await;
236         let arc_wr = Arc::new(Mutex::new(wr));
237         map.insert(id, arc_wr);
238     }
239     #[allow(unused)]
write(id: u32, data: Vec<u8>)240     async fn write(id: u32, data: Vec<u8>) {
241         let arc_map = Self::get_instance();
242         let map = arc_map.write().await;
243         let Some(arc_wr) = map.get(&id) else {
244             crate::error!("TcpReadStreamMap failed to get id {:#?}", id);
245             return;
246         };
247         let mut wr = arc_wr.lock().await;
248         let _ = wr.write_all(data.as_slice()).await;
249     }
250 
end(id: u32)251     pub async fn end(id: u32) {
252         let instance = Self::get_instance();
253         let mut map = instance.write().await;
254         if let Some(arc_wr) = map.remove(&id) {
255             let mut wr = arc_wr.lock().await;
256             let _ = wr.shutdown().await;
257         }
258     }
259 }
260 
261 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
262 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
263 pub struct TcpListenerMap {}
264 impl TcpListenerMap {
get_instance() -> TcpListenerMap_265     fn get_instance() -> TcpListenerMap_ {
266         static mut TCP_MAP: Option<TcpListenerMap_> = None;
267         unsafe {
268             TCP_MAP
269                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
270                 .clone()
271         }
272     }
273     #[allow(unused)]
put(id: u32, listener: JoinHandle<()>)274     async fn put(id: u32, listener: JoinHandle<()>) {
275         let instance = Self::get_instance();
276         let mut map = instance.write().await;
277         let arc_listener = Arc::new(Mutex::new(listener));
278         map.insert(id, arc_listener);
279         crate::info!("forward tcp put listener id = {id}");
280     }
281 
end(id: u32)282     pub async fn end(id: u32) {
283         let instance = Self::get_instance();
284         let mut map = instance.write().await;
285         if let Some(arc_listener) = map.remove(&id) {
286             let join_handle = arc_listener.lock().await;
287             join_handle.cancel();
288         }
289     }
290 }
291 
292 #[derive(Default, Eq, PartialEq, Clone, Debug)]
293 enum ForwardType {
294     #[default]
295     Tcp = 0,
296     Device,
297     Abstract,
298     FileSystem,
299     Jdwp,
300     Ark,
301     Reserved,
302 }
303 
304 #[derive(Debug, Default, PartialEq, Eq, Clone)]
305 pub struct ContextForward {
306     session_id: u32,
307     channel_id: u32,
308     check_order: bool,
309     id: u32,
310     fd: i32,
311     remote_parameters: String,
312     last_error: String,
313     forward_type: ForwardType,
314 }
315 
316 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
317 pub struct ForwardTaskMap {}
318 impl ForwardTaskMap {
get_instance() -> MapForward_319     fn get_instance() -> MapForward_ {
320         static mut FORWARD_MAP: Option<MapForward_> = None;
321         unsafe {
322             FORWARD_MAP
323                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
324                 .clone()
325         }
326     }
327 
update(session_id: u32, channel_id: u32, value: HdcForward)328     pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
329         let map = Self::get_instance();
330         let mut map = map.lock().await;
331         map.insert((session_id, channel_id), value.clone());
332     }
333 
remove(session_id: u32, channel_id: u32)334     pub async fn remove(session_id: u32, channel_id: u32) {
335         crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
336         let map = Self::get_instance();
337         let mut map = map.lock().await;
338         let _ = map.remove(&(session_id, channel_id));
339     }
340 
get(session_id: u32, channel_id: u32) -> Option<HdcForward>341     pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
342         let arc = Self::get_instance();
343         let map = arc.lock().await;
344         let task = map.get(&(session_id, channel_id));
345         match task {
346             Some(task) => Some(task.clone()),
347             None => {
348                 crate::error!(
349                     "ForwardTaskMap result:is none,session_id={:#?}, channel_id={:#?}",
350                     session_id,
351                     channel_id
352                 );
353                 Option::None
354             }
355         }
356     }
357 
get_channel_id(session_id: u32, task_string: String) -> Option<u32>358     pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
359         let arc = Self::get_instance();
360         let map = arc.lock().await;
361         for ((_session_id, _channel_id), value) in map.iter() {
362             if *_session_id == session_id && task_string.contains(&value.task_command) {
363                 return Some(*_channel_id);
364             }
365         }
366         None
367     }
368 
clear(session_id: u32)369     pub async fn clear(session_id: u32) {
370         let arc = Self::get_instance();
371         let mut channel_list = Vec::new();
372         {
373             let map = arc.lock().await;
374             if map.is_empty() {
375                 return;
376             }
377             for (&key, _) in map.iter() {
378                 if key.0 == session_id {
379                     let id = key;
380                     channel_list.push(id);
381                 }
382             }
383         }
384         for id in channel_list {
385             free_channel_task(id.0, id.1).await;
386         }
387     }
388 
dump_task() -> String389     pub async fn dump_task() -> String {
390         let arc = Self::get_instance();
391         let map = arc.lock().await;
392         let mut result = String::new();
393         for (_id, forward_task) in map.iter() {
394             let forward_type = match forward_task.remote_args.len() {
395                 0 => "fport".to_string(),
396                 2 => "rport".to_string(),
397                 _ => "unknown".to_string(),
398             };
399             let first_args = match forward_task.remote_args.len() {
400                 0 => "unknown".to_string(),
401                 2 => format!(
402                     "{}:{}",
403                     forward_task.local_args[0], forward_task.local_args[1]
404                 ),
405                 _ => "unknown".to_string(),
406             };
407             let second_args = match forward_task.remote_args.len() {
408                 0 => format!(
409                     "{}:{}",
410                     forward_task.local_args[0], forward_task.local_args[1]
411                 ),
412                 2 => format!(
413                     "{}:{}",
414                     forward_task.remote_args[0], forward_task.remote_args[1]
415                 ),
416                 _ => "unknown".to_string(),
417             };
418             result.push_str(&format!(
419                 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
420                 forward_task.session_id,
421                 forward_task.channel_id,
422                 forward_type,
423                 first_args,
424                 second_args
425             ));
426         }
427         result
428     }
429 }
430 
free_channel_task(session_id: u32, channel_id: u32)431 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
432     free_context(session_id, channel_id, 0, false).await;
433 }
434 
stop_task(session_id: u32)435 pub async fn stop_task(session_id: u32) {
436     ForwardTaskMap::clear(session_id).await;
437 }
438 
dump_task() -> String439 pub async fn dump_task() -> String {
440     ForwardTaskMap::dump_task().await
441 }
442 
443 #[derive(Debug, Default, Clone, PartialEq, Eq)]
444 pub struct HdcForward {
445     session_id: u32,
446     channel_id: u32,
447     is_master: bool,
448     local_args: Vec<String>,
449     remote_args: Vec<String>,
450     remote_parameters: String,
451     task_command: String,
452     forward_type: ForwardType,
453     context_forward: ContextForward,
454     map_ctx_point: HashMap<u32, ContextForward>,
455     pub transfer: HdcTransferBase,
456 }
457 
458 impl HdcForward {
new(session_id: u32, channel_id: u32) -> Self459     pub fn new(session_id: u32, channel_id: u32) -> Self {
460         Self {
461             session_id,
462             channel_id,
463             is_master: Default::default(),
464             local_args: Default::default(),
465             remote_args: Default::default(),
466             task_command: Default::default(),
467             remote_parameters: Default::default(),
468             forward_type: Default::default(),
469             context_forward: Default::default(),
470             map_ctx_point: HashMap::new(),
471             transfer: HdcTransferBase::new(session_id, channel_id),
472         }
473     }
474 }
475 
get_id(_payload: &[u8]) -> u32476 pub fn get_id(_payload: &[u8]) -> u32 {
477     let mut id_bytes = [0u8; 4];
478     id_bytes.copy_from_slice(&_payload[0..4]);
479     let id: u32 = u32::from_be_bytes(id_bytes);
480     id
481 }
482 
check_node_info(value: &String, arg: &mut Vec<String>) -> bool483 pub async fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
484     crate::info!("check cmd args value is: {:#?}", value);
485     if !value.contains(':') {
486         return false;
487     }
488     let array = value.split(':').collect::<Vec<&str>>();
489 
490     if array[0] == "tcp" {
491         if array[1].len() > config::MAX_PORT_LEN {
492             crate::error!(
493                 "forward port = {:#?} it'slength is wrong, can not more than five",
494                 array[1]
495             );
496             return false;
497         }
498 
499         match array[1].parse::<u32>() {
500             Ok(port) => {
501                 if port == 0 || port > config::MAX_PORT_NUM {
502                     crate::error!("port can not greater than: 65535");
503                     return false;
504                 }
505             }
506             Err(_) => {
507                 crate::error!("port must is int type, port is: {:#?}", array[1]);
508                 return false;
509             }
510         }
511     }
512     for item in array.iter() {
513         arg.push(String::from(item.to_owned()));
514     }
515     true
516 }
517 
518 #[cfg(feature = "host")]
on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()>519 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
520     crate::info!("on_forward_success");
521     let channel_id = task_message.channel_id;
522     let payload = task_message.payload;
523     let forward_direction = payload[0] == b'1';
524     let task_string = String::from_utf8(payload);
525     let connect_key = "unknow key".to_string();
526     if task_string.is_ok() {
527         let info = HdcForwardInfo::new(
528             session_id,
529             channel_id,
530             forward_direction,
531             task_string.unwrap(),
532             connect_key,
533         );
534         HdcForwardInfoMap::put(info).await;
535     }
536     transfer::TcpMap::end(task_message.channel_id).await;
537     Ok(())
538 }
539 
check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool540 pub async fn check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
541     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
542         crate::error!("check_command task is none");
543         return false;
544     };
545     let task = &mut task.clone();
546     if !_payload.is_empty() {
547         echo_client(
548             session_id,
549             channel_id,
550             "Forwardport result:OK",
551             MessageLevel::Ok,
552         )
553         .await;
554         let map_info = String::from(if task.transfer.server_or_daemon {
555             "1|"
556         } else {
557             "0|"
558         }) + &task.task_command;
559 
560         let mut command_string = vec![0_u8; map_info.len() + 1];
561         map_info
562             .as_bytes()
563             .to_vec()
564             .iter()
565             .enumerate()
566             .for_each(|(i, e)| {
567                 command_string[i] = *e;
568             });
569         let forward_success_message = TaskMessage {
570             channel_id,
571             command: HdcCommand::ForwardSuccess,
572             payload: command_string,
573         };
574         #[cfg(feature = "host")]
575         {
576             let _ = on_forward_success(forward_success_message, session_id).await;
577         }
578         #[cfg(not(feature = "host"))]
579         {
580             transfer::put(session_id, forward_success_message).await;
581         }
582     } else {
583         echo_client(
584             session_id,
585             channel_id,
586             "Forwardport result: Failed",
587             MessageLevel::Fail,
588         )
589         .await;
590         free_context(session_id, channel_id, 0, false).await;
591         return false;
592     }
593     true
594 }
595 
detech_forward_type(session_id: u32, channel_id: u32) -> bool596 pub async fn detech_forward_type(session_id: u32, channel_id: u32) -> bool {
597     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
598         crate::error!("detech_forward_type get task is none session_id = {session_id}, channel_id = {channel_id}");
599         return false;
600     };
601     let task = &mut task.clone();
602 
603     let type_str = &task.local_args[0];
604 
605     match type_str.as_str() {
606         "tcp" => {
607             task.forward_type = ForwardType::Tcp;
608         }
609         "dev" => {
610             task.forward_type = ForwardType::Device;
611         }
612         "localabstract" => {
613             task.forward_type = ForwardType::Abstract;
614         }
615         "localfilesystem" => {
616             task.local_args[1] = HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &task.local_args[1];
617             task.forward_type = ForwardType::FileSystem;
618         }
619         "jdwp" => {
620             task.forward_type = ForwardType::Jdwp;
621         }
622         "ark" => {
623             task.forward_type = ForwardType::Ark;
624         }
625         "localreserved" => {
626             task.local_args[1] = FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &task.local_args[1];
627             task.forward_type = ForwardType::Reserved;
628         }
629         _ => {
630             crate::error!("this forward type may is not expected");
631             ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
632             return false;
633         }
634     }
635     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
636     true
637 }
638 
forward_tcp_accept( session_id: u32, channel_id: u32, port: u32, value: String, cid: u32, ) -> io::Result<()>639 pub async fn forward_tcp_accept(
640     session_id: u32,
641     channel_id: u32,
642     port: u32,
643     value: String,
644     cid: u32,
645 ) -> io::Result<()> {
646     let saddr = format!("127.0.0.1:{}", port);
647     crate::info!("forward_tcp_accept bind addr:{:#?}", saddr);
648     let result = TcpListener::bind(saddr.clone()).await;
649     match result {
650         Ok(listener) => {
651             crate::info!("forward_tcp_accept bind ok");
652             let join_handle = ylong_runtime::spawn(async move {
653                 loop {
654                     let client = listener.accept().await;
655                     if client.is_err() {
656                         continue;
657                     }
658                     let (stream, _addr) = client.unwrap();
659                     let (rd, wr) = stream.into_split();
660                     TcpWriteStreamMap::put(channel_id, wr).await;
661                     ylong_runtime::spawn(on_accept(session_id, channel_id, value.clone(), cid));
662                     recv_tcp_msg(session_id, channel_id, rd, cid).await;
663                 }
664             });
665             TcpListenerMap::put(channel_id, join_handle).await;
666             Ok(())
667         }
668         Err(e) => {
669             crate::error!("forward_tcp_accept fail:{:#?}", e);
670             Err(e)
671         }
672     }
673 }
674 
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)675 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
676     let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
677     loop {
678         match rd.read(&mut data).await {
679             Ok(recv_size) => {
680                 if recv_size == 0 {
681                     free_context(session_id, channel_id, 0, true).await;
682                     drop(rd);
683                     crate::info!("recv_size is 0, tcp close shutdown");
684                     return;
685                 }
686                 if send_to_task(
687                     session_id,
688                     channel_id,
689                     HdcCommand::ForwardData,
690                     &data[0..recv_size],
691                     recv_size,
692                     cid,
693                 )
694                 .await
695                 {
696                     crate::info!("send task success");
697                 }
698             }
699             Err(_e) => {
700                 crate::error!(
701                     "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
702                 );
703             }
704         }
705     }
706 }
707 
on_accept(session_id: u32, channel_id: u32, value: String, cid: u32)708 pub async fn on_accept(session_id: u32, channel_id: u32, value: String, cid: u32) {
709     let buf_string: Vec<u8> = value.as_bytes().to_vec();
710     let mut new_buf = vec![0_u8; buf_string.len() + 9];
711 
712     buf_string.iter().enumerate().for_each(|(i, e)| {
713         new_buf[i + 8] = *e;
714     });
715 
716     send_to_task(
717         session_id,
718         channel_id,
719         HdcCommand::ForwardActiveSlave,
720         &new_buf,
721         buf_string.len() + 9,
722         cid,
723     )
724     .await;
725 }
726 
daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32)727 pub async fn daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32) {
728     let saddr = format!("127.0.0.1:{}", port);
729     let stream = match TcpStream::connect(saddr).await {
730         Err(err) => {
731             crate::error!("TcpStream::stream failed {:?}", err);
732             free_context(session_id, channel_id, 0, false).await;
733             return;
734         }
735         Ok(addr) => addr,
736     };
737     send_active_master(session_id, channel_id).await;
738     let (rd, wr) = stream.into_split();
739     TcpWriteStreamMap::put(channel_id, wr).await;
740     recv_tcp_msg(session_id, channel_id, rd, cid).await;
741 }
742 
743 #[cfg(not(target_os = "windows"))]
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32)744 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32) {
745     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
746         crate::error!("deamon_read_socket_msg get task is none session_id={session_id},channel_id={channel_id}");
747         return;
748     };
749     let task = &mut task.clone();
750     loop {
751         let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
752         let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
753         if recv_size <= 0 {
754             free_context(session_id, channel_id, 0, true).await;
755             crate::info!("local abstract close shutdown");
756             return;
757         }
758         if send_to_task(
759             session_id,
760             channel_id,
761             HdcCommand::ForwardData,
762             &buffer[0..recv_size as usize],
763             recv_size as usize,
764             task.context_forward.id,
765         )
766         .await
767         {
768             crate::info!("send task success");
769         }
770     }
771 }
772 
free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool)773 pub async fn free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool) {
774     crate::info!("free context id = {id}");
775     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
776         crate::error!(
777             "free_context get task is none session_id={session_id},channel_id={channel_id}"
778         );
779         return;
780     };
781     let task = &mut task.clone();
782     if notify_remote {
783         let vec_none = Vec::<u8>::new();
784         send_to_task(
785             session_id,
786             channel_id,
787             HdcCommand::ForwardFreeContext,
788             &vec_none,
789             0,
790             task.context_forward.id,
791         )
792         .await;
793     }
794     match task.forward_type {
795         ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
796             TcpWriteStreamMap::end(channel_id).await;
797             TcpListenerMap::end(channel_id).await;
798         }
799         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
800             #[cfg(not(target_os = "windows"))]
801             UdsServer::wrap_close(task.context_forward.fd);
802         }
803         ForwardType::Device => {
804             return;
805         }
806     }
807     ForwardTaskMap::remove(session_id, channel_id).await;
808 }
809 
setup_tcp_point(session_id: u32, channel_id: u32) -> bool810 pub async fn setup_tcp_point(session_id: u32, channel_id: u32) -> bool {
811     let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
812         crate::error!(
813             "setup_tcp_point get task is none session_id={session_id},channel_id={channel_id}"
814         );
815         return false;
816     };
817     let task = &mut task;
818     let Ok(port) = task.local_args[1].parse::<u32>() else {
819         crate::error!("setup_tcp_point parse error");
820         return false;
821     };
822     let cid = task.context_forward.id;
823     if task.is_master {
824         let parameters = task.remote_parameters.clone();
825         let result = forward_tcp_accept(session_id, channel_id, port, parameters, cid).await;
826         crate::info!("setup_tcp_point result:{:?}", result);
827         task.context_forward.last_error = format!("TCP Port listen failed at {}", port);
828         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
829         return result.is_ok();
830     } else {
831         crate::info!("setup_tcp_point slaver");
832         ylong_runtime::spawn(
833             async move { daemon_connect_tcp(session_id, channel_id, port, cid).await },
834         );
835     }
836     true
837 }
838 
839 #[cfg(not(target_os = "windows"))]
server_socket_bind_listen( session_id: u32, channel_id: u32, path: String, cid: u32, ) -> bool840 async fn server_socket_bind_listen(
841     session_id: u32,
842     channel_id: u32,
843     path: String,
844     cid: u32,
845 ) -> bool {
846     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
847         crate::error!(
848             "setup_tcp_point get task is none session_id={session_id},channel_id={channel_id}"
849         );
850         return false;
851     };
852     let task = &mut task.clone();
853     let parameters = task.remote_parameters.clone();
854     let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
855     task.context_forward.fd = fd;
856     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
857 
858     let name: Vec<u8> = path.as_bytes().to_vec();
859     let mut socket_name = vec![0_u8; name.len() + 1];
860     socket_name[0] = b'\0';
861     name.iter().enumerate().for_each(|(i, e)| {
862         socket_name[i + 1] = *e;
863     });
864     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
865     if let Ok(addr_obj) = &addr {
866         let ret = UdsServer::wrap_bind(fd, addr_obj);
867         if ret.is_err() {
868             echo_client(
869                 session_id,
870                 channel_id,
871                 "Unix pipe bind failed",
872                 MessageLevel::Fail,
873             )
874             .await;
875             crate::error!("bind fail");
876             return false;
877         }
878         let ret = UdsServer::wrap_listen(fd);
879         if ret < 0 {
880             echo_client(
881                 session_id,
882                 channel_id,
883                 "Unix pipe listen failed",
884                 MessageLevel::Fail,
885             )
886             .await;
887             crate::error!("listen fail");
888             return false;
889         }
890         ylong_runtime::spawn(async move {
891             loop {
892                 let client_fd = UdsServer::wrap_accept(fd);
893                 if client_fd == -1 {
894                     break;
895                 }
896                 ylong_runtime::spawn(on_accept(session_id, channel_id, parameters.clone(), cid));
897             }
898         });
899     }
900     true
901 }
902 
canonicalize(path: String) -> Result<String, Error>903 pub async fn canonicalize(path: String) -> Result<String, Error> {
904     match fs::canonicalize(path) {
905         Ok(abs_path) => match abs_path.to_str() {
906             Some(path) => Ok(path.to_string()),
907             None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
908         },
909         Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
910     }
911 }
912 
913 #[cfg(target_os = "windows")]
setup_device_point(_session_id: u32, _channel_id: u32) -> bool914 pub async fn setup_device_point(_session_id: u32, _channel_id: u32) -> bool {
915     false
916 }
917 
918 #[cfg(not(target_os = "windows"))]
setup_device_point(session_id: u32, channel_id: u32) -> bool919 pub async fn setup_device_point(session_id: u32, channel_id: u32) -> bool {
920     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
921         crate::error!(
922             "setup_device_point get task is none session_id={session_id},channel_id={channel_id}"
923         );
924         return false;
925     };
926     let task = &mut task.clone();
927     let s_node_cfg = task.local_args[1].clone();
928     let cid = task.context_forward.id;
929 
930     let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
931         crate::error!("Open unix-dev failed");
932         return false;
933     };
934     let thread_path_ref = Arc::new(Mutex::new(resolv_path));
935     if !send_active_master(session_id, channel_id).await {
936         crate::error!(
937             "send_active_master return failed channel_id={:?}",
938             channel_id
939         );
940         return false;
941     }
942 
943     ylong_runtime::spawn(async move {
944         loop {
945             let path = thread_path_ref.lock().await;
946             let Ok(mut file) = File::open(&*path) else {
947                 crate::error!("open {} failed.", *path);
948                 break;
949             };
950             let mut total = Vec::new();
951             let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
952                 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
953             let Ok(read_len) = file.read(&mut buf[4..]) else {
954                 crate::error!("read {} failed.", *path);
955                 break;
956             };
957             if read_len == 0 {
958                 free_context(session_id, channel_id, 0, true).await;
959                 break;
960             }
961             total.append(&mut buf[0..read_len].to_vec());
962             send_to_task(
963                 session_id,
964                 channel_id,
965                 HdcCommand::ForwardData,
966                 &total,
967                 read_len,
968                 cid,
969             )
970             .await;
971         }
972     });
973     true
974 }
975 
976 #[cfg(not(feature = "host"))]
get_pid(parameter: &str, forward_type: ForwardType) -> u32977 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
978     match forward_type == ForwardType::Jdwp {
979         true => parameter.parse::<u32>().unwrap_or_else(|e| {
980             crate::error!("Jdwp get pid err :{:?}", e);
981             0_u32
982         }),
983         false => {
984             let params: Vec<&str> = parameter.split('@').collect();
985             params[0].parse::<u32>().unwrap_or_else(|e| {
986                 crate::error!("get pid err :{:?}", e);
987                 0_u32
988             })
989         }
990     }
991 }
992 #[cfg(feature = "host")]
setup_jdwp_point(_session_id: u32, _channel_id: u32) -> bool993 pub async fn setup_jdwp_point(_session_id: u32, _channel_id: u32) -> bool {
994     crate::info!("not daemon setup_jdwp_point");
995     false
996 }
997 
998 #[cfg(not(feature = "host"))]
setup_jdwp_point(session_id: u32, channel_id: u32) -> bool999 pub async fn setup_jdwp_point(session_id: u32, channel_id: u32) -> bool {
1000     crate::info!("setup_jdwp_point start.");
1001     let Some(task): Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await else {
1002         crate::error!(
1003             "setup_jdwp_point get task is none session_id={session_id},channel_id={channel_id}"
1004         );
1005         return false;
1006     };
1007     let task = &mut task.clone();
1008     let local_args = task.local_args[1].clone();
1009     let parameter = local_args.as_str();
1010     let style = &task.forward_type;
1011     let pid = get_pid(parameter, style.clone());
1012     let cid = task.context_forward.id;
1013     if pid == 0 {
1014         crate::error!("setup_jdwp_point get pid is 0");
1015         return false;
1016     }
1017 
1018     let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1019     if result.is_err() {
1020         crate::error!("wrap socketpair failed");
1021         return false;
1022     }
1023     let mut target_fd = 0;
1024     let mut local_fd = 0;
1025     if let Ok((fd0, fd1)) = result {
1026         crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1027         local_fd = fd0;
1028         task.context_forward.fd = local_fd;
1029         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1030         target_fd = fd1;
1031     }
1032 
1033     ylong_runtime::spawn(async move {
1034         loop {
1035             let mut buffer = [0u8; 1024];
1036             let size = UdsServer::wrap_read(local_fd, &mut buffer);
1037             if size < 0 {
1038                 crate::error!("disconnect, error:{:?}", size);
1039                 free_context(session_id, channel_id, 0, true).await;
1040                 break;
1041             }
1042             if size == 0 {
1043                 ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1044                 continue;
1045             }
1046             send_to_task(
1047                 session_id,
1048                 channel_id,
1049                 HdcCommand::ForwardData,
1050                 &buffer[0..size as usize],
1051                 size as usize,
1052                 cid,
1053             )
1054             .await;
1055         }
1056     });
1057 
1058     let jdwp = Jdwp::get_instance();
1059     let mut param = task.local_args[0].clone();
1060     param.push(':');
1061     param.push_str(parameter);
1062 
1063     let ret = jdwp.send_fd_to_target(pid, target_fd, param.as_str()).await;
1064     if !ret {
1065         crate::error!("not found pid:{:?}", pid);
1066         echo_client(
1067             session_id,
1068             channel_id,
1069             format!("fport fail:pid not found:{}", pid).as_str(),
1070             MessageLevel::Fail,
1071         )
1072         .await;
1073         task_finish(session_id, channel_id).await;
1074         return false;
1075     }
1076 
1077     let vec_none = Vec::<u8>::new();
1078     send_to_task(
1079         session_id,
1080         channel_id,
1081         HdcCommand::ForwardActiveMaster, // 04
1082         &vec_none,
1083         0,
1084         cid,
1085     )
1086     .await;
1087     crate::info!("setup_jdwp_point return true");
1088     true
1089 }
1090 
echo_client(_session_id: u32, channel_id: u32, message: &str, _level: MessageLevel)1091 async fn echo_client(_session_id: u32, channel_id: u32, message: &str, _level: MessageLevel) {
1092     #[cfg(feature = "host")]
1093     {
1094         let level = match _level {
1095             MessageLevel::Ok => transfer::EchoLevel::OK,
1096             MessageLevel::Fail => transfer::EchoLevel::FAIL,
1097             MessageLevel::Info => transfer::EchoLevel::INFO,
1098         };
1099         let _ =
1100             transfer::send_channel_msg(channel_id, level, message.to_string())
1101                 .await;
1102         return;
1103     }
1104     #[allow(unreachable_code)]
1105     {
1106         hdctransfer::echo_client(_session_id, channel_id, message.as_bytes().to_vec(), _level)
1107             .await;
1108     }
1109 }
1110 
task_finish(session_id: u32, channel_id: u32)1111 async fn task_finish(session_id: u32, channel_id: u32) {
1112     transfer_task_finish(channel_id, session_id).await;
1113 }
1114 
1115 #[cfg(not(target_os = "windows"))]
daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String)1116 pub async fn daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String) {
1117     let name: Vec<u8> = path.as_bytes().to_vec();
1118     let mut socket_name = vec![0_u8; name.len() + 1];
1119     socket_name[0] = b'\0';
1120     name.iter().enumerate().for_each(|(i, e)| {
1121         socket_name[i + 1] = *e;
1122     });
1123     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1124     if let Ok(addr_obj) = &addr {
1125         let ret: Result<(), Error> = UdsClient::wrap_connect(fd, addr_obj);
1126         if ret.is_err() {
1127             echo_client(
1128                 session_id,
1129                 channel_id,
1130                 "localabstract connect fail",
1131                 MessageLevel::Fail,
1132             )
1133             .await;
1134             free_context(session_id, channel_id, 0, true).await;
1135             return;
1136         }
1137         send_active_master(session_id, channel_id).await;
1138         read_data_to_forward(session_id, channel_id).await;
1139     }
1140 }
1141 
1142 #[cfg(target_os = "windows")]
setup_file_point(_session_id: u32, _channel_id: u32) -> bool1143 pub async fn setup_file_point(_session_id: u32, _channel_id: u32) -> bool {
1144     false
1145 }
1146 
1147 #[cfg(not(target_os = "windows"))]
setup_file_point(session_id: u32, channel_id: u32) -> bool1148 pub async fn setup_file_point(session_id: u32, channel_id: u32) -> bool {
1149     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1150         crate::error!(
1151             "setup_file_point get task is none session_id={session_id},channel_id={channel_id}"
1152         );
1153         return false;
1154     };
1155     let task = &mut task.clone();
1156     let s_node_cfg = task.local_args[1].clone();
1157     if task.is_master {
1158         if task.forward_type == ForwardType::Reserved
1159             || task.forward_type == ForwardType::FileSystem
1160         {
1161             let _ = fs::remove_file(s_node_cfg.clone());
1162         }
1163         if !server_socket_bind_listen(session_id, channel_id, s_node_cfg, task.context_forward.id)
1164             .await
1165         {
1166             crate::error!(
1167                 "server socket bind listen failed channel_id={:?}",
1168                 channel_id
1169             );
1170             task_finish(session_id, channel_id).await;
1171             return false;
1172         }
1173     } else if task.forward_type == ForwardType::Abstract {
1174         let fd: i32 = UdsClient::wrap_socket(AF_LOCAL);
1175         unsafe {
1176             libc::fcntl(fd, F_SETFD, FD_CLOEXEC);
1177         }
1178         task.context_forward.fd = fd;
1179         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1180         daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
1181     } else {
1182         let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
1183         task.context_forward.fd = fd;
1184         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1185         daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
1186     }
1187     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1188     true
1189 }
1190 
setup_point(session_id: u32, channel_id: u32) -> bool1191 pub async fn setup_point(session_id: u32, channel_id: u32) -> bool {
1192     if !detech_forward_type(session_id, channel_id).await {
1193         crate::error!("forward type is not true");
1194         return false;
1195     }
1196     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1197         crate::error!(
1198             "setup_point get task is none session_id={session_id},channel_id={channel_id}"
1199         );
1200         return false;
1201     };
1202     let task = &mut task.clone();
1203     if cfg!(target_os = "windows") && task.forward_type != ForwardType::Tcp {
1204         task.context_forward.last_error = String::from("Not support forward-type");
1205         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1206         return false;
1207     }
1208     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1209     let mut ret = false;
1210     crate::info!("setup_point forward type:{:#?}", task.forward_type);
1211     match task.forward_type {
1212         ForwardType::Tcp => {
1213             ret = setup_tcp_point(session_id, channel_id).await;
1214         }
1215         ForwardType::Device => {
1216             if !cfg!(target_os = "windows") {
1217                 ret = setup_device_point(session_id, channel_id).await;
1218             }
1219         }
1220         ForwardType::Jdwp | ForwardType::Ark => {
1221             crate::info!("setup_point ark case");
1222             if !cfg!(feature = "host") {
1223                 ret = setup_jdwp_point(session_id, channel_id).await;
1224             }
1225         }
1226         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1227             if !cfg!(target_os = "windows") {
1228                 ret = setup_file_point(session_id, channel_id).await;
1229             }
1230         }
1231     };
1232     crate::info!("setup_point, ret:{ret}");
1233     ret
1234 }
1235 
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool1236 pub async fn send_to_task(
1237     session_id: u32,
1238     channel_id: u32,
1239     command: HdcCommand,
1240     buf_ptr: &[u8],
1241     buf_size: usize,
1242     cid: u32,
1243 ) -> bool {
1244     if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1245         crate::error!("send task buf_size oversize");
1246         return false;
1247     }
1248 
1249     let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1250     new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1251     let file_check_message = TaskMessage {
1252         channel_id,
1253         command,
1254         payload: new_buf,
1255     };
1256     transfer::put(session_id, file_check_message).await;
1257     true
1258 }
1259 
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>1260 pub async fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1261     let bytes = &_payload[4..];
1262     let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1263     if let Ok(content) = ct {
1264         let mut id_bytes = [0u8; 4];
1265         id_bytes.copy_from_slice(&_payload[0..4]);
1266         let id: u32 = u32::from_be_bytes(id_bytes);
1267         return Ok((content, id));
1268     }
1269     Err(Error::new(ErrorKind::Other, "filter command failure"))
1270 }
1271 
send_active_master(session_id: u32, channel_id: u32) -> bool1272 pub async fn send_active_master(session_id: u32, channel_id: u32) -> bool {
1273     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1274         crate::error!(
1275             "send_active_master get task is none session_id={session_id},channel_id={channel_id}"
1276         );
1277         return false;
1278     };
1279     let task = &mut task.clone();
1280     if task.context_forward.check_order {
1281         let flag = [0u8; 1];
1282         send_to_task(
1283             session_id,
1284             channel_id,
1285             HdcCommand::ForwardCheckResult,
1286             &flag,
1287             1,
1288             task.context_forward.id,
1289         )
1290         .await;
1291         free_context(session_id, channel_id, 0, false).await;
1292         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1293         return true;
1294     }
1295     if !send_to_task(
1296         session_id,
1297         channel_id,
1298         HdcCommand::ForwardActiveMaster,
1299         &Vec::<u8>::new(),
1300         0,
1301         task.context_forward.id,
1302     )
1303     .await
1304     {
1305         free_context(session_id, channel_id, 0, true).await;
1306         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1307         return false;
1308     }
1309     true
1310 }
1311 
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool1312 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1313     let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1314         crate::error!("cmd argv  is not int utf8");
1315         return false;
1316     };
1317     crate::info!("begin forward, command: {:?}", command);
1318     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1319         crate::error!("begin forward get task is none");
1320         return false;
1321     };
1322     let task = &mut task.clone();
1323     task.task_command = command.clone();
1324     let result = Base::split_command_to_args(&command);
1325     let argv = result.0;
1326     let argc = result.1;
1327     task.context_forward.id = get_id(_payload);
1328     task.is_master = true;
1329 
1330     if argc < ARG_COUNT2 {
1331         crate::error!("argc < 2 parse is failed.");
1332         return false;
1333     }
1334     if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1335         crate::error!("parse's length is flase.");
1336         return false;
1337     }
1338     if !check_node_info(&argv[0], &mut task.local_args).await {
1339         crate::error!("check argv[0] node info is flase.");
1340         return false;
1341     }
1342     if !check_node_info(&argv[1], &mut task.remote_args).await {
1343         crate::error!("check argv[1] node info is flase.");
1344         return false;
1345     }
1346     task.remote_parameters = argv[1].clone();
1347     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1348     if !setup_point(session_id, channel_id).await {
1349         crate::error!("setup point return false");
1350         return false;
1351     }
1352 
1353     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1354         crate::error!("begin forward get task is none");
1355         return false;
1356     };
1357     let task = &mut task.clone();
1358     task.map_ctx_point
1359         .insert(task.context_forward.id, task.context_forward.clone());
1360 
1361     let wake_up_message = TaskMessage {
1362         channel_id,
1363         command: HdcCommand::KernelWakeupSlavetask,
1364         payload: Vec::<u8>::new(),
1365     };
1366     transfer::put(session_id, wake_up_message).await;
1367 
1368     let buf_string: Vec<u8> = argv[1].as_bytes().to_vec();
1369     let mut new_buf = vec![0_u8; buf_string.len() + 9];
1370     buf_string.iter().enumerate().for_each(|(i, e)| {
1371         new_buf[i + 8] = *e;
1372     });
1373     send_to_task(
1374         session_id,
1375         channel_id,
1376         HdcCommand::ForwardCheck,
1377         &new_buf,
1378         buf_string.len() + 9,
1379         task.context_forward.id,
1380     )
1381     .await;
1382     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1383     true
1384 }
1385 
slave_connect( session_id: u32, channel_id: u32, _payload: &[u8], check_order: bool, ) -> bool1386 pub async fn slave_connect(
1387     session_id: u32,
1388     channel_id: u32,
1389     _payload: &[u8],
1390     check_order: bool,
1391 ) -> bool {
1392     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1393         crate::error!(
1394             "slave_connect get task is none session_id={session_id},channel_id={channel_id}"
1395         );
1396         return false;
1397     };
1398     let task = &mut task.clone();
1399     task.is_master = false;
1400     task.context_forward.check_order = check_order;
1401     if let Ok((content, id)) = filter_command(_payload).await {
1402         let content = &content[8..].trim_end_matches('\0').to_string();
1403         task.task_command = content.clone();
1404         if !check_node_info(content, &mut task.local_args).await {
1405             crate::error!("check local args is false");
1406             return false;
1407         }
1408         task.context_forward.id = id;
1409     }
1410     task.map_ctx_point
1411         .insert(task.context_forward.id, task.context_forward.clone());
1412     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1413     if !check_order {
1414         if !setup_point(session_id, channel_id).await {
1415             crate::error!("setup point return false, free context");
1416             free_context(session_id, channel_id, 0, true).await;
1417             return false;
1418         }
1419     } else {
1420         send_active_master(session_id, channel_id).await;
1421     }
1422     true
1423 }
1424 
read_data_to_forward(session_id: u32, channel_id: u32) -> bool1425 pub async fn read_data_to_forward(session_id: u32, channel_id: u32) -> bool {
1426     let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
1427         crate::error!(
1428             "read_data_to_forward get task is none session_id={session_id},channel_id={channel_id}"
1429         );
1430         return false;
1431     };
1432     let task = &mut task;
1433     let cid = task.context_forward.id;
1434     match task.forward_type {
1435         ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
1436             ylong_runtime::spawn(async move {
1437                 TcpReadStreamMap::read(session_id, channel_id, cid).await
1438             });
1439         }
1440         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1441             let _fd = task.context_forward.fd;
1442             #[cfg(not(target_os = "windows"))]
1443             ylong_runtime::spawn(async move {
1444                 deamon_read_socket_msg(session_id, channel_id, _fd).await
1445             });
1446         }
1447         ForwardType::Device =>
1448         {
1449             #[cfg(not(target_os = "windows"))]
1450             if !setup_device_point(session_id, channel_id).await {
1451                 return false;
1452             }
1453         }
1454     }
1455     true
1456 }
1457 
write_forward_bufer( session_id: u32, channel_id: u32, _id: u32, content: Vec<u8>, ) -> bool1458 pub async fn write_forward_bufer(
1459     session_id: u32,
1460     channel_id: u32,
1461     _id: u32,
1462     content: Vec<u8>,
1463 ) -> bool {
1464     let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
1465         crate::error!(
1466             "write_forward_bufer get task is none session_id={session_id},channel_id={channel_id}"
1467         );
1468         return false;
1469     };
1470     let task = &mut task;
1471     if task.forward_type == ForwardType::Tcp {
1472         TcpWriteStreamMap::write(channel_id, content).await;
1473     } else {
1474         #[cfg(not(target_os = "windows"))]
1475         {
1476             let fd = task.context_forward.fd;
1477             UdsClient::wrap_send(fd, &content);
1478         }
1479     }
1480     true
1481 }
1482 
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1483 pub async fn forward_command_dispatch(
1484     session_id: u32,
1485     channel_id: u32,
1486     command: HdcCommand,
1487     _payload: &[u8],
1488 ) -> bool {
1489     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1490         crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1491         );
1492         return false;
1493     };
1494     let task: &mut HdcForward = &mut task.clone();
1495     let mut ret: bool = true;
1496     if let Ok((_content, id)) = filter_command(_payload).await {
1497         task.context_forward.id = id;
1498     }
1499     let send_msg = _payload[4..].to_vec();
1500     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1501     match command {
1502         HdcCommand::ForwardCheckResult => {
1503             ret = check_command(session_id, channel_id, _payload).await;
1504         }
1505         HdcCommand::ForwardData => {
1506             ret = write_forward_bufer(session_id, channel_id, task.context_forward.id, send_msg)
1507                 .await;
1508         }
1509         HdcCommand::ForwardFreeContext => {
1510             free_context(session_id, channel_id, 0, false).await;
1511         }
1512         HdcCommand::ForwardActiveMaster => {
1513             ret = true;
1514         }
1515         _ => {
1516             ret = false;
1517         }
1518     }
1519     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1520     ret
1521 }
1522 
get_last_error(session_id: u32, channel_id: u32) -> io::Result<String>1523 async fn get_last_error(session_id: u32, channel_id: u32) -> io::Result<String> {
1524     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1525         return Err(Error::new(ErrorKind::Other, "task not found."));
1526     };
1527     Ok(task.context_forward.last_error)
1528 }
1529 
print_error_info(session_id: u32, channel_id: u32)1530 async fn print_error_info(session_id: u32, channel_id: u32) {
1531     if let Ok(error) = get_last_error(session_id, channel_id).await {
1532         echo_client(
1533             session_id,
1534             channel_id,
1535             error.as_str(),
1536             MessageLevel::Fail,
1537         )
1538         .await;
1539     }
1540 }
1541 
command_dispatch( session_id: u32, channel_id: u32, _command: HdcCommand, _payload: &[u8], _payload_size: u16, ) -> bool1542 pub async fn command_dispatch(
1543     session_id: u32,
1544     channel_id: u32,
1545     _command: HdcCommand,
1546     _payload: &[u8],
1547     _payload_size: u16,
1548 ) -> bool {
1549     crate::info!("command_dispatch command recv: {:?}", _command);
1550     let ret = match _command {
1551         HdcCommand::ForwardInit => begin_forward(session_id, channel_id, _payload).await,
1552         HdcCommand::ForwardCheck => {
1553             slave_connect(session_id, channel_id, _payload, true).await
1554         }
1555         HdcCommand::ForwardActiveSlave => {
1556             slave_connect(session_id, channel_id, _payload, false).await
1557         }
1558         _ => forward_command_dispatch(session_id, channel_id, _command, _payload).await,
1559     };
1560     crate::info!("command dispatch ret: {:?}", ret);
1561     if !ret {
1562         print_error_info(session_id, channel_id).await;
1563         task_finish(session_id, channel_id).await;
1564         return false;
1565     }
1566     ret
1567 }
1568