• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 #[allow(unused)]
24 use crate::utils::hdc_log::*;
25 use std::collections::HashMap;
26 use std::sync::Arc;
27 #[cfg(feature = "host")]
28 extern crate ylong_runtime_static as ylong_runtime;
29 use ylong_runtime::sync::waiter::Waiter;
30 use ylong_runtime::sync::Mutex;
31 use ylong_runtime::task::JoinHandle;
32 
33 #[derive(PartialEq, Debug, Clone, Copy)]
34 #[repr(u8)]
35 pub enum UartOption {
36     Tail = 1,  // makr is the last packget, can be send to session.
37     Reset = 2, // host request reset session in daemon
38     Ack = 4,   // response the pkg is received
39     Nak = 8,   // request resend pkg again
40     Free = 16, // request free this session, some unable recovery error happened
41 }
42 
43 impl TryFrom<u8> for UartOption {
44     type Error = ();
try_from(cmd: u8) -> Result<Self, ()>45     fn try_from(cmd: u8) -> Result<Self, ()> {
46         match cmd {
47             1 => Ok(Self::Tail),
48             2 => Ok(Self::Reset),
49             4 => Ok(Self::Ack),
50             8 => Ok(Self::Nak),
51             16 => Ok(Self::Free),
52             _ => Err(()),
53         }
54     }
55 }
56 
57 struct WaiterManager {
58     // waiter used for sync package send-response one by one.
59     response_waiters: HashMap<u32, Waiter>,
60     // waiter used for waiting if no packages.
61     empty_waiters: HashMap<u32, Waiter>,
62 }
63 
64 impl WaiterManager {
get_instance() -> &'static mut WaiterManager65     fn get_instance() -> &'static mut WaiterManager {
66         static mut INSTANCE: Option<WaiterManager> = None;
67         unsafe {
68             INSTANCE.get_or_insert(WaiterManager {
69                 response_waiters: HashMap::new(),
70                 empty_waiters: HashMap::new(),
71             })
72         }
73     }
74 
start_session(session_id: u32)75     async fn start_session(session_id: u32) {
76         let instance = Self::get_instance();
77         instance.response_waiters.insert(session_id, Waiter::new());
78         instance.empty_waiters.insert(session_id, Waiter::new());
79     }
80 
81     #[allow(unused)]
wait_response(session_id: u32)82     async fn wait_response(session_id: u32) {
83         let instance = Self::get_instance();
84         let waiter = instance.response_waiters.get(&session_id);
85         if let Some(w) = waiter {
86             w.wait().await;
87         }
88     }
89 
90     #[allow(unused)]
wakeup_response_wait(session_id: u32)91     async fn wakeup_response_wait(session_id: u32) {
92         let instance = Self::get_instance();
93         let waiter = instance.response_waiters.get(&session_id);
94         if let Some(w) = waiter {
95             w.wake_one();
96         }
97     }
98 
99     #[allow(unused)]
wait_empty(session_id: u32)100     async fn wait_empty(session_id: u32) {
101         let instance = Self::get_instance();
102         let waiter = instance.empty_waiters.get(&session_id);
103         if let Some(w) = waiter {
104             w.wait().await;
105         }
106     }
107 
108     #[allow(unused)]
wakeup_empty_wait(session_id: u32)109     async fn wakeup_empty_wait(session_id: u32) {
110         let instance = Self::get_instance();
111         let waiter = instance.empty_waiters.get(&session_id);
112         if let Some(w) = waiter {
113             w.wake_one();
114         }
115     }
116 }
117 
118 #[derive(PartialEq, Debug, Clone, Copy)]
119 #[repr(u8)]
120 enum OutputDataStatus {
121     WaitSend = 0,
122     WaitResponse = 1,
123     ResponseOk = 2,
124 }
125 
126 #[derive(PartialEq, Debug, Clone)]
127 struct OutputData {
128     session_id: u32,
129     response: bool,
130     option: u8,
131     package_index: u32,
132     data: Vec<u8>,
133     status: OutputDataStatus,
134     retry_count: u32,
135 }
136 
137 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result138     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139         write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
140         self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
141     }
142 }
143 
144 type OutputData_ = Arc<Mutex<OutputData>>;
145 
146 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
147 
148 struct DataQueue {
149     data_map: HashMap<u32, OutputDataVec_>,
150     thread_map: HashMap<u32, JoinHandle<()>>,
151     stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
152 }
153 
154 impl DataQueue {
new() -> Self155     fn new() -> Self {
156         Self {
157             data_map: HashMap::new(),
158             thread_map: HashMap::new(),
159             stop_flag_map: HashMap::new(),
160         }
161     }
162 }
163 
164 type DataQueue_ = Arc<Mutex<DataQueue>>;
165 
166 pub struct QueueManager {}
167 
168 impl QueueManager {
get_instance() -> DataQueue_169     fn get_instance() -> DataQueue_ {
170         static mut INSTANCE: Option<DataQueue_> = None;
171         unsafe {
172             INSTANCE
173                 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
174                 .clone()
175         }
176     }
177 
get_package(session_id: u32, index: usize) -> Option<OutputData>178     async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
179         let instance = Self::get_instance();
180         let mtx = instance.lock().await;
181         let data_map = &mtx.data_map;
182         if let Some(vec) = data_map.get(&session_id) {
183             let vec = vec.lock().await;
184             if !vec.is_empty() {
185                 let Some(arc) = vec.get(index) else {
186                     crate::error!("get_package get is None");
187                     return None;
188                 };
189                 let data_mtx = arc.lock().await;
190                 return Some(data_mtx.clone());
191             }
192         }
193         None
194     }
195 
put_package(session_id: u32, data: OutputData)196     async fn put_package(session_id: u32, data: OutputData) {
197         let instance = Self::get_instance();
198         let mut mtx = instance.lock().await;
199         let data_map = &mut mtx.data_map;
200         if let Some(vec) = data_map.get(&session_id) {
201             let mut vec = vec.lock().await;
202             let item = Arc::new(Mutex::new(data));
203             vec.push(item);
204         } else {
205             let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
206             let d = Arc::new(Mutex::new(data));
207             vec.push(d);
208             let v = Arc::new(Mutex::new(vec));
209             data_map.insert(session_id, v);
210         }
211     }
212 
update_package(session_id: u32, index: usize, data: OutputData) -> bool213     async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
214         let instance = Self::get_instance();
215         let mtx = instance.lock().await;
216         let data_map = &mtx.data_map;
217         if let Some(vec) = data_map.get(&session_id) {
218             let vec = vec.lock().await;
219             if !vec.is_empty() {
220                 let Some(arc) = vec.get(index) else {
221                     crate::error!("update_package get is None");
222                     return false;
223                 };
224                 let mut data_mtx = arc.lock().await;
225                 *data_mtx = data;
226                 return true;
227             }
228         }
229         false
230     }
231 
get_stop_flag(session_id: u32) -> Option<u8>232     async fn get_stop_flag(session_id: u32) -> Option<u8> {
233         let instance = Self::get_instance();
234         let mtx = instance.lock().await;
235         let stop_flag_map = &mtx.stop_flag_map;
236         if let Some(flag) = stop_flag_map.get(&session_id) {
237             let v = flag.lock().await;
238             Some(*v)
239         } else {
240             None
241         }
242     }
243 
244     #[allow(unused)]
set_stop_flag(session_id: u32)245     async fn set_stop_flag(session_id: u32) {
246         let instance = Self::get_instance();
247         let mut mtx = instance.lock().await;
248         let stop_flag_map = &mut mtx.stop_flag_map;
249         stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
250     }
251 
remove_package(session_id: u32, index: usize) -> bool252     async fn remove_package(session_id: u32, index: usize) -> bool {
253         let instance = Self::get_instance();
254         let mtx = instance.lock().await;
255         let data_map = &mtx.data_map;
256         if let Some(vec) = data_map.get(&session_id) {
257             let mut vec = vec.lock().await;
258             if !vec.is_empty() && index < vec.len() {
259                 vec.remove(index);
260                 return true;
261             }
262         }
263         false
264     }
265 
remove_session(session_id: u32)266     async fn remove_session(session_id: u32) {
267         let instance = Self::get_instance();
268         let mut mtx = instance.lock().await;
269         mtx.data_map.remove(&session_id);
270         mtx.stop_flag_map.remove(&session_id);
271         mtx.thread_map.remove(&session_id);
272         crate::info!("remove_session:{session_id}");
273     }
274 
check_stop(session_id: u32) -> bool275     async fn check_stop(session_id: u32) -> bool {
276         if let Some(stop) = Self::get_stop_flag(session_id).await {
277             return stop == 0;
278         }
279         false
280     }
281 
session_loop(session_id: u32)282     async fn session_loop(session_id: u32) {
283         // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
284         //   2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
285         //   3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
286         //      如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
287         //      retry count为0, 则表示连接中断,stop session
288         crate::info!("session_loop for {}", session_id);
289         loop {
290             if Self::check_stop(session_id).await {
291                 crate::info!("session_loop stop");
292                 break;
293             }
294             let mut first_pkg = Self::get_package(session_id, 0).await;
295             while first_pkg.is_none() {
296                 WaiterManager::wait_empty(session_id).await;
297                 first_pkg = Self::get_package(session_id, 0).await;
298                 if Self::check_stop(session_id).await {
299                     crate::info!("session_loop stop");
300                     break;
301                 }
302             }
303             if Self::check_stop(session_id).await {
304                 crate::info!("session_loop stop");
305                 break;
306             }
307             let Some(mut first_pkg) = first_pkg else {
308                 crate::info!("session_loop first_pkg is None");
309                 break;
310             };
311             let mut status = first_pkg.status;
312             let mut retry_count = first_pkg.retry_count;
313 
314             if status == OutputDataStatus::WaitSend {
315                 // 发送数据
316                 let data = first_pkg.data.clone();
317                 let _ret = UartMap::put(session_id, data).await;
318                 // 如果是ack报文 则不需要等待回应
319                 if first_pkg.response {
320                     QueueManager::remove_package(session_id, 0).await;
321                     continue;
322                 }
323                 // 修改data 的status = WaitResponse
324                 first_pkg.status = OutputDataStatus::WaitResponse;
325                 retry_count -= 1;
326                 first_pkg.retry_count = retry_count;
327                 // 更新数据
328                 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
329                 // 等待response
330                 WaiterManager::wait_response(session_id).await;
331 
332                 if Self::check_stop(session_id).await {
333                     crate::info!("session_loop stop");
334                     break;
335                 }
336                 // 收到回复
337                 // 重新获取数据
338 
339                 let Some(mut first_pkg) = Self::get_package(session_id, 0).await else {
340                     crate::info!("session_loop first_pkg is None");
341                     break;
342                 };
343                 // 得到新状态
344                 status = first_pkg.status;
345 
346                 if status == OutputDataStatus::ResponseOk {
347                     // 删除当前data
348                     QueueManager::remove_package(session_id, 0).await;
349                     continue;
350                 }
351                 retry_count = first_pkg.retry_count;
352                 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
353                     // 保存retry_count
354                     retry_count -= 1;
355                     first_pkg.retry_count = retry_count;
356                     QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
357 
358                     // 再次发送数据
359                     let data = first_pkg.data.clone();
360                     let _ret = UartMap::put(session_id, data).await;
361                     WaiterManager::wait_response(session_id).await;
362 
363                     if Self::check_stop(session_id).await {
364                         break;
365                     }
366 
367                     let Some(first_pkg) = Self::get_package(session_id, 0).await else {
368                         break;
369                     };
370                     status = first_pkg.status;
371 
372                     match status {
373                         OutputDataStatus::ResponseOk => {
374                             QueueManager::remove_package(session_id, 0).await;
375                             break;
376                         }
377                         OutputDataStatus::WaitResponse => {
378                             let Some(first_pkg) = Self::get_package(session_id, 0).await else {
379                                 break;
380                             };
381                             status = first_pkg.status;
382                             retry_count = first_pkg.retry_count;
383                             continue;
384                         }
385                         OutputDataStatus::WaitSend => {
386                             QueueManager::remove_package(session_id, 0).await;
387                             break;
388                         }
389                     }
390                 }
391             }
392         }
393         Self::remove_session(session_id).await;
394         crate::info!("session_loop for {} end.", session_id);
395     }
396 }
397 
start_session(session_id: u32)398 pub async fn start_session(session_id: u32) {
399     let instance = QueueManager::get_instance();
400     let mut mtx = instance.lock().await;
401     let thread_map = &mut mtx.thread_map;
402     if thread_map.contains_key(&session_id) {
403         crate::error!("session thread has started.");
404         return;
405     }
406 
407     WaiterManager::start_session(session_id).await;
408 
409     let handle = ylong_runtime::spawn(QueueManager::session_loop(session_id));
410     thread_map.insert(session_id, handle);
411 
412     let stop_flag_map = &mut mtx.stop_flag_map;
413     stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
414 }
415 
stop_session(session_id: u32)416 async fn stop_session(session_id: u32) {
417     let instance = QueueManager::get_instance();
418     let mut mtx = instance.lock().await;
419     let stop_flag_map = &mut mtx.stop_flag_map;
420     stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
421 
422     WaiterManager::wakeup_empty_wait(session_id).await;
423     WaiterManager::wakeup_response_wait(session_id).await;
424 }
425 
stop_other_session(session_id: u32)426 pub async fn stop_other_session(session_id: u32) {
427     let instance = QueueManager::get_instance();
428     let mtx = instance.lock().await;
429     let session_ids = mtx.data_map.keys();
430     let mut remove_sessions = Vec::new();
431     for k in session_ids {
432         if *k != session_id {
433             remove_sessions.push(*k);
434         }
435     }
436     drop(mtx);
437     for id in remove_sessions {
438         stop_session(id).await;
439     }
440 }
441 
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )442 async fn output_package(
443     session_id: u32,
444     response: bool,
445     option: u8,
446     package_index: u32,
447     data: Vec<u8>,
448 ) {
449     let pkg = OutputData {
450         session_id,
451         response,
452         option,
453         package_index,
454         data: data.clone(),
455         retry_count: 5,
456         status: OutputDataStatus::WaitSend,
457     };
458     QueueManager::put_package(session_id, pkg).await;
459     WaiterManager::wakeup_empty_wait(session_id).await;
460 }
461 
462 #[allow(unused)]
is_response(option: u8) -> bool463 fn is_response(option: u8) -> bool {
464     let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
465     ret != 0
466 }
467 
on_read_head(head: UartHead)468 pub async fn on_read_head(head: UartHead) {
469     let session_id = head.session_id;
470     let option = head.option;
471     let package_index = head.package_index;
472     if option & (UartOption::Free as u16) != 0 {
473         stop_session(session_id).await;
474         return;
475     }
476     if is_response(option as u8) {
477         let Some(mut pkg) = QueueManager::get_package(session_id, 0).await else {
478             return;
479         };
480         pkg.status = if option & (UartOption::Ack as u16) > 1 {
481             OutputDataStatus::ResponseOk
482         } else {
483             OutputDataStatus::WaitSend
484         };
485         QueueManager::update_package(session_id, 0, pkg).await;
486         WaiterManager::wakeup_response_wait(session_id).await;
487     } else {
488         let mut header_obj =
489             uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
490         let header = header_obj.serialize();
491         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
492         header_obj.head_checksum = u32::to_le(head_sum);
493         let data = header_obj.serialize();
494         output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
495     }
496 }
497 
498 #[allow(unused)]
get_package_index(is_create: bool) -> u32499 fn get_package_index(is_create: bool) -> u32 {
500     static mut PACKAGE_INDEX: u32 = 888;
501 
502     unsafe {
503         if is_create {
504             PACKAGE_INDEX += 1;
505             PACKAGE_INDEX
506         } else {
507             PACKAGE_INDEX
508         }
509     }
510 }
511 
start_uart(session_id: u32, wr: UartWriter)512 pub async fn start_uart(session_id: u32, wr: UartWriter) {
513     UartMap::start(session_id, wr).await;
514 }
515 
516 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)517 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
518     let mut pkg_index = package_index;
519     if package_index == 0 {
520         pkg_index = get_package_index(true);
521     }
522     let send = serializer::concat_pack(data);
523     crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
524 
525     let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
526     let mut index = 0;
527     let len = send.len();
528 
529     loop {
530         if index >= len {
531             println!("wrap_put break");
532             break;
533         }
534         let size;
535         let mut op = option;
536         if index + payload_max_len <= len {
537             size = payload_max_len;
538         } else {
539             size = len - index;
540             op = UartOption::Tail as u8 | option;
541         }
542 
543         let data = send[index..index + size].to_vec().clone();
544         let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
545         let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
546         header_obj.data_checksum = u32::to_le(data_sum);
547 
548         let header = header_obj.serialize();
549         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
550         header_obj.head_checksum = u32::to_le(head_sum);
551 
552         let header = header_obj.serialize();
553         crate::info!("header, header_len:{}", header.len());
554         let total = [header, send[index..index + size].to_vec().clone()].concat();
555 
556         output_package(
557             session_id,
558             (op & UartOption::Ack as u8) > 0,
559             op,
560             pkg_index,
561             total,
562         )
563         .await;
564         pkg_index = get_package_index(true);
565         index += size;
566     }
567 }
568