1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 #[allow(unused)]
24 use crate::utils::hdc_log::*;
25 use std::collections::HashMap;
26 use std::sync::Arc;
27 #[cfg(feature = "host")]
28 extern crate ylong_runtime_static as ylong_runtime;
29 use ylong_runtime::sync::waiter::Waiter;
30 use ylong_runtime::sync::Mutex;
31 use ylong_runtime::task::JoinHandle;
32
33 #[derive(PartialEq, Debug, Clone, Copy)]
34 #[repr(u8)]
35 pub enum UartOption {
36 Tail = 1, // makr is the last packget, can be send to session.
37 Reset = 2, // host request reset session in daemon
38 Ack = 4, // response the pkg is received
39 Nak = 8, // request resend pkg again
40 Free = 16, // request free this session, some unable recovery error happened
41 }
42
43 impl TryFrom<u8> for UartOption {
44 type Error = ();
try_from(cmd: u8) -> Result<Self, ()>45 fn try_from(cmd: u8) -> Result<Self, ()> {
46 match cmd {
47 1 => Ok(Self::Tail),
48 2 => Ok(Self::Reset),
49 4 => Ok(Self::Ack),
50 8 => Ok(Self::Nak),
51 16 => Ok(Self::Free),
52 _ => Err(()),
53 }
54 }
55 }
56
57 struct WaiterManager {
58 // waiter used for sync package send-response one by one.
59 response_waiters: HashMap<u32, Waiter>,
60 // waiter used for waiting if no packages.
61 empty_waiters: HashMap<u32, Waiter>,
62 }
63
64 impl WaiterManager {
get_instance() -> &'static mut WaiterManager65 fn get_instance() -> &'static mut WaiterManager {
66 static mut INSTANCE: Option<WaiterManager> = None;
67 unsafe {
68 INSTANCE.get_or_insert(WaiterManager {
69 response_waiters: HashMap::new(),
70 empty_waiters: HashMap::new(),
71 })
72 }
73 }
74
start_session(session_id: u32)75 async fn start_session(session_id: u32) {
76 let instance = Self::get_instance();
77 instance.response_waiters.insert(session_id, Waiter::new());
78 instance.empty_waiters.insert(session_id, Waiter::new());
79 }
80
81 #[allow(unused)]
wait_response(session_id: u32)82 async fn wait_response(session_id: u32) {
83 let instance = Self::get_instance();
84 let waiter = instance.response_waiters.get(&session_id);
85 if let Some(w) = waiter {
86 w.wait().await;
87 }
88 }
89
90 #[allow(unused)]
wakeup_response_wait(session_id: u32)91 async fn wakeup_response_wait(session_id: u32) {
92 let instance = Self::get_instance();
93 let waiter = instance.response_waiters.get(&session_id);
94 if let Some(w) = waiter {
95 w.wake_one();
96 }
97 }
98
99 #[allow(unused)]
wait_empty(session_id: u32)100 async fn wait_empty(session_id: u32) {
101 let instance = Self::get_instance();
102 let waiter = instance.empty_waiters.get(&session_id);
103 if let Some(w) = waiter {
104 w.wait().await;
105 }
106 }
107
108 #[allow(unused)]
wakeup_empty_wait(session_id: u32)109 async fn wakeup_empty_wait(session_id: u32) {
110 let instance = Self::get_instance();
111 let waiter = instance.empty_waiters.get(&session_id);
112 if let Some(w) = waiter {
113 w.wake_one();
114 }
115 }
116 }
117
118 #[derive(PartialEq, Debug, Clone, Copy)]
119 #[repr(u8)]
120 enum OutputDataStatus {
121 WaitSend = 0,
122 WaitResponse = 1,
123 ResponseOk = 2,
124 }
125
126 #[derive(PartialEq, Debug, Clone)]
127 struct OutputData {
128 session_id: u32,
129 response: bool,
130 option: u8,
131 package_index: u32,
132 data: Vec<u8>,
133 status: OutputDataStatus,
134 retry_count: u32,
135 }
136
137 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
140 self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
141 }
142 }
143
144 type OutputData_ = Arc<Mutex<OutputData>>;
145
146 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
147
148 struct DataQueue {
149 data_map: HashMap<u32, OutputDataVec_>,
150 thread_map: HashMap<u32, JoinHandle<()>>,
151 stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
152 }
153
154 impl DataQueue {
new() -> Self155 fn new() -> Self {
156 Self {
157 data_map: HashMap::new(),
158 thread_map: HashMap::new(),
159 stop_flag_map: HashMap::new(),
160 }
161 }
162 }
163
164 type DataQueue_ = Arc<Mutex<DataQueue>>;
165
166 pub struct QueueManager {}
167
168 impl QueueManager {
get_instance() -> DataQueue_169 fn get_instance() -> DataQueue_ {
170 static mut INSTANCE: Option<DataQueue_> = None;
171 unsafe {
172 INSTANCE
173 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
174 .clone()
175 }
176 }
177
get_package(session_id: u32, index: usize) -> Option<OutputData>178 async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
179 let instance = Self::get_instance();
180 let mtx = instance.lock().await;
181 let data_map = &mtx.data_map;
182 if let Some(vec) = data_map.get(&session_id) {
183 let vec = vec.lock().await;
184 if !vec.is_empty() {
185 let Some(arc) = vec.get(index) else {
186 crate::error!("get_package get is None");
187 return None;
188 };
189 let data_mtx = arc.lock().await;
190 return Some(data_mtx.clone());
191 }
192 }
193 None
194 }
195
put_package(session_id: u32, data: OutputData)196 async fn put_package(session_id: u32, data: OutputData) {
197 let instance = Self::get_instance();
198 let mut mtx = instance.lock().await;
199 let data_map = &mut mtx.data_map;
200 if let Some(vec) = data_map.get(&session_id) {
201 let mut vec = vec.lock().await;
202 let item = Arc::new(Mutex::new(data));
203 vec.push(item);
204 } else {
205 let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
206 let d = Arc::new(Mutex::new(data));
207 vec.push(d);
208 let v = Arc::new(Mutex::new(vec));
209 data_map.insert(session_id, v);
210 }
211 }
212
update_package(session_id: u32, index: usize, data: OutputData) -> bool213 async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
214 let instance = Self::get_instance();
215 let mtx = instance.lock().await;
216 let data_map = &mtx.data_map;
217 if let Some(vec) = data_map.get(&session_id) {
218 let vec = vec.lock().await;
219 if !vec.is_empty() {
220 let Some(arc) = vec.get(index) else {
221 crate::error!("update_package get is None");
222 return false;
223 };
224 let mut data_mtx = arc.lock().await;
225 *data_mtx = data;
226 return true;
227 }
228 }
229 false
230 }
231
get_stop_flag(session_id: u32) -> Option<u8>232 async fn get_stop_flag(session_id: u32) -> Option<u8> {
233 let instance = Self::get_instance();
234 let mtx = instance.lock().await;
235 let stop_flag_map = &mtx.stop_flag_map;
236 if let Some(flag) = stop_flag_map.get(&session_id) {
237 let v = flag.lock().await;
238 Some(*v)
239 } else {
240 None
241 }
242 }
243
244 #[allow(unused)]
set_stop_flag(session_id: u32)245 async fn set_stop_flag(session_id: u32) {
246 let instance = Self::get_instance();
247 let mut mtx = instance.lock().await;
248 let stop_flag_map = &mut mtx.stop_flag_map;
249 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
250 }
251
remove_package(session_id: u32, index: usize) -> bool252 async fn remove_package(session_id: u32, index: usize) -> bool {
253 let instance = Self::get_instance();
254 let mtx = instance.lock().await;
255 let data_map = &mtx.data_map;
256 if let Some(vec) = data_map.get(&session_id) {
257 let mut vec = vec.lock().await;
258 if !vec.is_empty() && index < vec.len() {
259 vec.remove(index);
260 return true;
261 }
262 }
263 false
264 }
265
remove_session(session_id: u32)266 async fn remove_session(session_id: u32) {
267 let instance = Self::get_instance();
268 let mut mtx = instance.lock().await;
269 mtx.data_map.remove(&session_id);
270 mtx.stop_flag_map.remove(&session_id);
271 mtx.thread_map.remove(&session_id);
272 crate::info!("remove_session:{session_id}");
273 }
274
check_stop(session_id: u32) -> bool275 async fn check_stop(session_id: u32) -> bool {
276 if let Some(stop) = Self::get_stop_flag(session_id).await {
277 return stop == 0;
278 }
279 false
280 }
281
session_loop(session_id: u32)282 async fn session_loop(session_id: u32) {
283 // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
284 // 2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
285 // 3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
286 // 如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
287 // retry count为0, 则表示连接中断,stop session
288 crate::info!("session_loop for {}", session_id);
289 loop {
290 if Self::check_stop(session_id).await {
291 crate::info!("session_loop stop");
292 break;
293 }
294 let mut first_pkg = Self::get_package(session_id, 0).await;
295 while first_pkg.is_none() {
296 WaiterManager::wait_empty(session_id).await;
297 first_pkg = Self::get_package(session_id, 0).await;
298 if Self::check_stop(session_id).await {
299 crate::info!("session_loop stop");
300 break;
301 }
302 }
303 if Self::check_stop(session_id).await {
304 crate::info!("session_loop stop");
305 break;
306 }
307 let Some(mut first_pkg) = first_pkg else {
308 crate::info!("session_loop first_pkg is None");
309 break;
310 };
311 let mut status = first_pkg.status;
312 let mut retry_count = first_pkg.retry_count;
313
314 if status == OutputDataStatus::WaitSend {
315 // 发送数据
316 let data = first_pkg.data.clone();
317 let _ret = UartMap::put(session_id, data).await;
318 // 如果是ack报文 则不需要等待回应
319 if first_pkg.response {
320 QueueManager::remove_package(session_id, 0).await;
321 continue;
322 }
323 // 修改data 的status = WaitResponse
324 first_pkg.status = OutputDataStatus::WaitResponse;
325 retry_count -= 1;
326 first_pkg.retry_count = retry_count;
327 // 更新数据
328 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
329 // 等待response
330 WaiterManager::wait_response(session_id).await;
331
332 if Self::check_stop(session_id).await {
333 crate::info!("session_loop stop");
334 break;
335 }
336 // 收到回复
337 // 重新获取数据
338
339 let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
340 crate::info!("session_loop first_pkg is None");
341 break;
342 };
343 // 得到新状态
344 status = first_pkg.status;
345
346 if status == OutputDataStatus::ResponseOk {
347 // 删除当前data
348 QueueManager::remove_package(session_id, 0).await;
349 continue;
350 }
351 retry_count = first_pkg.retry_count;
352 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
353 // 保存retry_count
354 retry_count -= 1;
355 first_pkg.retry_count = retry_count;
356 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
357
358 // 再次发送数据
359 let data = first_pkg.data.clone();
360 let _ret = UartMap::put(session_id, data).await;
361 WaiterManager::wait_response(session_id).await;
362
363 if Self::check_stop(session_id).await {
364 break;
365 }
366
367 let Some(first_pkg) = Self::get_package(session_id, 0).await else {
368 break;
369 };
370 status = first_pkg.status;
371
372 match status {
373 OutputDataStatus::ResponseOk => {
374 QueueManager::remove_package(session_id, 0).await;
375 break;
376 }
377 OutputDataStatus::WaitResponse => {
378 let Some(first_pkg) = Self::get_package(session_id, 0).await else {
379 break;
380 };
381 status = first_pkg.status;
382 retry_count = first_pkg.retry_count;
383 continue;
384 }
385 OutputDataStatus::WaitSend => {
386 QueueManager::remove_package(session_id, 0).await;
387 break;
388 }
389 }
390 }
391 }
392 }
393 Self::remove_session(session_id).await;
394 crate::info!("session_loop for {} end.", session_id);
395 }
396 }
397
start_session(session_id: u32)398 pub async fn start_session(session_id: u32) {
399 let instance = QueueManager::get_instance();
400 let mut mtx = instance.lock().await;
401 let thread_map = &mut mtx.thread_map;
402 if thread_map.contains_key(&session_id) {
403 crate::error!("session thread has started.");
404 return;
405 }
406
407 WaiterManager::start_session(session_id).await;
408
409 let handle = ylong_runtime::spawn(QueueManager::session_loop(session_id));
410 thread_map.insert(session_id, handle);
411
412 let stop_flag_map = &mut mtx.stop_flag_map;
413 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
414 }
415
stop_session(session_id: u32)416 async fn stop_session(session_id: u32) {
417 let instance = QueueManager::get_instance();
418 let mut mtx = instance.lock().await;
419 let stop_flag_map = &mut mtx.stop_flag_map;
420 stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
421
422 WaiterManager::wakeup_empty_wait(session_id).await;
423 WaiterManager::wakeup_response_wait(session_id).await;
424 }
425
stop_other_session(session_id: u32)426 pub async fn stop_other_session(session_id: u32) {
427 let instance = QueueManager::get_instance();
428 let mtx = instance.lock().await;
429 let session_ids = mtx.data_map.keys();
430 let mut remove_sessions = Vec::new();
431 for k in session_ids {
432 if *k != session_id {
433 remove_sessions.push(*k);
434 }
435 }
436 drop(mtx);
437 for id in remove_sessions {
438 stop_session(id).await;
439 }
440 }
441
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )442 async fn output_package(
443 session_id: u32,
444 response: bool,
445 option: u8,
446 package_index: u32,
447 data: Vec<u8>,
448 ) {
449 let pkg = OutputData {
450 session_id,
451 response,
452 option,
453 package_index,
454 data: data.clone(),
455 retry_count: 5,
456 status: OutputDataStatus::WaitSend,
457 };
458 QueueManager::put_package(session_id, pkg).await;
459 WaiterManager::wakeup_empty_wait(session_id).await;
460 }
461
462 #[allow(unused)]
is_response(option: u8) -> bool463 fn is_response(option: u8) -> bool {
464 let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
465 ret != 0
466 }
467
on_read_head(head: UartHead)468 pub async fn on_read_head(head: UartHead) {
469 let session_id = head.session_id;
470 let option = head.option;
471 let package_index = head.package_index;
472 if option & (UartOption::Free as u16) != 0 {
473 stop_session(session_id).await;
474 return;
475 }
476 if is_response(option as u8) {
477 let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
478 return;
479 };
480 pkg.status = if option & (UartOption::Ack as u16) > 1 {
481 OutputDataStatus::ResponseOk
482 } else {
483 OutputDataStatus::WaitSend
484 };
485 QueueManager::update_package(session_id, 0, pkg).await;
486 WaiterManager::wakeup_response_wait(session_id).await;
487 } else {
488 let mut header_obj =
489 uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
490 let header = header_obj.serialize();
491 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
492 header_obj.head_checksum = u32::to_le(head_sum);
493 let data = header_obj.serialize();
494 output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
495 }
496 }
497
498 #[allow(unused)]
get_package_index(is_create: bool) -> u32499 fn get_package_index(is_create: bool) -> u32 {
500 static mut PACKAGE_INDEX: u32 = 888;
501
502 unsafe {
503 if is_create {
504 PACKAGE_INDEX += 1;
505 PACKAGE_INDEX
506 } else {
507 PACKAGE_INDEX
508 }
509 }
510 }
511
start_uart(session_id: u32, wr: UartWriter)512 pub async fn start_uart(session_id: u32, wr: UartWriter) {
513 UartMap::start(session_id, wr).await;
514 }
515
516 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)517 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
518 let mut pkg_index = package_index;
519 if package_index == 0 {
520 pkg_index = get_package_index(true);
521 }
522 let send = serializer::concat_pack(data);
523 crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
524
525 let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
526 let mut index = 0;
527 let len = send.len();
528
529 loop {
530 if index >= len {
531 println!("wrap_put break");
532 break;
533 }
534 let size;
535 let mut op = option;
536 if index + payload_max_len <= len {
537 size = payload_max_len;
538 } else {
539 size = len - index;
540 op = UartOption::Tail as u8 | option;
541 }
542
543 let data = send[index..index + size].to_vec().clone();
544 let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
545 let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
546 header_obj.data_checksum = u32::to_le(data_sum);
547
548 let header = header_obj.serialize();
549 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
550 header_obj.head_checksum = u32::to_le(head_sum);
551
552 let header = header_obj.serialize();
553 crate::info!("header, header_len:{}", header.len());
554 let total = [header, send[index..index + size].to_vec().clone()].concat();
555
556 output_package(
557 session_id,
558 (op & UartOption::Ack as u8) > 0,
559 op,
560 pkg_index,
561 total,
562 )
563 .await;
564 pkg_index = get_package_index(true);
565 index += size;
566 }
567 }
568