1 /* 2 * Copyright (C) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 /// provide C interface to C++ for calling 17 pub mod ffi; 18 pub(super) mod net_packet; 19 mod binding; 20 use hilog_rust::{error, hilog, debug, HiLogLabel, LogType}; 21 use std::ffi::{CString, c_char}; 22 use std::mem::size_of; 23 use binding::CSensorServiceClient; 24 use net_packet::{NetPacket, CNetPacket, PackHead}; 25 type ErrorStatus = crate::stream_buffer::ErrStatus; 26 /// function pointer alias 27 pub type ClientPacketCallBackFun = unsafe extern "C" fn ( 28 client: *const CSensorServiceClient, 29 pkt: *const CNetPacket, 30 ); 31 32 const ONCE_PROCESS_NETPACKET_LIMIT: i32 = 100; 33 const MAX_STREAM_BUF_SIZE: usize = 256; 34 /// max buffer size of packet 35 pub const MAX_PACKET_BUF_SIZE: usize = 256; 36 const PARAM_INPUT_INVALID: i32 = 5; 37 const MEM_OUT_OF_BOUNDS: i32 = 3; 38 const MEMCPY_SEC_FUN_FAIL: i32 = 4; 39 const STREAM_BUF_READ_FAIL: i32 = 1; 40 const MAX_VECTOR_SIZE: i32 = 10; 41 const LOG_LABEL: HiLogLabel = HiLogLabel { 42 log_type: LogType::LogCore, 43 domain: 0xD002700, 44 tag: "StreamBuffer" 45 }; 46 47 /// enum errstatus 48 #[derive(Copy, Clone, PartialEq)] 49 #[repr(C)] 50 pub enum ErrStatus { 51 /// status ok 52 Ok = 0, 53 /// readerror 54 Read = 1, 55 /// writeerror 56 Write = 2, 57 } 58 59 #[derive(Copy, Clone)] 60 #[repr(C)] 61 pub struct StreamBuffer { 62 rw_error_status: ErrorStatus, 63 r_count: usize, 64 w_count: usize, 65 r_pos: usize, 66 w_pos: usize, 67 sz_buff: [c_char; MAX_STREAM_BUF_SIZE + 1], 68 } 69 70 impl Default for StreamBuffer { default() -> Self71 fn default() -> Self { 72 Self { 73 rw_error_status: ErrorStatus::Ok, 74 r_count: 0, 75 w_count: 0, 76 r_pos: 0, 77 w_pos: 0, 78 sz_buff: [0; MAX_STREAM_BUF_SIZE + 1], 79 } 80 } 81 } 82 83 84 impl StreamBuffer { as_ref<'a>(object: *const Self) -> Option<&'a Self>85 fn as_ref<'a>(object: *const Self) -> Option<&'a Self> { 86 // SAFETY: as_ref has already done no-null verification inside 87 unsafe { 88 object.as_ref() 89 } 90 } as_mut<'a>(object: *mut Self) -> Option<&'a mut Self>91 fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self> { 92 // SAFETY: as_mut has already done no-null verification inside 93 unsafe { 94 object.as_mut() 95 } 96 } write<T>(&mut self, data: T)97 fn write<T>(&mut self, data: T) { 98 let data: *const c_char = &data as *const T as *const c_char; 99 let size = size_of::<T>(); 100 self.write_char_usize(data, size); 101 } reset(&mut self)102 fn reset(&mut self) { 103 self.r_pos = 0; 104 self.w_pos = 0; 105 self.r_count = 0; 106 self.w_count = 0; 107 self.rw_error_status = ErrorStatus::Ok; 108 } clean(&mut self)109 fn clean(&mut self) { 110 self.reset(); 111 let size = MAX_STREAM_BUF_SIZE + 1; 112 let reference = &(self.sz_buff); 113 let pointer = reference as *const c_char; 114 // SAFETY: memset_s is the security function of the C library 115 let ret = unsafe { 116 binding::memset_s(pointer as *mut libc::c_void, size, 0, size) 117 }; 118 if ret != 0 { 119 error!(LOG_LABEL, "Call memset_s fail"); 120 } 121 } seek_read_pos(&mut self, n: usize) -> bool122 fn seek_read_pos(&mut self, n: usize) -> bool { 123 let pos: usize = self.r_pos + n; 124 if pos > self.w_pos { 125 error!(LOG_LABEL, "The position in the calculation is not as expected. pos:{} [0, {}]", 126 pos, self.w_pos); 127 false 128 } else { 129 self.r_pos = pos; 130 true 131 } 132 } unread_size(&self) -> usize133 fn unread_size(&self) -> usize { 134 if self.w_pos <= self.r_pos { 135 0 136 } else { 137 self.w_pos - self.r_pos 138 } 139 } is_empty(&self) -> bool140 fn is_empty(&self) -> bool { 141 self.r_pos == self.w_pos 142 } write_streambuffer(&mut self, buf: &Self) -> bool143 fn write_streambuffer(&mut self, buf: &Self) -> bool { 144 self.write_char_usize(buf.data(), buf.size()) 145 } read_streambuffer(&self, buf: &mut Self) -> bool146 fn read_streambuffer(&self, buf: &mut Self) -> bool { 147 buf.write_char_usize(self.data(), self.size()) 148 } data(&self) -> *const c_char149 pub(crate) fn data(&self) -> *const c_char { 150 &(self.sz_buff[0]) as *const c_char 151 } size(&self) -> usize152 pub(crate) fn size(&self) -> usize { 153 self.w_pos 154 } chk_rwerror(&self) -> bool155 pub(crate) fn chk_rwerror(&self) -> bool { 156 self.rw_error_status != ErrorStatus::Ok 157 } get_available_buf_size(&self) -> usize158 fn get_available_buf_size(&self) -> usize { 159 if self.w_pos >= MAX_STREAM_BUF_SIZE { 160 0 161 } else { 162 MAX_STREAM_BUF_SIZE - self.w_pos 163 } 164 } get_error_status_remark(&self) -> *const c_char165 fn get_error_status_remark(&self) -> *const c_char { 166 let s: &[c_char] = match self.rw_error_status { 167 ErrorStatus::Ok => b"OK\0", 168 ErrorStatus::Read => b"READ_ERROR\0", 169 ErrorStatus::Write => b"WRITE_ERROR\0", 170 }; 171 s.as_ptr() 172 } read_buf(&self) -> *const c_char173 fn read_buf(&self) -> *const c_char { 174 &(self.sz_buff[self.r_pos]) as *const c_char 175 } write_char_usize(&mut self, buf: *const c_char, size: usize) -> bool176 fn write_char_usize(&mut self, buf: *const c_char, size: usize) -> bool { 177 if self.chk_rwerror() { 178 return false; 179 } 180 if buf.is_null() { 181 error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID); 182 self.rw_error_status = ErrorStatus::Write; 183 return false; 184 } 185 if size == 0 { 186 error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID); 187 self.rw_error_status = ErrorStatus::Write; 188 return false; 189 } 190 if (self.w_pos + size) > MAX_STREAM_BUF_SIZE { 191 error!(LOG_LABEL, "The write length exceeds buffer. wIdx:{} size:{} maxBufSize:{} errCode:{}", 192 self.w_pos, size, MAX_STREAM_BUF_SIZE, MEM_OUT_OF_BOUNDS); 193 self.rw_error_status = ErrorStatus::Write; 194 return false; 195 } 196 let pointer = &(self.sz_buff[0]) as *const c_char; 197 // SAFETY: memcpy_s is the security function of the C library 198 let ret = unsafe { 199 binding::memcpy_s(pointer.add(self.w_pos) as *mut libc::c_void, self.get_available_buf_size(), 200 buf as *mut libc::c_void, size) 201 }; 202 if ret != 0 { 203 error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret); 204 self.rw_error_status = ErrorStatus::Write; 205 return false; 206 } 207 self.w_pos += size; 208 self.w_count += 1; 209 true 210 } check_write(&mut self, size: usize) -> bool211 fn check_write(&mut self, size: usize) -> bool { 212 let buffer_size = size; 213 let mut avail_size = self.get_available_buf_size(); 214 if buffer_size > avail_size && self.r_pos > 0 { 215 self.copy_data_to_begin(); 216 avail_size = self.get_available_buf_size(); 217 } 218 avail_size >= buffer_size 219 } copy_data_to_begin(&mut self)220 fn copy_data_to_begin(&mut self) { 221 let unread_size = self.unread_size(); 222 if unread_size > 0 && self.r_pos > 0 { 223 for (index, value) in (self.r_pos..=self.w_pos).enumerate() { 224 self.sz_buff[index] = self.sz_buff[value]; 225 } 226 } 227 debug!(LOG_LABEL, "unread_size:{} rPos:{} wPos:{}", unread_size, self.r_pos, self.w_pos); 228 self.r_pos = 0; 229 self.w_pos = unread_size; 230 } read_char_usize(&mut self, buf: *const c_char, size: usize) -> bool231 fn read_char_usize(&mut self, buf: *const c_char, size: usize) -> bool { 232 if self.chk_rwerror() { 233 return false; 234 } 235 if buf.is_null() { 236 error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID); 237 self.rw_error_status = ErrorStatus::Read; 238 return false; 239 } 240 if size == 0 { 241 error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID); 242 self.rw_error_status = ErrorStatus::Read; 243 return false; 244 } 245 if (self.r_pos + size) > self.w_pos { 246 error!(LOG_LABEL, "Memory out of bounds on read... errCode:{}", MEM_OUT_OF_BOUNDS); 247 self.rw_error_status = ErrorStatus::Read; 248 return false; 249 } 250 // SAFETY: memcpy_s is the security function of the C library 251 let ret = unsafe { 252 binding::memcpy_s(buf as *mut libc::c_void, size, self.read_buf() as *const libc::c_void, size) 253 }; 254 if ret != 0 { 255 error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret); 256 self.rw_error_status = ErrorStatus::Read; 257 return false; 258 } 259 self.r_pos += size; 260 self.r_count += 1; 261 true 262 } circle_write(&mut self, buf: *const c_char, size: usize) -> bool263 fn circle_write(&mut self, buf: *const c_char, size: usize) -> bool { 264 if !self.check_write(size) { 265 error!(LOG_LABEL, "Out of buffer memory, availableSize:{}, size:{}, unreadSize:{}, rPos:{}, wPos:{}", 266 self.get_available_buf_size(), size, self.unread_size(), 267 self.r_pos, self.w_pos); 268 return false; 269 } 270 self.write_char_usize(buf, size) 271 } 272 read_client_packets(&mut self, client: *const CSensorServiceClient, callback_fun: ClientPacketCallBackFun)273 pub unsafe fn read_client_packets(&mut self, client: *const CSensorServiceClient, callback_fun: ClientPacketCallBackFun) { 274 const HEAD_SIZE: usize = size_of::<PackHead>(); 275 for _i in 0..ONCE_PROCESS_NETPACKET_LIMIT { 276 let unread_size = self.unread_size(); 277 if unread_size < HEAD_SIZE { 278 break; 279 } 280 let data_size = unread_size - HEAD_SIZE; 281 let buf: *const c_char = self.read_buf(); 282 if buf.is_null() { 283 error!(LOG_LABEL, "buf is null, skip then break"); 284 break; 285 } 286 let head: *const PackHead = buf as *const PackHead; 287 if head.is_null() { 288 error!(LOG_LABEL, "head is null, skip then break"); 289 break; 290 } 291 // SAFETY: head pointer should be not null certainly 292 let size = unsafe { 293 (*head).size 294 }; 295 // SAFETY: head pointer should be not null certainly 296 let id_msg = unsafe { 297 (*head).id_msg 298 }; 299 if !(0..=MAX_PACKET_BUF_SIZE).contains(&size) { 300 error!(LOG_LABEL, "Packet header parsing error, and this error cannot be recovered. \ 301 The buffer will be reset. size:{}, unreadSize:{}", size, unread_size); 302 self.reset(); 303 break; 304 } 305 if size > data_size { 306 break; 307 } 308 let mut pkt: NetPacket = NetPacket { 309 msg_id: id_msg, 310 ..Default::default() 311 }; 312 unsafe { 313 if size > 0 && 314 !pkt.stream_buffer.write_char_usize(buf.add(HEAD_SIZE) as *const c_char, size) { 315 error!(LOG_LABEL, "Error writing data in the NetPacket. It will be retried next time. \ 316 messageid:{}, size:{}", id_msg as i32, size); 317 break; 318 } 319 } 320 if !self.seek_read_pos(pkt.get_packet_length()) { 321 error!(LOG_LABEL, "Set read position error, and this error cannot be recovered, and the buffer \ 322 will be reset. packetSize:{} unreadSize:{}", pkt.get_packet_length(), unread_size); 323 self.reset(); 324 break; 325 } 326 let c_net_packet: CNetPacket = CNetPacket { 327 msg_id: pkt.msg_id, 328 stream_buffer_ptr: Box::into_raw(Box::new(pkt.stream_buffer)) 329 }; 330 unsafe { 331 callback_fun(client, &c_net_packet as *const CNetPacket); 332 } 333 if self.is_empty() { 334 self.reset(); 335 break; 336 } 337 } 338 } r_count(&self) -> usize339 fn r_count(&self) -> usize { 340 self.r_count 341 } w_count(&self) -> usize342 fn w_count(&self) -> usize { 343 self.w_count 344 } w_pos(&self) -> usize345 fn w_pos(&self) -> usize { 346 self.w_pos 347 } r_pos(&self) -> usize348 fn r_pos(&self) -> usize { 349 self.r_pos 350 } sz_buff(&self) -> *const c_char351 fn sz_buff(&self) -> *const c_char { 352 &self.sz_buff[0] as *const c_char 353 } set_rw_error_status(&mut self, rw_error_status: ErrorStatus)354 fn set_rw_error_status(&mut self, rw_error_status: ErrorStatus) { 355 self.rw_error_status = rw_error_status 356 } set_r_pos(&mut self, r_pos: usize)357 fn set_r_pos(&mut self, r_pos: usize) { 358 self.r_pos = r_pos 359 } 360 } 361 362