1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 use std::fs;
26 #[cfg(not(target_os = "windows"))]
27 use std::fs::File;
28 #[cfg(not(target_os = "windows"))]
29 use std::io::Read;
30 use std::io::{self, Error, ErrorKind};
31 use ylong_runtime::sync::{Mutex, RwLock};
32
33 use crate::common::base::Base;
34 use crate::common::hdctransfer::transfer_task_finish;
35 use crate::common::hdctransfer::{self, HdcTransferBase};
36 #[cfg(not(feature = "host"))]
37 use crate::common::jdwp::Jdwp;
38 #[cfg(not(target_os = "windows"))]
39 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
40 use crate::config;
41 use crate::config::HdcCommand;
42 use crate::config::MessageLevel;
43 use crate::config::TaskMessage;
44 use crate::transfer;
45 #[allow(unused)]
46 use crate::utils::hdc_log::*;
47 use std::sync::Arc;
48 #[cfg(not(feature = "host"))]
49 use std::time::Duration;
50 use ylong_runtime::io::AsyncReadExt;
51 use ylong_runtime::io::AsyncWriteExt;
52 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
53 use ylong_runtime::task::JoinHandle;
54
55 pub const ARG_COUNT2: u32 = 2;
56 pub const BUF_SIZE_SMALL: usize = 256;
57 pub const SOCKET_BUFFER_SIZE: usize = 65535;
58 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
59 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
60
61 #[cfg(feature = "host")]
62 #[derive(Clone, Debug)]
63 pub struct HdcForwardInfo {
64 pub session_id: u32,
65 pub channel_id: u32,
66 pub forward_direction: bool,
67 pub task_string: String,
68 pub connect_key: String,
69 }
70
71 #[cfg(feature = "host")]
72 impl HdcForwardInfo {
new( session_id: u32, channel_id: u32, forward_direction: bool, task_string: String, connect_key: String, ) -> Self73 fn new(
74 session_id: u32,
75 channel_id: u32,
76 forward_direction: bool,
77 task_string: String,
78 connect_key: String,
79 ) -> Self {
80 Self {
81 session_id,
82 channel_id,
83 forward_direction,
84 task_string,
85 connect_key,
86 }
87 }
88 }
89
90 #[cfg(feature = "host")]
91 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
92 #[cfg(feature = "host")]
93 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
94 #[cfg(feature = "host")]
95 pub struct HdcForwardInfoMap {}
96 #[cfg(feature = "host")]
97 impl HdcForwardInfoMap {
get_instance() -> HdcForwardInfoMap_98 fn get_instance() -> HdcForwardInfoMap_ {
99 static mut MAP: Option<HdcForwardInfoMap_> = None;
100 unsafe {
101 MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
102 .clone()
103 }
104 }
105
put(forward_info: HdcForwardInfo)106 async fn put(forward_info: HdcForwardInfo) {
107 let instance = Self::get_instance();
108 let mut map = instance.lock().await;
109 map.insert(
110 forward_info.task_string.clone(),
111 Arc::new(Mutex::new(forward_info)),
112 );
113 }
114
get_all_forward_infos() -> Vec<HdcForwardInfo>115 pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
116 let instance = Self::get_instance();
117 let map = instance.lock().await;
118 let mut result = Vec::new();
119 for (_key, value) in map.iter() {
120 let info = value.lock().await;
121 result.push((*info).clone());
122 }
123 result
124 }
125
remove_forward(task_string: String, forward_direction: bool) -> bool126 pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
127 crate::info!(
128 "remove_forward task_string:{}, direction:{}",
129 task_string,
130 forward_direction
131 );
132 let instance = Self::get_instance();
133 let map = instance.lock().await;
134 let mut remove_key = String::new();
135 let prefix = if forward_direction {
136 "1|".to_string()
137 } else {
138 "0|".to_string()
139 };
140 let mut task_string1 = prefix;
141 task_string1.push_str(task_string.as_str());
142 for (key, value) in map.iter() {
143 let info = value.lock().await;
144 if info.task_string.contains(&task_string1)
145 && info.forward_direction == forward_direction
146 {
147 remove_key = (*key.clone()).to_string();
148 break;
149 }
150 }
151 drop(map);
152 if remove_key.is_empty() {
153 return false;
154 }
155
156 let mut map = instance.lock().await;
157 let result = map.remove(&remove_key);
158 result.is_some()
159 }
160 }
161
162 type TcpRead = Arc<Mutex<SplitReadHalf>>;
163 type TcpReadMap_ = Arc<RwLock<HashMap<u32, TcpRead>>>;
164 pub struct TcpReadStreamMap {}
165 impl TcpReadStreamMap {
get_instance() -> TcpReadMap_166 fn get_instance() -> TcpReadMap_ {
167 static mut TCP_MAP: Option<TcpReadMap_> = None;
168 unsafe {
169 TCP_MAP
170 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
171 .clone()
172 }
173 }
174 #[allow(unused)]
put(id: u32, rd: SplitReadHalf)175 async fn put(id: u32, rd: SplitReadHalf) {
176 let instance = Self::get_instance();
177 let mut map = instance.write().await;
178 let arc_rd = Arc::new(Mutex::new(rd));
179 map.insert(id, arc_rd);
180 }
181 #[allow(unused)]
read(session_id: u32, channel_id: u32, cid: u32)182 async fn read(session_id: u32, channel_id: u32, cid: u32) {
183 let arc_map = Self::get_instance();
184 let map = arc_map.read().await;
185 let Some(arc_rd) = map.get(&cid) else {
186 crate::error!("TcpReadStreamMap failed to get cid {:#?}", cid);
187 return;
188 };
189 let rd = &mut arc_rd.lock().await;
190 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
191 loop {
192 match rd.read(&mut data).await {
193 Ok(recv_size) => {
194 if recv_size == 0 {
195 free_context(session_id, channel_id, 0, true).await;
196 crate::info!("tcp close shutdown, channel_id = {:#?}", channel_id);
197 return;
198 }
199 if send_to_task(
200 session_id,
201 channel_id,
202 HdcCommand::ForwardData,
203 &data[0..recv_size],
204 recv_size,
205 cid,
206 )
207 .await
208 {
209 crate::info!("send task success");
210 }
211 }
212 Err(_e) => {
213 crate::error!("tcp stream rd read failed");
214 }
215 }
216 }
217 }
218 }
219
220 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
221 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
222 pub struct TcpWriteStreamMap {}
223 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_224 fn get_instance() -> TcpWriterMap_ {
225 static mut TCP_MAP: Option<TcpWriterMap_> = None;
226 unsafe {
227 TCP_MAP
228 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
229 .clone()
230 }
231 }
232 #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)233 async fn put(id: u32, wr: SplitWriteHalf) {
234 let instance = Self::get_instance();
235 let mut map = instance.write().await;
236 let arc_wr = Arc::new(Mutex::new(wr));
237 map.insert(id, arc_wr);
238 }
239 #[allow(unused)]
write(id: u32, data: Vec<u8>)240 async fn write(id: u32, data: Vec<u8>) {
241 let arc_map = Self::get_instance();
242 let map = arc_map.write().await;
243 let Some(arc_wr) = map.get(&id) else {
244 crate::error!("TcpReadStreamMap failed to get id {:#?}", id);
245 return;
246 };
247 let mut wr = arc_wr.lock().await;
248 let _ = wr.write_all(data.as_slice()).await;
249 }
250
end(id: u32)251 pub async fn end(id: u32) {
252 let instance = Self::get_instance();
253 let mut map = instance.write().await;
254 if let Some(arc_wr) = map.remove(&id) {
255 let mut wr = arc_wr.lock().await;
256 let _ = wr.shutdown().await;
257 }
258 }
259 }
260
261 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
262 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
263 pub struct TcpListenerMap {}
264 impl TcpListenerMap {
get_instance() -> TcpListenerMap_265 fn get_instance() -> TcpListenerMap_ {
266 static mut TCP_MAP: Option<TcpListenerMap_> = None;
267 unsafe {
268 TCP_MAP
269 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
270 .clone()
271 }
272 }
273 #[allow(unused)]
put(id: u32, listener: JoinHandle<()>)274 async fn put(id: u32, listener: JoinHandle<()>) {
275 let instance = Self::get_instance();
276 let mut map = instance.write().await;
277 let arc_listener = Arc::new(Mutex::new(listener));
278 map.insert(id, arc_listener);
279 crate::info!("forward tcp put listener id = {id}");
280 }
281
end(id: u32)282 pub async fn end(id: u32) {
283 let instance = Self::get_instance();
284 let mut map = instance.write().await;
285 if let Some(arc_listener) = map.remove(&id) {
286 let join_handle = arc_listener.lock().await;
287 join_handle.cancel();
288 }
289 }
290 }
291
292 #[derive(Default, Eq, PartialEq, Clone, Debug)]
293 enum ForwardType {
294 #[default]
295 Tcp = 0,
296 Device,
297 Abstract,
298 FileSystem,
299 Jdwp,
300 Ark,
301 Reserved,
302 }
303
304 #[derive(Debug, Default, PartialEq, Eq, Clone)]
305 pub struct ContextForward {
306 session_id: u32,
307 channel_id: u32,
308 check_order: bool,
309 id: u32,
310 fd: i32,
311 remote_parameters: String,
312 last_error: String,
313 forward_type: ForwardType,
314 }
315
316 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
317 pub struct ForwardTaskMap {}
318 impl ForwardTaskMap {
get_instance() -> MapForward_319 fn get_instance() -> MapForward_ {
320 static mut FORWARD_MAP: Option<MapForward_> = None;
321 unsafe {
322 FORWARD_MAP
323 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
324 .clone()
325 }
326 }
327
update(session_id: u32, channel_id: u32, value: HdcForward)328 pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
329 let map = Self::get_instance();
330 let mut map = map.lock().await;
331 map.insert((session_id, channel_id), value.clone());
332 }
333
remove(session_id: u32, channel_id: u32)334 pub async fn remove(session_id: u32, channel_id: u32) {
335 crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
336 let map = Self::get_instance();
337 let mut map = map.lock().await;
338 let _ = map.remove(&(session_id, channel_id));
339 }
340
get(session_id: u32, channel_id: u32) -> Option<HdcForward>341 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
342 let arc = Self::get_instance();
343 let map = arc.lock().await;
344 let task = map.get(&(session_id, channel_id));
345 match task {
346 Some(task) => Some(task.clone()),
347 None => {
348 crate::error!(
349 "ForwardTaskMap result:is none,session_id={:#?}, channel_id={:#?}",
350 session_id,
351 channel_id
352 );
353 Option::None
354 }
355 }
356 }
357
get_channel_id(session_id: u32, task_string: String) -> Option<u32>358 pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
359 let arc = Self::get_instance();
360 let map = arc.lock().await;
361 for ((_session_id, _channel_id), value) in map.iter() {
362 if *_session_id == session_id && task_string.contains(&value.task_command) {
363 return Some(*_channel_id);
364 }
365 }
366 None
367 }
368
clear(session_id: u32)369 pub async fn clear(session_id: u32) {
370 let arc = Self::get_instance();
371 let mut channel_list = Vec::new();
372 {
373 let map = arc.lock().await;
374 if map.is_empty() {
375 return;
376 }
377 for (&key, _) in map.iter() {
378 if key.0 == session_id {
379 let id = key;
380 channel_list.push(id);
381 }
382 }
383 }
384 for id in channel_list {
385 free_channel_task(id.0, id.1).await;
386 }
387 }
388
dump_task() -> String389 pub async fn dump_task() -> String {
390 let arc = Self::get_instance();
391 let map = arc.lock().await;
392 let mut result = String::new();
393 for (_id, forward_task) in map.iter() {
394 let forward_type = match forward_task.remote_args.len() {
395 0 => "fport".to_string(),
396 2 => "rport".to_string(),
397 _ => "unknown".to_string(),
398 };
399 let first_args = match forward_task.remote_args.len() {
400 0 => "unknown".to_string(),
401 2 => format!(
402 "{}:{}",
403 forward_task.local_args[0], forward_task.local_args[1]
404 ),
405 _ => "unknown".to_string(),
406 };
407 let second_args = match forward_task.remote_args.len() {
408 0 => format!(
409 "{}:{}",
410 forward_task.local_args[0], forward_task.local_args[1]
411 ),
412 2 => format!(
413 "{}:{}",
414 forward_task.remote_args[0], forward_task.remote_args[1]
415 ),
416 _ => "unknown".to_string(),
417 };
418 result.push_str(&format!(
419 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
420 forward_task.session_id,
421 forward_task.channel_id,
422 forward_type,
423 first_args,
424 second_args
425 ));
426 }
427 result
428 }
429 }
430
free_channel_task(session_id: u32, channel_id: u32)431 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
432 free_context(session_id, channel_id, 0, false).await;
433 }
434
stop_task(session_id: u32)435 pub async fn stop_task(session_id: u32) {
436 ForwardTaskMap::clear(session_id).await;
437 }
438
dump_task() -> String439 pub async fn dump_task() -> String {
440 ForwardTaskMap::dump_task().await
441 }
442
443 #[derive(Debug, Default, Clone, PartialEq, Eq)]
444 pub struct HdcForward {
445 session_id: u32,
446 channel_id: u32,
447 is_master: bool,
448 local_args: Vec<String>,
449 remote_args: Vec<String>,
450 remote_parameters: String,
451 task_command: String,
452 forward_type: ForwardType,
453 context_forward: ContextForward,
454 map_ctx_point: HashMap<u32, ContextForward>,
455 pub transfer: HdcTransferBase,
456 }
457
458 impl HdcForward {
new(session_id: u32, channel_id: u32) -> Self459 pub fn new(session_id: u32, channel_id: u32) -> Self {
460 Self {
461 session_id,
462 channel_id,
463 is_master: Default::default(),
464 local_args: Default::default(),
465 remote_args: Default::default(),
466 task_command: Default::default(),
467 remote_parameters: Default::default(),
468 forward_type: Default::default(),
469 context_forward: Default::default(),
470 map_ctx_point: HashMap::new(),
471 transfer: HdcTransferBase::new(session_id, channel_id),
472 }
473 }
474 }
475
get_id(_payload: &[u8]) -> u32476 pub fn get_id(_payload: &[u8]) -> u32 {
477 let mut id_bytes = [0u8; 4];
478 id_bytes.copy_from_slice(&_payload[0..4]);
479 let id: u32 = u32::from_be_bytes(id_bytes);
480 id
481 }
482
check_node_info(value: &String, arg: &mut Vec<String>) -> bool483 pub async fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
484 crate::info!("check cmd args value is: {:#?}", value);
485 if !value.contains(':') {
486 return false;
487 }
488 let array = value.split(':').collect::<Vec<&str>>();
489
490 if array[0] == "tcp" {
491 if array[1].len() > config::MAX_PORT_LEN {
492 crate::error!(
493 "forward port = {:#?} it'slength is wrong, can not more than five",
494 array[1]
495 );
496 return false;
497 }
498
499 match array[1].parse::<u32>() {
500 Ok(port) => {
501 if port == 0 || port > config::MAX_PORT_NUM {
502 crate::error!("port can not greater than: 65535");
503 return false;
504 }
505 }
506 Err(_) => {
507 crate::error!("port must is int type, port is: {:#?}", array[1]);
508 return false;
509 }
510 }
511 }
512 for item in array.iter() {
513 arg.push(String::from(item.to_owned()));
514 }
515 true
516 }
517
518 #[cfg(feature = "host")]
on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()>519 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
520 crate::info!("on_forward_success");
521 let channel_id = task_message.channel_id;
522 let payload = task_message.payload;
523 let forward_direction = payload[0] == b'1';
524 let task_string = String::from_utf8(payload);
525 let connect_key = "unknow key".to_string();
526 if task_string.is_ok() {
527 let info = HdcForwardInfo::new(
528 session_id,
529 channel_id,
530 forward_direction,
531 task_string.unwrap(),
532 connect_key,
533 );
534 HdcForwardInfoMap::put(info).await;
535 }
536 transfer::TcpMap::end(task_message.channel_id).await;
537 Ok(())
538 }
539
check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool540 pub async fn check_command(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
541 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
542 crate::error!("check_command task is none");
543 return false;
544 };
545 let task = &mut task.clone();
546 if !_payload.is_empty() {
547 echo_client(
548 session_id,
549 channel_id,
550 "Forwardport result:OK",
551 MessageLevel::Ok,
552 )
553 .await;
554 let map_info = String::from(if task.transfer.server_or_daemon {
555 "1|"
556 } else {
557 "0|"
558 }) + &task.task_command;
559
560 let mut command_string = vec![0_u8; map_info.len() + 1];
561 map_info
562 .as_bytes()
563 .to_vec()
564 .iter()
565 .enumerate()
566 .for_each(|(i, e)| {
567 command_string[i] = *e;
568 });
569 let forward_success_message = TaskMessage {
570 channel_id,
571 command: HdcCommand::ForwardSuccess,
572 payload: command_string,
573 };
574 #[cfg(feature = "host")]
575 {
576 let _ = on_forward_success(forward_success_message, session_id).await;
577 }
578 #[cfg(not(feature = "host"))]
579 {
580 transfer::put(session_id, forward_success_message).await;
581 }
582 } else {
583 echo_client(
584 session_id,
585 channel_id,
586 "Forwardport result: Failed",
587 MessageLevel::Fail,
588 )
589 .await;
590 free_context(session_id, channel_id, 0, false).await;
591 return false;
592 }
593 true
594 }
595
detech_forward_type(session_id: u32, channel_id: u32) -> bool596 pub async fn detech_forward_type(session_id: u32, channel_id: u32) -> bool {
597 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
598 crate::error!("detech_forward_type get task is none session_id = {session_id}, channel_id = {channel_id}");
599 return false;
600 };
601 let task = &mut task.clone();
602
603 let type_str = &task.local_args[0];
604
605 match type_str.as_str() {
606 "tcp" => {
607 task.forward_type = ForwardType::Tcp;
608 }
609 "dev" => {
610 task.forward_type = ForwardType::Device;
611 }
612 "localabstract" => {
613 task.forward_type = ForwardType::Abstract;
614 }
615 "localfilesystem" => {
616 task.local_args[1] = HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &task.local_args[1];
617 task.forward_type = ForwardType::FileSystem;
618 }
619 "jdwp" => {
620 task.forward_type = ForwardType::Jdwp;
621 }
622 "ark" => {
623 task.forward_type = ForwardType::Ark;
624 }
625 "localreserved" => {
626 task.local_args[1] = FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &task.local_args[1];
627 task.forward_type = ForwardType::Reserved;
628 }
629 _ => {
630 crate::error!("this forward type may is not expected");
631 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
632 return false;
633 }
634 }
635 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
636 true
637 }
638
forward_tcp_accept( session_id: u32, channel_id: u32, port: u32, value: String, cid: u32, ) -> io::Result<()>639 pub async fn forward_tcp_accept(
640 session_id: u32,
641 channel_id: u32,
642 port: u32,
643 value: String,
644 cid: u32,
645 ) -> io::Result<()> {
646 let saddr = format!("127.0.0.1:{}", port);
647 crate::info!("forward_tcp_accept bind addr:{:#?}", saddr);
648 let result = TcpListener::bind(saddr.clone()).await;
649 match result {
650 Ok(listener) => {
651 crate::info!("forward_tcp_accept bind ok");
652 let join_handle = ylong_runtime::spawn(async move {
653 loop {
654 let client = listener.accept().await;
655 if client.is_err() {
656 continue;
657 }
658 let (stream, _addr) = client.unwrap();
659 let (rd, wr) = stream.into_split();
660 TcpWriteStreamMap::put(channel_id, wr).await;
661 ylong_runtime::spawn(on_accept(session_id, channel_id, value.clone(), cid));
662 recv_tcp_msg(session_id, channel_id, rd, cid).await;
663 }
664 });
665 TcpListenerMap::put(channel_id, join_handle).await;
666 Ok(())
667 }
668 Err(e) => {
669 crate::error!("forward_tcp_accept fail:{:#?}", e);
670 Err(e)
671 }
672 }
673 }
674
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)675 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
676 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
677 loop {
678 match rd.read(&mut data).await {
679 Ok(recv_size) => {
680 if recv_size == 0 {
681 free_context(session_id, channel_id, 0, true).await;
682 drop(rd);
683 crate::info!("recv_size is 0, tcp close shutdown");
684 return;
685 }
686 if send_to_task(
687 session_id,
688 channel_id,
689 HdcCommand::ForwardData,
690 &data[0..recv_size],
691 recv_size,
692 cid,
693 )
694 .await
695 {
696 crate::info!("send task success");
697 }
698 }
699 Err(_e) => {
700 crate::error!(
701 "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
702 );
703 }
704 }
705 }
706 }
707
on_accept(session_id: u32, channel_id: u32, value: String, cid: u32)708 pub async fn on_accept(session_id: u32, channel_id: u32, value: String, cid: u32) {
709 let buf_string: Vec<u8> = value.as_bytes().to_vec();
710 let mut new_buf = vec![0_u8; buf_string.len() + 9];
711
712 buf_string.iter().enumerate().for_each(|(i, e)| {
713 new_buf[i + 8] = *e;
714 });
715
716 send_to_task(
717 session_id,
718 channel_id,
719 HdcCommand::ForwardActiveSlave,
720 &new_buf,
721 buf_string.len() + 9,
722 cid,
723 )
724 .await;
725 }
726
daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32)727 pub async fn daemon_connect_tcp(session_id: u32, channel_id: u32, port: u32, cid: u32) {
728 let saddr = format!("127.0.0.1:{}", port);
729 let stream = match TcpStream::connect(saddr).await {
730 Err(err) => {
731 crate::error!("TcpStream::stream failed {:?}", err);
732 free_context(session_id, channel_id, 0, false).await;
733 return;
734 }
735 Ok(addr) => addr,
736 };
737 send_active_master(session_id, channel_id).await;
738 let (rd, wr) = stream.into_split();
739 TcpWriteStreamMap::put(channel_id, wr).await;
740 recv_tcp_msg(session_id, channel_id, rd, cid).await;
741 }
742
743 #[cfg(not(target_os = "windows"))]
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32)744 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32) {
745 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
746 crate::error!("deamon_read_socket_msg get task is none session_id={session_id},channel_id={channel_id}");
747 return;
748 };
749 let task = &mut task.clone();
750 loop {
751 let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
752 let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
753 if recv_size <= 0 {
754 free_context(session_id, channel_id, 0, true).await;
755 crate::info!("local abstract close shutdown");
756 return;
757 }
758 if send_to_task(
759 session_id,
760 channel_id,
761 HdcCommand::ForwardData,
762 &buffer[0..recv_size as usize],
763 recv_size as usize,
764 task.context_forward.id,
765 )
766 .await
767 {
768 crate::info!("send task success");
769 }
770 }
771 }
772
free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool)773 pub async fn free_context(session_id: u32, channel_id: u32, id: u32, notify_remote: bool) {
774 crate::info!("free context id = {id}");
775 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
776 crate::error!(
777 "free_context get task is none session_id={session_id},channel_id={channel_id}"
778 );
779 return;
780 };
781 let task = &mut task.clone();
782 if notify_remote {
783 let vec_none = Vec::<u8>::new();
784 send_to_task(
785 session_id,
786 channel_id,
787 HdcCommand::ForwardFreeContext,
788 &vec_none,
789 0,
790 task.context_forward.id,
791 )
792 .await;
793 }
794 match task.forward_type {
795 ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
796 TcpWriteStreamMap::end(channel_id).await;
797 TcpListenerMap::end(channel_id).await;
798 }
799 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
800 #[cfg(not(target_os = "windows"))]
801 UdsServer::wrap_close(task.context_forward.fd);
802 }
803 ForwardType::Device => {
804 return;
805 }
806 }
807 ForwardTaskMap::remove(session_id, channel_id).await;
808 }
809
setup_tcp_point(session_id: u32, channel_id: u32) -> bool810 pub async fn setup_tcp_point(session_id: u32, channel_id: u32) -> bool {
811 let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
812 crate::error!(
813 "setup_tcp_point get task is none session_id={session_id},channel_id={channel_id}"
814 );
815 return false;
816 };
817 let task = &mut task;
818 let Ok(port) = task.local_args[1].parse::<u32>() else {
819 crate::error!("setup_tcp_point parse error");
820 return false;
821 };
822 let cid = task.context_forward.id;
823 if task.is_master {
824 let parameters = task.remote_parameters.clone();
825 let result = forward_tcp_accept(session_id, channel_id, port, parameters, cid).await;
826 crate::info!("setup_tcp_point result:{:?}", result);
827 task.context_forward.last_error = format!("TCP Port listen failed at {}", port);
828 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
829 return result.is_ok();
830 } else {
831 crate::info!("setup_tcp_point slaver");
832 ylong_runtime::spawn(
833 async move { daemon_connect_tcp(session_id, channel_id, port, cid).await },
834 );
835 }
836 true
837 }
838
839 #[cfg(not(target_os = "windows"))]
server_socket_bind_listen( session_id: u32, channel_id: u32, path: String, cid: u32, ) -> bool840 async fn server_socket_bind_listen(
841 session_id: u32,
842 channel_id: u32,
843 path: String,
844 cid: u32,
845 ) -> bool {
846 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
847 crate::error!(
848 "setup_tcp_point get task is none session_id={session_id},channel_id={channel_id}"
849 );
850 return false;
851 };
852 let task = &mut task.clone();
853 let parameters = task.remote_parameters.clone();
854 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
855 task.context_forward.fd = fd;
856 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
857
858 let name: Vec<u8> = path.as_bytes().to_vec();
859 let mut socket_name = vec![0_u8; name.len() + 1];
860 socket_name[0] = b'\0';
861 name.iter().enumerate().for_each(|(i, e)| {
862 socket_name[i + 1] = *e;
863 });
864 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
865 if let Ok(addr_obj) = &addr {
866 let ret = UdsServer::wrap_bind(fd, addr_obj);
867 if ret.is_err() {
868 echo_client(
869 session_id,
870 channel_id,
871 "Unix pipe bind failed",
872 MessageLevel::Fail,
873 )
874 .await;
875 crate::error!("bind fail");
876 return false;
877 }
878 let ret = UdsServer::wrap_listen(fd);
879 if ret < 0 {
880 echo_client(
881 session_id,
882 channel_id,
883 "Unix pipe listen failed",
884 MessageLevel::Fail,
885 )
886 .await;
887 crate::error!("listen fail");
888 return false;
889 }
890 ylong_runtime::spawn(async move {
891 loop {
892 let client_fd = UdsServer::wrap_accept(fd);
893 if client_fd == -1 {
894 break;
895 }
896 ylong_runtime::spawn(on_accept(session_id, channel_id, parameters.clone(), cid));
897 }
898 });
899 }
900 true
901 }
902
canonicalize(path: String) -> Result<String, Error>903 pub async fn canonicalize(path: String) -> Result<String, Error> {
904 match fs::canonicalize(path) {
905 Ok(abs_path) => match abs_path.to_str() {
906 Some(path) => Ok(path.to_string()),
907 None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
908 },
909 Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
910 }
911 }
912
913 #[cfg(target_os = "windows")]
setup_device_point(_session_id: u32, _channel_id: u32) -> bool914 pub async fn setup_device_point(_session_id: u32, _channel_id: u32) -> bool {
915 false
916 }
917
918 #[cfg(not(target_os = "windows"))]
setup_device_point(session_id: u32, channel_id: u32) -> bool919 pub async fn setup_device_point(session_id: u32, channel_id: u32) -> bool {
920 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
921 crate::error!(
922 "setup_device_point get task is none session_id={session_id},channel_id={channel_id}"
923 );
924 return false;
925 };
926 let task = &mut task.clone();
927 let s_node_cfg = task.local_args[1].clone();
928 let cid = task.context_forward.id;
929
930 let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
931 crate::error!("Open unix-dev failed");
932 return false;
933 };
934 let thread_path_ref = Arc::new(Mutex::new(resolv_path));
935 if !send_active_master(session_id, channel_id).await {
936 crate::error!(
937 "send_active_master return failed channel_id={:?}",
938 channel_id
939 );
940 return false;
941 }
942
943 ylong_runtime::spawn(async move {
944 loop {
945 let path = thread_path_ref.lock().await;
946 let Ok(mut file) = File::open(&*path) else {
947 crate::error!("open {} failed.", *path);
948 break;
949 };
950 let mut total = Vec::new();
951 let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
952 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
953 let Ok(read_len) = file.read(&mut buf[4..]) else {
954 crate::error!("read {} failed.", *path);
955 break;
956 };
957 if read_len == 0 {
958 free_context(session_id, channel_id, 0, true).await;
959 break;
960 }
961 total.append(&mut buf[0..read_len].to_vec());
962 send_to_task(
963 session_id,
964 channel_id,
965 HdcCommand::ForwardData,
966 &total,
967 read_len,
968 cid,
969 )
970 .await;
971 }
972 });
973 true
974 }
975
976 #[cfg(not(feature = "host"))]
get_pid(parameter: &str, forward_type: ForwardType) -> u32977 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
978 match forward_type == ForwardType::Jdwp {
979 true => parameter.parse::<u32>().unwrap_or_else(|e| {
980 crate::error!("Jdwp get pid err :{:?}", e);
981 0_u32
982 }),
983 false => {
984 let params: Vec<&str> = parameter.split('@').collect();
985 params[0].parse::<u32>().unwrap_or_else(|e| {
986 crate::error!("get pid err :{:?}", e);
987 0_u32
988 })
989 }
990 }
991 }
992 #[cfg(feature = "host")]
setup_jdwp_point(_session_id: u32, _channel_id: u32) -> bool993 pub async fn setup_jdwp_point(_session_id: u32, _channel_id: u32) -> bool {
994 crate::info!("not daemon setup_jdwp_point");
995 false
996 }
997
998 #[cfg(not(feature = "host"))]
setup_jdwp_point(session_id: u32, channel_id: u32) -> bool999 pub async fn setup_jdwp_point(session_id: u32, channel_id: u32) -> bool {
1000 crate::info!("setup_jdwp_point start.");
1001 let Some(task): Option<HdcForward> = ForwardTaskMap::get(session_id, channel_id).await else {
1002 crate::error!(
1003 "setup_jdwp_point get task is none session_id={session_id},channel_id={channel_id}"
1004 );
1005 return false;
1006 };
1007 let task = &mut task.clone();
1008 let local_args = task.local_args[1].clone();
1009 let parameter = local_args.as_str();
1010 let style = &task.forward_type;
1011 let pid = get_pid(parameter, style.clone());
1012 let cid = task.context_forward.id;
1013 if pid == 0 {
1014 crate::error!("setup_jdwp_point get pid is 0");
1015 return false;
1016 }
1017
1018 let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1019 if result.is_err() {
1020 crate::error!("wrap socketpair failed");
1021 return false;
1022 }
1023 let mut target_fd = 0;
1024 let mut local_fd = 0;
1025 if let Ok((fd0, fd1)) = result {
1026 crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1027 local_fd = fd0;
1028 task.context_forward.fd = local_fd;
1029 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1030 target_fd = fd1;
1031 }
1032
1033 ylong_runtime::spawn(async move {
1034 loop {
1035 let mut buffer = [0u8; 1024];
1036 let size = UdsServer::wrap_read(local_fd, &mut buffer);
1037 if size < 0 {
1038 crate::error!("disconnect, error:{:?}", size);
1039 free_context(session_id, channel_id, 0, true).await;
1040 break;
1041 }
1042 if size == 0 {
1043 ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1044 continue;
1045 }
1046 send_to_task(
1047 session_id,
1048 channel_id,
1049 HdcCommand::ForwardData,
1050 &buffer[0..size as usize],
1051 size as usize,
1052 cid,
1053 )
1054 .await;
1055 }
1056 });
1057
1058 let jdwp = Jdwp::get_instance();
1059 let mut param = task.local_args[0].clone();
1060 param.push(':');
1061 param.push_str(parameter);
1062
1063 let ret = jdwp.send_fd_to_target(pid, target_fd, param.as_str()).await;
1064 if !ret {
1065 crate::error!("not found pid:{:?}", pid);
1066 echo_client(
1067 session_id,
1068 channel_id,
1069 format!("fport fail:pid not found:{}", pid).as_str(),
1070 MessageLevel::Fail,
1071 )
1072 .await;
1073 task_finish(session_id, channel_id).await;
1074 return false;
1075 }
1076
1077 let vec_none = Vec::<u8>::new();
1078 send_to_task(
1079 session_id,
1080 channel_id,
1081 HdcCommand::ForwardActiveMaster, // 04
1082 &vec_none,
1083 0,
1084 cid,
1085 )
1086 .await;
1087 crate::info!("setup_jdwp_point return true");
1088 true
1089 }
1090
echo_client(_session_id: u32, channel_id: u32, message: &str, _level: MessageLevel)1091 async fn echo_client(_session_id: u32, channel_id: u32, message: &str, _level: MessageLevel) {
1092 #[cfg(feature = "host")]
1093 {
1094 let level = match _level {
1095 MessageLevel::Ok => transfer::EchoLevel::OK,
1096 MessageLevel::Fail => transfer::EchoLevel::FAIL,
1097 MessageLevel::Info => transfer::EchoLevel::INFO,
1098 };
1099 let _ =
1100 transfer::send_channel_msg(channel_id, level, message.to_string())
1101 .await;
1102 return;
1103 }
1104 #[allow(unreachable_code)]
1105 {
1106 hdctransfer::echo_client(_session_id, channel_id, message.as_bytes().to_vec(), _level)
1107 .await;
1108 }
1109 }
1110
task_finish(session_id: u32, channel_id: u32)1111 async fn task_finish(session_id: u32, channel_id: u32) {
1112 transfer_task_finish(channel_id, session_id).await;
1113 }
1114
1115 #[cfg(not(target_os = "windows"))]
daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String)1116 pub async fn daemon_connect_pipe(session_id: u32, channel_id: u32, fd: i32, path: String) {
1117 let name: Vec<u8> = path.as_bytes().to_vec();
1118 let mut socket_name = vec![0_u8; name.len() + 1];
1119 socket_name[0] = b'\0';
1120 name.iter().enumerate().for_each(|(i, e)| {
1121 socket_name[i + 1] = *e;
1122 });
1123 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1124 if let Ok(addr_obj) = &addr {
1125 let ret: Result<(), Error> = UdsClient::wrap_connect(fd, addr_obj);
1126 if ret.is_err() {
1127 echo_client(
1128 session_id,
1129 channel_id,
1130 "localabstract connect fail",
1131 MessageLevel::Fail,
1132 )
1133 .await;
1134 free_context(session_id, channel_id, 0, true).await;
1135 return;
1136 }
1137 send_active_master(session_id, channel_id).await;
1138 read_data_to_forward(session_id, channel_id).await;
1139 }
1140 }
1141
1142 #[cfg(target_os = "windows")]
setup_file_point(_session_id: u32, _channel_id: u32) -> bool1143 pub async fn setup_file_point(_session_id: u32, _channel_id: u32) -> bool {
1144 false
1145 }
1146
1147 #[cfg(not(target_os = "windows"))]
setup_file_point(session_id: u32, channel_id: u32) -> bool1148 pub async fn setup_file_point(session_id: u32, channel_id: u32) -> bool {
1149 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1150 crate::error!(
1151 "setup_file_point get task is none session_id={session_id},channel_id={channel_id}"
1152 );
1153 return false;
1154 };
1155 let task = &mut task.clone();
1156 let s_node_cfg = task.local_args[1].clone();
1157 if task.is_master {
1158 if task.forward_type == ForwardType::Reserved
1159 || task.forward_type == ForwardType::FileSystem
1160 {
1161 let _ = fs::remove_file(s_node_cfg.clone());
1162 }
1163 if !server_socket_bind_listen(session_id, channel_id, s_node_cfg, task.context_forward.id)
1164 .await
1165 {
1166 crate::error!(
1167 "server socket bind listen failed channel_id={:?}",
1168 channel_id
1169 );
1170 task_finish(session_id, channel_id).await;
1171 return false;
1172 }
1173 } else if task.forward_type == ForwardType::Abstract {
1174 let fd: i32 = UdsClient::wrap_socket(AF_LOCAL);
1175 unsafe {
1176 libc::fcntl(fd, F_SETFD, FD_CLOEXEC);
1177 }
1178 task.context_forward.fd = fd;
1179 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1180 daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
1181 } else {
1182 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
1183 task.context_forward.fd = fd;
1184 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1185 daemon_connect_pipe(session_id, channel_id, fd, s_node_cfg).await;
1186 }
1187 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1188 true
1189 }
1190
setup_point(session_id: u32, channel_id: u32) -> bool1191 pub async fn setup_point(session_id: u32, channel_id: u32) -> bool {
1192 if !detech_forward_type(session_id, channel_id).await {
1193 crate::error!("forward type is not true");
1194 return false;
1195 }
1196 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1197 crate::error!(
1198 "setup_point get task is none session_id={session_id},channel_id={channel_id}"
1199 );
1200 return false;
1201 };
1202 let task = &mut task.clone();
1203 if cfg!(target_os = "windows") && task.forward_type != ForwardType::Tcp {
1204 task.context_forward.last_error = String::from("Not support forward-type");
1205 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1206 return false;
1207 }
1208 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1209 let mut ret = false;
1210 crate::info!("setup_point forward type:{:#?}", task.forward_type);
1211 match task.forward_type {
1212 ForwardType::Tcp => {
1213 ret = setup_tcp_point(session_id, channel_id).await;
1214 }
1215 ForwardType::Device => {
1216 if !cfg!(target_os = "windows") {
1217 ret = setup_device_point(session_id, channel_id).await;
1218 }
1219 }
1220 ForwardType::Jdwp | ForwardType::Ark => {
1221 crate::info!("setup_point ark case");
1222 if !cfg!(feature = "host") {
1223 ret = setup_jdwp_point(session_id, channel_id).await;
1224 }
1225 }
1226 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1227 if !cfg!(target_os = "windows") {
1228 ret = setup_file_point(session_id, channel_id).await;
1229 }
1230 }
1231 };
1232 crate::info!("setup_point, ret:{ret}");
1233 ret
1234 }
1235
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool1236 pub async fn send_to_task(
1237 session_id: u32,
1238 channel_id: u32,
1239 command: HdcCommand,
1240 buf_ptr: &[u8],
1241 buf_size: usize,
1242 cid: u32,
1243 ) -> bool {
1244 if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1245 crate::error!("send task buf_size oversize");
1246 return false;
1247 }
1248
1249 let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1250 new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1251 let file_check_message = TaskMessage {
1252 channel_id,
1253 command,
1254 payload: new_buf,
1255 };
1256 transfer::put(session_id, file_check_message).await;
1257 true
1258 }
1259
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>1260 pub async fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1261 let bytes = &_payload[4..];
1262 let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1263 if let Ok(content) = ct {
1264 let mut id_bytes = [0u8; 4];
1265 id_bytes.copy_from_slice(&_payload[0..4]);
1266 let id: u32 = u32::from_be_bytes(id_bytes);
1267 return Ok((content, id));
1268 }
1269 Err(Error::new(ErrorKind::Other, "filter command failure"))
1270 }
1271
send_active_master(session_id: u32, channel_id: u32) -> bool1272 pub async fn send_active_master(session_id: u32, channel_id: u32) -> bool {
1273 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1274 crate::error!(
1275 "send_active_master get task is none session_id={session_id},channel_id={channel_id}"
1276 );
1277 return false;
1278 };
1279 let task = &mut task.clone();
1280 if task.context_forward.check_order {
1281 let flag = [0u8; 1];
1282 send_to_task(
1283 session_id,
1284 channel_id,
1285 HdcCommand::ForwardCheckResult,
1286 &flag,
1287 1,
1288 task.context_forward.id,
1289 )
1290 .await;
1291 free_context(session_id, channel_id, 0, false).await;
1292 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1293 return true;
1294 }
1295 if !send_to_task(
1296 session_id,
1297 channel_id,
1298 HdcCommand::ForwardActiveMaster,
1299 &Vec::<u8>::new(),
1300 0,
1301 task.context_forward.id,
1302 )
1303 .await
1304 {
1305 free_context(session_id, channel_id, 0, true).await;
1306 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1307 return false;
1308 }
1309 true
1310 }
1311
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool1312 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1313 let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1314 crate::error!("cmd argv is not int utf8");
1315 return false;
1316 };
1317 crate::info!("begin forward, command: {:?}", command);
1318 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1319 crate::error!("begin forward get task is none");
1320 return false;
1321 };
1322 let task = &mut task.clone();
1323 task.task_command = command.clone();
1324 let result = Base::split_command_to_args(&command);
1325 let argv = result.0;
1326 let argc = result.1;
1327 task.context_forward.id = get_id(_payload);
1328 task.is_master = true;
1329
1330 if argc < ARG_COUNT2 {
1331 crate::error!("argc < 2 parse is failed.");
1332 return false;
1333 }
1334 if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1335 crate::error!("parse's length is flase.");
1336 return false;
1337 }
1338 if !check_node_info(&argv[0], &mut task.local_args).await {
1339 crate::error!("check argv[0] node info is flase.");
1340 return false;
1341 }
1342 if !check_node_info(&argv[1], &mut task.remote_args).await {
1343 crate::error!("check argv[1] node info is flase.");
1344 return false;
1345 }
1346 task.remote_parameters = argv[1].clone();
1347 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1348 if !setup_point(session_id, channel_id).await {
1349 crate::error!("setup point return false");
1350 return false;
1351 }
1352
1353 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1354 crate::error!("begin forward get task is none");
1355 return false;
1356 };
1357 let task = &mut task.clone();
1358 task.map_ctx_point
1359 .insert(task.context_forward.id, task.context_forward.clone());
1360
1361 let wake_up_message = TaskMessage {
1362 channel_id,
1363 command: HdcCommand::KernelWakeupSlavetask,
1364 payload: Vec::<u8>::new(),
1365 };
1366 transfer::put(session_id, wake_up_message).await;
1367
1368 let buf_string: Vec<u8> = argv[1].as_bytes().to_vec();
1369 let mut new_buf = vec![0_u8; buf_string.len() + 9];
1370 buf_string.iter().enumerate().for_each(|(i, e)| {
1371 new_buf[i + 8] = *e;
1372 });
1373 send_to_task(
1374 session_id,
1375 channel_id,
1376 HdcCommand::ForwardCheck,
1377 &new_buf,
1378 buf_string.len() + 9,
1379 task.context_forward.id,
1380 )
1381 .await;
1382 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1383 true
1384 }
1385
slave_connect( session_id: u32, channel_id: u32, _payload: &[u8], check_order: bool, ) -> bool1386 pub async fn slave_connect(
1387 session_id: u32,
1388 channel_id: u32,
1389 _payload: &[u8],
1390 check_order: bool,
1391 ) -> bool {
1392 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1393 crate::error!(
1394 "slave_connect get task is none session_id={session_id},channel_id={channel_id}"
1395 );
1396 return false;
1397 };
1398 let task = &mut task.clone();
1399 task.is_master = false;
1400 task.context_forward.check_order = check_order;
1401 if let Ok((content, id)) = filter_command(_payload).await {
1402 let content = &content[8..].trim_end_matches('\0').to_string();
1403 task.task_command = content.clone();
1404 if !check_node_info(content, &mut task.local_args).await {
1405 crate::error!("check local args is false");
1406 return false;
1407 }
1408 task.context_forward.id = id;
1409 }
1410 task.map_ctx_point
1411 .insert(task.context_forward.id, task.context_forward.clone());
1412 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1413 if !check_order {
1414 if !setup_point(session_id, channel_id).await {
1415 crate::error!("setup point return false, free context");
1416 free_context(session_id, channel_id, 0, true).await;
1417 return false;
1418 }
1419 } else {
1420 send_active_master(session_id, channel_id).await;
1421 }
1422 true
1423 }
1424
read_data_to_forward(session_id: u32, channel_id: u32) -> bool1425 pub async fn read_data_to_forward(session_id: u32, channel_id: u32) -> bool {
1426 let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
1427 crate::error!(
1428 "read_data_to_forward get task is none session_id={session_id},channel_id={channel_id}"
1429 );
1430 return false;
1431 };
1432 let task = &mut task;
1433 let cid = task.context_forward.id;
1434 match task.forward_type {
1435 ForwardType::Tcp | ForwardType::Jdwp | ForwardType::Ark => {
1436 ylong_runtime::spawn(async move {
1437 TcpReadStreamMap::read(session_id, channel_id, cid).await
1438 });
1439 }
1440 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1441 let _fd = task.context_forward.fd;
1442 #[cfg(not(target_os = "windows"))]
1443 ylong_runtime::spawn(async move {
1444 deamon_read_socket_msg(session_id, channel_id, _fd).await
1445 });
1446 }
1447 ForwardType::Device =>
1448 {
1449 #[cfg(not(target_os = "windows"))]
1450 if !setup_device_point(session_id, channel_id).await {
1451 return false;
1452 }
1453 }
1454 }
1455 true
1456 }
1457
write_forward_bufer( session_id: u32, channel_id: u32, _id: u32, content: Vec<u8>, ) -> bool1458 pub async fn write_forward_bufer(
1459 session_id: u32,
1460 channel_id: u32,
1461 _id: u32,
1462 content: Vec<u8>,
1463 ) -> bool {
1464 let Some(mut task) = ForwardTaskMap::get(session_id, channel_id).await else {
1465 crate::error!(
1466 "write_forward_bufer get task is none session_id={session_id},channel_id={channel_id}"
1467 );
1468 return false;
1469 };
1470 let task = &mut task;
1471 if task.forward_type == ForwardType::Tcp {
1472 TcpWriteStreamMap::write(channel_id, content).await;
1473 } else {
1474 #[cfg(not(target_os = "windows"))]
1475 {
1476 let fd = task.context_forward.fd;
1477 UdsClient::wrap_send(fd, &content);
1478 }
1479 }
1480 true
1481 }
1482
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1483 pub async fn forward_command_dispatch(
1484 session_id: u32,
1485 channel_id: u32,
1486 command: HdcCommand,
1487 _payload: &[u8],
1488 ) -> bool {
1489 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1490 crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1491 );
1492 return false;
1493 };
1494 let task: &mut HdcForward = &mut task.clone();
1495 let mut ret: bool = true;
1496 if let Ok((_content, id)) = filter_command(_payload).await {
1497 task.context_forward.id = id;
1498 }
1499 let send_msg = _payload[4..].to_vec();
1500 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1501 match command {
1502 HdcCommand::ForwardCheckResult => {
1503 ret = check_command(session_id, channel_id, _payload).await;
1504 }
1505 HdcCommand::ForwardData => {
1506 ret = write_forward_bufer(session_id, channel_id, task.context_forward.id, send_msg)
1507 .await;
1508 }
1509 HdcCommand::ForwardFreeContext => {
1510 free_context(session_id, channel_id, 0, false).await;
1511 }
1512 HdcCommand::ForwardActiveMaster => {
1513 ret = true;
1514 }
1515 _ => {
1516 ret = false;
1517 }
1518 }
1519 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1520 ret
1521 }
1522
get_last_error(session_id: u32, channel_id: u32) -> io::Result<String>1523 async fn get_last_error(session_id: u32, channel_id: u32) -> io::Result<String> {
1524 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1525 return Err(Error::new(ErrorKind::Other, "task not found."));
1526 };
1527 Ok(task.context_forward.last_error)
1528 }
1529
print_error_info(session_id: u32, channel_id: u32)1530 async fn print_error_info(session_id: u32, channel_id: u32) {
1531 if let Ok(error) = get_last_error(session_id, channel_id).await {
1532 echo_client(
1533 session_id,
1534 channel_id,
1535 error.as_str(),
1536 MessageLevel::Fail,
1537 )
1538 .await;
1539 }
1540 }
1541
command_dispatch( session_id: u32, channel_id: u32, _command: HdcCommand, _payload: &[u8], _payload_size: u16, ) -> bool1542 pub async fn command_dispatch(
1543 session_id: u32,
1544 channel_id: u32,
1545 _command: HdcCommand,
1546 _payload: &[u8],
1547 _payload_size: u16,
1548 ) -> bool {
1549 crate::info!("command_dispatch command recv: {:?}", _command);
1550 let ret = match _command {
1551 HdcCommand::ForwardInit => begin_forward(session_id, channel_id, _payload).await,
1552 HdcCommand::ForwardCheck => {
1553 slave_connect(session_id, channel_id, _payload, true).await
1554 }
1555 HdcCommand::ForwardActiveSlave => {
1556 slave_connect(session_id, channel_id, _payload, false).await
1557 }
1558 _ => forward_command_dispatch(session_id, channel_id, _command, _payload).await,
1559 };
1560 crate::info!("command dispatch ret: {:?}", ret);
1561 if !ret {
1562 print_error_info(session_id, channel_id).await;
1563 task_finish(session_id, channel_id).await;
1564 return false;
1565 }
1566 ret
1567 }
1568