1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 use crate::utils;
24 #[allow(unused)]
25 use crate::utils::hdc_log::*;
26 use std::collections::HashMap;
27 use std::sync::Arc;
28 #[cfg(feature = "host")]
29 extern crate ylong_runtime_static as ylong_runtime;
30 use ylong_runtime::sync::waiter::Waiter;
31 use ylong_runtime::sync::Mutex;
32 use ylong_runtime::task::JoinHandle;
33
34 #[derive(PartialEq, Debug, Clone, Copy)]
35 #[repr(u8)]
36 pub enum UartOption {
37 Tail = 1, // makr is the last packget, can be send to session.
38 Reset = 2, // host request reset session in daemon
39 Ack = 4, // response the pkg is received
40 Nak = 8, // request resend pkg again
41 Free = 16, // request free this session, some unable recovery error happened
42 }
43
44 impl TryFrom<u8> for UartOption {
45 type Error = ();
try_from(cmd: u8) -> Result<Self, ()>46 fn try_from(cmd: u8) -> Result<Self, ()> {
47 match cmd {
48 1 => Ok(Self::Tail),
49 2 => Ok(Self::Reset),
50 4 => Ok(Self::Ack),
51 8 => Ok(Self::Nak),
52 16 => Ok(Self::Free),
53 _ => Err(()),
54 }
55 }
56 }
57
58 struct WaiterManager {
59 // waiter used for sync package send-response one by one.
60 response_waiters: HashMap<u32, Waiter>,
61 // waiter used for waiting if no packages.
62 empty_waiters: HashMap<u32, Waiter>,
63 }
64
65 impl WaiterManager {
get_instance() -> &'static mut WaiterManager66 fn get_instance() -> &'static mut WaiterManager {
67 static mut INSTANCE: Option<WaiterManager> = None;
68 unsafe {
69 INSTANCE.get_or_insert(WaiterManager {
70 response_waiters: HashMap::new(),
71 empty_waiters: HashMap::new(),
72 })
73 }
74 }
75
start_session(session_id: u32)76 async fn start_session(session_id: u32) {
77 let instance = Self::get_instance();
78 instance.response_waiters.insert(session_id, Waiter::new());
79 instance.empty_waiters.insert(session_id, Waiter::new());
80 }
81
82 #[allow(unused)]
wait_response(session_id: u32)83 async fn wait_response(session_id: u32) {
84 let instance = Self::get_instance();
85 let waiter = instance.response_waiters.get(&session_id);
86 if let Some(w) = waiter {
87 w.wait().await;
88 }
89 }
90
91 #[allow(unused)]
wakeup_response_wait(session_id: u32)92 async fn wakeup_response_wait(session_id: u32) {
93 let instance = Self::get_instance();
94 let waiter = instance.response_waiters.get(&session_id);
95 if let Some(w) = waiter {
96 w.wake_one();
97 }
98 }
99
100 #[allow(unused)]
wait_empty(session_id: u32)101 async fn wait_empty(session_id: u32) {
102 let instance = Self::get_instance();
103 let waiter = instance.empty_waiters.get(&session_id);
104 if let Some(w) = waiter {
105 w.wait().await;
106 }
107 }
108
109 #[allow(unused)]
wakeup_empty_wait(session_id: u32)110 async fn wakeup_empty_wait(session_id: u32) {
111 let instance = Self::get_instance();
112 let waiter = instance.empty_waiters.get(&session_id);
113 if let Some(w) = waiter {
114 w.wake_one();
115 }
116 }
117 }
118
119 #[derive(PartialEq, Debug, Clone, Copy)]
120 #[repr(u8)]
121 enum OutputDataStatus {
122 WaitSend = 0,
123 WaitResponse = 1,
124 ResponseOk = 2,
125 }
126
127 #[derive(PartialEq, Debug, Clone)]
128 struct OutputData {
129 session_id: u32,
130 response: bool,
131 option: u8,
132 package_index: u32,
133 data: Vec<u8>,
134 status: OutputDataStatus,
135 retry_count: u32,
136 }
137
138 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
141 self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
142 }
143 }
144
145 type OutputData_ = Arc<Mutex<OutputData>>;
146
147 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
148
149 struct DataQueue {
150 data_map: HashMap<u32, OutputDataVec_>,
151 thread_map: HashMap<u32, JoinHandle<()>>,
152 stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
153 }
154
155 impl DataQueue {
new() -> Self156 fn new() -> Self {
157 Self {
158 data_map: HashMap::new(),
159 thread_map: HashMap::new(),
160 stop_flag_map: HashMap::new(),
161 }
162 }
163 }
164
165 type DataQueue_ = Arc<Mutex<DataQueue>>;
166
167 pub struct QueueManager {}
168
169 impl QueueManager {
get_instance() -> DataQueue_170 fn get_instance() -> DataQueue_ {
171 static mut INSTANCE: Option<DataQueue_> = None;
172 unsafe {
173 INSTANCE
174 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
175 .clone()
176 }
177 }
178
get_package(session_id: u32, index: usize) -> Option<OutputData>179 async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
180 let instance = Self::get_instance();
181 let mtx = instance.lock().await;
182 let data_map = &mtx.data_map;
183 if let Some(vec) = data_map.get(&session_id) {
184 let vec = vec.lock().await;
185 if !vec.is_empty() {
186 let Some(arc) = vec.get(index) else {
187 crate::error!("get_package get is None");
188 return None;
189 };
190 let data_mtx = arc.lock().await;
191 return Some(data_mtx.clone());
192 }
193 }
194 None
195 }
196
put_package(session_id: u32, data: OutputData)197 async fn put_package(session_id: u32, data: OutputData) {
198 let instance = Self::get_instance();
199 let mut mtx = instance.lock().await;
200 let data_map = &mut mtx.data_map;
201 if let Some(vec) = data_map.get(&session_id) {
202 let mut vec = vec.lock().await;
203 let item = Arc::new(Mutex::new(data));
204 vec.push(item);
205 } else {
206 let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
207 let d = Arc::new(Mutex::new(data));
208 vec.push(d);
209 let v = Arc::new(Mutex::new(vec));
210 data_map.insert(session_id, v);
211 }
212 }
213
update_package(session_id: u32, index: usize, data: OutputData) -> bool214 async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
215 let instance = Self::get_instance();
216 let mtx = instance.lock().await;
217 let data_map = &mtx.data_map;
218 if let Some(vec) = data_map.get(&session_id) {
219 let vec = vec.lock().await;
220 if !vec.is_empty() {
221 let Some(arc) = vec.get(index) else {
222 crate::error!("update_package get is None");
223 return false;
224 };
225 let mut data_mtx = arc.lock().await;
226 *data_mtx = data;
227 return true;
228 }
229 }
230 false
231 }
232
get_stop_flag(session_id: u32) -> Option<u8>233 async fn get_stop_flag(session_id: u32) -> Option<u8> {
234 let instance = Self::get_instance();
235 let mtx = instance.lock().await;
236 let stop_flag_map = &mtx.stop_flag_map;
237 if let Some(flag) = stop_flag_map.get(&session_id) {
238 let v = flag.lock().await;
239 Some(*v)
240 } else {
241 None
242 }
243 }
244
245 #[allow(unused)]
set_stop_flag(session_id: u32)246 async fn set_stop_flag(session_id: u32) {
247 let instance = Self::get_instance();
248 let mut mtx = instance.lock().await;
249 let stop_flag_map = &mut mtx.stop_flag_map;
250 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
251 }
252
remove_package(session_id: u32, index: usize) -> bool253 async fn remove_package(session_id: u32, index: usize) -> bool {
254 let instance = Self::get_instance();
255 let mtx = instance.lock().await;
256 let data_map = &mtx.data_map;
257 if let Some(vec) = data_map.get(&session_id) {
258 let mut vec = vec.lock().await;
259 if !vec.is_empty() && index < vec.len() {
260 vec.remove(index);
261 return true;
262 }
263 }
264 false
265 }
266
remove_session(session_id: u32)267 async fn remove_session(session_id: u32) {
268 let instance = Self::get_instance();
269 let mut mtx = instance.lock().await;
270 mtx.data_map.remove(&session_id);
271 mtx.stop_flag_map.remove(&session_id);
272 mtx.thread_map.remove(&session_id);
273 crate::info!("remove_session:{session_id}");
274 }
275
check_stop(session_id: u32) -> bool276 async fn check_stop(session_id: u32) -> bool {
277 if let Some(stop) = Self::get_stop_flag(session_id).await {
278 return stop == 0;
279 }
280 false
281 }
282
session_loop(session_id: u32)283 async fn session_loop(session_id: u32) {
284 // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
285 // 2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
286 // 3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
287 // 如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
288 // retry count为0, 则表示连接中断,stop session
289 crate::info!("session_loop for {}", session_id);
290 loop {
291 if Self::check_stop(session_id).await {
292 crate::info!("session_loop stop");
293 break;
294 }
295 let mut first_pkg = Self::get_package(session_id, 0).await;
296 while first_pkg.is_none() {
297 WaiterManager::wait_empty(session_id).await;
298 first_pkg = Self::get_package(session_id, 0).await;
299 if Self::check_stop(session_id).await {
300 crate::info!("session_loop stop");
301 break;
302 }
303 }
304 if Self::check_stop(session_id).await {
305 crate::info!("session_loop stop");
306 break;
307 }
308 let Some(mut first_pkg) = first_pkg else {
309 crate::info!("session_loop first_pkg is None");
310 break;
311 };
312 let mut status = first_pkg.status;
313 let mut retry_count = first_pkg.retry_count;
314
315 if status == OutputDataStatus::WaitSend {
316 // 发送数据
317 let data = first_pkg.data.clone();
318 let _ret = UartMap::put(session_id, data).await;
319 // 如果是ack报文 则不需要等待回应
320 if first_pkg.response {
321 QueueManager::remove_package(session_id, 0).await;
322 continue;
323 }
324 // 修改data 的status = WaitResponse
325 first_pkg.status = OutputDataStatus::WaitResponse;
326 retry_count -= 1;
327 first_pkg.retry_count = retry_count;
328 // 更新数据
329 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
330 // 等待response
331 WaiterManager::wait_response(session_id).await;
332
333 if Self::check_stop(session_id).await {
334 crate::info!("session_loop stop");
335 break;
336 }
337 // 收到回复
338 // 重新获取数据
339
340 let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
341 crate::info!("session_loop first_pkg is None");
342 break;
343 };
344 // 得到新状态
345 status = first_pkg.status;
346
347 if status == OutputDataStatus::ResponseOk {
348 // 删除当前data
349 QueueManager::remove_package(session_id, 0).await;
350 continue;
351 }
352 retry_count = first_pkg.retry_count;
353 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
354 // 保存retry_count
355 retry_count -= 1;
356 first_pkg.retry_count = retry_count;
357 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
358
359 // 再次发送数据
360 let data = first_pkg.data.clone();
361 let _ret = UartMap::put(session_id, data).await;
362 WaiterManager::wait_response(session_id).await;
363
364 if Self::check_stop(session_id).await {
365 break;
366 }
367
368 let Some(first_pkg) = Self::get_package(session_id, 0).await else {
369 break;
370 };
371 status = first_pkg.status;
372
373 match status {
374 OutputDataStatus::ResponseOk => {
375 QueueManager::remove_package(session_id, 0).await;
376 break;
377 }
378 OutputDataStatus::WaitResponse => {
379 let Some(first_pkg) = Self::get_package(session_id, 0).await else {
380 break;
381 };
382 status = first_pkg.status;
383 retry_count = first_pkg.retry_count;
384 continue;
385 }
386 OutputDataStatus::WaitSend => {
387 QueueManager::remove_package(session_id, 0).await;
388 break;
389 }
390 }
391 }
392 }
393 }
394 Self::remove_session(session_id).await;
395 crate::info!("session_loop for {} end.", session_id);
396 }
397 }
398
start_session(session_id: u32)399 pub async fn start_session(session_id: u32) {
400 let instance = QueueManager::get_instance();
401 let mut mtx = instance.lock().await;
402 let thread_map = &mut mtx.thread_map;
403 if thread_map.contains_key(&session_id) {
404 crate::error!("session thread has started.");
405 return;
406 }
407
408 WaiterManager::start_session(session_id).await;
409
410 let handle = utils::spawn(QueueManager::session_loop(session_id));
411 thread_map.insert(session_id, handle);
412
413 let stop_flag_map = &mut mtx.stop_flag_map;
414 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
415 }
416
stop_session(session_id: u32)417 async fn stop_session(session_id: u32) {
418 let instance = QueueManager::get_instance();
419 let mut mtx = instance.lock().await;
420 let stop_flag_map = &mut mtx.stop_flag_map;
421 stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
422
423 WaiterManager::wakeup_empty_wait(session_id).await;
424 WaiterManager::wakeup_response_wait(session_id).await;
425 }
426
stop_other_session(session_id: u32)427 pub async fn stop_other_session(session_id: u32) {
428 let instance = QueueManager::get_instance();
429 let mtx = instance.lock().await;
430 let session_ids = mtx.data_map.keys();
431 let mut remove_sessions = Vec::new();
432 for k in session_ids {
433 if *k != session_id {
434 remove_sessions.push(*k);
435 }
436 }
437 drop(mtx);
438 for id in remove_sessions {
439 stop_session(id).await;
440 }
441 }
442
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )443 async fn output_package(
444 session_id: u32,
445 response: bool,
446 option: u8,
447 package_index: u32,
448 data: Vec<u8>,
449 ) {
450 let pkg = OutputData {
451 session_id,
452 response,
453 option,
454 package_index,
455 data: data.clone(),
456 retry_count: 5,
457 status: OutputDataStatus::WaitSend,
458 };
459 QueueManager::put_package(session_id, pkg).await;
460 WaiterManager::wakeup_empty_wait(session_id).await;
461 }
462
463 #[allow(unused)]
is_response(option: u8) -> bool464 fn is_response(option: u8) -> bool {
465 let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
466 ret != 0
467 }
468
on_read_head(head: UartHead)469 pub async fn on_read_head(head: UartHead) {
470 let session_id = head.session_id;
471 let option = head.option;
472 let package_index = head.package_index;
473 if option & (UartOption::Free as u16) != 0 {
474 stop_session(session_id).await;
475 return;
476 }
477 if is_response(option as u8) {
478 let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
479 return;
480 };
481 pkg.status = if option & (UartOption::Ack as u16) > 1 {
482 OutputDataStatus::ResponseOk
483 } else {
484 OutputDataStatus::WaitSend
485 };
486 QueueManager::update_package(session_id, 0, pkg).await;
487 WaiterManager::wakeup_response_wait(session_id).await;
488 } else {
489 let mut header_obj =
490 uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
491 let header = header_obj.serialize();
492 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
493 header_obj.head_checksum = u32::to_le(head_sum);
494 let data = header_obj.serialize();
495 output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
496 }
497 }
498
499 #[allow(unused)]
get_package_index(is_create: bool) -> u32500 fn get_package_index(is_create: bool) -> u32 {
501 static mut PACKAGE_INDEX: u32 = 888;
502
503 unsafe {
504 if is_create {
505 PACKAGE_INDEX += 1;
506 PACKAGE_INDEX
507 } else {
508 PACKAGE_INDEX
509 }
510 }
511 }
512
start_uart(session_id: u32, wr: UartWriter)513 pub async fn start_uart(session_id: u32, wr: UartWriter) {
514 UartMap::start(session_id, wr).await;
515 }
516
517 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)518 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
519 let mut pkg_index = package_index;
520 if package_index == 0 {
521 pkg_index = get_package_index(true);
522 }
523 let send = serializer::concat_pack(data);
524 crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
525
526 let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
527 let mut index = 0;
528 let len = send.len();
529
530 loop {
531 if index >= len {
532 println!("wrap_put break");
533 break;
534 }
535 let size;
536 let mut op = option;
537 if index + payload_max_len <= len {
538 size = payload_max_len;
539 } else {
540 size = len - index;
541 op = UartOption::Tail as u8 | option;
542 }
543
544 let data = send[index..index + size].to_vec().clone();
545 let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
546 let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
547 header_obj.data_checksum = u32::to_le(data_sum);
548
549 let header = header_obj.serialize();
550 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
551 header_obj.head_checksum = u32::to_le(head_sum);
552
553 let header = header_obj.serialize();
554 crate::info!("header, header_len:{}", header.len());
555 let total = [header, send[index..index + size].to_vec().clone()].concat();
556
557 output_package(
558 session_id,
559 (op & UartOption::Ack as u8) > 0,
560 op,
561 pkg_index,
562 total,
563 )
564 .await;
565 pkg_index = get_package_index(true);
566 index += size;
567 }
568 }
569