• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! tcp
16 #![allow(missing_docs)]
17 
18 use crate::config::*;
19 use crate::serializer;
20 #[allow(unused)]
21 use crate::utils::hdc_log::*;
22 
23 use std::io::{self, Error, ErrorKind};
24 
25 use ylong_runtime::io::AsyncReadExt;
26 use ylong_runtime::net::SplitReadHalf;
27 
read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>>28 async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> {
29     if expected_size == 0 {
30         return Ok(vec![]);
31     }
32     let mut data = vec![0_u8; expected_size];
33     let mut index: usize = 0;
34     while index < expected_size {
35         crate::trace!("before read {index} / {expected_size}");
36         match rd.read(&mut data[index..]).await {
37             Ok(recv_size) => {
38                 crate::trace!("after read {recv_size}");
39                 if recv_size == 0 {
40                     crate::debug!("peer shutdown");
41                     return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown"));
42                 }
43                 index += recv_size;
44             }
45             Err(e) => {
46                 crate::warn!("read tcp failed: {}", e.to_string());
47                 return Err(Error::new(ErrorKind::Other, "read tcp failed"));
48             }
49         }
50     }
51     Ok(data)
52 }
53 
unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage>54 pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> {
55     let data = read_frame(rd, serializer::HEAD_SIZE).await?;
56     let payload_head = serializer::unpack_payload_head(data)?;
57     crate::trace!("get payload_head: {:#?}", payload_head);
58 
59     let expected_head_size = u16::from_be(payload_head.head_size) as usize;
60     let expected_data_size = u32::from_be(payload_head.data_size) as usize;
61     if expected_head_size + expected_data_size == 0
62         || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE
63     {
64         return Err(Error::new(ErrorKind::Other, "Packet size incorrect"));
65     }
66 
67     let data = read_frame(rd, expected_head_size).await?;
68     let payload_protect = serializer::unpack_payload_protect(data)?;
69     crate::trace!("get payload_protect: {:#?}", payload_protect);
70     let channel_id = payload_protect.channel_id;
71 
72     let command = match HdcCommand::try_from(payload_protect.command_flag) {
73         Ok(command) => command,
74         Err(_) => {
75             return Err(Error::new(ErrorKind::Other, "unknown command"));
76         }
77     };
78 
79     let payload = read_frame(rd, expected_data_size).await?;
80     Ok(TaskMessage {
81         channel_id,
82         command,
83         payload,
84     })
85 }
86 
recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>>87 pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> {
88     let data = read_frame(rd, 4).await?;
89     let expected_size = u32::from_be_bytes(data.try_into().unwrap());
90     read_frame(rd, expected_size as usize).await
91 }
92