• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19 
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 #[cfg(not(target_os = "windows"))]
26 use std::fs::{self, File, OpenOptions};
27 #[cfg(not(target_os = "windows"))]
28 use std::io::{self, Error, ErrorKind, Read, Write};
29 use ylong_runtime::sync::{Mutex, RwLock};
30 
31 use crate::common::base::Base;
32 use crate::common::hdctransfer::transfer_task_finish;
33 use crate::common::hdctransfer::{self, HdcTransferBase};
34 #[cfg(not(feature = "host"))]
35 use crate::common::jdwp::Jdwp;
36 #[cfg(not(target_os = "windows"))]
37 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
38 use crate::config::HdcCommand;
39 use crate::config::MessageLevel;
40 use crate::config::TaskMessage;
41 use crate::transfer;
42 #[allow(unused)]
43 use crate::utils::hdc_log::*;
44 use crate::{config, utils};
45 use std::sync::Arc;
46 #[cfg(not(feature = "host"))]
47 use std::time::Duration;
48 use ylong_runtime::io::AsyncReadExt;
49 use ylong_runtime::io::AsyncWriteExt;
50 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
51 use ylong_runtime::task::JoinHandle;
52 
53 pub const ARG_COUNT2: u32 = 2;
54 pub const BUF_SIZE_SMALL: usize = 256;
55 pub const SOCKET_BUFFER_SIZE: usize = 65535;
56 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
57 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
58 
59 #[cfg(feature = "host")]
60 #[derive(Clone, Debug)]
61 pub struct HdcForwardInfo {
62     pub session_id: u32,
63     pub channel_id: u32,
64     pub forward_direction: bool,
65     pub task_string: String,
66     pub connect_key: String,
67 }
68 
69 #[cfg(feature = "host")]
70 impl HdcForwardInfo {
new( session_id: u32, channel_id: u32, forward_direction: bool, task_string: String, connect_key: String, ) -> Self71     fn new(
72         session_id: u32,
73         channel_id: u32,
74         forward_direction: bool,
75         task_string: String,
76         connect_key: String,
77     ) -> Self {
78         Self {
79             session_id,
80             channel_id,
81             forward_direction,
82             task_string,
83             connect_key,
84         }
85     }
86 }
87 
88 #[derive(Default, Eq, PartialEq, Clone, Debug)]
89 enum ForwardType {
90     #[default]
91     Tcp = 0,
92     Device,
93     Abstract,
94     FileSystem,
95     Jdwp,
96     Ark,
97     Reserved,
98 }
99 
100 #[derive(Debug, Default, PartialEq, Eq, Clone)]
101 pub struct ContextForward {
102     session_id: u32,
103     channel_id: u32,
104     check_order: bool,
105     is_master: bool,
106     id: u32,
107     fd: i32,
108     target_fd: i32,
109     forward_type: ForwardType,
110     local_args: Vec<String>,
111     remote_args: Vec<String>,
112     task_command: String,
113     last_error: String,
114     remote_parameters: String,
115     dev_path: String,
116 }
117 
118 #[cfg(feature = "host")]
119 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
120 #[cfg(feature = "host")]
121 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
122 #[cfg(feature = "host")]
123 pub struct HdcForwardInfoMap {}
124 #[cfg(feature = "host")]
125 impl HdcForwardInfoMap {
get_instance() -> HdcForwardInfoMap_126     fn get_instance() -> HdcForwardInfoMap_ {
127         static mut MAP: Option<HdcForwardInfoMap_> = None;
128         unsafe {
129             MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
130                 .clone()
131         }
132     }
133 
put(forward_info: HdcForwardInfo)134     async fn put(forward_info: HdcForwardInfo) {
135         let instance = Self::get_instance();
136         let mut map = instance.lock().await;
137         map.insert(
138             forward_info.task_string.clone(),
139             Arc::new(Mutex::new(forward_info)),
140         );
141     }
142 
get_all_forward_infos() -> Vec<HdcForwardInfo>143     pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
144         let instance = Self::get_instance();
145         let map = instance.lock().await;
146         let mut result = Vec::new();
147         for (_key, value) in map.iter() {
148             let info = value.lock().await;
149             result.push((*info).clone());
150         }
151         result
152     }
153 
remove_forward(task_string: String, forward_direction: bool) -> bool154     pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
155         crate::info!(
156             "remove_forward task_string:{}, direction:{}",
157             task_string,
158             forward_direction
159         );
160         let instance = Self::get_instance();
161         let map = instance.lock().await;
162         let mut remove_key = String::new();
163         let prefix = if forward_direction {
164             "1|".to_string()
165         } else {
166             "0|".to_string()
167         };
168         let mut task_string1 = prefix;
169         task_string1.push_str(task_string.as_str());
170         for (key, value) in map.iter() {
171             let info = value.lock().await;
172             if info.task_string.contains(&task_string1)
173                 && info.forward_direction == forward_direction
174             {
175                 remove_key = (*key.clone()).to_string();
176                 break;
177             }
178         }
179         drop(map);
180         if remove_key.is_empty() {
181             return false;
182         }
183 
184         let mut map = instance.lock().await;
185         let result = map.remove(&remove_key);
186         result.is_some()
187     }
188 }
189 
190 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
191 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
192 pub struct TcpWriteStreamMap {}
193 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_194     fn get_instance() -> TcpWriterMap_ {
195         static mut TCP_MAP: Option<TcpWriterMap_> = None;
196         unsafe {
197             TCP_MAP
198                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
199                 .clone()
200         }
201     }
202     #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)203     async fn put(id: u32, wr: SplitWriteHalf) {
204         let instance = Self::get_instance();
205         let mut map = instance.write().await;
206         let arc_wr = Arc::new(Mutex::new(wr));
207         map.insert(id, arc_wr);
208     }
209     #[allow(unused)]
write(id: u32, data: Vec<u8>) -> bool210     async fn write(id: u32, data: Vec<u8>) -> bool {
211         let arc_map = Self::get_instance();
212         let map = arc_map.write().await;
213         let Some(arc_wr) = map.get(&id) else {
214             crate::error!("TcpWriteStreamMap failed to get id {:#?}", id);
215             return false;
216         };
217         let mut wr = arc_wr.lock().await;
218         let write_result = wr.write_all(data.as_slice()).await;
219         if write_result.is_err() {
220             crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id);
221         }
222         true
223     }
224 
end(id: u32)225     pub async fn end(id: u32) {
226         let instance = Self::get_instance();
227         let mut map = instance.write().await;
228         if let Some(arc_wr) = map.remove(&id) {
229             let mut wr = arc_wr.lock().await;
230             let _ = wr.shutdown().await;
231         }
232     }
233 }
234 
235 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
236 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
237 pub struct TcpListenerMap {}
238 impl TcpListenerMap {
get_instance() -> TcpListenerMap_239     fn get_instance() -> TcpListenerMap_ {
240         static mut TCP_MAP: Option<TcpListenerMap_> = None;
241         unsafe {
242             TCP_MAP
243                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
244                 .clone()
245         }
246     }
247     #[allow(unused)]
put(id: u32, listener: JoinHandle<()>)248     async fn put(id: u32, listener: JoinHandle<()>) {
249         let instance = Self::get_instance();
250         let mut map = instance.write().await;
251         let arc_listener = Arc::new(Mutex::new(listener));
252         map.insert(id, arc_listener);
253         crate::info!("forward tcp put listener id = {id}");
254     }
255 
end(id: u32)256     pub async fn end(id: u32) {
257         let instance = Self::get_instance();
258         let mut map = instance.write().await;
259         if let Some(arc_listener) = map.remove(&id) {
260             let join_handle = arc_listener.lock().await;
261             join_handle.cancel();
262         }
263     }
264 }
265 
266 type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>;
267 pub struct ForwardContextMap {}
268 impl ForwardContextMap {
get_instance() -> MapContextForward_269     fn get_instance() -> MapContextForward_ {
270         static mut FORWARD_MAP: Option<MapContextForward_> = None;
271         unsafe {
272             FORWARD_MAP
273                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
274                 .clone()
275         }
276     }
277 
add(cid: u32, value: ContextForward)278     pub async fn add(cid: u32, value: ContextForward) {
279         let map = Self::get_instance();
280         let mut map = map.lock().await;
281         map.insert(cid, value.clone());
282     }
283 
update(cid: u32, value: ContextForward)284     pub async fn update(cid: u32, value: ContextForward) {
285         let map = Self::get_instance();
286         let mut map = map.lock().await;
287         map.insert(cid, value.clone());
288     }
289 
remove(cid: u32)290     pub async fn remove(cid: u32) {
291         crate::info!("ContextForward remove, cid:{}", cid);
292         let map = Self::get_instance();
293         let mut map = map.lock().await;
294         let _ = map.remove(&cid);
295     }
296 
get(cid: u32) -> Option<ContextForward>297     pub async fn get(cid: u32) -> Option<ContextForward> {
298         let arc = Self::get_instance();
299         let map = arc.lock().await;
300         let task = map.get(&cid);
301         match task {
302             Some(task) => Some(task.clone()),
303             None => {
304                 crate::debug!("ContextForward result:is none,cid={:#?}", cid,);
305                 Option::None
306             }
307         }
308     }
309 }
310 
311 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
312 pub struct ForwardTaskMap {}
313 impl ForwardTaskMap {
get_instance() -> MapForward_314     fn get_instance() -> MapForward_ {
315         static mut FORWARD_MAP: Option<MapForward_> = None;
316         unsafe {
317             FORWARD_MAP
318                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
319                 .clone()
320         }
321     }
322 
update(session_id: u32, channel_id: u32, value: HdcForward)323     pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
324         let map = Self::get_instance();
325         let mut map = map.lock().await;
326         map.insert((session_id, channel_id), value.clone());
327     }
328 
remove(session_id: u32, channel_id: u32)329     pub async fn remove(session_id: u32, channel_id: u32) {
330         crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
331         let map = Self::get_instance();
332         let mut map = map.lock().await;
333         let _ = map.remove(&(session_id, channel_id));
334     }
335 
get(session_id: u32, channel_id: u32) -> Option<HdcForward>336     pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
337         let arc = Self::get_instance();
338         let map = arc.lock().await;
339         let task = map.get(&(session_id, channel_id));
340         match task {
341             Some(task) => Some(task.clone()),
342             None => {
343                 crate::debug!(
344                     "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}",
345                     session_id,
346                     channel_id
347                 );
348                 Option::None
349             }
350         }
351     }
352 
get_channel_id(session_id: u32, task_string: String) -> Option<u32>353     pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
354         let arc = Self::get_instance();
355         let map = arc.lock().await;
356         for ((_session_id, _channel_id), value) in map.iter() {
357             if *_session_id == session_id && task_string.contains(&value.task_command) {
358                 return Some(*_channel_id);
359             }
360         }
361         None
362     }
363 
clear(session_id: u32)364     pub async fn clear(session_id: u32) {
365         let arc = Self::get_instance();
366         let mut channel_list = Vec::new();
367         {
368             let map = arc.lock().await;
369             if map.is_empty() {
370                 return;
371             }
372             for (&key, _) in map.iter() {
373                 if key.0 == session_id {
374                     let id = key;
375                     channel_list.push(id);
376                 }
377             }
378         }
379         for id in channel_list {
380             free_channel_task(id.0, id.1).await;
381         }
382     }
383 
dump_task() -> String384     pub async fn dump_task() -> String {
385         let arc = Self::get_instance();
386         let map = arc.lock().await;
387         let mut result = String::new();
388         for (_id, forward_task) in map.iter() {
389             let forward_type = match forward_task.remote_args.len() {
390                 0 => "fport".to_string(),
391                 2 => "rport".to_string(),
392                 _ => "unknown".to_string(),
393             };
394             let first_args = match forward_task.remote_args.len() {
395                 0 => "unknown".to_string(),
396                 2 => format!(
397                     "{}:{}",
398                     forward_task.local_args[0], forward_task.local_args[1]
399                 ),
400                 _ => "unknown".to_string(),
401             };
402             let second_args = match forward_task.remote_args.len() {
403                 0 => format!(
404                     "{}:{}",
405                     forward_task.local_args[0], forward_task.local_args[1]
406                 ),
407                 2 => format!(
408                     "{}:{}",
409                     forward_task.remote_args[0], forward_task.remote_args[1]
410                 ),
411                 _ => "unknown".to_string(),
412             };
413             result.push_str(&format!(
414                 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
415                 forward_task.session_id,
416                 forward_task.channel_id,
417                 forward_type,
418                 first_args,
419                 second_args
420             ));
421         }
422         result
423     }
424 }
425 
free_channel_task(session_id: u32, channel_id: u32)426 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
427     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
428         return;
429     };
430     crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}");
431     let task = &mut task.clone();
432     let cid = task.context_forward.id;
433     match task.forward_type {
434         ForwardType::Tcp => {
435             TcpWriteStreamMap::end(cid).await;
436             TcpListenerMap::end(channel_id).await;
437         }
438         ForwardType::Jdwp | ForwardType::Ark => {
439             TcpWriteStreamMap::end(cid).await;
440             let ret = unsafe { libc::close(task.context_forward.fd) };
441             crate::debug!(
442                 "close context_forward fd, ret={}, session_id={}, channel_id={}",
443                 ret,
444                 session_id,
445                 channel_id,
446             );
447             let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) };
448             crate::debug!(
449                 "close context_forward target fd, ret={}, session_id={}, channel_id={}",
450                 target_fd_ret,
451                 session_id,
452                 channel_id,
453             );
454             TcpListenerMap::end(channel_id).await;
455         }
456         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
457             #[cfg(not(target_os = "windows"))]
458             UdsServer::wrap_close(task.context_forward.fd);
459         }
460         ForwardType::Device => {
461             return;
462         }
463     }
464     ForwardTaskMap::remove(session_id, channel_id).await;
465 }
466 
stop_task(session_id: u32)467 pub async fn stop_task(session_id: u32) {
468     ForwardTaskMap::clear(session_id).await;
469 }
470 
dump_task() -> String471 pub async fn dump_task() -> String {
472     ForwardTaskMap::dump_task().await
473 }
474 
475 #[derive(Debug, Default, Clone, PartialEq, Eq)]
476 pub struct HdcForward {
477     session_id: u32,
478     channel_id: u32,
479     server_or_daemon: bool,
480     local_args: Vec<String>,
481     remote_args: Vec<String>,
482     task_command: String,
483     forward_type: ForwardType,
484     context_forward: ContextForward,
485     pub transfer: HdcTransferBase,
486 }
487 
488 impl HdcForward {
new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self489     pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self {
490         Self {
491             session_id,
492             channel_id,
493             server_or_daemon,
494             local_args: Default::default(),
495             remote_args: Default::default(),
496             task_command: Default::default(),
497             forward_type: Default::default(),
498             context_forward: Default::default(),
499             transfer: HdcTransferBase::new(session_id, channel_id),
500         }
501     }
502 }
503 
check_node_info(value: &String, arg: &mut Vec<String>) -> bool504 pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
505     crate::info!("check cmd args is : {:#?}", value);
506     if !value.contains(':') {
507         return false;
508     }
509     let array = value.split(':').collect::<Vec<&str>>();
510     if array[0] == "tcp" {
511         if array[1].len() > config::MAX_PORT_LEN {
512             crate::error!(
513                 "forward port = {:#?} it'slength is wrong, can not more than five",
514                 array[1]
515             );
516             return false;
517         }
518 
519         match array[1].parse::<u32>() {
520             Ok(port) => {
521                 if port == 0 || port > config::MAX_PORT_NUM {
522                     crate::error!("port can not greater than: 65535");
523                     return false;
524                 }
525             }
526             Err(_) => {
527                 crate::error!("port must is int type, port is: {:#?}", array[1]);
528                 return false;
529             }
530         }
531     }
532     for item in array.iter() {
533         arg.push(String::from(item.to_owned()));
534     }
535     true
536 }
537 
538 #[cfg(feature = "host")]
on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()>539 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
540     crate::info!("on_forward_success");
541     let channel_id = task_message.channel_id;
542     let payload = task_message.payload;
543     let forward_direction = payload[0] == b'1';
544     let connect_key = "unknow key".to_string();
545     let task_string = String::from_utf8(payload);
546     match task_string {
547         Ok(task_string) => {
548             let info = HdcForwardInfo::new(
549                 session_id,
550                 channel_id,
551                 forward_direction,
552                 task_string,
553                 connect_key,
554             );
555             HdcForwardInfoMap::put(info).await;
556         }
557         Err(err) => {
558             crate::error!("payload to String failed. {err}");
559         }
560     }
561     transfer::TcpMap::end(task_message.channel_id).await;
562     Ok(())
563 }
564 
check_command( ctx: &mut ContextForward, _payload: &[u8], server_or_daemon: bool, ) -> bool565 pub async fn check_command(
566     ctx: &mut ContextForward,
567     _payload: &[u8],
568     server_or_daemon: bool,
569 ) -> bool {
570     let channel_id = ctx.channel_id;
571     if !_payload.is_empty() {
572         hdctransfer::echo_client(
573             ctx.session_id,
574             channel_id,
575             "Forwardport result:OK",
576             MessageLevel::Ok,
577         )
578         .await;
579         let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command;
580 
581         let mut command_string = vec![0_u8; map_info.len() + 1];
582         map_info
583             .as_bytes()
584             .to_vec()
585             .iter()
586             .enumerate()
587             .for_each(|(i, e)| {
588                 command_string[i] = *e;
589             });
590         let forward_success_message = TaskMessage {
591             channel_id,
592             command: HdcCommand::ForwardSuccess,
593             payload: command_string,
594         };
595         #[cfg(feature = "host")]
596         {
597             let _ = on_forward_success(forward_success_message, ctx.session_id).await;
598         }
599         #[cfg(not(feature = "host"))]
600         {
601             transfer::put(ctx.session_id, forward_success_message).await;
602         }
603     } else {
604         hdctransfer::echo_client(
605             ctx.session_id,
606             channel_id,
607             "Forwardport result: Failed",
608             MessageLevel::Fail,
609         )
610         .await;
611         free_context(ctx.id, true).await;
612         return false;
613     }
614     true
615 }
616 
detech_forward_type(ctx_point: &mut ContextForward) -> bool617 pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool {
618     let type_str = &ctx_point.local_args[0];
619     match type_str.as_str() {
620         "tcp" => {
621             ctx_point.forward_type = ForwardType::Tcp;
622         }
623         "dev" => {
624             ctx_point.forward_type = ForwardType::Device;
625         }
626         "localabstract" => {
627             ctx_point.forward_type = ForwardType::Abstract;
628         }
629         "localfilesystem" => {
630             ctx_point.local_args[1] =
631                 HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
632             ctx_point.forward_type = ForwardType::FileSystem;
633         }
634         "jdwp" => {
635             ctx_point.forward_type = ForwardType::Jdwp;
636         }
637         "ark" => {
638             ctx_point.forward_type = ForwardType::Ark;
639         }
640         "localreserved" => {
641             ctx_point.local_args[1] =
642                 FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
643             ctx_point.forward_type = ForwardType::Reserved;
644         }
645         _ => {
646             crate::error!("this forward type may is not expected");
647             return false;
648         }
649     }
650     true
651 }
652 
forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()>653 pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> {
654     let saddr = format!("127.0.0.1:{}", port);
655     let cid = ctx.id;
656     let session_tmp = ctx.session_id;
657     let channel_tmp = ctx.channel_id;
658     crate::info!(
659         "forward_ tcp_accept bind addr:{:#?}, cid = {:#?}",
660         saddr,
661         cid
662     );
663     let result = TcpListener::bind(saddr.clone()).await;
664     match result {
665         Ok(listener) => {
666             crate::info!("forward_ tcp_accept bind ok");
667             let join_handle = utils::spawn(async move {
668                 loop {
669                     let (stream, _addr) = match listener.accept().await {
670                         Ok((stream, _addr)) => (stream, _addr),
671                         Err(err) => {
672                             crate::error!("listener accept failed, {err}");
673                             continue;
674                         }
675                     };
676                     let (rd, wr) = stream.into_split();
677                     TcpWriteStreamMap::put(cid, wr).await;
678                     on_accept(cid).await;
679                     recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await;
680                 }
681             });
682             TcpListenerMap::put(channel_tmp, join_handle).await;
683             Ok(())
684         }
685         Err(e) => {
686             crate::error!("forward_ tcp_accept fail:{:#?}", e);
687             Err(e)
688         }
689     }
690 }
691 
on_accept(cid: u32)692 pub async fn on_accept(cid: u32) {
693     let Some(context_forward) = ForwardContextMap::get(cid).await else {
694         crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
695         return;
696     };
697     let ctx = &mut context_forward.clone();
698 
699     let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec();
700     let mut new_buf = vec![0_u8; buf_string.len() + 9];
701 
702     buf_string.iter().enumerate().for_each(|(i, e)| {
703         new_buf[i + 8] = *e;
704     });
705     send_to_task(
706         ctx.session_id,
707         ctx.channel_id,
708         HdcCommand::ForwardActiveSlave,
709         &new_buf,
710         buf_string.len() + 9,
711         ctx.id,
712     )
713     .await;
714 }
715 
daemon_connect_tcp(cid: u32, port: u32)716 pub async fn daemon_connect_tcp(cid: u32, port: u32) {
717     let Some(context_forward) = ForwardContextMap::get(cid).await else {
718         crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
719         return;
720     };
721     let ctx = &mut context_forward.clone();
722 
723     let saddr = format!("127.0.0.1:{}", port);
724     let stream = match TcpStream::connect(saddr).await {
725         Err(err) => {
726             crate::error!("TcpStream::stream failed {:?}", err);
727             free_context(cid, false).await;
728             return;
729         }
730         Ok(addr) => addr,
731     };
732     send_active_master(ctx).await;
733     let (rd, wr) = stream.into_split();
734     TcpWriteStreamMap::put(ctx.id, wr).await;
735     ForwardContextMap::update(ctx.id, ctx.clone()).await;
736     recv_tcp_msg(ctx.session_id, ctx.channel_id, rd, ctx.id).await;
737 }
738 
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)739 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
740     let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
741     loop {
742         match rd.read(&mut data).await {
743             Ok(recv_size) => {
744                 if recv_size == 0 {
745                     crate::info!("recv_size is 0, tcp temporarily closed");
746                     free_context(cid, true).await;
747                     return;
748                 }
749                 send_to_task(
750                     session_id,
751                     channel_id,
752                     HdcCommand::ForwardData,
753                     &data[0..recv_size],
754                     recv_size,
755                     cid,
756                 )
757                 .await;
758             }
759             Err(_e) => {
760                 crate::error!(
761                     "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
762                 );
763             }
764         }
765     }
766 }
767 
768 #[cfg(not(target_os = "windows"))]
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32)769 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) {
770     loop {
771         let result = ylong_runtime::spawn_blocking(move || {
772             let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
773             let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
774             (recv_size, buffer)
775         })
776         .await;
777         let (recv_size, buffer) = match result {
778             Ok((recv_size, _)) if recv_size < 0 => {
779                 crate::error!("local abstract close shutdown fd = {fd}");
780                 free_context(cid, true).await;
781                 return;
782             }
783             Ok((recv_size, buffer)) => (recv_size, buffer),
784             Err(err) => {
785                 crate::error!("read socket msg failed. {err}");
786                 free_context(cid, true).await;
787                 return;
788             }
789         };
790         send_to_task(
791             session_id,
792             channel_id,
793             HdcCommand::ForwardData,
794             &buffer[0..recv_size as usize],
795             recv_size as usize,
796             cid,
797         )
798         .await;
799     }
800 }
801 
free_context(cid: u32, notify_remote: bool)802 pub async fn free_context(cid: u32, notify_remote: bool) {
803     let Some(context_forward) = ForwardContextMap::get(cid).await else {
804         crate::error!("free forward context get cid  is none. cid = {cid}");
805         return;
806     };
807     let ctx = &mut context_forward.clone();
808 
809     if notify_remote {
810         let vec_none = Vec::<u8>::new();
811         send_to_task(
812             ctx.session_id,
813             ctx.channel_id,
814             HdcCommand::ForwardFreeContext,
815             &vec_none,
816             0,
817             ctx.id,
818         )
819         .await;
820     }
821     crate::error!("begin to free forward context cid. cid = {cid}");
822     match ctx.forward_type {
823         ForwardType::Tcp => {
824             TcpWriteStreamMap::end(ctx.id).await;
825         }
826         ForwardType::Jdwp | ForwardType::Ark => {
827             TcpWriteStreamMap::end(ctx.id).await;
828             let ret = unsafe { libc::close(ctx.fd) };
829             crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,);
830             let target_fd_ret = unsafe { libc::close(ctx.target_fd) };
831             crate::debug!(
832                 "close context_forward target fd, ret={}, id={}",
833                 target_fd_ret,
834                 ctx.id,
835             );
836         }
837         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
838             crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd);
839             #[cfg(not(target_os = "windows"))]
840             UdsServer::wrap_close(ctx.fd);
841             ctx.fd = -1;
842         }
843         ForwardType::Device => {}
844     }
845     ForwardContextMap::remove(cid).await;
846 }
847 
setup_tcp_point(ctx: &mut ContextForward) -> bool848 pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool {
849     let Ok(port) = ctx.local_args[1].parse::<u32>() else {
850         crate::error!("setup tcp point parse error");
851         return false;
852     };
853     let cid = ctx.id;
854     if ctx.is_master {
855         let result = forward_tcp_accept(ctx, port).await;
856         ctx.last_error = format!("TCP Port listen failed at {}", port);
857         return result.is_ok();
858     } else {
859         ForwardContextMap::update(ctx.id, ctx.clone()).await;
860         utils::spawn(async move { daemon_connect_tcp(cid, port).await });
861     }
862     true
863 }
864 
865 #[cfg(not(target_os = "windows"))]
server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool866 async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool {
867     let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
868     ctx.fd = fd;
869     let name: Vec<u8> = path.as_bytes().to_vec();
870     let mut socket_name = vec![0_u8; name.len() + 1];
871     socket_name[0] = b'\0';
872     name.iter().enumerate().for_each(|(i, e)| {
873         socket_name[i + 1] = *e;
874     });
875     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
876     ForwardContextMap::update(ctx.id, ctx.clone()).await;
877     let cid = ctx.id;
878     if let Ok(addr_obj) = &addr {
879         let ret = UdsServer::wrap_bind(fd, addr_obj);
880         if ret.is_err() {
881             hdctransfer::echo_client(
882                 ctx.session_id,
883                 ctx.channel_id,
884                 "Unix pipe bind failed",
885                 MessageLevel::Fail,
886             )
887             .await;
888             crate::error!("bind fail");
889             return false;
890         }
891         let ret = UdsServer::wrap_listen(fd);
892         if ret < 0 {
893             hdctransfer::echo_client(
894                 ctx.session_id,
895                 ctx.channel_id,
896                 "Unix pipe listen failed",
897                 MessageLevel::Fail,
898             )
899             .await;
900             crate::error!("listen fail");
901             return false;
902         }
903         utils::spawn(async move {
904             loop {
905                 let client_fd = UdsServer::wrap_accept(fd);
906                 if client_fd == -1 {
907                     break;
908                 }
909                 on_accept(cid).await;
910             }
911         });
912     }
913     true
914 }
915 
canonicalize(path: String) -> Result<String, Error>916 pub async fn canonicalize(path: String) -> Result<String, Error> {
917     match fs::canonicalize(path) {
918         Ok(abs_path) => match abs_path.to_str() {
919             Some(path) => Ok(path.to_string()),
920             None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
921         },
922         Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
923     }
924 }
925 
926 #[cfg(target_os = "windows")]
setup_device_point(_ctx: &mut ContextForward) -> bool927 pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool {
928     false
929 }
930 
931 #[cfg(not(target_os = "windows"))]
setup_device_point(ctx: &mut ContextForward) -> bool932 pub async fn setup_device_point(ctx: &mut ContextForward) -> bool {
933     let s_node_cfg = ctx.local_args[1].clone();
934     let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
935         crate::error!("Open unix-dev failed");
936         return false;
937     };
938     ctx.dev_path = resolv_path.clone();
939     crate::info!("setup_ device_point resolv_path={:?}", resolv_path);
940     let thread_path_ref = Arc::new(Mutex::new(resolv_path));
941     if !send_active_master(ctx).await {
942         crate::error!("send active_master return failed ctx={:?}", ctx);
943         return false;
944     }
945     let session = ctx.session_id;
946     let channel = ctx.channel_id;
947     let cid = ctx.id;
948 
949     ForwardContextMap::update(ctx.id, ctx.clone()).await;
950     utils::spawn(async move {
951         loop {
952             let path = thread_path_ref.lock().await;
953             let Ok(mut file) = File::open(&*path) else {
954                 crate::error!("open {} failed.", *path);
955                 break;
956             };
957             let mut total = Vec::new();
958             let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
959                 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
960             let Ok(read_len) = file.read(&mut buf[4..]) else {
961                 crate::error!("read {} failed.", *path);
962                 break;
963             };
964             if read_len == 0 {
965                 free_context(cid, true).await;
966                 break;
967             }
968             total.append(&mut buf[0..read_len].to_vec());
969             send_to_task(
970                 session,
971                 channel,
972                 HdcCommand::ForwardData,
973                 &total,
974                 read_len,
975                 cid,
976             )
977             .await;
978         }
979     });
980     true
981 }
982 
983 #[cfg(not(feature = "host"))]
get_pid(parameter: &str, forward_type: ForwardType) -> u32984 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
985     match forward_type == ForwardType::Jdwp {
986         true => parameter.parse::<u32>().unwrap_or_else(|e| {
987             crate::error!("Jdwp get pid err :{:?}", e);
988             0_u32
989         }),
990         false => {
991             let params: Vec<&str> = parameter.split('@').collect();
992             params[0].parse::<u32>().unwrap_or_else(|e| {
993                 crate::error!("get pid err :{:?}", e);
994                 0_u32
995             })
996         }
997     }
998 }
999 
1000 #[cfg(feature = "host")]
setup_jdwp_point(_ctx: &mut ContextForward) -> bool1001 pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool {
1002     crate::info!("host not setup_jdwp _point");
1003     false
1004 }
1005 
1006 #[cfg(not(feature = "host"))]
setup_jdwp_point(ctx: &mut ContextForward) -> bool1007 pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool {
1008     let local_args = ctx.local_args[1].clone();
1009     let parameter = local_args.as_str();
1010     let style = &ctx.forward_type;
1011     let pid = get_pid(parameter, style.clone());
1012     let cid = ctx.id;
1013     let session = ctx.session_id;
1014     let channel = ctx.channel_id;
1015     if pid == 0 {
1016         crate::error!("setup jdwp point get pid is 0");
1017         return false;
1018     }
1019 
1020     let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1021     if result.is_err() {
1022         crate::error!("wrap socketpair failed");
1023         return false;
1024     }
1025     let mut target_fd = 0;
1026     let mut local_fd = 0;
1027     if let Ok((fd0, fd1)) = result {
1028         crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1029         local_fd = fd0;
1030         target_fd = fd1;
1031         ctx.fd = local_fd;
1032         ctx.target_fd = target_fd;
1033         target_fd = fd1;
1034     }
1035 
1036     utils::spawn(async move {
1037         loop {
1038             let result = ylong_runtime::spawn_blocking(move || {
1039                 let mut buffer = [0u8; SOCKET_BUFFER_SIZE];
1040                 let size = UdsServer::wrap_read(local_fd, &mut buffer);
1041                 (size, buffer)
1042             })
1043             .await;
1044             let (size, buffer) = match result {
1045                 Ok((size, _)) if size < 0 => {
1046                     crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size);
1047                     free_context(cid, true).await;
1048                     break;
1049                 }
1050                 Ok((0, _)) => {
1051                     ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1052                     continue;
1053                 }
1054                 Ok((size, buffer)) => (size, buffer),
1055                 Err(err) => {
1056                     crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}");
1057                     free_context(cid, true).await;
1058                     break;
1059                 }
1060             };
1061             send_to_task(
1062                 session,
1063                 channel,
1064                 HdcCommand::ForwardData,
1065                 &buffer[0..size as usize],
1066                 size as usize,
1067                 cid,
1068             )
1069             .await;
1070         }
1071     });
1072 
1073     let jdwp = Jdwp::get_instance();
1074     let mut param = ctx.local_args[0].clone();
1075     param.push(':');
1076     param.push_str(parameter);
1077 
1078     let ret = jdwp
1079         .send_fd_to_target(pid, target_fd, local_fd, param.as_str())
1080         .await;
1081     if !ret {
1082         crate::error!("not found pid:{:?}", pid);
1083         hdctransfer::echo_client(
1084             session,
1085             channel,
1086             format!("fport fail:pid not found:{}", pid).as_str(),
1087             MessageLevel::Fail,
1088         )
1089         .await;
1090         task_finish(session, channel).await;
1091         return false;
1092     }
1093 
1094     let vec_none = Vec::<u8>::new();
1095     send_to_task(
1096         session,
1097         channel,
1098         HdcCommand::ForwardActiveMaster, // 04
1099         &vec_none,
1100         0,
1101         cid,
1102     )
1103     .await;
1104     crate::info!("setup_jdwp_ point return true");
1105     true
1106 }
1107 
task_finish(session_id: u32, channel_id: u32)1108 async fn task_finish(session_id: u32, channel_id: u32) {
1109     transfer_task_finish(channel_id, session_id).await;
1110 }
1111 
1112 #[cfg(not(target_os = "windows"))]
daemon_connect_pipe(ctx: &mut ContextForward)1113 pub async fn daemon_connect_pipe(ctx: &mut ContextForward) {
1114     let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec();
1115     let mut socket_name = vec![0_u8; name.len() + 1];
1116     socket_name[0] = b'\0';
1117     name.iter().enumerate().for_each(|(i, e)| {
1118         socket_name[i + 1] = *e;
1119     });
1120     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1121     if let Ok(addr_obj) = &addr {
1122         let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj);
1123         if ret.is_err() {
1124             hdctransfer::echo_client(
1125                 ctx.session_id,
1126                 ctx.channel_id,
1127                 "localabstract connect fail",
1128                 MessageLevel::Fail,
1129             )
1130             .await;
1131             free_context(ctx.id, true).await;
1132             return;
1133         }
1134         send_active_master(ctx).await;
1135         read_data_to_forward(ctx).await;
1136     }
1137 }
1138 
1139 #[cfg(target_os = "windows")]
setup_file_point(_ctx: &mut ContextForward) -> bool1140 pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool {
1141     false
1142 }
1143 
1144 #[cfg(not(target_os = "windows"))]
setup_file_point(ctx: &mut ContextForward) -> bool1145 pub async fn setup_file_point(ctx: &mut ContextForward) -> bool {
1146     let s_node_cfg = ctx.local_args[1].clone();
1147     if ctx.is_master {
1148         if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem
1149         {
1150             let _ = fs::remove_file(s_node_cfg.clone());
1151         }
1152         if !server_socket_bind_listen(ctx, s_node_cfg).await {
1153             crate::error!("server socket bind listen failed id={:?}", ctx.id);
1154             task_finish(ctx.session_id, ctx.channel_id).await;
1155             return false;
1156         }
1157     } else {
1158         if ctx.fd <= 0 {
1159             crate::info!("setup_file _point fd: {:?}", ctx.fd);
1160             if ctx.forward_type == ForwardType::Abstract {
1161                 ctx.fd = UdsClient::wrap_socket(AF_LOCAL);
1162                 unsafe {
1163                     libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC);
1164                 }
1165             } else {
1166                 ctx.fd = UdsClient::wrap_socket(AF_UNIX);
1167             }
1168         }
1169         ForwardContextMap::update(ctx.id, ctx.clone()).await;
1170         daemon_connect_pipe(ctx).await;
1171     }
1172     true
1173 }
1174 
setup_point(ctx: &mut ContextForward) -> bool1175 pub async fn setup_point(ctx: &mut ContextForward) -> bool {
1176     if !detech_forward_type(ctx).await {
1177         crate::error!("forward type is not true");
1178         return false;
1179     }
1180 
1181     if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp {
1182         ctx.last_error = String::from("Not support forward-type");
1183         return false;
1184     }
1185 
1186     let mut ret = false;
1187     match ctx.forward_type {
1188         ForwardType::Tcp => {
1189             ret = setup_tcp_point(ctx).await;
1190         }
1191         ForwardType::Device => {
1192             if !cfg!(target_os = "windows") {
1193                 ret = setup_device_point(ctx).await;
1194             }
1195         }
1196         ForwardType::Jdwp | ForwardType::Ark => {
1197             crate::info!("setup point ark case");
1198             if !cfg!(feature = "host") {
1199                 ret = setup_jdwp_point(ctx).await;
1200             }
1201         }
1202         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1203             if !cfg!(target_os = "windows") {
1204                 ret = setup_file_point(ctx).await;
1205             }
1206         }
1207     };
1208     ForwardContextMap::update(ctx.id, ctx.clone()).await;
1209     ret
1210 }
1211 
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool1212 pub async fn send_to_task(
1213     session_id: u32,
1214     channel_id: u32,
1215     command: HdcCommand,
1216     buf_ptr: &[u8],
1217     buf_size: usize,
1218     cid: u32,
1219 ) -> bool {
1220     if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1221         crate::error!("send task buf_size oversize");
1222         return false;
1223     }
1224 
1225     let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1226     new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1227     let file_check_message = TaskMessage {
1228         channel_id,
1229         command,
1230         payload: new_buf,
1231     };
1232     transfer::put(session_id, file_check_message).await;
1233     true
1234 }
1235 
get_cid(_payload: &[u8]) -> u321236 pub fn get_cid(_payload: &[u8]) -> u32 {
1237     let mut id_bytes = [0u8; 4];
1238     id_bytes.copy_from_slice(&_payload[0..4]);
1239     let id: u32 = u32::from_be_bytes(id_bytes);
1240     id
1241 }
1242 
send_active_master(ctx: &mut ContextForward) -> bool1243 pub async fn send_active_master(ctx: &mut ContextForward) -> bool {
1244     if ctx.check_order {
1245         let flag = [0u8; 1];
1246         send_to_task(
1247             ctx.session_id,
1248             ctx.channel_id,
1249             HdcCommand::ForwardCheckResult,
1250             &flag,
1251             1,
1252             ctx.id,
1253         )
1254         .await;
1255         free_context(ctx.id, false).await;
1256         return true;
1257     }
1258     if !send_to_task(
1259         ctx.session_id,
1260         ctx.channel_id,
1261         HdcCommand::ForwardActiveMaster,
1262         &Vec::<u8>::new(),
1263         0,
1264         ctx.id,
1265     )
1266     .await
1267     {
1268         free_context(ctx.id, true).await;
1269         return false;
1270     }
1271     true
1272 }
1273 
forward_parse_cmd(context_forward: &mut ContextForward) -> bool1274 pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool {
1275     let command = context_forward.task_command.clone();
1276     let result = Base::split_command_to_args(&command);
1277     let argv = result.0;
1278     let argc = result.1;
1279 
1280     if argc < ARG_COUNT2 {
1281         crate::error!("argc < 2 parse is failed.");
1282         context_forward.last_error = "Too few arguments.".to_string();
1283         return false;
1284     }
1285     if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1286         crate::error!("parse's length is flase.");
1287         context_forward.last_error = "Some argument too long.".to_string();
1288         return false;
1289     }
1290     if !check_node_info(&argv[0], &mut context_forward.local_args) {
1291         crate::error!("check argv[0] node info is flase.");
1292         context_forward.last_error = "Arguments parsing failed.".to_string();
1293         return false;
1294     }
1295     if !check_node_info(&argv[1], &mut context_forward.remote_args) {
1296         crate::error!("check argv[1] node info is flase.");
1297         context_forward.last_error = "Arguments parsing failed.".to_string();
1298         return false;
1299     }
1300     context_forward.remote_parameters = argv[1].clone();
1301     true
1302 }
1303 
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool1304 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1305     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1306         crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}"
1307         );
1308         return false;
1309     };
1310     let task: &mut HdcForward = &mut task.clone();
1311     let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1312         crate::error!("cmd argv is not int utf8");
1313         return false;
1314     };
1315     let mut context_forward = malloc_context(session_id, channel_id, true).await;
1316     context_forward.task_command = command.clone();
1317 
1318     if !forward_parse_cmd(&mut context_forward) {
1319         task.context_forward = context_forward.clone();
1320         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1321         return false;
1322     }
1323     if !setup_point(&mut context_forward).await {
1324         task.context_forward = context_forward.clone();
1325         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1326         return false;
1327     }
1328 
1329     let wake_up_message = TaskMessage {
1330         channel_id,
1331         command: HdcCommand::KernelWakeupSlavetask,
1332         payload: Vec::<u8>::new(),
1333     };
1334     transfer::put(context_forward.session_id, wake_up_message).await;
1335 
1336     let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec();
1337     let mut new_buf = vec![0_u8; buf_string.len() + 9];
1338     buf_string.iter().enumerate().for_each(|(i, e)| {
1339         new_buf[i + 8] = *e;
1340     });
1341     send_to_task(
1342         context_forward.session_id,
1343         context_forward.channel_id,
1344         HdcCommand::ForwardCheck,
1345         &new_buf,
1346         buf_string.len() + 9,
1347         context_forward.id,
1348     )
1349     .await;
1350     true
1351 }
1352 
slave_connect( session_id: u32, channel_id: u32, payload: &[u8], check_order: bool, ) -> bool1353 pub async fn slave_connect(
1354     session_id: u32,
1355     channel_id: u32,
1356     payload: &[u8],
1357     check_order: bool,
1358 ) -> bool {
1359     let mut context_forward = malloc_context(session_id, channel_id, false).await;
1360     context_forward.check_order = check_order;
1361     if let Ok((content, id)) = filter_command(payload) {
1362         let content = &content[8..].trim_end_matches('\0').to_string();
1363         context_forward.task_command = content.clone();
1364         if !check_node_info(content, &mut context_forward.local_args) {
1365             crate::error!("check local args is false");
1366             return false;
1367         }
1368         context_forward.id = id;
1369         ForwardContextMap::update(id, context_forward.clone()).await;
1370     }
1371 
1372     if !check_order {
1373         if !setup_point(&mut context_forward).await {
1374             crate::error!("setup point return false, free context");
1375             free_context(context_forward.id, true).await;
1376             ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1377             return false;
1378         }
1379     } else {
1380         send_active_master(&mut context_forward).await;
1381     }
1382     ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1383     true
1384 }
1385 
read_data_to_forward(ctx: &mut ContextForward)1386 pub async fn read_data_to_forward(ctx: &mut ContextForward) {
1387     let cid = ctx.id;
1388     let session = ctx.session_id;
1389     let channel = ctx.channel_id;
1390     match ctx.forward_type {
1391         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1392             #[cfg(not(target_os = "windows"))]
1393             {
1394                 let fd_temp = ctx.fd;
1395                 utils::spawn(async move {
1396                     deamon_read_socket_msg(session, channel, fd_temp, cid).await
1397                 });
1398             }
1399         }
1400         ForwardType::Device => {
1401             #[cfg(not(target_os = "windows"))]
1402             setup_device_point(ctx).await;
1403         }
1404         _ => {}
1405     }
1406 }
1407 
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>1408 pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1409     let bytes = &_payload[4..];
1410     let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1411     if let Ok(content) = ct {
1412         let mut id_bytes = [0u8; 4];
1413         id_bytes.copy_from_slice(&_payload[0..4]);
1414         let id: u32 = u32::from_be_bytes(id_bytes);
1415         return Ok((content, id));
1416     }
1417     Err(Error::new(ErrorKind::Other, "filter command failure"))
1418 }
1419 
dev_write_bufer(path: String, content: Vec<u8>)1420 pub async fn dev_write_bufer(path: String, content: Vec<u8>) {
1421     utils::spawn(async move {
1422         let open_result = OpenOptions::new()
1423             .write(true)
1424             .create(true)
1425             .open(path.clone());
1426 
1427         match open_result {
1428             Ok(mut file) => {
1429                 let write_result = file.write_all(content.as_slice());
1430                 match write_result {
1431                     Ok(()) => {}
1432                     Err(e) => {
1433                         crate::error!("dev write bufer to file fail:{:#?}", e);
1434                     }
1435                 }
1436             }
1437             Err(e) => {
1438                 crate::error!("dev write bufer fail:{:#?}", e);
1439             }
1440         }
1441     });
1442 }
1443 
write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool1444 pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool {
1445     if ctx.forward_type == ForwardType::Tcp {
1446         return TcpWriteStreamMap::write(ctx.id, content).await;
1447     } else if ctx.forward_type == ForwardType::Device {
1448         let path_ref = ctx.dev_path.clone();
1449         if path_ref.is_empty() {
1450             crate::error!(
1451                 "write_forward_bufer get dev_path  is failed ctx.id = {:#?}",
1452                 ctx.id
1453             );
1454             return false;
1455         }
1456         dev_write_bufer(path_ref, content).await;
1457     } else {
1458         #[cfg(not(target_os = "windows"))]
1459         {
1460             let fd = ctx.fd;
1461             if UdsClient::wrap_send(fd, &content) < 0 {
1462                 crate::info!("write forward bufer failed. fd = {fd}");
1463                 return false;
1464             }
1465         }
1466     }
1467     true
1468 }
1469 
1470 #[allow(unused)]
malloc_context( session_id: u32, channel_id: u32, master_slave: bool, ) -> ContextForward1471 pub async fn malloc_context(
1472     session_id: u32,
1473     channel_id: u32,
1474     master_slave: bool,
1475 ) -> ContextForward {
1476     let mut ctx = ContextForward {
1477         ..Default::default()
1478     };
1479     ctx.id = utils::get_current_time() as u32;
1480     ctx.session_id = session_id;
1481     ctx.channel_id = channel_id;
1482     ctx.is_master = master_slave;
1483     ForwardContextMap::add(ctx.id, ctx.clone());
1484     ctx
1485 }
1486 
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1487 pub async fn forward_command_dispatch(
1488     session_id: u32,
1489     channel_id: u32,
1490     command: HdcCommand,
1491     _payload: &[u8],
1492 ) -> bool {
1493     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1494         crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1495         );
1496         return false;
1497     };
1498     let task: &mut HdcForward = &mut task.clone();
1499     let mut ret: bool = true;
1500     let cid = get_cid(_payload);
1501     let send_msg = _payload[4..].to_vec();
1502 
1503     let Some(context_forward) = ForwardContextMap::get(cid).await else {
1504         crate::error!("forward command dispatch get context is none cid={cid}");
1505         return false;
1506     };
1507     let ctx = &mut context_forward.clone();
1508     match command {
1509         HdcCommand::ForwardCheckResult => {
1510             ret = check_command(ctx, _payload, task.server_or_daemon).await;
1511         }
1512         HdcCommand::ForwardData => {
1513             ret = write_forward_bufer(ctx, send_msg).await;
1514         }
1515         HdcCommand::ForwardFreeContext => {
1516             free_context(ctx.id, false).await;
1517         }
1518         HdcCommand::ForwardActiveMaster => {
1519             read_data_to_forward(ctx).await;
1520             ret = true;
1521         }
1522         _ => {
1523             ret = false;
1524         }
1525     }
1526     ret
1527 }
1528 
print_error_info(session_id: u32, channel_id: u32)1529 async fn print_error_info(session_id: u32, channel_id: u32) {
1530     let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1531         crate::error!(
1532             "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}",
1533             session_id,
1534             channel_id
1535         );
1536         return;
1537     };
1538     let task = &mut task.clone();
1539     let ctx = &task.context_forward;
1540     let mut echo_content = ctx.last_error.clone();
1541 
1542     if echo_content.is_empty() {
1543         echo_content = "Forward parament failed".to_string();
1544     }
1545 
1546     hdctransfer::echo_client(
1547         session_id,
1548         channel_id,
1549         echo_content.as_str(),
1550         MessageLevel::Fail,
1551     )
1552     .await;
1553 }
1554 
command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, payload: &[u8], _payload_size: u16, ) -> bool1555 pub async fn command_dispatch(
1556     session_id: u32,
1557     channel_id: u32,
1558     command: HdcCommand,
1559     payload: &[u8],
1560     _payload_size: u16,
1561 ) -> bool {
1562     if command != HdcCommand::ForwardData {
1563         crate::info!("command_dispatch command recv: {:?}", command);
1564     }
1565 
1566     let ret = match command {
1567         HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await,
1568         HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await,
1569         HdcCommand::ForwardActiveSlave => {
1570             slave_connect(session_id, channel_id, payload, false).await
1571         }
1572         _ => forward_command_dispatch(session_id, channel_id, command, payload).await,
1573     };
1574     if !ret {
1575         print_error_info(session_id, channel_id).await;
1576         task_finish(session_id, channel_id).await;
1577         return false;
1578     }
1579     ret
1580 }
1581