1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! forward
16 #![allow(missing_docs)]
17 use libc::SOCK_STREAM;
18 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
19 use std::collections::HashMap;
20 use std::fs::{self, File};
21 use std::io::{self, Error, ErrorKind};
22 use ylong_runtime::sync::{Mutex, RwLock};
23
24 use crate::common::base::Base;
25 use crate::common::hdctransfer::transfer_task_finish;
26 use crate::common::hdctransfer::HdcTransferBase;
27 use crate::common::jdwp::Jdwp;
28 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
29 use crate::config;
30 use crate::config::HdcCommand;
31 use crate::config::TaskMessage;
32 use crate::transfer;
33 use crate::utils::hdc_log::*;
34 use std::io::Read;
35 use std::sync::Arc;
36 use ylong_runtime::io::AsyncReadExt;
37 use ylong_runtime::io::AsyncWriteExt;
38 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
39
40 pub const ARG_COUNT2: u32 = 2;
41 pub const BUF_SIZE_SMALL: usize = 256;
42 pub const SOCKET_BUFFER_SIZE: usize = 65535;
43 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/seocket";
44 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
45
46 type TcpRead = Arc<Mutex<SplitReadHalf>>;
47 type TcpReadMap_ = Arc<RwLock<HashMap<u32, TcpRead>>>;
48 pub struct TcpReadStreamMap {}
49 impl TcpReadStreamMap {
get_instance() -> TcpReadMap_50 fn get_instance() -> TcpReadMap_ {
51 static mut TCP_MAP: Option<TcpReadMap_> = None;
52 unsafe {
53 TCP_MAP
54 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
55 .clone()
56 }
57 }
58 #[allow(unused)]
put(id: u32, rd: SplitReadHalf)59 async fn put(id: u32, rd: SplitReadHalf) {
60 let instance = Self::get_instance();
61 let mut map = instance.write().await;
62 let arc_rd = Arc::new(Mutex::new(rd));
63 map.insert(id, arc_rd);
64 }
65 #[allow(unused)]
read(session_id: u32, channel_id: u32, cid: u32)66 async fn read(session_id: u32, channel_id: u32, cid: u32) {
67 let arc_map = Self::get_instance();
68 let map = arc_map.read().await;
69 if map.get(&cid).is_none() {
70 return;
71 }
72 let arc_rd = map.get(&cid).unwrap();
73 let rd = &mut arc_rd.lock().await;
74 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
75 loop {
76 match rd.read(&mut data).await {
77 Ok(recv_size) => {
78 if recv_size == 0 {
79 free_context(session_id, channel_id, 0, true).await;
80 crate::info!("tcp close shutdown");
81 return;
82 }
83 if send_to_task(
84 session_id,
85 channel_id,
86 HdcCommand::ForwardData,
87 &data[0..recv_size],
88 recv_size,
89 cid,
90 )
91 .await
92 {
93 crate::info!("send task success");
94 }
95 }
96 Err(_e) => {
97 crate::error!("tcp stream rd read failed");
98 }
99 }
100 }
101 }
102 }
103
104 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
105 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
106 pub struct TcpWriteStreamMap {}
107 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_108 fn get_instance() -> TcpWriterMap_ {
109 static mut TCP_MAP: Option<TcpWriterMap_> = None;
110 unsafe {
111 TCP_MAP
112 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
113 .clone()
114 }
115 }
116 #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)117 async fn put(id: u32, wr: SplitWriteHalf) {
118 let instance = Self::get_instance();
119 let mut map = instance.write().await;
120 let arc_wr = Arc::new(Mutex::new(wr));
121 map.insert(id, arc_wr);
122 }
123 #[allow(unused)]
write(id: u32, data: Vec<u8>)124 async fn write(id: u32, data: Vec<u8>) {
125 let arc_map = Self::get_instance();
126 let map = arc_map.write().await;
127 if map.get(&id).is_none() {
128 return;
129 }
130 let arc_wr = map.get(&id).unwrap();
131 let mut wr = arc_wr.lock().await;
132 let _ = wr.write_all(data.as_slice()).await;
133 }
134
end(id: u32)135 pub async fn end(id: u32) {
136 let instance = Self::get_instance();
137 let mut map = instance.write().await;
138 if let Some(arc_wr) = map.remove(&id) {
139 let mut wr = arc_wr.lock().await;
140 let _ = wr.shutdown().await;
141 }
142 }
143 }
144
145 #[derive(Default, Eq, PartialEq, Clone, Debug)]
146 enum ForwardType {
147 #[default]
148 Tcp = 0,
149 Device,
150 Abstract,
151 FileSystem,
152 Jdwp,
153 Ark,
154 Reserved,
155 }
156
157 #[derive(Debug, Default, PartialEq, Eq, Clone)]
158 pub struct ContextForward {
159 session_id: u32,
160 channel_id: u32,
161 check_order: bool,
162 id: u32,
163 fd: i32,
164 remote_parameters: String,
165 last_error: String,
166 forward_type: ForwardType,
167 }
168
169 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
170 pub struct ForwardTaskMap {}
171 impl ForwardTaskMap {
get_instance() -> MapForward_172 fn get_instance() -> MapForward_ {
173 static mut FORWARD_MAP: Option<MapForward_> = None;
174 unsafe {
175 FORWARD_MAP
176 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
177 .clone()
178 }
179 }
180
update(session_id: u32, channel_id: u32, value: HdcForward)181 pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
182 let map = Self::get_instance();
183 let mut map = map.lock().await;
184 map.insert((session_id, channel_id), value.clone());
185 }
186
get(session_id: u32, channel_id: u32) -> Option<HdcForward>187 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
188 let arc = Self::get_instance();
189 let map = arc.lock().await;
190 let task = map.get(&(session_id, channel_id));
191 if task.is_none() {
192 crate::error!("ForwardTaskMap result: is none");
193 return Option::None;
194 }
195
196 Some(task.unwrap().clone())
197 }
198 }
199
200 #[derive(Debug, Default, Clone, PartialEq, Eq)]
201 pub struct HdcForward {
202 session_id: u32,
203 channel_id: u32,
204 is_master: bool,
205 local_args: Vec<String>,
206 remote_args: Vec<String>,
207 remote_parameters: String,
208 task_command: String,
209 forward_type: ForwardType,
210 context_forward: ContextForward,
211 map_ctx_point: HashMap<u32, ContextForward>,
212 pub transfer: HdcTransferBase,
213 }
214
215 impl HdcForward {
new(session_id: u32, channel_id: u32) -> Self216 pub fn new(session_id: u32, channel_id: u32) -> Self {
217 Self {
218 session_id,
219 channel_id,
220 is_master: Default::default(),
221 local_args: Default::default(),
222 remote_args: Default::default(),
223 task_command: Default::default(),
224 remote_parameters: Default::default(),
225 forward_type: Default::default(),
226 context_forward: Default::default(),
227 map_ctx_point: HashMap::new(),
228 transfer: HdcTransferBase::new(session_id, channel_id),
229 }
230 }
231 }
232
get_id(_payload: &[u8]) -> u32233 pub fn get_id(_payload: &[u8]) -> u32 {
234 let mut id_bytes = [0u8; 4];
235 id_bytes.copy_from_slice(&_payload[0..4]);
236 let id: u32 = u32::from_be_bytes(id_bytes);
237 id
238 }
239
check_node_info(value: &String, arg: &mut Vec<String>) -> bool240 pub async fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
241 crate::info!("check cmd args value is: {:#?}", value);
242 if !value.contains(':') {
243 return false;
244 }
245 let array = value.split(':').collect::<Vec<&str>>();
246
247 if array[0] == "tcp" {
248 if array[1].len() > config::MAX_PORT_LEN {
249 return false;
250 }
251 let port = array[1].parse::<u32>();
252 if port.is_err() {
253 crate::error!("port must is int type, port is: {:#?}", array[1]);
254 return false;
255 }
256
257 if port.clone().unwrap() == 0 || port.unwrap() > config::MAX_PORT_NUM {
258 crate::error!("port can not greater than: 65535");
259 return false;
260 }
261 }
262 for item in array.iter() {
263 arg.push(String::from(item.to_owned()));
264 }
265 true
266 }
267
check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool268 pub async fn check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
269 let task = ForwardTaskMap::get(session_id, channel_id).await;
270 if task.is_none() {
271 return false;
272 }
273 let task = &mut task.unwrap().clone();
274 if !_payload.is_empty() {
275 echo_client(session_id, channel_id, "Forwardport result: Ok").await;
276 let map_info = String::from(if task.transfer.server_or_daemon {
277 "1|"
278 } else {
279 "0|"
280 }) + &task.task_command;
281
282 let mut command_string = vec![0_u8; map_info.len() + 1];
283 map_info
284 .as_bytes()
285 .to_vec()
286 .iter()
287 .enumerate()
288 .for_each(|(i, e)| {
289 command_string[i] = *e;
290 });
291 let file_check_message = TaskMessage {
292 channel_id,
293 command: HdcCommand::ForwardSuccess,
294 payload: command_string,
295 };
296 transfer::put(session_id, file_check_message).await;
297 log::error!("Forwardport result: Ok");
298 } else {
299 echo_client(session_id, channel_id, "Forwardport result: Failed").await;
300 free_context(session_id, channel_id, 0, false).await;
301 return false;
302 }
303 true
304 }
305
detech_forward_type(session_id: u32, channel_id: u32) -> bool306 pub async fn detech_forward_type(session_id: u32, channel_id: u32) -> bool {
307 let task = ForwardTaskMap::get(session_id, channel_id).await;
308 if task.is_none() {
309 return false;
310 }
311 let task = &mut task.unwrap().clone();
312
313 let type_str = &task.local_args[0];
314
315 match type_str.as_str() {
316 "tcp" => {
317 task.forward_type = ForwardType::Tcp;
318 }
319 "dev" => {
320 task.forward_type = ForwardType::Device;
321 }
322 "localabstract" => {
323 task.forward_type = ForwardType::Abstract;
324 }
325 "localfilesystem" => {
326 task.local_args[1] = HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &task.local_args[1];
327 task.forward_type = ForwardType::FileSystem;
328 }
329 "jdwp" => {
330 task.forward_type = ForwardType::Jdwp;
331 }
332 "ark" => {
333 task.forward_type = ForwardType::Ark;
334 }
335 "localreserved" => {
336 task.local_args[1] = FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &task.local_args[1];
337 task.forward_type = ForwardType::Reserved;
338 }
339 _ => {
340 crate::error!("this forward type may is not expected");
341 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
342 return false;
343 }
344 }
345 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
346 true
347 }
348
forward_tcp_accept( session_id: u32, channel_id: u32, port: u32, value: String, cid: u32, ) -> io::Result<()>349 pub async fn forward_tcp_accept(
350 session_id: u32,
351 channel_id: u32,
352 port: u32,
353 value: String,
354 cid: u32,
355 ) -> io::Result<()> {
356 let saddr = format!("127.0.0.1:{}", port);
357 let listener: TcpListener = TcpListener::bind(saddr.clone()).await?;
358 loop {
359 let (stream, _addr) = listener.accept().await?;
360 let (rd, wr) = stream.into_split();
361 TcpWriteStreamMap::put(cid, wr).await;
362 ylong_runtime::spawn(on_accept(session_id, channel_id, value.clone(), cid));
363 recv_tcp_msg(session_id, channel_id, rd, cid).await;
364 }
365 }
366
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)367 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
368 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
369 loop {
370 match rd.read(&mut data).await {
371 Ok(recv_size) => {
372 if recv_size == 0 {
373 free_context(session_id, channel_id, 0, true).await;
374 drop(rd);
375 crate::info!("recv_size is 0, tcp close shutdown");
376 return;
377 }
378 if send_to_task(
379 session_id,
380 channel_id,
381 HdcCommand::ForwardData,
382 &data[0..recv_size],
383 recv_size,
384 cid,
385 )
386 .await
387 {
388 crate::info!("send task success");
389 }
390 }
391 Err(_e) => {
392 crate::error!("recv tcp msg read failed");
393 }
394 }
395 }
396 }
397
on_accept(session_id: u32, channel_id: u32, value: String, cid: u32)398 pub async fn on_accept(session_id: u32, channel_id: u32, value: String, cid: u32) {
399 let buf_string: Vec<u8> = value.as_bytes().to_vec();
400 let mut new_buf = vec![0_u8; buf_string.len() + 9];
401
402 buf_string.iter().enumerate().for_each(|(i, e)| {
403 new_buf[i + 8] = *e;
404 });
405
406 send_to_task(
407 session_id,
408 channel_id,
409 HdcCommand::ForwardActiveSlave,
410 &new_buf,
411 buf_string.len() + 9,
412 cid,
413 )
414 .await;
415 }
416
daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32)417 pub async fn daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32) {
418 let saddr = format!("127.0.0.1:{}", port);
419 let stream = match TcpStream::connect(saddr).await {
420 Err(err) => {
421 crate::error!("TcpStream::stream failed {:#?}", err);
422 free_context(session_id, channel_id, 0, false).await;
423 return;
424 }
425 Ok(addr) => addr,
426 };
427 send_active_master(session_id, channel_id).await;
428 let (rd, wr) = stream.into_split();
429 TcpWriteStreamMap::put(cid, wr).await;
430 recv_tcp_msg(session_id, channel_id, rd, cid).await;
431 }
432
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32)433 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32) {
434 let task = ForwardTaskMap::get(session_id, channel_id).await;
435 if task.is_none() {
436 return;
437 }
438 let task = &mut task.unwrap().clone();
439 loop {
440 let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
441 let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
442 if recv_size <= 0 {
443 free_context(session_id, channel_id, 0, true).await;
444 crate::info!("local abstract close shutdown");
445 return;
446 }
447 if send_to_task(
448 session_id,
449 channel_id,
450 HdcCommand::ForwardData,
451 &buffer[0..recv_size as usize],
452 recv_size as usize,
453 task.context_forward.id,
454 )
455 .await
456 {
457 crate::info!("send task success");
458 }
459 }
460 }
461
free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool)462 async fn free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool) {
463 crate::info!("free context id = {id}");
464 let task = ForwardTaskMap::get(session_id, channel_id).await;
465 if task.is_none() {
466 return;
467 }
468 let task = &mut task.unwrap().clone();
469 if notify_remote {
470 let vec_none = Vec::<u8>::new();
471 send_to_task(
472 session_id,
473 channel_id,
474 HdcCommand::ForwardFreeContext,
475 &vec_none,
476 0,
477 task.context_forward.id,
478 )
479 .await;
480 }
481 match task.forward_type {
482 ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
483 TcpWriteStreamMap::end(task.context_forward.id).await;
484 }
485 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
486 UdsServer::wrap_close(task.context_forward.fd);
487 }
488 ForwardType::Device => {
489 return;
490 }
491 }
492 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
493 }
494
setup_tcp_point(session_id: u32, channel_id: u32) -> bool495 pub async fn setup_tcp_point(session_id: u32, channel_id: u32) -> bool {
496 let task = ForwardTaskMap::get(session_id, channel_id).await;
497 if task.is_none() {
498 return false;
499 }
500 let task = &mut task.unwrap();
501 let port = task.local_args[1].parse::<u32>().unwrap();
502 let cid = task.context_forward.id;
503 if task.is_master {
504 let parameters = task.remote_parameters.clone();
505 ylong_runtime::spawn(async move {
506 forward_tcp_accept(session_id, channel_id, port, parameters, cid).await
507 });
508 } else {
509 ylong_runtime::spawn(
510 async move { daemon_connect_tcp(session_id, channel_id, port, cid).await },
511 );
512 }
513 true
514 }
515
server_socket_bind_listen( session_id: u32, channel_id: u32, path: String, cid: u32, ) -> bool516 async fn server_socket_bind_listen(
517 session_id: u32,
518 channel_id: u32,
519 path: String,
520 cid: u32,
521 ) -> bool {
522 let task = ForwardTaskMap::get(session_id, channel_id).await;
523 let task = &mut task.unwrap().clone();
524 let parameters = task.remote_parameters.clone();
525 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
526 task.context_forward.fd = fd;
527 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
528
529 let name: Vec<u8> = path.as_bytes().to_vec();
530 let mut socket_name = vec![0_u8; name.len() + 1];
531 socket_name[0] = b'\0';
532 name.iter().enumerate().for_each(|(i, e)| {
533 socket_name[i + 1] = *e;
534 });
535 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
536 if let Ok(addr_obj) = &addr {
537 let ret = UdsServer::wrap_bind(fd, addr_obj);
538 if ret.is_err() {
539 echo_client(session_id, channel_id, "Unix pipe bind failed").await;
540 crate::error!("bind fail");
541 return false;
542 }
543 let ret = UdsServer::wrap_listen(fd);
544 if ret < 0 {
545 echo_client(session_id, channel_id, "Unix pipe listen failed").await;
546 crate::error!("listen fail");
547 return false;
548 }
549 ylong_runtime::spawn(async move {
550 loop {
551 let client_fd = UdsServer::wrap_accept(fd);
552 if client_fd == -1 {
553 break;
554 }
555 ylong_runtime::spawn(on_accept(session_id, channel_id, parameters.clone(), cid));
556 }
557 });
558 }
559 true
560 }
561
canonicalize(path: String) -> Result<String, Error>562 pub async fn canonicalize(path: String) -> Result<String, Error> {
563 match fs::canonicalize(path) {
564 Ok(abs_path) => match abs_path.to_str() {
565 Some(path) => Ok(path.to_string()),
566 None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
567 },
568 Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
569 }
570 }
571
setup_device_point(session_id: u32, channel_id: u32) -> bool572 pub async fn setup_device_point(session_id: u32, channel_id: u32) -> bool {
573 let task = ForwardTaskMap::get(session_id, channel_id).await;
574 if task.is_none() {
575 return false;
576 }
577 let task = &mut task.unwrap().clone();
578 let s_node_cfg = task.local_args[1].clone();
579 let cid = task.context_forward.id;
580
581 let resolve = canonicalize(s_node_cfg).await;
582 if resolve.is_err() {
583 crate::error!("Open unix-dev failed");
584 return false;
585 }
586 let resolv_path = resolve.unwrap();
587 let thread_path_ref = Arc::new(Mutex::new(resolv_path));
588 if !send_active_master(session_id, channel_id).await {
589 return false;
590 }
591
592 ylong_runtime::spawn(async move {
593 loop {
594 let path = thread_path_ref.lock().await;
595 let mut file = File::open(&*path).unwrap();
596 let mut total = Vec::new();
597 let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
598 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
599 let read_len = file.read(&mut buf[4..]).unwrap();
600 if read_len == 0 {
601 free_context(session_id, channel_id, 0, true).await;
602 break;
603 }
604 total.append(&mut buf[0..read_len].to_vec());
605 send_to_task(
606 session_id,
607 channel_id,
608 HdcCommand::ForwardData,
609 &total,
610 read_len,
611 cid,
612 )
613 .await;
614 }
615 });
616 true
617 }
618
get_pid(parameter: &str, forward_type: ForwardType) -> u32619 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
620 let mut res: u32 = 0;
621 if forward_type == ForwardType::Jdwp {
622 let pid = parameter.parse::<u32>();
623 if pid.is_err() {
624 crate::error!("get pid err :{:#?}", pid);
625 return res;
626 }
627 res = pid.unwrap();
628 } else {
629 let params: Vec<&str> = parameter.split('@').collect();
630 let pid = params[0].parse::<u32>();
631 if pid.is_err() {
632 return res;
633 }
634 res = pid.unwrap();
635 }
636 res
637 }
638
setup_jdwp_point(session_id: u32, channel_id: u32) -> bool639 pub async fn setup_jdwp_point(session_id: u32, channel_id: u32) -> bool {
640 let task: Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await;
641 if task.is_none() {
642 return false;
643 }
644 let task = &mut task.unwrap().clone();
645 let local_args = task.local_args[1].clone();
646 let parameter = local_args.as_str();
647 let style = &task.forward_type;
648 let pid = get_pid(parameter, style.clone());
649 let cid = task.context_forward.id;
650 if pid == 0 {
651 return false;
652 }
653
654 let result = UdsServer::wrap_socketpair(SOCK_STREAM);
655 if result.is_err() {
656 return false;
657 }
658 let mut target_fd = 0;
659 let mut local_fd = 0;
660 if let Ok((fd0, fd1)) = result {
661 crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
662 local_fd = fd0;
663 task.context_forward.fd = local_fd;
664 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
665 target_fd = fd1;
666 }
667
668 ylong_runtime::spawn(async move {
669 loop {
670 let mut buffer = [0u8; 1024];
671 crate::info!("jdwp pipe read....");
672 let size = UdsServer::wrap_read(local_fd, &mut buffer);
673 crate::info!("jdwp pipe read.... size: {:#?}", size);
674 if size < 0 {
675 crate::error!("disconnect, error:{:#?}", size);
676 free_context(session_id, channel_id, 0, true).await;
677 break;
678 }
679 send_to_task(
680 session_id,
681 channel_id,
682 HdcCommand::ForwardData,
683 &buffer[0..size as usize],
684 size as usize,
685 cid,
686 )
687 .await;
688 }
689 });
690
691 let jdwp = Jdwp::get_instance();
692 let mut param = task.local_args[0].clone();
693 param.push(':');
694 param.push_str(parameter);
695
696 let ret = jdwp.send_fd_to_target(pid, target_fd, param.as_str()).await;
697 if !ret {
698 crate::error!("not found pid:{:#?}", pid);
699 echo_client(
700 session_id,
701 channel_id,
702 format!("fport fail:pid not found:{}", pid).as_str(),
703 )
704 .await;
705 task_finish(session_id, channel_id).await;
706 return false;
707 }
708
709 let vec_none = Vec::<u8>::new();
710 send_to_task(
711 session_id,
712 channel_id,
713 HdcCommand::ForwardActiveMaster, // 04
714 &vec_none,
715 0,
716 cid,
717 )
718 .await;
719
720 true
721 }
722
echo_client(session_id: u32, channel_id: u32, message: &str)723 async fn echo_client(session_id: u32, channel_id: u32, message: &str) {
724 let echo_message = TaskMessage {
725 channel_id,
726 command: HdcCommand::KernelEchoRaw,
727 payload: message.as_bytes().to_vec(),
728 };
729 transfer::put(session_id, echo_message).await;
730 }
731
task_finish(session_id: u32, channel_id: u32)732 async fn task_finish(session_id: u32, channel_id: u32) {
733 transfer_task_finish(channel_id, session_id).await;
734 }
735
daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String)736 pub async fn daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String) {
737 let name: Vec<u8> = path.as_bytes().to_vec();
738 let mut socket_name = vec![0_u8; name.len() + 1];
739 socket_name[0] = b'\0';
740 name.iter().enumerate().for_each(|(i, e)| {
741 socket_name[i + 1] = *e;
742 });
743 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
744 if let Ok(addr_obj) = &addr {
745 let ret: Result<(), Error> = UdsClient::wrap_connect(fd, addr_obj);
746 if ret.is_err() {
747 echo_client(session_id, channel_id, "localabstract connect fail").await;
748 free_context(session_id, channel_id, 0, true).await;
749 return;
750 }
751 send_active_master(session_id, channel_id).await;
752 read_data_to_forward(session_id, channel_id).await;
753 }
754 }
755
setup_file_point(session_id: u32, channel_id: u32) -> bool756 pub async fn setup_file_point(session_id: u32, channel_id: u32) -> bool {
757 let task: Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await;
758 if task.is_none() {
759 return false;
760 }
761 let task = &mut task.unwrap().clone();
762 let s_node_cfg = task.local_args[1].clone();
763 if task.is_master {
764 if task.forward_type == ForwardType::Reserved
765 || task.forward_type == ForwardType::FileSystem
766 {
767 let _ = fs::remove_file(s_node_cfg.clone());
768 }
769 if !server_socket_bind_listen(session_id, channel_id, s_node_cfg, task.context_forward.id)
770 .await
771 {
772 task_finish(session_id, channel_id).await;
773 return false;
774 }
775 } else if task.forward_type == ForwardType::Abstract {
776 let fd: i32 = UdsClient::wrap_socket(AF_LOCAL);
777 unsafe {
778 libc::fcntl(fd, F_SETFD, FD_CLOEXEC);
779 }
780 task.context_forward.fd = fd;
781 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
782 daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
783 } else {
784 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
785 task.context_forward.fd = fd;
786 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
787 daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
788 }
789 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
790 true
791 }
792
setup_point(session_id: u32, channel_id: u32) -> bool793 pub async fn setup_point(session_id: u32, channel_id: u32) -> bool {
794 if !detech_forward_type(session_id, channel_id).await {
795 crate::error!("forward type is not true");
796 return false;
797 }
798 let task = ForwardTaskMap::get(session_id, channel_id).await;
799 if task.is_none() {
800 return false;
801 }
802 let task = &mut task.unwrap().clone();
803 if cfg!(target_os = "windows") && task.forward_type != ForwardType::Tcp {
804 task.context_forward.last_error = String::from("Not support forward-type");
805 return false;
806 }
807 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
808 let ret = match task.forward_type {
809 ForwardType::Tcp => setup_tcp_point(session_id, channel_id).await,
810 ForwardType::Device => setup_device_point(session_id, channel_id).await,
811 ForwardType::Jdwp | ForwardType::Ark => setup_jdwp_point(session_id, channel_id).await,
812 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
813 setup_file_point(session_id, channel_id).await
814 }
815 };
816 ret
817 }
818
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool819 pub async fn send_to_task(
820 session_id: u32,
821 channel_id: u32,
822 command: HdcCommand,
823 buf_ptr: &[u8],
824 buf_size: usize,
825 cid: u32,
826 ) -> bool {
827 if buf_size > (config::MAX_SIZE_IOBUF * 2) {
828 return false;
829 }
830
831 let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
832 new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
833 let file_check_message = TaskMessage {
834 channel_id,
835 command,
836 payload: new_buf,
837 };
838 transfer::put(session_id, file_check_message).await;
839 true
840 }
841
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>842 pub async fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
843 let bytes = &_payload[4..];
844 let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
845 if let Ok(content) = ct {
846 let mut id_bytes = [0u8; 4];
847 id_bytes.copy_from_slice(&_payload[0..4]);
848 let id: u32 = u32::from_be_bytes(id_bytes);
849 return Ok((content, id));
850 }
851 Err(Error::new(ErrorKind::Other, "filter command failure"))
852 }
853
send_active_master(session_id: u32, channel_id: u32) -> bool854 pub async fn send_active_master(session_id: u32, channel_id: u32) -> bool {
855 let task = ForwardTaskMap::get(session_id, channel_id).await;
856 if task.is_none() {
857 return false;
858 }
859 let task = &mut task.unwrap().clone();
860 if task.context_forward.check_order {
861 let flag = [0u8; 1];
862 send_to_task(
863 session_id,
864 channel_id,
865 HdcCommand::ForwardCheckResult,
866 &flag,
867 1,
868 task.context_forward.id,
869 )
870 .await;
871 free_context(session_id, channel_id, 0, false).await;
872 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
873 return true;
874 }
875 if !send_to_task(
876 session_id,
877 channel_id,
878 HdcCommand::ForwardActiveMaster,
879 &Vec::<u8>::new(),
880 0,
881 task.context_forward.id,
882 )
883 .await
884 {
885 free_context(session_id, channel_id, 0, true).await;
886 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
887 return false;
888 }
889 true
890 }
891
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool892 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
893 let s = String::from_utf8(_payload.to_vec());
894 if s.is_err() {
895 crate::error!("cmd argv is not int utf8");
896 return false;
897 }
898 let command = s.unwrap();
899 crate::info!("begin forward, command: {:#?}", command);
900 let task = ForwardTaskMap::get(session_id, channel_id).await;
901 if task.is_none() {
902 crate::error!("begin forward get task is none");
903 return false;
904 }
905 let task = &mut task.unwrap().clone();
906 let result = Base::split_command_to_args(&command);
907 let argv = result.0;
908 let argc = result.1;
909 task.context_forward.id = get_id(_payload);
910 task.is_master = true;
911
912 if argc < ARG_COUNT2 {
913 return false;
914 }
915 if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
916 return false;
917 }
918 if !check_node_info(&argv[0], &mut task.local_args).await {
919 return false;
920 }
921 if !check_node_info(&argv[1], &mut task.remote_args).await {
922 return false;
923 }
924 task.remote_parameters = argv[1].clone();
925 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
926 if !setup_point(session_id, channel_id).await {
927 crate::error!("setup point return false");
928 return false;
929 }
930
931 let task = ForwardTaskMap::get(session_id, channel_id).await;
932 let task = &mut task.unwrap().clone();
933 task.map_ctx_point
934 .insert(task.context_forward.id, task.context_forward.clone());
935
936 let wake_up_message = TaskMessage {
937 channel_id,
938 command: HdcCommand::KernelWakeupSlavetask,
939 payload: Vec::<u8>::new(),
940 };
941 transfer::put(session_id, wake_up_message).await;
942
943 let buf_string: Vec<u8> = argv[1].as_bytes().to_vec();
944 let mut new_buf = vec![0_u8; buf_string.len() + 9];
945 buf_string.iter().enumerate().for_each(|(i, e)| {
946 new_buf[i + 8] = *e;
947 });
948 send_to_task(
949 session_id,
950 channel_id,
951 HdcCommand::ForwardCheck,
952 &new_buf,
953 buf_string.len() + 9,
954 task.context_forward.id,
955 )
956 .await;
957 task.task_command = command.clone();
958 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
959 true
960 }
961
slave_connect( session_id: u32, channel_id: u32, _payload: &[u8], check_order: bool, error: &mut String, ) -> bool962 pub async fn slave_connect(
963 session_id: u32,
964 channel_id: u32,
965 _payload: &[u8],
966 check_order: bool,
967 error: &mut String,
968 ) -> bool {
969 let task = ForwardTaskMap::get(session_id, channel_id).await;
970 if task.is_none() {
971 return false;
972 }
973 let task = &mut task.unwrap().clone();
974 task.is_master = false;
975 task.context_forward.check_order = check_order;
976 if let Ok((content, id)) = filter_command(_payload).await {
977 let content = &content[8..].trim_end_matches('\0').to_string();
978 if !check_node_info(content, &mut task.local_args).await {
979 crate::error!("check local args is false");
980 return false;
981 }
982 task.context_forward.id = id;
983 }
984 task.map_ctx_point
985 .insert(task.context_forward.id, task.context_forward.clone());
986 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
987 if !check_order {
988 if !setup_point(session_id, channel_id).await {
989 crate::error!("setup point return false, free context");
990 free_context(session_id, channel_id, 0, true).await;
991 return false;
992 }
993 *error = task.context_forward.last_error.clone();
994 } else {
995 send_active_master(session_id, channel_id).await;
996 }
997 *error = task.context_forward.last_error.clone();
998 true
999 }
1000
read_data_to_forward(session_id: u32, channel_id: u32) -> bool1001 pub async fn read_data_to_forward(session_id: u32, channel_id: u32) -> bool {
1002 let task = ForwardTaskMap::get(session_id, channel_id).await;
1003 if task.is_none() {
1004 return false;
1005 }
1006 let task = &mut task.unwrap();
1007 let cid = task.context_forward.id;
1008 match task.forward_type {
1009 ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
1010 ylong_runtime::spawn(async move {
1011 TcpReadStreamMap::read(session_id, channel_id, cid).await
1012 });
1013 }
1014 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1015 let fd = task.context_forward.fd;
1016 ylong_runtime::spawn(async move {
1017 deamon_read_socket_msg(session_id, channel_id, fd).await
1018 });
1019 }
1020 ForwardType::Device => {
1021 if !setup_device_point(session_id, channel_id).await {
1022 return false;
1023 }
1024 }
1025 }
1026 true
1027 }
1028
write_forward_bufer( session_id: u32, channel_id: u32, id: u32, content: Vec<u8>, ) -> bool1029 pub async fn write_forward_bufer(
1030 session_id: u32,
1031 channel_id: u32,
1032 id: u32,
1033 content: Vec<u8>,
1034 ) -> bool {
1035 let task = ForwardTaskMap::get(session_id, channel_id).await;
1036 if task.is_none() {
1037 return false;
1038 }
1039 let task = &mut task.unwrap();
1040 if task.forward_type == ForwardType::Tcp {
1041 TcpWriteStreamMap::write(id, content).await;
1042 } else {
1043 let fd = task.context_forward.fd;
1044 UdsClient::wrap_send(fd, &content);
1045 }
1046 true
1047 }
1048
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1049 pub async fn forward_command_dispatch(
1050 session_id: u32,
1051 channel_id: u32,
1052 command: HdcCommand,
1053 _payload: &[u8],
1054 ) -> bool {
1055 let task = ForwardTaskMap::get(session_id, channel_id).await;
1056 if task.is_none() {
1057 return false;
1058 }
1059 let task: &mut HdcForward = &mut task.unwrap().clone();
1060 let mut ret: bool = true;
1061 if let Ok((_content, id)) = filter_command(_payload).await {
1062 task.context_forward.id = id;
1063 }
1064 let send_msg = _payload[4..].to_vec();
1065 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1066 match command {
1067 HdcCommand::ForwardCheckResult => {
1068 ret = check_command(session_id, channel_id, _payload).await;
1069 }
1070 HdcCommand::ForwardData => {
1071 ret = write_forward_bufer(session_id, channel_id, task.context_forward.id, send_msg)
1072 .await;
1073 }
1074 HdcCommand::ForwardFreeContext => {
1075 free_context(session_id, channel_id, 0, false).await;
1076 }
1077 HdcCommand::ForwardActiveMaster => {
1078 ret = true;
1079 }
1080 _ => {
1081 ret = false;
1082 }
1083 }
1084 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1085 ret
1086 }
1087
print_error_info(session_id: u32, channel_id: u32, error: &mut String)1088 pub async fn print_error_info(session_id: u32, channel_id: u32, error: &mut String) {
1089 if error.is_empty() {
1090 echo_client(session_id, channel_id, "forward arguments parse is fail").await;
1091 } else {
1092 echo_client(session_id, channel_id, error.as_str()).await;
1093 }
1094 }
1095
command_dispatch( session_id: u32, channel_id: u32, _command: HdcCommand, _payload: &[u8], _payload_size: u16, ) -> bool1096 pub async fn command_dispatch(
1097 session_id: u32,
1098 channel_id: u32,
1099 _command: HdcCommand,
1100 _payload: &[u8],
1101 _payload_size: u16,
1102 ) -> bool {
1103 let mut error = String::from("");
1104 crate::info!("command_dispatch command recv: {:#?}", _command);
1105 let ret = match _command {
1106 HdcCommand::ForwardInit => begin_forward(session_id, channel_id, _payload).await,
1107 HdcCommand::ForwardCheck => {
1108 slave_connect(session_id, channel_id, _payload, true, &mut error).await
1109 }
1110 HdcCommand::ForwardActiveSlave => {
1111 slave_connect(session_id, channel_id, _payload, false, &mut error).await
1112 }
1113 _ => forward_command_dispatch(session_id, channel_id, _command, _payload).await,
1114 };
1115 crate::info!("command dispatch ret: {:#?}", ret);
1116 if !ret {
1117 print_error_info(session_id, channel_id, &mut error).await;
1118 task_finish(session_id, channel_id).await;
1119 return false;
1120 }
1121 ret
1122 }
1123