1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! tcp
16 #![allow(missing_docs)]
17
18 use crate::config::*;
19 use crate::serializer;
20 #[allow(unused)]
21 use crate::utils::hdc_log::*;
22
23 use std::io::{self, Error, ErrorKind};
24
25 #[cfg(feature = "host")]
26 extern crate ylong_runtime_static as ylong_runtime;
27 use ylong_runtime::io::AsyncReadExt;
28 use ylong_runtime::net::SplitReadHalf;
29
read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>>30 async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result<Vec<u8>> {
31 if expected_size == 0 {
32 return Ok(vec![]);
33 }
34 let mut data = vec![0_u8; expected_size];
35 let mut index: usize = 0;
36 while index < expected_size {
37 crate::trace!("before read {index} / {expected_size}");
38 match rd.read(&mut data[index..]).await {
39 Ok(recv_size) => {
40 crate::trace!("after read {recv_size}");
41 if recv_size == 0 {
42 crate::debug!("peer shutdown");
43 return Err(Error::new(ErrorKind::ConnectionAborted, "peer shutdown"));
44 }
45 index += recv_size;
46 }
47 Err(e) => {
48 crate::warn!("read tcp failed: {}", e.to_string());
49 return Err(Error::new(ErrorKind::Other, "read tcp failed"));
50 }
51 }
52 }
53 Ok(data)
54 }
55
unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage>56 pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result<TaskMessage> {
57 let data = read_frame(rd, serializer::HEAD_SIZE).await?;
58 let payload_head = serializer::unpack_payload_head(data)?;
59 crate::trace!("get payload_head: {:?}", payload_head);
60
61 let expected_head_size = u16::from_be(payload_head.head_size) as usize;
62 let expected_data_size = u32::from_be(payload_head.data_size) as usize;
63 if expected_head_size + expected_data_size == 0
64 || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE
65 {
66 return Err(Error::new(ErrorKind::Other, "Packet size incorrect"));
67 }
68
69 let data = read_frame(rd, expected_head_size).await?;
70 let payload_protect = serializer::unpack_payload_protect(data)?;
71 crate::trace!("get payload_protect: {:?}", payload_protect);
72 let channel_id = payload_protect.channel_id;
73
74 let command = match HdcCommand::try_from(payload_protect.command_flag) {
75 Ok(command) => command,
76 Err(_) => {
77 return Err(Error::new(ErrorKind::Other, "unknown command"));
78 }
79 };
80
81 let payload = read_frame(rd, expected_data_size).await?;
82 Ok(TaskMessage {
83 channel_id,
84 command,
85 payload,
86 })
87 }
88
recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>>89 pub async fn recv_channel_message(rd: &mut SplitReadHalf) -> io::Result<Vec<u8>> {
90 let data = read_frame(rd, 4).await?;
91 let Ok(data) = data.try_into() else {
92 return Err(Error::new(
93 ErrorKind::Other,
94 "Data forced conversion failed",
95 ));
96 };
97 let expected_size = u32::from_be_bytes(data);
98 read_frame(rd, expected_size as usize).await
99 }
100