• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 use crate::utils;
24 #[allow(unused)]
25 use crate::utils::hdc_log::*;
26 use std::collections::HashMap;
27 use std::sync::Arc;
28 #[cfg(feature = "host")]
29 extern crate ylong_runtime_static as ylong_runtime;
30 use ylong_runtime::sync::waiter::Waiter;
31 use ylong_runtime::sync::Mutex;
32 use ylong_runtime::task::JoinHandle;
33 
34 #[derive(PartialEq, Debug, Clone, Copy)]
35 #[repr(u8)]
36 pub enum UartOption {
37     Tail = 1,  // makr is the last packget, can be send to session.
38     Reset = 2, // host request reset session in daemon
39     Ack = 4,   // response the pkg is received
40     Nak = 8,   // request resend pkg again
41     Free = 16, // request free this session, some unable recovery error happened
42 }
43 
44 impl TryFrom<u8> for UartOption {
45     type Error = ();
try_from(cmd: u8) -> Result<Self, ()>46     fn try_from(cmd: u8) -> Result<Self, ()> {
47         match cmd {
48             1 => Ok(Self::Tail),
49             2 => Ok(Self::Reset),
50             4 => Ok(Self::Ack),
51             8 => Ok(Self::Nak),
52             16 => Ok(Self::Free),
53             _ => Err(()),
54         }
55     }
56 }
57 
58 struct WaiterManager {
59     // waiter used for sync package send-response one by one.
60     response_waiters: HashMap<u32, Waiter>,
61     // waiter used for waiting if no packages.
62     empty_waiters: HashMap<u32, Waiter>,
63 }
64 
65 impl WaiterManager {
get_instance() -> &'static mut WaiterManager66     fn get_instance() -> &'static mut WaiterManager {
67         static mut INSTANCE: Option<WaiterManager> = None;
68         unsafe {
69             INSTANCE.get_or_insert(WaiterManager {
70                 response_waiters: HashMap::new(),
71                 empty_waiters: HashMap::new(),
72             })
73         }
74     }
75 
start_session(session_id: u32)76     async fn start_session(session_id: u32) {
77         let instance = Self::get_instance();
78         instance.response_waiters.insert(session_id, Waiter::new());
79         instance.empty_waiters.insert(session_id, Waiter::new());
80     }
81 
82     #[allow(unused)]
wait_response(session_id: u32)83     async fn wait_response(session_id: u32) {
84         let instance = Self::get_instance();
85         let waiter = instance.response_waiters.get(&session_id);
86         if let Some(w) = waiter {
87             w.wait().await;
88         }
89     }
90 
91     #[allow(unused)]
wakeup_response_wait(session_id: u32)92     async fn wakeup_response_wait(session_id: u32) {
93         let instance = Self::get_instance();
94         let waiter = instance.response_waiters.get(&session_id);
95         if let Some(w) = waiter {
96             w.wake_one();
97         }
98     }
99 
100     #[allow(unused)]
wait_empty(session_id: u32)101     async fn wait_empty(session_id: u32) {
102         let instance = Self::get_instance();
103         let waiter = instance.empty_waiters.get(&session_id);
104         if let Some(w) = waiter {
105             w.wait().await;
106         }
107     }
108 
109     #[allow(unused)]
wakeup_empty_wait(session_id: u32)110     async fn wakeup_empty_wait(session_id: u32) {
111         let instance = Self::get_instance();
112         let waiter = instance.empty_waiters.get(&session_id);
113         if let Some(w) = waiter {
114             w.wake_one();
115         }
116     }
117 }
118 
119 #[derive(PartialEq, Debug, Clone, Copy)]
120 #[repr(u8)]
121 enum OutputDataStatus {
122     WaitSend = 0,
123     WaitResponse = 1,
124     ResponseOk = 2,
125 }
126 
127 #[derive(PartialEq, Debug, Clone)]
128 struct OutputData {
129     session_id: u32,
130     response: bool,
131     option: u8,
132     package_index: u32,
133     data: Vec<u8>,
134     status: OutputDataStatus,
135     retry_count: u32,
136 }
137 
138 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result139     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140         write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
141         self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
142     }
143 }
144 
145 type OutputData_ = Arc<Mutex<OutputData>>;
146 
147 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
148 
149 struct DataQueue {
150     data_map: HashMap<u32, OutputDataVec_>,
151     thread_map: HashMap<u32, JoinHandle<()>>,
152     stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
153 }
154 
155 impl DataQueue {
new() -> Self156     fn new() -> Self {
157         Self {
158             data_map: HashMap::new(),
159             thread_map: HashMap::new(),
160             stop_flag_map: HashMap::new(),
161         }
162     }
163 }
164 
165 type DataQueue_ = Arc<Mutex<DataQueue>>;
166 
167 pub struct QueueManager {}
168 
169 impl QueueManager {
get_instance() -> DataQueue_170     fn get_instance() -> DataQueue_ {
171         static mut INSTANCE: Option<DataQueue_> = None;
172         unsafe {
173             INSTANCE
174                 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
175                 .clone()
176         }
177     }
178 
get_package(session_id: u32, index: usize) -> Option<OutputData>179     async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
180         let instance = Self::get_instance();
181         let mtx = instance.lock().await;
182         let data_map = &mtx.data_map;
183         if let Some(vec) = data_map.get(&session_id) {
184             let vec = vec.lock().await;
185             if !vec.is_empty() {
186                 let Some(arc) = vec.get(index) else {
187                     crate::error!("get_package get is None");
188                     return None;
189                 };
190                 let data_mtx = arc.lock().await;
191                 return Some(data_mtx.clone());
192             }
193         }
194         None
195     }
196 
put_package(session_id: u32, data: OutputData)197     async fn put_package(session_id: u32, data: OutputData) {
198         let instance = Self::get_instance();
199         let mut mtx = instance.lock().await;
200         let data_map = &mut mtx.data_map;
201         if let Some(vec) = data_map.get(&session_id) {
202             let mut vec = vec.lock().await;
203             let item = Arc::new(Mutex::new(data));
204             vec.push(item);
205         } else {
206             let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
207             let d = Arc::new(Mutex::new(data));
208             vec.push(d);
209             let v = Arc::new(Mutex::new(vec));
210             data_map.insert(session_id, v);
211         }
212     }
213 
update_package(session_id: u32, index: usize, data: OutputData) -> bool214     async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
215         let instance = Self::get_instance();
216         let mtx = instance.lock().await;
217         let data_map = &mtx.data_map;
218         if let Some(vec) = data_map.get(&session_id) {
219             let vec = vec.lock().await;
220             if !vec.is_empty() {
221                 let Some(arc) = vec.get(index) else {
222                     crate::error!("update_package get is None");
223                     return false;
224                 };
225                 let mut data_mtx = arc.lock().await;
226                 *data_mtx = data;
227                 return true;
228             }
229         }
230         false
231     }
232 
get_stop_flag(session_id: u32) -> Option<u8>233     async fn get_stop_flag(session_id: u32) -> Option<u8> {
234         let instance = Self::get_instance();
235         let mtx = instance.lock().await;
236         let stop_flag_map = &mtx.stop_flag_map;
237         if let Some(flag) = stop_flag_map.get(&session_id) {
238             let v = flag.lock().await;
239             Some(*v)
240         } else {
241             None
242         }
243     }
244 
245     #[allow(unused)]
set_stop_flag(session_id: u32)246     async fn set_stop_flag(session_id: u32) {
247         let instance = Self::get_instance();
248         let mut mtx = instance.lock().await;
249         let stop_flag_map = &mut mtx.stop_flag_map;
250         stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
251     }
252 
remove_package(session_id: u32, index: usize) -> bool253     async fn remove_package(session_id: u32, index: usize) -> bool {
254         let instance = Self::get_instance();
255         let mtx = instance.lock().await;
256         let data_map = &mtx.data_map;
257         if let Some(vec) = data_map.get(&session_id) {
258             let mut vec = vec.lock().await;
259             if !vec.is_empty() && index < vec.len() {
260                 vec.remove(index);
261                 return true;
262             }
263         }
264         false
265     }
266 
remove_session(session_id: u32)267     async fn remove_session(session_id: u32) {
268         let instance = Self::get_instance();
269         let mut mtx = instance.lock().await;
270         mtx.data_map.remove(&session_id);
271         mtx.stop_flag_map.remove(&session_id);
272         mtx.thread_map.remove(&session_id);
273         crate::info!("remove_session:{session_id}");
274     }
275 
check_stop(session_id: u32) -> bool276     async fn check_stop(session_id: u32) -> bool {
277         if let Some(stop) = Self::get_stop_flag(session_id).await {
278             return stop == 0;
279         }
280         false
281     }
282 
session_loop(session_id: u32)283     async fn session_loop(session_id: u32) {
284         // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
285         //   2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
286         //   3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
287         //      如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
288         //      retry count为0, 则表示连接中断,stop session
289         crate::info!("session_loop for {}", session_id);
290         loop {
291             if Self::check_stop(session_id).await {
292                 crate::info!("session_loop stop");
293                 break;
294             }
295             let mut first_pkg = Self::get_package(session_id, 0).await;
296             while first_pkg.is_none() {
297                 WaiterManager::wait_empty(session_id).await;
298                 first_pkg = Self::get_package(session_id, 0).await;
299                 if Self::check_stop(session_id).await {
300                     crate::info!("session_loop stop");
301                     break;
302                 }
303             }
304             if Self::check_stop(session_id).await {
305                 crate::info!("session_loop stop");
306                 break;
307             }
308             let Some(mut first_pkg) = first_pkg else {
309                 crate::info!("session_loop first_pkg is None");
310                 break;
311             };
312             let mut status = first_pkg.status;
313             let mut retry_count = first_pkg.retry_count;
314 
315             if status == OutputDataStatus::WaitSend {
316                 // 发送数据
317                 let data = first_pkg.data.clone();
318                 let _ret = UartMap::put(session_id, data).await;
319                 // 如果是ack报文 则不需要等待回应
320                 if first_pkg.response {
321                     QueueManager::remove_package(session_id, 0).await;
322                     continue;
323                 }
324                 // 修改data 的status = WaitResponse
325                 first_pkg.status = OutputDataStatus::WaitResponse;
326                 retry_count -= 1;
327                 first_pkg.retry_count = retry_count;
328                 // 更新数据
329                 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
330                 // 等待response
331                 WaiterManager::wait_response(session_id).await;
332 
333                 if Self::check_stop(session_id).await {
334                     crate::info!("session_loop stop");
335                     break;
336                 }
337                 // 收到回复
338                 // 重新获取数据
339 
340                 let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
341                     crate::info!("session_loop first_pkg is None");
342                     break;
343                 };
344                 // 得到新状态
345                 status = first_pkg.status;
346 
347                 if status == OutputDataStatus::ResponseOk {
348                     // 删除当前data
349                     QueueManager::remove_package(session_id, 0).await;
350                     continue;
351                 }
352                 retry_count = first_pkg.retry_count;
353                 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
354                     // 保存retry_count
355                     retry_count -= 1;
356                     first_pkg.retry_count = retry_count;
357                     QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
358 
359                     // 再次发送数据
360                     let data = first_pkg.data.clone();
361                     let _ret = UartMap::put(session_id, data).await;
362                     WaiterManager::wait_response(session_id).await;
363 
364                     if Self::check_stop(session_id).await {
365                         break;
366                     }
367 
368                     let Some(first_pkg) = Self::get_package(session_id, 0).await else {
369                         break;
370                     };
371                     status = first_pkg.status;
372 
373                     match status {
374                         OutputDataStatus::ResponseOk => {
375                             QueueManager::remove_package(session_id, 0).await;
376                             break;
377                         }
378                         OutputDataStatus::WaitResponse => {
379                             let Some(first_pkg) = Self::get_package(session_id, 0).await else {
380                                 break;
381                             };
382                             status = first_pkg.status;
383                             retry_count = first_pkg.retry_count;
384                             continue;
385                         }
386                         OutputDataStatus::WaitSend => {
387                             QueueManager::remove_package(session_id, 0).await;
388                             break;
389                         }
390                     }
391                 }
392             }
393         }
394         Self::remove_session(session_id).await;
395         crate::info!("session_loop for {} end.", session_id);
396     }
397 }
398 
start_session(session_id: u32)399 pub async fn start_session(session_id: u32) {
400     let instance = QueueManager::get_instance();
401     let mut mtx = instance.lock().await;
402     let thread_map = &mut mtx.thread_map;
403     if thread_map.contains_key(&session_id) {
404         crate::error!("session thread has started.");
405         return;
406     }
407 
408     WaiterManager::start_session(session_id).await;
409 
410     let handle = utils::spawn(QueueManager::session_loop(session_id));
411     thread_map.insert(session_id, handle);
412 
413     let stop_flag_map = &mut mtx.stop_flag_map;
414     stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
415 }
416 
stop_session(session_id: u32)417 async fn stop_session(session_id: u32) {
418     let instance = QueueManager::get_instance();
419     let mut mtx = instance.lock().await;
420     let stop_flag_map = &mut mtx.stop_flag_map;
421     stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
422 
423     WaiterManager::wakeup_empty_wait(session_id).await;
424     WaiterManager::wakeup_response_wait(session_id).await;
425 }
426 
stop_other_session(session_id: u32)427 pub async fn stop_other_session(session_id: u32) {
428     let instance = QueueManager::get_instance();
429     let mtx = instance.lock().await;
430     let session_ids = mtx.data_map.keys();
431     let mut remove_sessions = Vec::new();
432     for k in session_ids {
433         if *k != session_id {
434             remove_sessions.push(*k);
435         }
436     }
437     drop(mtx);
438     for id in remove_sessions {
439         stop_session(id).await;
440     }
441 }
442 
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )443 async fn output_package(
444     session_id: u32,
445     response: bool,
446     option: u8,
447     package_index: u32,
448     data: Vec<u8>,
449 ) {
450     let pkg = OutputData {
451         session_id,
452         response,
453         option,
454         package_index,
455         data: data.clone(),
456         retry_count: 5,
457         status: OutputDataStatus::WaitSend,
458     };
459     QueueManager::put_package(session_id, pkg).await;
460     WaiterManager::wakeup_empty_wait(session_id).await;
461 }
462 
463 #[allow(unused)]
is_response(option: u8) -> bool464 fn is_response(option: u8) -> bool {
465     let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
466     ret != 0
467 }
468 
on_read_head(head: UartHead)469 pub async fn on_read_head(head: UartHead) {
470     let session_id = head.session_id;
471     let option = head.option;
472     let package_index = head.package_index;
473     if option & (UartOption::Free as u16) != 0 {
474         stop_session(session_id).await;
475         return;
476     }
477     if is_response(option as u8) {
478         let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
479             return;
480         };
481         pkg.status = if option & (UartOption::Ack as u16) > 1 {
482             OutputDataStatus::ResponseOk
483         } else {
484             OutputDataStatus::WaitSend
485         };
486         QueueManager::update_package(session_id, 0, pkg).await;
487         WaiterManager::wakeup_response_wait(session_id).await;
488     } else {
489         let mut header_obj =
490             uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
491         let header = header_obj.serialize();
492         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
493         header_obj.head_checksum = u32::to_le(head_sum);
494         let data = header_obj.serialize();
495         output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
496     }
497 }
498 
499 #[allow(unused)]
get_package_index(is_create: bool) -> u32500 fn get_package_index(is_create: bool) -> u32 {
501     static mut PACKAGE_INDEX: u32 = 888;
502 
503     unsafe {
504         if is_create {
505             PACKAGE_INDEX += 1;
506             PACKAGE_INDEX
507         } else {
508             PACKAGE_INDEX
509         }
510     }
511 }
512 
start_uart(session_id: u32, wr: UartWriter)513 pub async fn start_uart(session_id: u32, wr: UartWriter) {
514     UartMap::start(session_id, wr).await;
515 }
516 
517 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)518 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
519     let mut pkg_index = package_index;
520     if package_index == 0 {
521         pkg_index = get_package_index(true);
522     }
523     let send = serializer::concat_pack(data);
524     crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
525 
526     let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
527     let mut index = 0;
528     let len = send.len();
529 
530     loop {
531         if index >= len {
532             println!("wrap_put break");
533             break;
534         }
535         let size;
536         let mut op = option;
537         if index + payload_max_len <= len {
538             size = payload_max_len;
539         } else {
540             size = len - index;
541             op = UartOption::Tail as u8 | option;
542         }
543 
544         let data = send[index..index + size].to_vec().clone();
545         let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
546         let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
547         header_obj.data_checksum = u32::to_le(data_sum);
548 
549         let header = header_obj.serialize();
550         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
551         header_obj.head_checksum = u32::to_le(head_sum);
552 
553         let header = header_obj.serialize();
554         crate::info!("header, header_len:{}", header.len());
555         let total = [header, send[index..index + size].to_vec().clone()].concat();
556 
557         output_package(
558             session_id,
559             (op & UartOption::Ack as u8) > 0,
560             op,
561             pkg_index,
562             total,
563         )
564         .await;
565         pkg_index = get_package_index(true);
566         index += size;
567     }
568 }
569