1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! forward
16 #![allow(missing_docs)]
17 #[cfg(feature = "host")]
18 extern crate ylong_runtime_static as ylong_runtime;
19
20 #[cfg(not(feature = "host"))]
21 use libc::SOCK_STREAM;
22 #[cfg(not(target_os = "windows"))]
23 use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD};
24 use std::collections::HashMap;
25 #[cfg(not(target_os = "windows"))]
26 use std::fs::{self, File, OpenOptions};
27 #[cfg(not(target_os = "windows"))]
28 use std::io::{self, Error, ErrorKind, Read, Write};
29 use ylong_runtime::sync::{Mutex, RwLock};
30
31 use crate::common::base::Base;
32 use crate::common::hdctransfer::transfer_task_finish;
33 use crate::common::hdctransfer::{self, HdcTransferBase};
34 #[cfg(not(feature = "host"))]
35 use crate::common::jdwp::Jdwp;
36 #[cfg(not(target_os = "windows"))]
37 use crate::common::uds::{UdsAddr, UdsClient, UdsServer};
38 use crate::config::HdcCommand;
39 use crate::config::MessageLevel;
40 use crate::config::TaskMessage;
41 use crate::transfer;
42 #[allow(unused)]
43 use crate::utils::hdc_log::*;
44 use crate::{config, utils};
45 use std::sync::Arc;
46 #[cfg(not(feature = "host"))]
47 use std::time::Duration;
48 use ylong_runtime::io::AsyncReadExt;
49 use ylong_runtime::io::AsyncWriteExt;
50 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf, TcpListener, TcpStream};
51 use ylong_runtime::task::JoinHandle;
52
53 pub const ARG_COUNT2: u32 = 2;
54 pub const BUF_SIZE_SMALL: usize = 256;
55 pub const SOCKET_BUFFER_SIZE: usize = 65535;
56 pub const HARMONY_RESERVED_SOCKET_PREFIX: &str = "/dev/socket";
57 pub const FILE_SYSTEM_SOCKET_PREFIX: &str = "/tmp/";
58
59 #[cfg(feature = "host")]
60 #[derive(Clone, Debug)]
61 pub struct HdcForwardInfo {
62 pub session_id: u32,
63 pub channel_id: u32,
64 pub forward_direction: bool,
65 pub task_string: String,
66 pub connect_key: String,
67 }
68
69 #[cfg(feature = "host")]
70 impl HdcForwardInfo {
new( session_id: u32, channel_id: u32, forward_direction: bool, task_string: String, connect_key: String, ) -> Self71 fn new(
72 session_id: u32,
73 channel_id: u32,
74 forward_direction: bool,
75 task_string: String,
76 connect_key: String,
77 ) -> Self {
78 Self {
79 session_id,
80 channel_id,
81 forward_direction,
82 task_string,
83 connect_key,
84 }
85 }
86 }
87
88 #[derive(Default, Eq, PartialEq, Clone, Debug)]
89 enum ForwardType {
90 #[default]
91 Tcp = 0,
92 Device,
93 Abstract,
94 FileSystem,
95 Jdwp,
96 Ark,
97 Reserved,
98 }
99
100 #[derive(Debug, Default, PartialEq, Eq, Clone)]
101 pub struct ContextForward {
102 session_id: u32,
103 channel_id: u32,
104 check_order: bool,
105 is_master: bool,
106 id: u32,
107 fd: i32,
108 target_fd: i32,
109 forward_type: ForwardType,
110 local_args: Vec<String>,
111 remote_args: Vec<String>,
112 task_command: String,
113 last_error: String,
114 remote_parameters: String,
115 dev_path: String,
116 }
117
118 #[cfg(feature = "host")]
119 type HdcForwardInfo_ = Arc<Mutex<HdcForwardInfo>>;
120 #[cfg(feature = "host")]
121 type HdcForwardInfoMap_ = Arc<Mutex<HashMap<String, HdcForwardInfo_>>>;
122 #[cfg(feature = "host")]
123 pub struct HdcForwardInfoMap {}
124 #[cfg(feature = "host")]
125 impl HdcForwardInfoMap {
get_instance() -> HdcForwardInfoMap_126 fn get_instance() -> HdcForwardInfoMap_ {
127 static mut MAP: Option<HdcForwardInfoMap_> = None;
128 unsafe {
129 MAP.get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
130 .clone()
131 }
132 }
133
put(forward_info: HdcForwardInfo)134 async fn put(forward_info: HdcForwardInfo) {
135 let instance = Self::get_instance();
136 let mut map = instance.lock().await;
137 map.insert(
138 forward_info.task_string.clone(),
139 Arc::new(Mutex::new(forward_info)),
140 );
141 }
142
get_all_forward_infos() -> Vec<HdcForwardInfo>143 pub async fn get_all_forward_infos() -> Vec<HdcForwardInfo> {
144 let instance = Self::get_instance();
145 let map = instance.lock().await;
146 let mut result = Vec::new();
147 for (_key, value) in map.iter() {
148 let info = value.lock().await;
149 result.push((*info).clone());
150 }
151 result
152 }
153
remove_forward(task_string: String, forward_direction: bool) -> bool154 pub async fn remove_forward(task_string: String, forward_direction: bool) -> bool {
155 crate::info!(
156 "remove_forward task_string:{}, direction:{}",
157 task_string,
158 forward_direction
159 );
160 let instance = Self::get_instance();
161 let map = instance.lock().await;
162 let mut remove_key = String::new();
163 let prefix = if forward_direction {
164 "1|".to_string()
165 } else {
166 "0|".to_string()
167 };
168 let mut task_string1 = prefix;
169 task_string1.push_str(task_string.as_str());
170 for (key, value) in map.iter() {
171 let info = value.lock().await;
172 if info.task_string.contains(&task_string1)
173 && info.forward_direction == forward_direction
174 {
175 remove_key = (*key.clone()).to_string();
176 break;
177 }
178 }
179 drop(map);
180 if remove_key.is_empty() {
181 return false;
182 }
183
184 let mut map = instance.lock().await;
185 let result = map.remove(&remove_key);
186 result.is_some()
187 }
188 }
189
190 type TcpWriter = Arc<Mutex<SplitWriteHalf>>;
191 type TcpWriterMap_ = Arc<RwLock<HashMap<u32, TcpWriter>>>;
192 pub struct TcpWriteStreamMap {}
193 impl TcpWriteStreamMap {
get_instance() -> TcpWriterMap_194 fn get_instance() -> TcpWriterMap_ {
195 static mut TCP_MAP: Option<TcpWriterMap_> = None;
196 unsafe {
197 TCP_MAP
198 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
199 .clone()
200 }
201 }
202 #[allow(unused)]
put(id: u32, wr: SplitWriteHalf)203 async fn put(id: u32, wr: SplitWriteHalf) {
204 let instance = Self::get_instance();
205 let mut map = instance.write().await;
206 let arc_wr = Arc::new(Mutex::new(wr));
207 map.insert(id, arc_wr);
208 }
209 #[allow(unused)]
write(id: u32, data: Vec<u8>) -> bool210 async fn write(id: u32, data: Vec<u8>) -> bool {
211 let arc_map = Self::get_instance();
212 let map = arc_map.write().await;
213 let Some(arc_wr) = map.get(&id) else {
214 crate::error!("TcpWriteStreamMap failed to get id {:#?}", id);
215 return false;
216 };
217 let mut wr = arc_wr.lock().await;
218 let write_result = wr.write_all(data.as_slice()).await;
219 if write_result.is_err() {
220 crate::error!("TcpWriteStreamMap write_all error. id = {:#?}", id);
221 }
222 true
223 }
224
end(id: u32)225 pub async fn end(id: u32) {
226 let instance = Self::get_instance();
227 let mut map = instance.write().await;
228 if let Some(arc_wr) = map.remove(&id) {
229 let mut wr = arc_wr.lock().await;
230 let _ = wr.shutdown().await;
231 }
232 }
233 }
234
235 type TcpListener_ = Arc<Mutex<JoinHandle<()>>>;
236 type TcpListenerMap_ = Arc<RwLock<HashMap<u32, TcpListener_>>>;
237 pub struct TcpListenerMap {}
238 impl TcpListenerMap {
get_instance() -> TcpListenerMap_239 fn get_instance() -> TcpListenerMap_ {
240 static mut TCP_MAP: Option<TcpListenerMap_> = None;
241 unsafe {
242 TCP_MAP
243 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
244 .clone()
245 }
246 }
247 #[allow(unused)]
put(id: u32, listener: JoinHandle<()>)248 async fn put(id: u32, listener: JoinHandle<()>) {
249 let instance = Self::get_instance();
250 let mut map = instance.write().await;
251 let arc_listener = Arc::new(Mutex::new(listener));
252 map.insert(id, arc_listener);
253 crate::info!("forward tcp put listener id = {id}");
254 }
255
end(id: u32)256 pub async fn end(id: u32) {
257 let instance = Self::get_instance();
258 let mut map = instance.write().await;
259 if let Some(arc_listener) = map.remove(&id) {
260 let join_handle = arc_listener.lock().await;
261 join_handle.cancel();
262 }
263 }
264 }
265
266 type MapContextForward_ = Arc<Mutex<HashMap<u32, ContextForward>>>;
267 pub struct ForwardContextMap {}
268 impl ForwardContextMap {
get_instance() -> MapContextForward_269 fn get_instance() -> MapContextForward_ {
270 static mut FORWARD_MAP: Option<MapContextForward_> = None;
271 unsafe {
272 FORWARD_MAP
273 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
274 .clone()
275 }
276 }
277
add(cid: u32, value: ContextForward)278 pub async fn add(cid: u32, value: ContextForward) {
279 let map = Self::get_instance();
280 let mut map = map.lock().await;
281 map.insert(cid, value.clone());
282 }
283
update(cid: u32, value: ContextForward)284 pub async fn update(cid: u32, value: ContextForward) {
285 let map = Self::get_instance();
286 let mut map = map.lock().await;
287 map.insert(cid, value.clone());
288 }
289
remove(cid: u32)290 pub async fn remove(cid: u32) {
291 crate::info!("ContextForward remove, cid:{}", cid);
292 let map = Self::get_instance();
293 let mut map = map.lock().await;
294 let _ = map.remove(&cid);
295 }
296
get(cid: u32) -> Option<ContextForward>297 pub async fn get(cid: u32) -> Option<ContextForward> {
298 let arc = Self::get_instance();
299 let map = arc.lock().await;
300 let task = map.get(&cid);
301 match task {
302 Some(task) => Some(task.clone()),
303 None => {
304 crate::debug!("ContextForward result:is none,cid={:#?}", cid,);
305 Option::None
306 }
307 }
308 }
309 }
310
311 type MapForward_ = Arc<Mutex<HashMap<(u32, u32), HdcForward>>>;
312 pub struct ForwardTaskMap {}
313 impl ForwardTaskMap {
get_instance() -> MapForward_314 fn get_instance() -> MapForward_ {
315 static mut FORWARD_MAP: Option<MapForward_> = None;
316 unsafe {
317 FORWARD_MAP
318 .get_or_insert_with(|| Arc::new(Mutex::new(HashMap::new())))
319 .clone()
320 }
321 }
322
update(session_id: u32, channel_id: u32, value: HdcForward)323 pub async fn update(session_id: u32, channel_id: u32, value: HdcForward) {
324 let map = Self::get_instance();
325 let mut map = map.lock().await;
326 map.insert((session_id, channel_id), value.clone());
327 }
328
remove(session_id: u32, channel_id: u32)329 pub async fn remove(session_id: u32, channel_id: u32) {
330 crate::info!("remove, session:{}, channel:{}", session_id, channel_id);
331 let map = Self::get_instance();
332 let mut map = map.lock().await;
333 let _ = map.remove(&(session_id, channel_id));
334 }
335
get(session_id: u32, channel_id: u32) -> Option<HdcForward>336 pub async fn get(session_id: u32, channel_id: u32) -> Option<HdcForward> {
337 let arc = Self::get_instance();
338 let map = arc.lock().await;
339 let task = map.get(&(session_id, channel_id));
340 match task {
341 Some(task) => Some(task.clone()),
342 None => {
343 crate::debug!(
344 "Forward TaskMap result:is none,session_id={:#?}, channel_id={:#?}",
345 session_id,
346 channel_id
347 );
348 Option::None
349 }
350 }
351 }
352
get_channel_id(session_id: u32, task_string: String) -> Option<u32>353 pub async fn get_channel_id(session_id: u32, task_string: String) -> Option<u32> {
354 let arc = Self::get_instance();
355 let map = arc.lock().await;
356 for ((_session_id, _channel_id), value) in map.iter() {
357 if *_session_id == session_id && task_string.contains(&value.task_command) {
358 return Some(*_channel_id);
359 }
360 }
361 None
362 }
363
clear(session_id: u32)364 pub async fn clear(session_id: u32) {
365 let arc = Self::get_instance();
366 let mut channel_list = Vec::new();
367 {
368 let map = arc.lock().await;
369 if map.is_empty() {
370 return;
371 }
372 for (&key, _) in map.iter() {
373 if key.0 == session_id {
374 let id = key;
375 channel_list.push(id);
376 }
377 }
378 }
379 for id in channel_list {
380 free_channel_task(id.0, id.1).await;
381 }
382 }
383
dump_task() -> String384 pub async fn dump_task() -> String {
385 let arc = Self::get_instance();
386 let map = arc.lock().await;
387 let mut result = String::new();
388 for (_id, forward_task) in map.iter() {
389 let forward_type = match forward_task.remote_args.len() {
390 0 => "fport".to_string(),
391 2 => "rport".to_string(),
392 _ => "unknown".to_string(),
393 };
394 let first_args = match forward_task.remote_args.len() {
395 0 => "unknown".to_string(),
396 2 => format!(
397 "{}:{}",
398 forward_task.local_args[0], forward_task.local_args[1]
399 ),
400 _ => "unknown".to_string(),
401 };
402 let second_args = match forward_task.remote_args.len() {
403 0 => format!(
404 "{}:{}",
405 forward_task.local_args[0], forward_task.local_args[1]
406 ),
407 2 => format!(
408 "{}:{}",
409 forward_task.remote_args[0], forward_task.remote_args[1]
410 ),
411 _ => "unknown".to_string(),
412 };
413 result.push_str(&format!(
414 "session_id:{},\tchannel_id:{},\tcommand:{:#} {:#} {:#}\n",
415 forward_task.session_id,
416 forward_task.channel_id,
417 forward_type,
418 first_args,
419 second_args
420 ));
421 }
422 result
423 }
424 }
425
free_channel_task(session_id: u32, channel_id: u32)426 pub async fn free_channel_task(session_id: u32, channel_id: u32) {
427 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
428 return;
429 };
430 crate::info!("free channel context session_id:{session_id}, channel_id:{channel_id}");
431 let task = &mut task.clone();
432 let cid = task.context_forward.id;
433 match task.forward_type {
434 ForwardType::Tcp => {
435 TcpWriteStreamMap::end(cid).await;
436 TcpListenerMap::end(channel_id).await;
437 }
438 ForwardType::Jdwp | ForwardType::Ark => {
439 TcpWriteStreamMap::end(cid).await;
440 let ret = unsafe { libc::close(task.context_forward.fd) };
441 crate::debug!(
442 "close context_forward fd, ret={}, session_id={}, channel_id={}",
443 ret,
444 session_id,
445 channel_id,
446 );
447 let target_fd_ret = unsafe { libc::close(task.context_forward.target_fd) };
448 crate::debug!(
449 "close context_forward target fd, ret={}, session_id={}, channel_id={}",
450 target_fd_ret,
451 session_id,
452 channel_id,
453 );
454 TcpListenerMap::end(channel_id).await;
455 }
456 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
457 #[cfg(not(target_os = "windows"))]
458 UdsServer::wrap_close(task.context_forward.fd);
459 }
460 ForwardType::Device => {
461 return;
462 }
463 }
464 ForwardTaskMap::remove(session_id, channel_id).await;
465 }
466
stop_task(session_id: u32)467 pub async fn stop_task(session_id: u32) {
468 ForwardTaskMap::clear(session_id).await;
469 }
470
dump_task() -> String471 pub async fn dump_task() -> String {
472 ForwardTaskMap::dump_task().await
473 }
474
475 #[derive(Debug, Default, Clone, PartialEq, Eq)]
476 pub struct HdcForward {
477 session_id: u32,
478 channel_id: u32,
479 server_or_daemon: bool,
480 local_args: Vec<String>,
481 remote_args: Vec<String>,
482 task_command: String,
483 forward_type: ForwardType,
484 context_forward: ContextForward,
485 pub transfer: HdcTransferBase,
486 }
487
488 impl HdcForward {
new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self489 pub fn new(session_id: u32, channel_id: u32, server_or_daemon: bool) -> Self {
490 Self {
491 session_id,
492 channel_id,
493 server_or_daemon,
494 local_args: Default::default(),
495 remote_args: Default::default(),
496 task_command: Default::default(),
497 forward_type: Default::default(),
498 context_forward: Default::default(),
499 transfer: HdcTransferBase::new(session_id, channel_id),
500 }
501 }
502 }
503
check_node_info(value: &String, arg: &mut Vec<String>) -> bool504 pub fn check_node_info(value: &String, arg: &mut Vec<String>) -> bool {
505 crate::info!("check cmd args is : {:#?}", value);
506 if !value.contains(':') {
507 return false;
508 }
509 let array = value.split(':').collect::<Vec<&str>>();
510 if array[0] == "tcp" {
511 if array[1].len() > config::MAX_PORT_LEN {
512 crate::error!(
513 "forward port = {:#?} it'slength is wrong, can not more than five",
514 array[1]
515 );
516 return false;
517 }
518
519 match array[1].parse::<u32>() {
520 Ok(port) => {
521 if port == 0 || port > config::MAX_PORT_NUM {
522 crate::error!("port can not greater than: 65535");
523 return false;
524 }
525 }
526 Err(_) => {
527 crate::error!("port must is int type, port is: {:#?}", array[1]);
528 return false;
529 }
530 }
531 }
532 for item in array.iter() {
533 arg.push(String::from(item.to_owned()));
534 }
535 true
536 }
537
538 #[cfg(feature = "host")]
on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()>539 pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> {
540 crate::info!("on_forward_success");
541 let channel_id = task_message.channel_id;
542 let payload = task_message.payload;
543 let forward_direction = payload[0] == b'1';
544 let connect_key = "unknow key".to_string();
545 let task_string = String::from_utf8(payload);
546 match task_string {
547 Ok(task_string) => {
548 let info = HdcForwardInfo::new(
549 session_id,
550 channel_id,
551 forward_direction,
552 task_string,
553 connect_key,
554 );
555 HdcForwardInfoMap::put(info).await;
556 }
557 Err(err) => {
558 crate::error!("payload to String failed. {err}");
559 }
560 }
561 transfer::TcpMap::end(task_message.channel_id).await;
562 Ok(())
563 }
564
check_command( ctx: &mut ContextForward, _payload: &[u8], server_or_daemon: bool, ) -> bool565 pub async fn check_command(
566 ctx: &mut ContextForward,
567 _payload: &[u8],
568 server_or_daemon: bool,
569 ) -> bool {
570 let channel_id = ctx.channel_id;
571 if !_payload.is_empty() {
572 hdctransfer::echo_client(
573 ctx.session_id,
574 channel_id,
575 "Forwardport result:OK",
576 MessageLevel::Ok,
577 )
578 .await;
579 let map_info = String::from(if server_or_daemon { "1|" } else { "0|" }) + &ctx.task_command;
580
581 let mut command_string = vec![0_u8; map_info.len() + 1];
582 map_info
583 .as_bytes()
584 .to_vec()
585 .iter()
586 .enumerate()
587 .for_each(|(i, e)| {
588 command_string[i] = *e;
589 });
590 let forward_success_message = TaskMessage {
591 channel_id,
592 command: HdcCommand::ForwardSuccess,
593 payload: command_string,
594 };
595 #[cfg(feature = "host")]
596 {
597 let _ = on_forward_success(forward_success_message, ctx.session_id).await;
598 }
599 #[cfg(not(feature = "host"))]
600 {
601 transfer::put(ctx.session_id, forward_success_message).await;
602 }
603 } else {
604 hdctransfer::echo_client(
605 ctx.session_id,
606 channel_id,
607 "Forwardport result: Failed",
608 MessageLevel::Fail,
609 )
610 .await;
611 free_context(ctx.id, true).await;
612 return false;
613 }
614 true
615 }
616
detech_forward_type(ctx_point: &mut ContextForward) -> bool617 pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool {
618 let type_str = &ctx_point.local_args[0];
619 match type_str.as_str() {
620 "tcp" => {
621 ctx_point.forward_type = ForwardType::Tcp;
622 }
623 "dev" => {
624 ctx_point.forward_type = ForwardType::Device;
625 }
626 "localabstract" => {
627 ctx_point.forward_type = ForwardType::Abstract;
628 }
629 "localfilesystem" => {
630 ctx_point.local_args[1] =
631 HARMONY_RESERVED_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
632 ctx_point.forward_type = ForwardType::FileSystem;
633 }
634 "jdwp" => {
635 ctx_point.forward_type = ForwardType::Jdwp;
636 }
637 "ark" => {
638 ctx_point.forward_type = ForwardType::Ark;
639 }
640 "localreserved" => {
641 ctx_point.local_args[1] =
642 FILE_SYSTEM_SOCKET_PREFIX.to_owned() + &ctx_point.local_args[1];
643 ctx_point.forward_type = ForwardType::Reserved;
644 }
645 _ => {
646 crate::error!("this forward type may is not expected");
647 return false;
648 }
649 }
650 true
651 }
652
forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()>653 pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> {
654 let saddr = format!("127.0.0.1:{}", port);
655 let cid = ctx.id;
656 let session_tmp = ctx.session_id;
657 let channel_tmp = ctx.channel_id;
658 crate::info!(
659 "forward_ tcp_accept bind addr:{:#?}, cid = {:#?}",
660 saddr,
661 cid
662 );
663 let result = TcpListener::bind(saddr.clone()).await;
664 match result {
665 Ok(listener) => {
666 crate::info!("forward_ tcp_accept bind ok");
667 let join_handle = utils::spawn(async move {
668 loop {
669 let (stream, _addr) = match listener.accept().await {
670 Ok((stream, _addr)) => (stream, _addr),
671 Err(err) => {
672 crate::error!("listener accept failed, {err}");
673 continue;
674 }
675 };
676 let (rd, wr) = stream.into_split();
677 TcpWriteStreamMap::put(cid, wr).await;
678 on_accept(cid).await;
679 recv_tcp_msg(session_tmp, channel_tmp, rd, cid).await;
680 }
681 });
682 TcpListenerMap::put(channel_tmp, join_handle).await;
683 Ok(())
684 }
685 Err(e) => {
686 crate::error!("forward_ tcp_accept fail:{:#?}", e);
687 Err(e)
688 }
689 }
690 }
691
on_accept(cid: u32)692 pub async fn on_accept(cid: u32) {
693 let Some(context_forward) = ForwardContextMap::get(cid).await else {
694 crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
695 return;
696 };
697 let ctx = &mut context_forward.clone();
698
699 let buf_string: Vec<u8> = ctx.remote_parameters.clone().as_bytes().to_vec();
700 let mut new_buf = vec![0_u8; buf_string.len() + 9];
701
702 buf_string.iter().enumerate().for_each(|(i, e)| {
703 new_buf[i + 8] = *e;
704 });
705 send_to_task(
706 ctx.session_id,
707 ctx.channel_id,
708 HdcCommand::ForwardActiveSlave,
709 &new_buf,
710 buf_string.len() + 9,
711 ctx.id,
712 )
713 .await;
714 }
715
daemon_connect_tcp(cid: u32, port: u32)716 pub async fn daemon_connect_tcp(cid: u32, port: u32) {
717 let Some(context_forward) = ForwardContextMap::get(cid).await else {
718 crate::error!("daemon_connect _tcp2 get context is none cid={cid}");
719 return;
720 };
721 let ctx = &mut context_forward.clone();
722
723 let saddr = format!("127.0.0.1:{}", port);
724 let stream = match TcpStream::connect(saddr).await {
725 Err(err) => {
726 crate::error!("TcpStream::stream failed {:?}", err);
727 free_context(cid, false).await;
728 return;
729 }
730 Ok(addr) => addr,
731 };
732 send_active_master(ctx).await;
733 let (rd, wr) = stream.into_split();
734 TcpWriteStreamMap::put(ctx.id, wr).await;
735 ForwardContextMap::update(ctx.id, ctx.clone()).await;
736 recv_tcp_msg(ctx.session_id, ctx.channel_id, rd, ctx.id).await;
737 }
738
recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32)739 pub async fn recv_tcp_msg(session_id: u32, channel_id: u32, mut rd: SplitReadHalf, cid: u32) {
740 let mut data = vec![0_u8; SOCKET_BUFFER_SIZE];
741 loop {
742 match rd.read(&mut data).await {
743 Ok(recv_size) => {
744 if recv_size == 0 {
745 crate::info!("recv_size is 0, tcp temporarily closed");
746 free_context(cid, true).await;
747 return;
748 }
749 send_to_task(
750 session_id,
751 channel_id,
752 HdcCommand::ForwardData,
753 &data[0..recv_size],
754 recv_size,
755 cid,
756 )
757 .await;
758 }
759 Err(_e) => {
760 crate::error!(
761 "recv tcp msg read failed session_id={session_id},channel_id={channel_id}"
762 );
763 }
764 }
765 }
766 }
767
768 #[cfg(not(target_os = "windows"))]
deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32)769 pub async fn deamon_read_socket_msg(session_id: u32, channel_id: u32, fd: i32, cid: u32) {
770 loop {
771 let result = ylong_runtime::spawn_blocking(move || {
772 let mut buffer: [u8; SOCKET_BUFFER_SIZE] = [0; SOCKET_BUFFER_SIZE];
773 let recv_size = UdsClient::wrap_recv(fd, &mut buffer);
774 (recv_size, buffer)
775 })
776 .await;
777 let (recv_size, buffer) = match result {
778 Ok((recv_size, _)) if recv_size < 0 => {
779 crate::error!("local abstract close shutdown fd = {fd}");
780 free_context(cid, true).await;
781 return;
782 }
783 Ok((recv_size, buffer)) => (recv_size, buffer),
784 Err(err) => {
785 crate::error!("read socket msg failed. {err}");
786 free_context(cid, true).await;
787 return;
788 }
789 };
790 send_to_task(
791 session_id,
792 channel_id,
793 HdcCommand::ForwardData,
794 &buffer[0..recv_size as usize],
795 recv_size as usize,
796 cid,
797 )
798 .await;
799 }
800 }
801
free_context(cid: u32, notify_remote: bool)802 pub async fn free_context(cid: u32, notify_remote: bool) {
803 let Some(context_forward) = ForwardContextMap::get(cid).await else {
804 crate::error!("free forward context get cid is none. cid = {cid}");
805 return;
806 };
807 let ctx = &mut context_forward.clone();
808
809 if notify_remote {
810 let vec_none = Vec::<u8>::new();
811 send_to_task(
812 ctx.session_id,
813 ctx.channel_id,
814 HdcCommand::ForwardFreeContext,
815 &vec_none,
816 0,
817 ctx.id,
818 )
819 .await;
820 }
821 crate::error!("begin to free forward context cid. cid = {cid}");
822 match ctx.forward_type {
823 ForwardType::Tcp => {
824 TcpWriteStreamMap::end(ctx.id).await;
825 }
826 ForwardType::Jdwp | ForwardType::Ark => {
827 TcpWriteStreamMap::end(ctx.id).await;
828 let ret = unsafe { libc::close(ctx.fd) };
829 crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,);
830 let target_fd_ret = unsafe { libc::close(ctx.target_fd) };
831 crate::debug!(
832 "close context_forward target fd, ret={}, id={}",
833 target_fd_ret,
834 ctx.id,
835 );
836 }
837 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
838 crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd);
839 #[cfg(not(target_os = "windows"))]
840 UdsServer::wrap_close(ctx.fd);
841 ctx.fd = -1;
842 }
843 ForwardType::Device => {}
844 }
845 ForwardContextMap::remove(cid).await;
846 }
847
setup_tcp_point(ctx: &mut ContextForward) -> bool848 pub async fn setup_tcp_point(ctx: &mut ContextForward) -> bool {
849 let Ok(port) = ctx.local_args[1].parse::<u32>() else {
850 crate::error!("setup tcp point parse error");
851 return false;
852 };
853 let cid = ctx.id;
854 if ctx.is_master {
855 let result = forward_tcp_accept(ctx, port).await;
856 ctx.last_error = format!("TCP Port listen failed at {}", port);
857 return result.is_ok();
858 } else {
859 ForwardContextMap::update(ctx.id, ctx.clone()).await;
860 utils::spawn(async move { daemon_connect_tcp(cid, port).await });
861 }
862 true
863 }
864
865 #[cfg(not(target_os = "windows"))]
server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool866 async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bool {
867 let fd: i32 = UdsClient::wrap_socket(AF_UNIX);
868 ctx.fd = fd;
869 let name: Vec<u8> = path.as_bytes().to_vec();
870 let mut socket_name = vec![0_u8; name.len() + 1];
871 socket_name[0] = b'\0';
872 name.iter().enumerate().for_each(|(i, e)| {
873 socket_name[i + 1] = *e;
874 });
875 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
876 ForwardContextMap::update(ctx.id, ctx.clone()).await;
877 let cid = ctx.id;
878 if let Ok(addr_obj) = &addr {
879 let ret = UdsServer::wrap_bind(fd, addr_obj);
880 if ret.is_err() {
881 hdctransfer::echo_client(
882 ctx.session_id,
883 ctx.channel_id,
884 "Unix pipe bind failed",
885 MessageLevel::Fail,
886 )
887 .await;
888 crate::error!("bind fail");
889 return false;
890 }
891 let ret = UdsServer::wrap_listen(fd);
892 if ret < 0 {
893 hdctransfer::echo_client(
894 ctx.session_id,
895 ctx.channel_id,
896 "Unix pipe listen failed",
897 MessageLevel::Fail,
898 )
899 .await;
900 crate::error!("listen fail");
901 return false;
902 }
903 utils::spawn(async move {
904 loop {
905 let client_fd = UdsServer::wrap_accept(fd);
906 if client_fd == -1 {
907 break;
908 }
909 on_accept(cid).await;
910 }
911 });
912 }
913 true
914 }
915
canonicalize(path: String) -> Result<String, Error>916 pub async fn canonicalize(path: String) -> Result<String, Error> {
917 match fs::canonicalize(path) {
918 Ok(abs_path) => match abs_path.to_str() {
919 Some(path) => Ok(path.to_string()),
920 None => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
921 },
922 Err(_) => Err(Error::new(ErrorKind::Other, "forward canonicalize failed")),
923 }
924 }
925
926 #[cfg(target_os = "windows")]
setup_device_point(_ctx: &mut ContextForward) -> bool927 pub async fn setup_device_point(_ctx: &mut ContextForward) -> bool {
928 false
929 }
930
931 #[cfg(not(target_os = "windows"))]
setup_device_point(ctx: &mut ContextForward) -> bool932 pub async fn setup_device_point(ctx: &mut ContextForward) -> bool {
933 let s_node_cfg = ctx.local_args[1].clone();
934 let Ok(resolv_path) = canonicalize(s_node_cfg).await else {
935 crate::error!("Open unix-dev failed");
936 return false;
937 };
938 ctx.dev_path = resolv_path.clone();
939 crate::info!("setup_ device_point resolv_path={:?}", resolv_path);
940 let thread_path_ref = Arc::new(Mutex::new(resolv_path));
941 if !send_active_master(ctx).await {
942 crate::error!("send active_master return failed ctx={:?}", ctx);
943 return false;
944 }
945 let session = ctx.session_id;
946 let channel = ctx.channel_id;
947 let cid = ctx.id;
948
949 ForwardContextMap::update(ctx.id, ctx.clone()).await;
950 utils::spawn(async move {
951 loop {
952 let path = thread_path_ref.lock().await;
953 let Ok(mut file) = File::open(&*path) else {
954 crate::error!("open {} failed.", *path);
955 break;
956 };
957 let mut total = Vec::new();
958 let mut buf: [u8; config::FILE_PACKAGE_PAYLOAD_SIZE] =
959 [0; config::FILE_PACKAGE_PAYLOAD_SIZE];
960 let Ok(read_len) = file.read(&mut buf[4..]) else {
961 crate::error!("read {} failed.", *path);
962 break;
963 };
964 if read_len == 0 {
965 free_context(cid, true).await;
966 break;
967 }
968 total.append(&mut buf[0..read_len].to_vec());
969 send_to_task(
970 session,
971 channel,
972 HdcCommand::ForwardData,
973 &total,
974 read_len,
975 cid,
976 )
977 .await;
978 }
979 });
980 true
981 }
982
983 #[cfg(not(feature = "host"))]
get_pid(parameter: &str, forward_type: ForwardType) -> u32984 fn get_pid(parameter: &str, forward_type: ForwardType) -> u32 {
985 match forward_type == ForwardType::Jdwp {
986 true => parameter.parse::<u32>().unwrap_or_else(|e| {
987 crate::error!("Jdwp get pid err :{:?}", e);
988 0_u32
989 }),
990 false => {
991 let params: Vec<&str> = parameter.split('@').collect();
992 params[0].parse::<u32>().unwrap_or_else(|e| {
993 crate::error!("get pid err :{:?}", e);
994 0_u32
995 })
996 }
997 }
998 }
999
1000 #[cfg(feature = "host")]
setup_jdwp_point(_ctx: &mut ContextForward) -> bool1001 pub async fn setup_jdwp_point(_ctx: &mut ContextForward) -> bool {
1002 crate::info!("host not setup_jdwp _point");
1003 false
1004 }
1005
1006 #[cfg(not(feature = "host"))]
setup_jdwp_point(ctx: &mut ContextForward) -> bool1007 pub async fn setup_jdwp_point(ctx: &mut ContextForward) -> bool {
1008 let local_args = ctx.local_args[1].clone();
1009 let parameter = local_args.as_str();
1010 let style = &ctx.forward_type;
1011 let pid = get_pid(parameter, style.clone());
1012 let cid = ctx.id;
1013 let session = ctx.session_id;
1014 let channel = ctx.channel_id;
1015 if pid == 0 {
1016 crate::error!("setup jdwp point get pid is 0");
1017 return false;
1018 }
1019
1020 let result = UdsServer::wrap_socketpair(SOCK_STREAM);
1021 if result.is_err() {
1022 crate::error!("wrap socketpair failed");
1023 return false;
1024 }
1025 let mut target_fd = 0;
1026 let mut local_fd = 0;
1027 if let Ok((fd0, fd1)) = result {
1028 crate::info!("pipe, fd0:{}, fd1:{}", fd0, fd1);
1029 local_fd = fd0;
1030 target_fd = fd1;
1031 ctx.fd = local_fd;
1032 ctx.target_fd = target_fd;
1033 target_fd = fd1;
1034 }
1035
1036 utils::spawn(async move {
1037 loop {
1038 let result = ylong_runtime::spawn_blocking(move || {
1039 let mut buffer = [0u8; SOCKET_BUFFER_SIZE];
1040 let size = UdsServer::wrap_read(local_fd, &mut buffer);
1041 (size, buffer)
1042 })
1043 .await;
1044 let (size, buffer) = match result {
1045 Ok((size, _)) if size < 0 => {
1046 crate::error!("disconnect fd:({local_fd}, {target_fd}), error:{:?}", size);
1047 free_context(cid, true).await;
1048 break;
1049 }
1050 Ok((0, _)) => {
1051 ylong_runtime::time::sleep(Duration::from_millis(200)).await;
1052 continue;
1053 }
1054 Ok((size, buffer)) => (size, buffer),
1055 Err(err) => {
1056 crate::error!("spawn_blocking failed. disconnect fd:({local_fd}, {target_fd}), error:{err}");
1057 free_context(cid, true).await;
1058 break;
1059 }
1060 };
1061 send_to_task(
1062 session,
1063 channel,
1064 HdcCommand::ForwardData,
1065 &buffer[0..size as usize],
1066 size as usize,
1067 cid,
1068 )
1069 .await;
1070 }
1071 });
1072
1073 let jdwp = Jdwp::get_instance();
1074 let mut param = ctx.local_args[0].clone();
1075 param.push(':');
1076 param.push_str(parameter);
1077
1078 let ret = jdwp
1079 .send_fd_to_target(pid, target_fd, local_fd, param.as_str())
1080 .await;
1081 if !ret {
1082 crate::error!("not found pid:{:?}", pid);
1083 hdctransfer::echo_client(
1084 session,
1085 channel,
1086 format!("fport fail:pid not found:{}", pid).as_str(),
1087 MessageLevel::Fail,
1088 )
1089 .await;
1090 task_finish(session, channel).await;
1091 return false;
1092 }
1093
1094 let vec_none = Vec::<u8>::new();
1095 send_to_task(
1096 session,
1097 channel,
1098 HdcCommand::ForwardActiveMaster, // 04
1099 &vec_none,
1100 0,
1101 cid,
1102 )
1103 .await;
1104 crate::info!("setup_jdwp_ point return true");
1105 true
1106 }
1107
task_finish(session_id: u32, channel_id: u32)1108 async fn task_finish(session_id: u32, channel_id: u32) {
1109 transfer_task_finish(channel_id, session_id).await;
1110 }
1111
1112 #[cfg(not(target_os = "windows"))]
daemon_connect_pipe(ctx: &mut ContextForward)1113 pub async fn daemon_connect_pipe(ctx: &mut ContextForward) {
1114 let name: Vec<u8> = ctx.local_args[1].clone().as_bytes().to_vec();
1115 let mut socket_name = vec![0_u8; name.len() + 1];
1116 socket_name[0] = b'\0';
1117 name.iter().enumerate().for_each(|(i, e)| {
1118 socket_name[i + 1] = *e;
1119 });
1120 let addr = UdsAddr::parse_abstract(&socket_name[1..]);
1121 if let Ok(addr_obj) = &addr {
1122 let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj);
1123 if ret.is_err() {
1124 hdctransfer::echo_client(
1125 ctx.session_id,
1126 ctx.channel_id,
1127 "localabstract connect fail",
1128 MessageLevel::Fail,
1129 )
1130 .await;
1131 free_context(ctx.id, true).await;
1132 return;
1133 }
1134 send_active_master(ctx).await;
1135 read_data_to_forward(ctx).await;
1136 }
1137 }
1138
1139 #[cfg(target_os = "windows")]
setup_file_point(_ctx: &mut ContextForward) -> bool1140 pub async fn setup_file_point(_ctx: &mut ContextForward) -> bool {
1141 false
1142 }
1143
1144 #[cfg(not(target_os = "windows"))]
setup_file_point(ctx: &mut ContextForward) -> bool1145 pub async fn setup_file_point(ctx: &mut ContextForward) -> bool {
1146 let s_node_cfg = ctx.local_args[1].clone();
1147 if ctx.is_master {
1148 if ctx.forward_type == ForwardType::Reserved || ctx.forward_type == ForwardType::FileSystem
1149 {
1150 let _ = fs::remove_file(s_node_cfg.clone());
1151 }
1152 if !server_socket_bind_listen(ctx, s_node_cfg).await {
1153 crate::error!("server socket bind listen failed id={:?}", ctx.id);
1154 task_finish(ctx.session_id, ctx.channel_id).await;
1155 return false;
1156 }
1157 } else {
1158 if ctx.fd <= 0 {
1159 crate::info!("setup_file _point fd: {:?}", ctx.fd);
1160 if ctx.forward_type == ForwardType::Abstract {
1161 ctx.fd = UdsClient::wrap_socket(AF_LOCAL);
1162 unsafe {
1163 libc::fcntl(ctx.fd, F_SETFD, FD_CLOEXEC);
1164 }
1165 } else {
1166 ctx.fd = UdsClient::wrap_socket(AF_UNIX);
1167 }
1168 }
1169 ForwardContextMap::update(ctx.id, ctx.clone()).await;
1170 daemon_connect_pipe(ctx).await;
1171 }
1172 true
1173 }
1174
setup_point(ctx: &mut ContextForward) -> bool1175 pub async fn setup_point(ctx: &mut ContextForward) -> bool {
1176 if !detech_forward_type(ctx).await {
1177 crate::error!("forward type is not true");
1178 return false;
1179 }
1180
1181 if cfg!(target_os = "windows") && ctx.forward_type != ForwardType::Tcp {
1182 ctx.last_error = String::from("Not support forward-type");
1183 return false;
1184 }
1185
1186 let mut ret = false;
1187 match ctx.forward_type {
1188 ForwardType::Tcp => {
1189 ret = setup_tcp_point(ctx).await;
1190 }
1191 ForwardType::Device => {
1192 if !cfg!(target_os = "windows") {
1193 ret = setup_device_point(ctx).await;
1194 }
1195 }
1196 ForwardType::Jdwp | ForwardType::Ark => {
1197 crate::info!("setup point ark case");
1198 if !cfg!(feature = "host") {
1199 ret = setup_jdwp_point(ctx).await;
1200 }
1201 }
1202 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1203 if !cfg!(target_os = "windows") {
1204 ret = setup_file_point(ctx).await;
1205 }
1206 }
1207 };
1208 ForwardContextMap::update(ctx.id, ctx.clone()).await;
1209 ret
1210 }
1211
send_to_task( session_id: u32, channel_id: u32, command: HdcCommand, buf_ptr: &[u8], buf_size: usize, cid: u32, ) -> bool1212 pub async fn send_to_task(
1213 session_id: u32,
1214 channel_id: u32,
1215 command: HdcCommand,
1216 buf_ptr: &[u8],
1217 buf_size: usize,
1218 cid: u32,
1219 ) -> bool {
1220 if buf_size > (config::MAX_SIZE_IOBUF * 2) {
1221 crate::error!("send task buf_size oversize");
1222 return false;
1223 }
1224
1225 let mut new_buf = [u32::to_be_bytes(cid).as_slice(), buf_ptr].concat();
1226 new_buf[4..].copy_from_slice(&buf_ptr[0..buf_size]);
1227 let file_check_message = TaskMessage {
1228 channel_id,
1229 command,
1230 payload: new_buf,
1231 };
1232 transfer::put(session_id, file_check_message).await;
1233 true
1234 }
1235
get_cid(_payload: &[u8]) -> u321236 pub fn get_cid(_payload: &[u8]) -> u32 {
1237 let mut id_bytes = [0u8; 4];
1238 id_bytes.copy_from_slice(&_payload[0..4]);
1239 let id: u32 = u32::from_be_bytes(id_bytes);
1240 id
1241 }
1242
send_active_master(ctx: &mut ContextForward) -> bool1243 pub async fn send_active_master(ctx: &mut ContextForward) -> bool {
1244 if ctx.check_order {
1245 let flag = [0u8; 1];
1246 send_to_task(
1247 ctx.session_id,
1248 ctx.channel_id,
1249 HdcCommand::ForwardCheckResult,
1250 &flag,
1251 1,
1252 ctx.id,
1253 )
1254 .await;
1255 free_context(ctx.id, false).await;
1256 return true;
1257 }
1258 if !send_to_task(
1259 ctx.session_id,
1260 ctx.channel_id,
1261 HdcCommand::ForwardActiveMaster,
1262 &Vec::<u8>::new(),
1263 0,
1264 ctx.id,
1265 )
1266 .await
1267 {
1268 free_context(ctx.id, true).await;
1269 return false;
1270 }
1271 true
1272 }
1273
forward_parse_cmd(context_forward: &mut ContextForward) -> bool1274 pub fn forward_parse_cmd(context_forward: &mut ContextForward) -> bool {
1275 let command = context_forward.task_command.clone();
1276 let result = Base::split_command_to_args(&command);
1277 let argv = result.0;
1278 let argc = result.1;
1279
1280 if argc < ARG_COUNT2 {
1281 crate::error!("argc < 2 parse is failed.");
1282 context_forward.last_error = "Too few arguments.".to_string();
1283 return false;
1284 }
1285 if argv[0].len() > BUF_SIZE_SMALL || argv[1].len() > BUF_SIZE_SMALL {
1286 crate::error!("parse's length is flase.");
1287 context_forward.last_error = "Some argument too long.".to_string();
1288 return false;
1289 }
1290 if !check_node_info(&argv[0], &mut context_forward.local_args) {
1291 crate::error!("check argv[0] node info is flase.");
1292 context_forward.last_error = "Arguments parsing failed.".to_string();
1293 return false;
1294 }
1295 if !check_node_info(&argv[1], &mut context_forward.remote_args) {
1296 crate::error!("check argv[1] node info is flase.");
1297 context_forward.last_error = "Arguments parsing failed.".to_string();
1298 return false;
1299 }
1300 context_forward.remote_parameters = argv[1].clone();
1301 true
1302 }
1303
begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool1304 pub async fn begin_forward(session_id: u32, channel_id: u32, _payload: &[u8]) -> bool {
1305 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1306 crate::error!("begin forward get task is none session_id={session_id},channel_id={channel_id}"
1307 );
1308 return false;
1309 };
1310 let task: &mut HdcForward = &mut task.clone();
1311 let Ok(command) = String::from_utf8(_payload.to_vec()) else {
1312 crate::error!("cmd argv is not int utf8");
1313 return false;
1314 };
1315 let mut context_forward = malloc_context(session_id, channel_id, true).await;
1316 context_forward.task_command = command.clone();
1317
1318 if !forward_parse_cmd(&mut context_forward) {
1319 task.context_forward = context_forward.clone();
1320 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1321 return false;
1322 }
1323 if !setup_point(&mut context_forward).await {
1324 task.context_forward = context_forward.clone();
1325 ForwardTaskMap::update(session_id, channel_id, task.clone()).await;
1326 return false;
1327 }
1328
1329 let wake_up_message = TaskMessage {
1330 channel_id,
1331 command: HdcCommand::KernelWakeupSlavetask,
1332 payload: Vec::<u8>::new(),
1333 };
1334 transfer::put(context_forward.session_id, wake_up_message).await;
1335
1336 let buf_string: Vec<u8> = context_forward.remote_parameters.as_bytes().to_vec();
1337 let mut new_buf = vec![0_u8; buf_string.len() + 9];
1338 buf_string.iter().enumerate().for_each(|(i, e)| {
1339 new_buf[i + 8] = *e;
1340 });
1341 send_to_task(
1342 context_forward.session_id,
1343 context_forward.channel_id,
1344 HdcCommand::ForwardCheck,
1345 &new_buf,
1346 buf_string.len() + 9,
1347 context_forward.id,
1348 )
1349 .await;
1350 true
1351 }
1352
slave_connect( session_id: u32, channel_id: u32, payload: &[u8], check_order: bool, ) -> bool1353 pub async fn slave_connect(
1354 session_id: u32,
1355 channel_id: u32,
1356 payload: &[u8],
1357 check_order: bool,
1358 ) -> bool {
1359 let mut context_forward = malloc_context(session_id, channel_id, false).await;
1360 context_forward.check_order = check_order;
1361 if let Ok((content, id)) = filter_command(payload) {
1362 let content = &content[8..].trim_end_matches('\0').to_string();
1363 context_forward.task_command = content.clone();
1364 if !check_node_info(content, &mut context_forward.local_args) {
1365 crate::error!("check local args is false");
1366 return false;
1367 }
1368 context_forward.id = id;
1369 ForwardContextMap::update(id, context_forward.clone()).await;
1370 }
1371
1372 if !check_order {
1373 if !setup_point(&mut context_forward).await {
1374 crate::error!("setup point return false, free context");
1375 free_context(context_forward.id, true).await;
1376 ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1377 return false;
1378 }
1379 } else {
1380 send_active_master(&mut context_forward).await;
1381 }
1382 ForwardContextMap::update(context_forward.id, context_forward.clone()).await;
1383 true
1384 }
1385
read_data_to_forward(ctx: &mut ContextForward)1386 pub async fn read_data_to_forward(ctx: &mut ContextForward) {
1387 let cid = ctx.id;
1388 let session = ctx.session_id;
1389 let channel = ctx.channel_id;
1390 match ctx.forward_type {
1391 ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => {
1392 #[cfg(not(target_os = "windows"))]
1393 {
1394 let fd_temp = ctx.fd;
1395 utils::spawn(async move {
1396 deamon_read_socket_msg(session, channel, fd_temp, cid).await
1397 });
1398 }
1399 }
1400 ForwardType::Device => {
1401 #[cfg(not(target_os = "windows"))]
1402 setup_device_point(ctx).await;
1403 }
1404 _ => {}
1405 }
1406 }
1407
filter_command(_payload: &[u8]) -> io::Result<(String, u32)>1408 pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> {
1409 let bytes = &_payload[4..];
1410 let ct: Result<String, std::string::FromUtf8Error> = String::from_utf8(bytes.to_vec());
1411 if let Ok(content) = ct {
1412 let mut id_bytes = [0u8; 4];
1413 id_bytes.copy_from_slice(&_payload[0..4]);
1414 let id: u32 = u32::from_be_bytes(id_bytes);
1415 return Ok((content, id));
1416 }
1417 Err(Error::new(ErrorKind::Other, "filter command failure"))
1418 }
1419
dev_write_bufer(path: String, content: Vec<u8>)1420 pub async fn dev_write_bufer(path: String, content: Vec<u8>) {
1421 utils::spawn(async move {
1422 let open_result = OpenOptions::new()
1423 .write(true)
1424 .create(true)
1425 .open(path.clone());
1426
1427 match open_result {
1428 Ok(mut file) => {
1429 let write_result = file.write_all(content.as_slice());
1430 match write_result {
1431 Ok(()) => {}
1432 Err(e) => {
1433 crate::error!("dev write bufer to file fail:{:#?}", e);
1434 }
1435 }
1436 }
1437 Err(e) => {
1438 crate::error!("dev write bufer fail:{:#?}", e);
1439 }
1440 }
1441 });
1442 }
1443
write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool1444 pub async fn write_forward_bufer(ctx: &mut ContextForward, content: Vec<u8>) -> bool {
1445 if ctx.forward_type == ForwardType::Tcp {
1446 return TcpWriteStreamMap::write(ctx.id, content).await;
1447 } else if ctx.forward_type == ForwardType::Device {
1448 let path_ref = ctx.dev_path.clone();
1449 if path_ref.is_empty() {
1450 crate::error!(
1451 "write_forward_bufer get dev_path is failed ctx.id = {:#?}",
1452 ctx.id
1453 );
1454 return false;
1455 }
1456 dev_write_bufer(path_ref, content).await;
1457 } else {
1458 #[cfg(not(target_os = "windows"))]
1459 {
1460 let fd = ctx.fd;
1461 if UdsClient::wrap_send(fd, &content) < 0 {
1462 crate::info!("write forward bufer failed. fd = {fd}");
1463 return false;
1464 }
1465 }
1466 }
1467 true
1468 }
1469
1470 #[allow(unused)]
malloc_context( session_id: u32, channel_id: u32, master_slave: bool, ) -> ContextForward1471 pub async fn malloc_context(
1472 session_id: u32,
1473 channel_id: u32,
1474 master_slave: bool,
1475 ) -> ContextForward {
1476 let mut ctx = ContextForward {
1477 ..Default::default()
1478 };
1479 ctx.id = utils::get_current_time() as u32;
1480 ctx.session_id = session_id;
1481 ctx.channel_id = channel_id;
1482 ctx.is_master = master_slave;
1483 ForwardContextMap::add(ctx.id, ctx.clone());
1484 ctx
1485 }
1486
forward_command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, _payload: &[u8], ) -> bool1487 pub async fn forward_command_dispatch(
1488 session_id: u32,
1489 channel_id: u32,
1490 command: HdcCommand,
1491 _payload: &[u8],
1492 ) -> bool {
1493 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1494 crate::error!("forward_command_dispatch get task is none session_id={session_id},channel_id={channel_id}"
1495 );
1496 return false;
1497 };
1498 let task: &mut HdcForward = &mut task.clone();
1499 let mut ret: bool = true;
1500 let cid = get_cid(_payload);
1501 let send_msg = _payload[4..].to_vec();
1502
1503 let Some(context_forward) = ForwardContextMap::get(cid).await else {
1504 crate::error!("forward command dispatch get context is none cid={cid}");
1505 return false;
1506 };
1507 let ctx = &mut context_forward.clone();
1508 match command {
1509 HdcCommand::ForwardCheckResult => {
1510 ret = check_command(ctx, _payload, task.server_or_daemon).await;
1511 }
1512 HdcCommand::ForwardData => {
1513 ret = write_forward_bufer(ctx, send_msg).await;
1514 }
1515 HdcCommand::ForwardFreeContext => {
1516 free_context(ctx.id, false).await;
1517 }
1518 HdcCommand::ForwardActiveMaster => {
1519 read_data_to_forward(ctx).await;
1520 ret = true;
1521 }
1522 _ => {
1523 ret = false;
1524 }
1525 }
1526 ret
1527 }
1528
print_error_info(session_id: u32, channel_id: u32)1529 async fn print_error_info(session_id: u32, channel_id: u32) {
1530 let Some(task) = ForwardTaskMap::get(session_id, channel_id).await else {
1531 crate::error!(
1532 "detech_forward_type get task is none session_id = {:#?}, channel_id = {:#?}",
1533 session_id,
1534 channel_id
1535 );
1536 return;
1537 };
1538 let task = &mut task.clone();
1539 let ctx = &task.context_forward;
1540 let mut echo_content = ctx.last_error.clone();
1541
1542 if echo_content.is_empty() {
1543 echo_content = "Forward parament failed".to_string();
1544 }
1545
1546 hdctransfer::echo_client(
1547 session_id,
1548 channel_id,
1549 echo_content.as_str(),
1550 MessageLevel::Fail,
1551 )
1552 .await;
1553 }
1554
command_dispatch( session_id: u32, channel_id: u32, command: HdcCommand, payload: &[u8], _payload_size: u16, ) -> bool1555 pub async fn command_dispatch(
1556 session_id: u32,
1557 channel_id: u32,
1558 command: HdcCommand,
1559 payload: &[u8],
1560 _payload_size: u16,
1561 ) -> bool {
1562 if command != HdcCommand::ForwardData {
1563 crate::info!("command_dispatch command recv: {:?}", command);
1564 }
1565
1566 let ret = match command {
1567 HdcCommand::ForwardInit => begin_forward(session_id, channel_id, payload).await,
1568 HdcCommand::ForwardCheck => slave_connect(session_id, channel_id, payload, true).await,
1569 HdcCommand::ForwardActiveSlave => {
1570 slave_connect(session_id, channel_id, payload, false).await
1571 }
1572 _ => forward_command_dispatch(session_id, channel_id, command, payload).await,
1573 };
1574 if !ret {
1575 print_error_info(session_id, channel_id).await;
1576 task_finish(session_id, channel_id).await;
1577 return false;
1578 }
1579 ret
1580 }
1581