• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! tcp
16 #![allow(missing_docs)]
17 
18 use crate::config::*;
19 use crate::serializer;
20 #[allow(unused)]
21 use crate::utils::hdc_log::*;
22 
23 use std::io::{self, Error, ErrorKind};
24 
25 #[cfg(feature = "host")]
26 extern crate ylong_runtime_static as ylong_runtime;
27 use ylong_runtime::io::AsyncReadExt;
28 use ylong_runtime::net::SplitReadHalf;
29 
read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>>30 async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> {
31     if expected_size == 0 {
32         return Ok(vec![]);
33     }
34     let mut data = vec![0_u8; expected_size];
35     let mut index: usize = 0;
36     while index < expected_size {
37         crate::trace!("before read {index} / {expected_size}");
38         match rd.read(&mut data[index..]).await {
39             Ok(recv_size) => {
40                 crate::trace!("after read {recv_size}");
41                 if recv_size == 0 {
42                     crate::debug!("peer shutdown");
43                     return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown"));
44                 }
45                 index += recv_size;
46             }
47             Err(e) => {
48                 crate::warn!("read tcp failed: {}", e.to_string());
49                 return Err(Error::new(ErrorKind::Other, "read tcp failed"));
50             }
51         }
52     }
53     Ok(data)
54 }
55 
unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage>56 pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> {
57     let data = read_frame(rd, serializer::HEAD_SIZE).await?;
58     let payload_head = serializer::unpack_payload_head(data)?;
59     crate::trace!("get payload_head: {:?}", payload_head);
60 
61     let expected_head_size = u16::from_be(payload_head.head_size) as usize;
62     let expected_data_size = u32::from_be(payload_head.data_size) as usize;
63     if expected_head_size + expected_data_size == 0
64         || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE
65     {
66         return Err(Error::new(ErrorKind::Other, "Packet size incorrect"));
67     }
68 
69     let data = read_frame(rd, expected_head_size).await?;
70     let payload_protect = serializer::unpack_payload_protect(data)?;
71     crate::trace!("get payload_protect: {:?}", payload_protect);
72     let channel_id = payload_protect.channel_id;
73 
74     let command = match HdcCommand::try_from(payload_protect.command_flag) {
75         Ok(command) => command,
76         Err(_) => {
77             return Err(Error::new(ErrorKind::Other, "unknown command"));
78         }
79     };
80 
81     let payload = read_frame(rd, expected_data_size).await?;
82     Ok(TaskMessage {
83         channel_id,
84         command,
85         payload,
86     })
87 }
88 
recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>>89 pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> {
90     let data = read_frame(rd, 4).await?;
91     let Ok(data) = data.try_into() else {
92         return Err(Error::new(
93             ErrorKind::Other,
94             "Data forced conversion failed",
95         ));
96     };
97     let expected_size = u32::from_be_bytes(data);
98     read_frame(rd, expected_size as usize).await
99 }
100