• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! forward
16 #![allow(missing_docs)]
17 use libc::SOCK_STREAM;
18 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
19 use std::collections::HashMap;
20 use std::fs::{self, File};
21 use std::io::{self, Error, ErrorKind};
22 use ylong_runtime::sync::{Mutex, RwLock};
23 
24 use crate::common::base::Base;
25 use crate::common::hdctransfer::transfer_task_finish;
26 use crate::common::hdctransfer::HdcTransferBase;
27 use crate::common::jdwp::Jdwp;
28 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
29 use crate::config;
30 use crate::config::HdcCommand;
31 use crate::config::TaskMessage;
32 use crate::transfer;
33 use crate::utils::hdc_log::*;
34 use std::io::Read;
35 use std::sync::Arc;
36 use ylong_runtime::io::AsyncReadExt;
37 use ylong_runtime::io::AsyncWriteExt;
38 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
39 
40 pub const ARG_COUNT2: u32 = 2;
41 pub const BUF_SIZE_SMALL: usize = 256;
42 pub const SOCKET_BUFFER_SIZE: usize = 65535;
43 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/seocket";
44 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
45 
46 type TcpRead = Arc<Mutex<SplitReadHalf>>;
47 type TcpReadMap_ = Arc<RwLock<HashMap<u32, TcpRead>>>;
48 pub struct TcpReadStreamMap {}
49 impl TcpReadStreamMap {
get_instance() -> TcpReadMap_50     fn get_instance() -> TcpReadMap_ {
51         static mut TCP_MAP: Option<TcpReadMap_> = None;
52         unsafe {
53             TCP_MAP
54                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
55                 .clone()
56         }
57     }
58     #[allow(unused)]
put(id: u32, rd: SplitReadHalf)59     async fn put(id: u32, rd: SplitReadHalf) {
60         let instance = Self::get_instance();
61         let mut map = instance.write().await;
62         let arc_rd = Arc::new(Mutex::new(rd));
63         map.insert(id, arc_rd);
64     }
65     #[allow(unused)]
read(session_id: u32, channel_id: u32, cid: u32)66     async fn read(session_id: u32, channel_id: u32, cid: u32) {
67         let arc_map = Self::get_instance();
68         let map = arc_map.read().await;
69         if map.get(&cid).is_none() {
70             return;
71         }
72         let arc_rd = map.get(&cid).unwrap();
73         let rd = &mut arc_rd.lock().await;
74         let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
75         loop {
76             match rd.read(&mut data).await {
77                 Ok(recv_size) => {
78                     if recv_size == 0 {
79                         free_context(session_id, channel_id, 0, true).await;
80                         crate::info!("tcp close shutdown");
81                         return;
82                     }
83                     if send_to_task(
84                         session_id,
85                         channel_id,
86                         HdcCommand::ForwardData,
87                         &data[0..recv_size],
88                         recv_size,
89                         cid,
90                     )
91                     .await
92                     {
93                         crate::info!("send task success");
94                     }
95                 }
96                 Err(_e) => {
97                     crate::error!("tcp stream rd read failed");
98                 }
99             }
100         }
101     }
102 }
103 
104 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
105 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
106 pub struct TcpWriteStreamMap {}
107 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_108     fn get_instance() -> TcpWriterMap_ {
109         static mut TCP_MAP: Option<TcpWriterMap_> = None;
110         unsafe {
111             TCP_MAP
112                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
113                 .clone()
114         }
115     }
116     #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)117     async fn put(id: u32, wr: SplitWriteHalf) {
118         let instance = Self::get_instance();
119         let mut map = instance.write().await;
120         let arc_wr = Arc::new(Mutex::new(wr));
121         map.insert(id, arc_wr);
122     }
123     #[allow(unused)]
write(id: u32, data: Vec<u8>)124     async fn write(id: u32, data: Vec<u8>) {
125         let arc_map = Self::get_instance();
126         let map = arc_map.write().await;
127         if map.get(&id).is_none() {
128             return;
129         }
130         let arc_wr = map.get(&id).unwrap();
131         let mut wr = arc_wr.lock().await;
132         let _ = wr.write_all(data.as_slice()).await;
133     }
134 
end(id: u32)135     pub async fn end(id: u32) {
136         let instance = Self::get_instance();
137         let mut map = instance.write().await;
138         if let Some(arc_wr) = map.remove(&id) {
139             let mut wr = arc_wr.lock().await;
140             let _ = wr.shutdown().await;
141         }
142     }
143 }
144 
145 #[derive(Default, Eq, PartialEq, Clone, Debug)]
146 enum ForwardType {
147     #[default]
148     Tcp = 0,
149     Device,
150     Abstract,
151     FileSystem,
152     Jdwp,
153     Ark,
154     Reserved,
155 }
156 
157 #[derive(Debug, Default, PartialEq, Eq, Clone)]
158 pub struct ContextForward {
159     session_id: u32,
160     channel_id: u32,
161     check_order: bool,
162     id: u32,
163     fd: i32,
164     remote_parameters: String,
165     last_error: String,
166     forward_type: ForwardType,
167 }
168 
169 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
170 pub struct ForwardTaskMap {}
171 impl ForwardTaskMap {
get_instance() -> MapForward_172     fn get_instance() -> MapForward_ {
173         static mut FORWARD_MAP: Option<MapForward_> = None;
174         unsafe {
175             FORWARD_MAP
176                 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
177                 .clone()
178         }
179     }
180 
update(session_id: u32, channel_id: u32, value: HdcForward)181     pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
182         let map = Self::get_instance();
183         let mut map = map.lock().await;
184         map.insert((session_id, channel_id), value.clone());
185     }
186 
get(session_id: u32, channel_id: u32) -> Option<HdcForward>187     pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
188         let arc = Self::get_instance();
189         let map = arc.lock().await;
190         let task = map.get(&(session_id, channel_id));
191         if task.is_none() {
192             crate::error!("ForwardTaskMap result: is none");
193             return Option::None;
194         }
195 
196         Some(task.unwrap().clone())
197     }
198 }
199 
200 #[derive(Debug, Default, Clone, PartialEq, Eq)]
201 pub struct HdcForward {
202     session_id: u32,
203     channel_id: u32,
204     is_master: bool,
205     local_args: Vec<String>,
206     remote_args: Vec<String>,
207     remote_parameters: String,
208     task_command: String,
209     forward_type: ForwardType,
210     context_forward: ContextForward,
211     map_ctx_point: HashMap<u32, ContextForward>,
212     pub transfer: HdcTransferBase,
213 }
214 
215 impl HdcForward {
new(session_id: u32, channel_id: u32) -> Self216     pub fn new(session_id: u32, channel_id: u32) -> Self {
217         Self {
218             session_id,
219             channel_id,
220             is_master: Default::default(),
221             local_args: Default::default(),
222             remote_args: Default::default(),
223             task_command: Default::default(),
224             remote_parameters: Default::default(),
225             forward_type: Default::default(),
226             context_forward: Default::default(),
227             map_ctx_point: HashMap::new(),
228             transfer: HdcTransferBase::new(session_id, channel_id),
229         }
230     }
231 }
232 
get_id(_payload: &[u8]) -> u32233 pub fn get_id(_payload: &[u8]) -> u32 {
234     let mut id_bytes = [0u8; 4];
235     id_bytes.copy_from_slice(&_payload[0..4]);
236     let id: u32 = u32::from_be_bytes(id_bytes);
237     id
238 }
239 
check_node_info(value: &String, arg: &mut Vec<String>) -> bool240 pub async fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
241     crate::info!("check cmd args value is: {:#?}", value);
242     if !value.contains(':') {
243         return false;
244     }
245     let array = value.split(':').collect::<Vec<&str>>();
246 
247     if array[0] == "tcp" {
248         if array[1].len() > config::MAX_PORT_LEN {
249             return false;
250         }
251         let port = array[1].parse::<u32>();
252         if port.is_err() {
253             crate::error!("port must is int type, port is: {:#?}", array[1]);
254             return false;
255         }
256 
257         if port.clone().unwrap() == 0 || port.unwrap() > config::MAX_PORT_NUM {
258             crate::error!("port can not greater than: 65535");
259             return false;
260         }
261     }
262     for item in array.iter() {
263         arg.push(String::from(item.to_owned()));
264     }
265     true
266 }
267 
check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool268 pub async fn check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
269     let task = ForwardTaskMap::get(session_id, channel_id).await;
270     if task.is_none() {
271         return false;
272     }
273     let task = &mut task.unwrap().clone();
274     if !_payload.is_empty() {
275         echo_client(session_id, channel_id, "Forwardport result: Ok").await;
276         let map_info = String::from(if task.transfer.server_or_daemon {
277             "1|"
278         } else {
279             "0|"
280         }) + &task.task_command;
281 
282         let mut command_string = vec![0_u8; map_info.len() + 1];
283         map_info
284             .as_bytes()
285             .to_vec()
286             .iter()
287             .enumerate()
288             .for_each(|(i, e)| {
289                 command_string[i] = *e;
290             });
291         let file_check_message = TaskMessage {
292             channel_id,
293             command: HdcCommand::ForwardSuccess,
294             payload: command_string,
295         };
296         transfer::put(session_id, file_check_message).await;
297         log::error!("Forwardport result: Ok");
298     } else {
299         echo_client(session_id, channel_id, "Forwardport result: Failed").await;
300         free_context(session_id, channel_id, 0, false).await;
301         return false;
302     }
303     true
304 }
305 
detech_forward_type(session_id: u32, channel_id: u32) -> bool306 pub async fn detech_forward_type(session_id: u32, channel_id: u32) -> bool {
307     let task = ForwardTaskMap::get(session_id, channel_id).await;
308     if task.is_none() {
309         return false;
310     }
311     let task = &mut task.unwrap().clone();
312 
313     let type_str = &task.local_args[0];
314 
315     match type_str.as_str() {
316         "tcp" => {
317             task.forward_type = ForwardType::Tcp;
318         }
319         "dev" => {
320             task.forward_type = ForwardType::Device;
321         }
322         "localabstract" => {
323             task.forward_type = ForwardType::Abstract;
324         }
325         "localfilesystem" => {
326             task.local_args[1] = HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &task.local_args[1];
327             task.forward_type = ForwardType::FileSystem;
328         }
329         "jdwp" => {
330             task.forward_type = ForwardType::Jdwp;
331         }
332         "ark" => {
333             task.forward_type = ForwardType::Ark;
334         }
335         "localreserved" => {
336             task.local_args[1] = FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &task.local_args[1];
337             task.forward_type = ForwardType::Reserved;
338         }
339         _ => {
340             crate::error!("this forward type may is not expected");
341             ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
342             return false;
343         }
344     }
345     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
346     true
347 }
348 
forward_tcp_accept( session_id: u32, channel_id: u32, port: u32, value: String, cid: u32, ) -> io::Result<()>349 pub async fn forward_tcp_accept(
350     session_id: u32,
351     channel_id: u32,
352     port: u32,
353     value: String,
354     cid: u32,
355 ) -> io::Result<()> {
356     let saddr = format!("127.0.0.1:{}", port);
357     let listener: TcpListener = TcpListener::bind(saddr.clone()).await?;
358     loop {
359         let (stream, _addr) = listener.accept().await?;
360         let (rd, wr) = stream.into_split();
361         TcpWriteStreamMap::put(cid, wr).await;
362         ylong_runtime::spawn(on_accept(session_id, channel_id, value.clone(), cid));
363         recv_tcp_msg(session_id, channel_id, rd, cid).await;
364     }
365 }
366 
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)367 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
368     let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
369     loop {
370         match rd.read(&mut data).await {
371             Ok(recv_size) => {
372                 if recv_size == 0 {
373                     free_context(session_id, channel_id, 0, true).await;
374                     drop(rd);
375                     crate::info!("recv_size is 0, tcp close shutdown");
376                     return;
377                 }
378                 if send_to_task(
379                     session_id,
380                     channel_id,
381                     HdcCommand::ForwardData,
382                     &data[0..recv_size],
383                     recv_size,
384                     cid,
385                 )
386                 .await
387                 {
388                     crate::info!("send task success");
389                 }
390             }
391             Err(_e) => {
392                 crate::error!("recv tcp msg read failed");
393             }
394         }
395     }
396 }
397 
on_accept(session_id: u32, channel_id: u32, value: String, cid: u32)398 pub async fn on_accept(session_id: u32, channel_id: u32, value: String, cid: u32) {
399     let buf_string: Vec<u8> = value.as_bytes().to_vec();
400     let mut new_buf = vec![0_u8; buf_string.len() + 9];
401 
402     buf_string.iter().enumerate().for_each(|(i, e)| {
403         new_buf[i + 8] = *e;
404     });
405 
406     send_to_task(
407         session_id,
408         channel_id,
409         HdcCommand::ForwardActiveSlave,
410         &new_buf,
411         buf_string.len() + 9,
412         cid,
413     )
414     .await;
415 }
416 
daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32)417 pub async fn daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32) {
418     let saddr = format!("127.0.0.1:{}", port);
419     let stream = match TcpStream::connect(saddr).await {
420         Err(err) => {
421             crate::error!("TcpStream::stream failed {:#?}", err);
422             free_context(session_id, channel_id, 0, false).await;
423             return;
424         }
425         Ok(addr) => addr,
426     };
427     send_active_master(session_id, channel_id).await;
428     let (rd, wr) = stream.into_split();
429     TcpWriteStreamMap::put(cid, wr).await;
430     recv_tcp_msg(session_id, channel_id, rd, cid).await;
431 }
432 
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32)433 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32) {
434     let task = ForwardTaskMap::get(session_id, channel_id).await;
435     if task.is_none() {
436         return;
437     }
438     let task = &mut task.unwrap().clone();
439     loop {
440         let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
441         let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
442         if recv_size <= 0 {
443             free_context(session_id, channel_id, 0, true).await;
444             crate::info!("local abstract close shutdown");
445             return;
446         }
447         if send_to_task(
448             session_id,
449             channel_id,
450             HdcCommand::ForwardData,
451             &buffer[0..recv_size as usize],
452             recv_size as usize,
453             task.context_forward.id,
454         )
455         .await
456         {
457             crate::info!("send task success");
458         }
459     }
460 }
461 
free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool)462 async fn free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool) {
463     crate::info!("free context id = {id}");
464     let task = ForwardTaskMap::get(session_id, channel_id).await;
465     if task.is_none() {
466         return;
467     }
468     let task = &mut task.unwrap().clone();
469     if notify_remote {
470         let vec_none = Vec::<u8>::new();
471         send_to_task(
472             session_id,
473             channel_id,
474             HdcCommand::ForwardFreeContext,
475             &vec_none,
476             0,
477             task.context_forward.id,
478         )
479         .await;
480     }
481     match task.forward_type {
482         ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
483             TcpWriteStreamMap::end(task.context_forward.id).await;
484         }
485         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
486             UdsServer::wrap_close(task.context_forward.fd);
487         }
488         ForwardType::Device => {
489             return;
490         }
491     }
492     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
493 }
494 
setup_tcp_point(session_id: u32, channel_id: u32) -> bool495 pub async fn setup_tcp_point(session_id: u32, channel_id: u32) -> bool {
496     let task = ForwardTaskMap::get(session_id, channel_id).await;
497     if task.is_none() {
498         return false;
499     }
500     let task = &mut task.unwrap();
501     let port = task.local_args[1].parse::<u32>().unwrap();
502     let cid = task.context_forward.id;
503     if task.is_master {
504         let parameters = task.remote_parameters.clone();
505         ylong_runtime::spawn(async move {
506             forward_tcp_accept(session_id, channel_id, port, parameters, cid).await
507         });
508     } else {
509         ylong_runtime::spawn(
510             async move { daemon_connect_tcp(session_id, channel_id, port, cid).await },
511         );
512     }
513     true
514 }
515 
server_socket_bind_listen( session_id: u32, channel_id: u32, path: String, cid: u32, ) -> bool516 async fn server_socket_bind_listen(
517     session_id: u32,
518     channel_id: u32,
519     path: String,
520     cid: u32,
521 ) -> bool {
522     let task = ForwardTaskMap::get(session_id, channel_id).await;
523     let task = &mut task.unwrap().clone();
524     let parameters = task.remote_parameters.clone();
525     let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
526     task.context_forward.fd = fd;
527     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
528 
529     let name: Vec<u8> = path.as_bytes().to_vec();
530     let mut socket_name = vec![0_u8; name.len() + 1];
531     socket_name[0] = b'\0';
532     name.iter().enumerate().for_each(|(i, e)| {
533         socket_name[i + 1] = *e;
534     });
535     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
536     if let Ok(addr_obj) = &addr {
537         let ret = UdsServer::wrap_bind(fd, addr_obj);
538         if ret.is_err() {
539             echo_client(session_id, channel_id, "Unix pipe bind failed").await;
540             crate::error!("bind fail");
541             return false;
542         }
543         let ret = UdsServer::wrap_listen(fd);
544         if ret < 0 {
545             echo_client(session_id, channel_id, "Unix pipe listen failed").await;
546             crate::error!("listen fail");
547             return false;
548         }
549         ylong_runtime::spawn(async move {
550             loop {
551                 let client_fd = UdsServer::wrap_accept(fd);
552                 if client_fd == -1 {
553                     break;
554                 }
555                 ylong_runtime::spawn(on_accept(session_id, channel_id, parameters.clone(), cid));
556             }
557         });
558     }
559     true
560 }
561 
canonicalize(path: String) -> Result<String, Error>562 pub async fn canonicalize(path: String) -> Result<String, Error> {
563     match fs::canonicalize(path) {
564         Ok(abs_path) => match abs_path.to_str() {
565             Some(path) => Ok(path.to_string()),
566             None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
567         },
568         Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
569     }
570 }
571 
setup_device_point(session_id: u32, channel_id: u32) -> bool572 pub async fn setup_device_point(session_id: u32, channel_id: u32) -> bool {
573     let task = ForwardTaskMap::get(session_id, channel_id).await;
574     if task.is_none() {
575         return false;
576     }
577     let task = &mut task.unwrap().clone();
578     let s_node_cfg = task.local_args[1].clone();
579     let cid = task.context_forward.id;
580 
581     let resolve = canonicalize(s_node_cfg).await;
582     if resolve.is_err() {
583         crate::error!("Open unix-dev failed");
584         return false;
585     }
586     let resolv_path = resolve.unwrap();
587     let thread_path_ref = Arc::new(Mutex::new(resolv_path));
588     if !send_active_master(session_id, channel_id).await {
589         return false;
590     }
591 
592     ylong_runtime::spawn(async move {
593         loop {
594             let path = thread_path_ref.lock().await;
595             let mut file = File::open(&*path).unwrap();
596             let mut total = Vec::new();
597             let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
598                 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
599             let read_len = file.read(&mut buf[4..]).unwrap();
600             if read_len == 0 {
601                 free_context(session_id, channel_id, 0, true).await;
602                 break;
603             }
604             total.append(&mut buf[0..read_len].to_vec());
605             send_to_task(
606                 session_id,
607                 channel_id,
608                 HdcCommand::ForwardData,
609                 &total,
610                 read_len,
611                 cid,
612             )
613             .await;
614         }
615     });
616     true
617 }
618 
get_pid(parameter: &str, forward_type: ForwardType) -> u32619 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
620     let mut res: u32 = 0;
621     if forward_type == ForwardType::Jdwp {
622         let pid = parameter.parse::<u32>();
623         if pid.is_err() {
624             crate::error!("get pid err :{:#?}", pid);
625             return res;
626         }
627         res = pid.unwrap();
628     } else {
629         let params: Vec<&str> = parameter.split('@').collect();
630         let pid = params[0].parse::<u32>();
631         if pid.is_err() {
632             return res;
633         }
634         res = pid.unwrap();
635     }
636     res
637 }
638 
setup_jdwp_point(session_id: u32, channel_id: u32) -> bool639 pub async fn setup_jdwp_point(session_id: u32, channel_id: u32) -> bool {
640     let task: Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await;
641     if task.is_none() {
642         return false;
643     }
644     let task = &mut task.unwrap().clone();
645     let local_args = task.local_args[1].clone();
646     let parameter = local_args.as_str();
647     let style = &task.forward_type;
648     let pid = get_pid(parameter, style.clone());
649     let cid = task.context_forward.id;
650     if pid == 0 {
651         return false;
652     }
653 
654     let result = UdsServer::wrap_socketpair(SOCK_STREAM);
655     if result.is_err() {
656         return false;
657     }
658     let mut target_fd = 0;
659     let mut local_fd = 0;
660     if let Ok((fd0, fd1)) = result {
661         crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
662         local_fd = fd0;
663         task.context_forward.fd = local_fd;
664         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
665         target_fd = fd1;
666     }
667 
668     ylong_runtime::spawn(async move {
669         loop {
670             let mut buffer = [0u8; 1024];
671             crate::info!("jdwp pipe read....");
672             let size = UdsServer::wrap_read(local_fd, &mut buffer);
673             crate::info!("jdwp pipe read.... size: {:#?}", size);
674             if size < 0 {
675                 crate::error!("disconnect, error:{:#?}", size);
676                 free_context(session_id, channel_id, 0, true).await;
677                 break;
678             }
679             send_to_task(
680                 session_id,
681                 channel_id,
682                 HdcCommand::ForwardData,
683                 &buffer[0..size as usize],
684                 size as usize,
685                 cid,
686             )
687             .await;
688         }
689     });
690 
691     let jdwp = Jdwp::get_instance();
692     let mut param = task.local_args[0].clone();
693     param.push(':');
694     param.push_str(parameter);
695 
696     let ret = jdwp.send_fd_to_target(pid, target_fd, param.as_str()).await;
697     if !ret {
698         crate::error!("not found pid:{:#?}", pid);
699         echo_client(
700             session_id,
701             channel_id,
702             format!("fport fail:pid not found:{}", pid).as_str(),
703         )
704         .await;
705         task_finish(session_id, channel_id).await;
706         return false;
707     }
708 
709     let vec_none = Vec::<u8>::new();
710     send_to_task(
711         session_id,
712         channel_id,
713         HdcCommand::ForwardActiveMaster, // 04
714         &vec_none,
715         0,
716         cid,
717     )
718     .await;
719 
720     true
721 }
722 
echo_client(session_id: u32, channel_id: u32, message: &str)723 async fn echo_client(session_id: u32, channel_id: u32, message: &str) {
724     let echo_message = TaskMessage {
725         channel_id,
726         command: HdcCommand::KernelEchoRaw,
727         payload: message.as_bytes().to_vec(),
728     };
729     transfer::put(session_id, echo_message).await;
730 }
731 
task_finish(session_id: u32, channel_id: u32)732 async fn task_finish(session_id: u32, channel_id: u32) {
733     transfer_task_finish(channel_id, session_id).await;
734 }
735 
daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String)736 pub async fn daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String) {
737     let name: Vec<u8> = path.as_bytes().to_vec();
738     let mut socket_name = vec![0_u8; name.len() + 1];
739     socket_name[0] = b'\0';
740     name.iter().enumerate().for_each(|(i, e)| {
741         socket_name[i + 1] = *e;
742     });
743     let addr = UdsAddr::parse_abstract(&socket_name[1..]);
744     if let Ok(addr_obj) = &addr {
745         let ret: Result<(), Error> = UdsClient::wrap_connect(fd, addr_obj);
746         if ret.is_err() {
747             echo_client(session_id, channel_id, "localabstract connect fail").await;
748             free_context(session_id, channel_id, 0, true).await;
749             return;
750         }
751         send_active_master(session_id, channel_id).await;
752         read_data_to_forward(session_id, channel_id).await;
753     }
754 }
755 
setup_file_point(session_id: u32, channel_id: u32) -> bool756 pub async fn setup_file_point(session_id: u32, channel_id: u32) -> bool {
757     let task: Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await;
758     if task.is_none() {
759         return false;
760     }
761     let task = &mut task.unwrap().clone();
762     let s_node_cfg = task.local_args[1].clone();
763     if task.is_master {
764         if task.forward_type == ForwardType::Reserved
765             || task.forward_type == ForwardType::FileSystem
766         {
767             let _ = fs::remove_file(s_node_cfg.clone());
768         }
769         if !server_socket_bind_listen(session_id, channel_id, s_node_cfg, task.context_forward.id)
770             .await
771         {
772             task_finish(session_id, channel_id).await;
773             return false;
774         }
775     } else if task.forward_type == ForwardType::Abstract {
776         let fd: i32 = UdsClient::wrap_socket(AF_LOCAL);
777         unsafe {
778             libc::fcntl(fd, F_SETFD, FD_CLOEXEC);
779         }
780         task.context_forward.fd = fd;
781         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
782         daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
783     } else {
784         let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
785         task.context_forward.fd = fd;
786         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
787         daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
788     }
789     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
790     true
791 }
792 
setup_point(session_id: u32, channel_id: u32) -> bool793 pub async fn setup_point(session_id: u32, channel_id: u32) -> bool {
794     if !detech_forward_type(session_id, channel_id).await {
795         crate::error!("forward type is not true");
796         return false;
797     }
798     let task = ForwardTaskMap::get(session_id, channel_id).await;
799     if task.is_none() {
800         return false;
801     }
802     let task = &mut task.unwrap().clone();
803     if cfg!(target_os = "windows") && task.forward_type != ForwardType::Tcp {
804         task.context_forward.last_error = String::from("Not support forward-type");
805         return false;
806     }
807     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
808     let ret = match task.forward_type {
809         ForwardType::Tcp => setup_tcp_point(session_id, channel_id).await,
810         ForwardType::Device => setup_device_point(session_id, channel_id).await,
811         ForwardType::Jdwp | ForwardType::Ark => setup_jdwp_point(session_id, channel_id).await,
812         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
813             setup_file_point(session_id, channel_id).await
814         }
815     };
816     ret
817 }
818 
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool819 pub async fn send_to_task(
820     session_id: u32,
821     channel_id: u32,
822     command: HdcCommand,
823     buf_ptr: &[u8],
824     buf_size: usize,
825     cid: u32,
826 ) -> bool {
827     if buf_size > (config::MAX_SIZE_IOBUF * 2) {
828         return false;
829     }
830 
831     let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
832     new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
833     let file_check_message = TaskMessage {
834         channel_id,
835         command,
836         payload: new_buf,
837     };
838     transfer::put(session_id, file_check_message).await;
839     true
840 }
841 
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>842 pub async fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
843     let bytes = &_payload[4..];
844     let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
845     if let Ok(content) = ct {
846         let mut id_bytes = [0u8; 4];
847         id_bytes.copy_from_slice(&_payload[0..4]);
848         let id: u32 = u32::from_be_bytes(id_bytes);
849         return Ok((content, id));
850     }
851     Err(Error::new(ErrorKind::Other, "filter command failure"))
852 }
853 
send_active_master(session_id: u32, channel_id: u32) -> bool854 pub async fn send_active_master(session_id: u32, channel_id: u32) -> bool {
855     let task = ForwardTaskMap::get(session_id, channel_id).await;
856     if task.is_none() {
857         return false;
858     }
859     let task = &mut task.unwrap().clone();
860     if task.context_forward.check_order {
861         let flag = [0u8; 1];
862         send_to_task(
863             session_id,
864             channel_id,
865             HdcCommand::ForwardCheckResult,
866             &flag,
867             1,
868             task.context_forward.id,
869         )
870         .await;
871         free_context(session_id, channel_id, 0, false).await;
872         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
873         return true;
874     }
875     if !send_to_task(
876         session_id,
877         channel_id,
878         HdcCommand::ForwardActiveMaster,
879         &Vec::<u8>::new(),
880         0,
881         task.context_forward.id,
882     )
883     .await
884     {
885         free_context(session_id, channel_id, 0, true).await;
886         ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
887         return false;
888     }
889     true
890 }
891 
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool892 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
893     let s = String::from_utf8(_payload.to_vec());
894     if s.is_err() {
895         crate::error!("cmd argv  is not int utf8");
896         return false;
897     }
898     let command = s.unwrap();
899     crate::info!("begin forward, command: {:#?}", command);
900     let task = ForwardTaskMap::get(session_id, channel_id).await;
901     if task.is_none() {
902         crate::error!("begin forward get task is none");
903         return false;
904     }
905     let task = &mut task.unwrap().clone();
906     let result = Base::split_command_to_args(&command);
907     let argv = result.0;
908     let argc = result.1;
909     task.context_forward.id = get_id(_payload);
910     task.is_master = true;
911 
912     if argc < ARG_COUNT2 {
913         return false;
914     }
915     if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
916         return false;
917     }
918     if !check_node_info(&argv[0], &mut task.local_args).await {
919         return false;
920     }
921     if !check_node_info(&argv[1], &mut task.remote_args).await {
922         return false;
923     }
924     task.remote_parameters = argv[1].clone();
925     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
926     if !setup_point(session_id, channel_id).await {
927         crate::error!("setup point return false");
928         return false;
929     }
930 
931     let task = ForwardTaskMap::get(session_id, channel_id).await;
932     let task = &mut task.unwrap().clone();
933     task.map_ctx_point
934         .insert(task.context_forward.id, task.context_forward.clone());
935 
936     let wake_up_message = TaskMessage {
937         channel_id,
938         command: HdcCommand::KernelWakeupSlavetask,
939         payload: Vec::<u8>::new(),
940     };
941     transfer::put(session_id, wake_up_message).await;
942 
943     let buf_string: Vec<u8> = argv[1].as_bytes().to_vec();
944     let mut new_buf = vec![0_u8; buf_string.len() + 9];
945     buf_string.iter().enumerate().for_each(|(i, e)| {
946         new_buf[i + 8] = *e;
947     });
948     send_to_task(
949         session_id,
950         channel_id,
951         HdcCommand::ForwardCheck,
952         &new_buf,
953         buf_string.len() + 9,
954         task.context_forward.id,
955     )
956     .await;
957     task.task_command = command.clone();
958     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
959     true
960 }
961 
slave_connect( session_id: u32, channel_id: u32, _payload: &[u8], check_order: bool, error: &mut String, ) -> bool962 pub async fn slave_connect(
963     session_id: u32,
964     channel_id: u32,
965     _payload: &[u8],
966     check_order: bool,
967     error: &mut String,
968 ) -> bool {
969     let task = ForwardTaskMap::get(session_id, channel_id).await;
970     if task.is_none() {
971         return false;
972     }
973     let task = &mut task.unwrap().clone();
974     task.is_master = false;
975     task.context_forward.check_order = check_order;
976     if let Ok((content, id)) = filter_command(_payload).await {
977         let content = &content[8..].trim_end_matches('\0').to_string();
978         if !check_node_info(content, &mut task.local_args).await {
979             crate::error!("check local args is false");
980             return false;
981         }
982         task.context_forward.id = id;
983     }
984     task.map_ctx_point
985         .insert(task.context_forward.id, task.context_forward.clone());
986     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
987     if !check_order {
988         if !setup_point(session_id, channel_id).await {
989             crate::error!("setup point return false, free context");
990             free_context(session_id, channel_id, 0, true).await;
991             return false;
992         }
993         *error = task.context_forward.last_error.clone();
994     } else {
995         send_active_master(session_id, channel_id).await;
996     }
997     *error = task.context_forward.last_error.clone();
998     true
999 }
1000 
read_data_to_forward(session_id: u32, channel_id: u32) -> bool1001 pub async fn read_data_to_forward(session_id: u32, channel_id: u32) -> bool {
1002     let task = ForwardTaskMap::get(session_id, channel_id).await;
1003     if task.is_none() {
1004         return false;
1005     }
1006     let task = &mut task.unwrap();
1007     let cid = task.context_forward.id;
1008     match task.forward_type {
1009         ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
1010             ylong_runtime::spawn(async move {
1011                 TcpReadStreamMap::read(session_id, channel_id, cid).await
1012             });
1013         }
1014         ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1015             let fd = task.context_forward.fd;
1016             ylong_runtime::spawn(async move {
1017                 deamon_read_socket_msg(session_id, channel_id, fd).await
1018             });
1019         }
1020         ForwardType::Device => {
1021             if !setup_device_point(session_id, channel_id).await {
1022                 return false;
1023             }
1024         }
1025     }
1026     true
1027 }
1028 
write_forward_bufer( session_id: u32, channel_id: u32, id: u32, content: Vec<u8>, ) -> bool1029 pub async fn write_forward_bufer(
1030     session_id: u32,
1031     channel_id: u32,
1032     id: u32,
1033     content: Vec<u8>,
1034 ) -> bool {
1035     let task = ForwardTaskMap::get(session_id, channel_id).await;
1036     if task.is_none() {
1037         return false;
1038     }
1039     let task = &mut task.unwrap();
1040     if task.forward_type == ForwardType::Tcp {
1041         TcpWriteStreamMap::write(id, content).await;
1042     } else {
1043         let fd = task.context_forward.fd;
1044         UdsClient::wrap_send(fd, &content);
1045     }
1046     true
1047 }
1048 
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1049 pub async fn forward_command_dispatch(
1050     session_id: u32,
1051     channel_id: u32,
1052     command: HdcCommand,
1053     _payload: &[u8],
1054 ) -> bool {
1055     let task = ForwardTaskMap::get(session_id, channel_id).await;
1056     if task.is_none() {
1057         return false;
1058     }
1059     let task: &mut HdcForward = &mut task.unwrap().clone();
1060     let mut ret: bool = true;
1061     if let Ok((_content, id)) = filter_command(_payload).await {
1062         task.context_forward.id = id;
1063     }
1064     let send_msg = _payload[4..].to_vec();
1065     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1066     match command {
1067         HdcCommand::ForwardCheckResult => {
1068             ret = check_command(session_id, channel_id, _payload).await;
1069         }
1070         HdcCommand::ForwardData => {
1071             ret = write_forward_bufer(session_id, channel_id, task.context_forward.id, send_msg)
1072                 .await;
1073         }
1074         HdcCommand::ForwardFreeContext => {
1075             free_context(session_id, channel_id, 0, false).await;
1076         }
1077         HdcCommand::ForwardActiveMaster => {
1078             ret = true;
1079         }
1080         _ => {
1081             ret = false;
1082         }
1083     }
1084     ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1085     ret
1086 }
1087 
print_error_info(session_id: u32, channel_id: u32, error: &mut String)1088 pub async fn print_error_info(session_id: u32, channel_id: u32, error: &mut String) {
1089     if error.is_empty() {
1090         echo_client(session_id, channel_id, "forward arguments parse is fail").await;
1091     } else {
1092         echo_client(session_id, channel_id, error.as_str()).await;
1093     }
1094 }
1095 
command_dispatch( session_id: u32, channel_id: u32, _command: HdcCommand, _payload: &[u8], _payload_size: u16, ) -> bool1096 pub async fn command_dispatch(
1097     session_id: u32,
1098     channel_id: u32,
1099     _command: HdcCommand,
1100     _payload: &[u8],
1101     _payload_size: u16,
1102 ) -> bool {
1103     let mut error = String::from("");
1104     crate::info!("command_dispatch command recv: {:#?}", _command);
1105     let ret = match _command {
1106         HdcCommand::ForwardInit => begin_forward(session_id, channel_id, _payload).await,
1107         HdcCommand::ForwardCheck => {
1108             slave_connect(session_id, channel_id, _payload, true, &mut error).await
1109         }
1110         HdcCommand::ForwardActiveSlave => {
1111             slave_connect(session_id, channel_id, _payload, false, &mut error).await
1112         }
1113         _ => forward_command_dispatch(session_id, channel_id, _command, _payload).await,
1114     };
1115     crate::info!("command dispatch ret: {:#?}", ret);
1116     if !ret {
1117         print_error_info(session_id, channel_id, &mut error).await;
1118         task_finish(session_id, channel_id).await;
1119         return false;
1120     }
1121     ret
1122 }
1123