1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 #[allow(unused)]
24 use crate::utils::hdc_log::*;
25 use std::collections::HashMap;
26 use std::sync::Arc;
27 use ylong_runtime::sync::waiter::Waiter;
28 use ylong_runtime::sync::Mutex;
29 use ylong_runtime::task::JoinHandle;
30
31 #[derive(PartialEq, Debug, Clone, Copy)]
32 #[repr(u8)]
33 pub enum UartOption {
34 Tail = 1, // makr is the last packget, can be send to session.
35 Reset = 2, // host request reset session in daemon
36 Ack = 4, // response the pkg is received
37 Nak = 8, // request resend pkg again
38 Free = 16, // request free this session, some unable recovery error happened
39 }
40
41 impl TryFrom<u8> for UartOption {
42 type Error = ();
try_from(cmd: u8) -> Result<Self, ()>43 fn try_from(cmd: u8) -> Result<Self, ()> {
44 match cmd {
45 1 => Ok(Self::Tail),
46 2 => Ok(Self::Reset),
47 4 => Ok(Self::Ack),
48 8 => Ok(Self::Nak),
49 16 => Ok(Self::Free),
50 _ => Err(()),
51 }
52 }
53 }
54
55 struct WaiterManager {
56 // waiter used for sync package send-response one by one.
57 response_waiters: HashMap<u32, Waiter>,
58 // waiter used for waiting if no packages.
59 empty_waiters: HashMap<u32, Waiter>,
60 }
61
62 impl WaiterManager {
get_instance() -> &'static mut WaiterManager63 fn get_instance() -> &'static mut WaiterManager {
64 static mut INSTANCE: Option<WaiterManager> = None;
65 unsafe {
66 INSTANCE.get_or_insert(WaiterManager {
67 response_waiters: HashMap::new(),
68 empty_waiters: HashMap::new(),
69 })
70 }
71 }
72
start_session(session_id: u32)73 async fn start_session(session_id: u32) {
74 let instance = Self::get_instance();
75 instance.response_waiters.insert(session_id, Waiter::new());
76 instance.empty_waiters.insert(session_id, Waiter::new());
77 }
78
79 #[allow(unused)]
wait_response(session_id: u32)80 async fn wait_response(session_id: u32) {
81 let instance = Self::get_instance();
82 let waiter = instance.response_waiters.get(&session_id);
83 if let Some(w) = waiter {
84 w.wait().await;
85 }
86 }
87
88 #[allow(unused)]
wakeup_response_wait(session_id: u32)89 async fn wakeup_response_wait(session_id: u32) {
90 let instance = Self::get_instance();
91 let waiter = instance.response_waiters.get(&session_id);
92 if let Some(w) = waiter {
93 w.wake_one();
94 }
95 }
96
97 #[allow(unused)]
wait_empty(session_id: u32)98 async fn wait_empty(session_id: u32) {
99 let instance = Self::get_instance();
100 let waiter = instance.empty_waiters.get(&session_id);
101 if let Some(w) = waiter {
102 w.wait().await;
103 }
104 }
105
106 #[allow(unused)]
wakeup_empty_wait(session_id: u32)107 async fn wakeup_empty_wait(session_id: u32) {
108 let instance = Self::get_instance();
109 let waiter = instance.empty_waiters.get(&session_id);
110 if let Some(w) = waiter {
111 w.wake_one();
112 }
113 }
114 }
115
116 #[derive(PartialEq, Debug, Clone, Copy)]
117 #[repr(u8)]
118 enum OutputDataStatus {
119 WaitSend = 0,
120 WaitResponse = 1,
121 ResponseOk = 2,
122 }
123
124 #[derive(PartialEq, Debug, Clone)]
125 struct OutputData {
126 session_id: u32,
127 response: bool,
128 option: u8,
129 package_index: u32,
130 data: Vec<u8>,
131 status: OutputDataStatus,
132 retry_count: u32,
133 }
134
135 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
138 self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
139 }
140 }
141
142 type OutputData_ = Arc<Mutex<OutputData>>;
143
144 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
145
146 struct DataQueue {
147 data_map: HashMap<u32, OutputDataVec_>,
148 thread_map: HashMap<u32, JoinHandle<()>>,
149 stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
150 }
151
152 impl DataQueue {
new() -> Self153 fn new() -> Self {
154 Self {
155 data_map: HashMap::new(),
156 thread_map: HashMap::new(),
157 stop_flag_map: HashMap::new(),
158 }
159 }
160 }
161
162 type DataQueue_ = Arc<Mutex<DataQueue>>;
163
164 pub struct QueueManager {}
165
166 impl QueueManager {
get_instance() -> DataQueue_167 fn get_instance() -> DataQueue_ {
168 static mut INSTANCE: Option<DataQueue_> = None;
169 unsafe {
170 INSTANCE
171 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
172 .clone()
173 }
174 }
175
get_package(session_id: u32, index: usize) -> Option<OutputData>176 async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
177 let instance = Self::get_instance();
178 let mtx = instance.lock().await;
179 let data_map = &mtx.data_map;
180 if let Some(vec) = data_map.get(&session_id) {
181 let vec = vec.lock().await;
182 if !vec.is_empty() {
183 let arc = vec.get(index).unwrap();
184 let data_mtx = arc.lock().await;
185 return Some(data_mtx.clone());
186 }
187 }
188 None
189 }
190
put_package(session_id: u32, data: OutputData)191 async fn put_package(session_id: u32, data: OutputData) {
192 let instance = Self::get_instance();
193 let mut mtx = instance.lock().await;
194 let data_map = &mut mtx.data_map;
195 if let Some(vec) = data_map.get(&session_id) {
196 let mut vec = vec.lock().await;
197 let item = Arc::new(Mutex::new(data));
198 vec.push(item);
199 } else {
200 let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
201 let d = Arc::new(Mutex::new(data));
202 vec.push(d);
203 let v = Arc::new(Mutex::new(vec));
204 data_map.insert(session_id, v);
205 }
206 }
207
update_package(session_id: u32, index: usize, data: OutputData) -> bool208 async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
209 let instance = Self::get_instance();
210 let mtx = instance.lock().await;
211 let data_map = &mtx.data_map;
212 if let Some(vec) = data_map.get(&session_id) {
213 let vec = vec.lock().await;
214 if !vec.is_empty() {
215 let arc = vec.get(index).unwrap();
216 let mut data_mtx = arc.lock().await;
217 *data_mtx = data;
218 return true;
219 }
220 }
221 false
222 }
223
get_stop_flag(session_id: u32) -> Option<u8>224 async fn get_stop_flag(session_id: u32) -> Option<u8> {
225 let instance = Self::get_instance();
226 let mtx = instance.lock().await;
227 let stop_flag_map = &mtx.stop_flag_map;
228 if let Some(flag) = stop_flag_map.get(&session_id) {
229 let v = flag.lock().await;
230 Some(*v)
231 } else {
232 None
233 }
234 }
235
236 #[allow(unused)]
set_stop_flag(session_id: u32)237 async fn set_stop_flag(session_id: u32) {
238 let instance = Self::get_instance();
239 let mut mtx = instance.lock().await;
240 let stop_flag_map = &mut mtx.stop_flag_map;
241 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
242 }
243
remove_package(session_id: u32, index: usize) -> bool244 async fn remove_package(session_id: u32, index: usize) -> bool {
245 let instance = Self::get_instance();
246 let mtx = instance.lock().await;
247 let data_map = &mtx.data_map;
248 if let Some(vec) = data_map.get(&session_id) {
249 let mut vec = vec.lock().await;
250 if !vec.is_empty() && index < vec.len() {
251 vec.remove(index);
252 return true;
253 }
254 }
255 false
256 }
257
remove_session(session_id: u32)258 async fn remove_session(session_id: u32) {
259 let instance = Self::get_instance();
260 let mut mtx = instance.lock().await;
261 mtx.data_map.remove(&session_id);
262 mtx.stop_flag_map.remove(&session_id);
263 mtx.thread_map.remove(&session_id);
264 println!("remove_session:{session_id}");
265 }
266
check_stop(session_id: u32) -> bool267 async fn check_stop(session_id: u32) -> bool {
268 if let Some(stop) = Self::get_stop_flag(session_id).await {
269 return stop == 0;
270 }
271 false
272 }
273
session_loop(session_id: u32)274 async fn session_loop(session_id: u32) {
275 // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
276 // 2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
277 // 3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
278 // 如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
279 // retry count为0, 则表示连接中断,stop session
280 println!("session_loop for {}", session_id);
281 loop {
282 if Self::check_stop(session_id).await {
283 break;
284 }
285 let mut first_pkg = Self::get_package(session_id, 0).await;
286 while first_pkg.is_none() {
287 WaiterManager::wait_empty(session_id).await;
288 first_pkg = Self::get_package(session_id, 0).await;
289 if Self::check_stop(session_id).await {
290 break;
291 }
292 }
293 if Self::check_stop(session_id).await {
294 break;
295 }
296 let mut first_pkg = first_pkg.unwrap();
297 let mut status = first_pkg.status;
298 let mut retry_count = first_pkg.retry_count;
299
300 if status == OutputDataStatus::WaitSend {
301 // 发送数据
302 let data = first_pkg.data.clone();
303 let _ret = UartMap::put(session_id, data).await;
304 // 如果是ack报文 则不需要等待回应
305 if first_pkg.response {
306 QueueManager::remove_package(session_id, 0).await;
307 continue;
308 }
309 // 修改data 的status = WaitResponse
310 first_pkg.status = OutputDataStatus::WaitResponse;
311 retry_count -= 1;
312 first_pkg.retry_count = retry_count;
313 // 更新数据
314 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
315 // 等待response
316 WaiterManager::wait_response(session_id).await;
317
318 if Self::check_stop(session_id).await {
319 break;
320 }
321 // 收到回复
322 // 重新获取数据
323
324 let first_pkg = Self::get_package(session_id, 0).await;
325
326 let mut first_pkg = first_pkg.unwrap();
327 // 得到新状态
328 status = first_pkg.status;
329
330 if status == OutputDataStatus::ResponseOk {
331 // 删除当前data
332 QueueManager::remove_package(session_id, 0).await;
333 continue;
334 }
335 retry_count = first_pkg.retry_count;
336 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
337 // 保存retry_count
338 retry_count -= 1;
339 first_pkg.retry_count = retry_count;
340 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
341
342 // 再次发送数据
343 let data = first_pkg.data.clone();
344 let _ret = UartMap::put(session_id, data).await;
345 WaiterManager::wait_response(session_id).await;
346
347 if Self::check_stop(session_id).await {
348 break;
349 }
350
351 let first_pkg = Self::get_package(session_id, 0).await;
352
353 let first_pkg = first_pkg.unwrap();
354 status = first_pkg.status;
355
356 match status {
357 OutputDataStatus::ResponseOk => {
358 QueueManager::remove_package(session_id, 0).await;
359 break;
360 }
361 OutputDataStatus::WaitResponse => {
362 let first_pkg = Self::get_package(session_id, 0).await;
363 let first_pkg = first_pkg.unwrap();
364 status = first_pkg.status;
365 retry_count = first_pkg.retry_count;
366 continue;
367 }
368 OutputDataStatus::WaitSend => {
369 QueueManager::remove_package(session_id, 0).await;
370 break;
371 }
372 }
373 }
374 }
375 }
376 Self::remove_session(session_id).await;
377 println!("session_loop for {} end.", session_id);
378 }
379 }
380
start_session(session_id: u32)381 pub async fn start_session(session_id: u32) {
382 let instance = QueueManager::get_instance();
383 let mut mtx = instance.lock().await;
384 let thread_map = &mut mtx.thread_map;
385 if thread_map.contains_key(&session_id) {
386 println!("session thread has started.");
387 return;
388 }
389
390 WaiterManager::start_session(session_id).await;
391
392 let handle = ylong_runtime::spawn(QueueManager::session_loop(session_id));
393 thread_map.insert(session_id, handle);
394
395 let stop_flag_map = &mut mtx.stop_flag_map;
396 stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
397 }
398
stop_session(session_id: u32)399 async fn stop_session(session_id: u32) {
400 let instance = QueueManager::get_instance();
401 let mut mtx = instance.lock().await;
402 let stop_flag_map = &mut mtx.stop_flag_map;
403 stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
404
405 WaiterManager::wakeup_empty_wait(session_id).await;
406 WaiterManager::wakeup_response_wait(session_id).await;
407 }
408
stop_other_session(session_id: u32)409 pub async fn stop_other_session(session_id: u32) {
410 let instance = QueueManager::get_instance();
411 let mtx = instance.lock().await;
412 let session_ids = mtx.data_map.keys();
413 let mut remove_sessions = Vec::new();
414 for k in session_ids {
415 if *k != session_id {
416 remove_sessions.push(*k);
417 }
418 }
419 drop(mtx);
420 for id in remove_sessions {
421 stop_session(id).await;
422 }
423 }
424
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )425 async fn output_package(
426 session_id: u32,
427 response: bool,
428 option: u8,
429 package_index: u32,
430 data: Vec<u8>,
431 ) {
432 let pkg = OutputData {
433 session_id,
434 response,
435 option,
436 package_index,
437 data: data.clone(),
438 retry_count: 5,
439 status: OutputDataStatus::WaitSend,
440 };
441 QueueManager::put_package(session_id, pkg).await;
442 WaiterManager::wakeup_empty_wait(session_id).await;
443 }
444
445 #[allow(unused)]
is_response(option: u8) -> bool446 fn is_response(option: u8) -> bool {
447 let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
448 ret != 0
449 }
450
on_read_head(head: UartHead)451 pub async fn on_read_head(head: UartHead) {
452 let session_id = head.session_id;
453 let option = head.option;
454 let package_index = head.package_index;
455 if option & (UartOption::Free as u16) != 0 {
456 stop_session(session_id).await;
457 return;
458 }
459 if is_response(option as u8) {
460 let pkg = QueueManager::get_package(session_id, 0).await;
461 let mut pkg = pkg.unwrap();
462 pkg.status = if option & (UartOption::Ack as u16) > 1 {
463 OutputDataStatus::ResponseOk
464 } else {
465 OutputDataStatus::WaitSend
466 };
467 QueueManager::update_package(session_id, 0, pkg).await;
468 WaiterManager::wakeup_response_wait(session_id).await;
469 } else {
470 let mut header_obj =
471 uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
472 let header = header_obj.serialize();
473 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
474 header_obj.head_checksum = u32::to_le(head_sum);
475 let data = header_obj.serialize();
476 output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
477 }
478 }
479
480 #[allow(unused)]
get_package_index(is_create: bool) -> u32481 fn get_package_index(is_create: bool) -> u32 {
482 static mut PACKAGE_INDEX: u32 = 888;
483
484 unsafe {
485 if is_create {
486 PACKAGE_INDEX += 1;
487 PACKAGE_INDEX
488 } else {
489 PACKAGE_INDEX
490 }
491 }
492 }
493
start_uart(session_id: u32, wr: UartWriter)494 pub async fn start_uart(session_id: u32, wr: UartWriter) {
495 UartMap::start(session_id, wr).await;
496 }
497
498 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)499 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
500 let mut pkg_index = package_index;
501 if package_index == 0 {
502 pkg_index = get_package_index(true);
503 }
504 let send = serializer::concat_pack(data);
505 crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
506
507 let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
508 let mut index = 0;
509 let len = send.len();
510
511 loop {
512 if index >= len {
513 println!("wrap_put break");
514 break;
515 }
516 let size;
517 let mut op = option;
518 if index + payload_max_len <= len {
519 size = payload_max_len;
520 } else {
521 size = len - index;
522 op = UartOption::Tail as u8 | option;
523 }
524
525 let data = send[index..index + size].to_vec().clone();
526 let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
527 let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
528 header_obj.data_checksum = u32::to_le(data_sum);
529
530 let header = header_obj.serialize();
531 let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
532 header_obj.head_checksum = u32::to_le(head_sum);
533
534 let header = header_obj.serialize();
535 crate::info!("header, header_len:{}", header.len());
536 let total = [header, send[index..index + size].to_vec().clone()].concat();
537
538 output_package(
539 session_id,
540 (op & UartOption::Ack as u8) > 0,
541 op,
542 pkg_index,
543 total,
544 )
545 .await;
546 pkg_index = get_package_index(true);
547 index += size;
548 }
549 }
550