1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! tcp
16 #![allow(missing_docs)]
17
18 use crate::config::*;
19 use crate::serializer;
20 #[allow(unused)]
21 use crate::utils::hdc_log::*;
22
23 use std::io::{self, Error, ErrorKind};
24
25 use ylong_runtime::io::AsyncReadExt;
26 use ylong_runtime::net::SplitReadHalf;
27
read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>>28 async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> {
29 if expected_size == 0 {
30 return Ok(vec![]);
31 }
32 let mut data = vec![0_u8; expected_size];
33 let mut index: usize = 0;
34 while index < expected_size {
35 crate::trace!("before read {index} / {expected_size}");
36 match rd.read(&mut data[index..]).await {
37 Ok(recv_size) => {
38 crate::trace!("after read {recv_size}");
39 if recv_size == 0 {
40 crate::debug!("peer shutdown");
41 return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown"));
42 }
43 index += recv_size;
44 }
45 Err(e) => {
46 crate::warn!("read tcp failed: {}", e.to_string());
47 return Err(Error::new(ErrorKind::Other, "read tcp failed"));
48 }
49 }
50 }
51 Ok(data)
52 }
53
unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage>54 pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> {
55 let data = read_frame(rd, serializer::HEAD_SIZE).await?;
56 let payload_head = serializer::unpack_payload_head(data)?;
57 crate::trace!("get payload_head: {:#?}", payload_head);
58
59 let expected_head_size = u16::from_be(payload_head.head_size) as usize;
60 let expected_data_size = u32::from_be(payload_head.data_size) as usize;
61 if expected_head_size + expected_data_size == 0
62 || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE
63 {
64 return Err(Error::new(ErrorKind::Other, "Packet size incorrect"));
65 }
66
67 let data = read_frame(rd, expected_head_size).await?;
68 let payload_protect = serializer::unpack_payload_protect(data)?;
69 crate::trace!("get payload_protect: {:#?}", payload_protect);
70 let channel_id = payload_protect.channel_id;
71
72 let command = match HdcCommand::try_from(payload_protect.command_flag) {
73 Ok(command) => command,
74 Err(_) => {
75 return Err(Error::new(ErrorKind::Other, "unknown command"));
76 }
77 };
78
79 let payload = read_frame(rd, expected_data_size).await?;
80 Ok(TaskMessage {
81 channel_id,
82 command,
83 payload,
84 })
85 }
86
recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>>87 pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> {
88 let data = read_frame(rd, 4).await?;
89 let expected_size = u32::from_be_bytes(data.try_into().unwrap());
90 read_frame(rd, expected_size as usize).await
91 }
92