• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! uart_wrapper
16 #![allow(missing_docs)]
17 use super::uart::UartWriter;
18 use super::{uart, UartMap};
19 use crate::config::{self, TaskMessage};
20 use crate::serializer::native_struct::UartHead;
21 use crate::serializer::serialize::Serialization;
22 use crate::serializer::{self, UART_HEAD_SIZE};
23 #[allow(unused)]
24 use crate::utils::hdc_log::*;
25 use std::collections::HashMap;
26 use std::sync::Arc;
27 use ylong_runtime::sync::waiter::Waiter;
28 use ylong_runtime::sync::Mutex;
29 use ylong_runtime::task::JoinHandle;
30 
31 #[derive(PartialEq, Debug, Clone, Copy)]
32 #[repr(u8)]
33 pub enum UartOption {
34     Tail = 1,  // makr is the last packget, can be send to session.
35     Reset = 2, // host request reset session in daemon
36     Ack = 4,   // response the pkg is received
37     Nak = 8,   // request resend pkg again
38     Free = 16, // request free this session, some unable recovery error happened
39 }
40 
41 impl TryFrom<u8> for UartOption {
42     type Error = ();
try_from(cmd: u8) -> Result<Self, ()>43     fn try_from(cmd: u8) -> Result<Self, ()> {
44         match cmd {
45             1 => Ok(Self::Tail),
46             2 => Ok(Self::Reset),
47             4 => Ok(Self::Ack),
48             8 => Ok(Self::Nak),
49             16 => Ok(Self::Free),
50             _ => Err(()),
51         }
52     }
53 }
54 
55 struct WaiterManager {
56     // waiter used for sync package send-response one by one.
57     response_waiters: HashMap<u32, Waiter>,
58     // waiter used for waiting if no packages.
59     empty_waiters: HashMap<u32, Waiter>,
60 }
61 
62 impl WaiterManager {
get_instance() -> &'static mut WaiterManager63     fn get_instance() -> &'static mut WaiterManager {
64         static mut INSTANCE: Option<WaiterManager> = None;
65         unsafe {
66             INSTANCE.get_or_insert(WaiterManager {
67                 response_waiters: HashMap::new(),
68                 empty_waiters: HashMap::new(),
69             })
70         }
71     }
72 
start_session(session_id: u32)73     async fn start_session(session_id: u32) {
74         let instance = Self::get_instance();
75         instance.response_waiters.insert(session_id, Waiter::new());
76         instance.empty_waiters.insert(session_id, Waiter::new());
77     }
78 
79     #[allow(unused)]
wait_response(session_id: u32)80     async fn wait_response(session_id: u32) {
81         let instance = Self::get_instance();
82         let waiter = instance.response_waiters.get(&session_id);
83         if let Some(w) = waiter {
84             w.wait().await;
85         }
86     }
87 
88     #[allow(unused)]
wakeup_response_wait(session_id: u32)89     async fn wakeup_response_wait(session_id: u32) {
90         let instance = Self::get_instance();
91         let waiter = instance.response_waiters.get(&session_id);
92         if let Some(w) = waiter {
93             w.wake_one();
94         }
95     }
96 
97     #[allow(unused)]
wait_empty(session_id: u32)98     async fn wait_empty(session_id: u32) {
99         let instance = Self::get_instance();
100         let waiter = instance.empty_waiters.get(&session_id);
101         if let Some(w) = waiter {
102             w.wait().await;
103         }
104     }
105 
106     #[allow(unused)]
wakeup_empty_wait(session_id: u32)107     async fn wakeup_empty_wait(session_id: u32) {
108         let instance = Self::get_instance();
109         let waiter = instance.empty_waiters.get(&session_id);
110         if let Some(w) = waiter {
111             w.wake_one();
112         }
113     }
114 }
115 
116 #[derive(PartialEq, Debug, Clone, Copy)]
117 #[repr(u8)]
118 enum OutputDataStatus {
119     WaitSend = 0,
120     WaitResponse = 1,
121     ResponseOk = 2,
122 }
123 
124 #[derive(PartialEq, Debug, Clone)]
125 struct OutputData {
126     session_id: u32,
127     response: bool,
128     option: u8,
129     package_index: u32,
130     data: Vec<u8>,
131     status: OutputDataStatus,
132     retry_count: u32,
133 }
134 
135 impl std::fmt::Display for OutputData {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result136     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137         write!(f, "OutputData: session_id:{}, response:{}, option:{:#?}, package_index:{}, status:{:#?}, retry_count:{}, data size:{}",
138         self.session_id, self.response, self.option, self.package_index, self.status, self.retry_count, self.data.len())
139     }
140 }
141 
142 type OutputData_ = Arc<Mutex<OutputData>>;
143 
144 type OutputDataVec_ = Arc<Mutex<Vec<OutputData_>>>;
145 
146 struct DataQueue {
147     data_map: HashMap<u32, OutputDataVec_>,
148     thread_map: HashMap<u32, JoinHandle<()>>,
149     stop_flag_map: HashMap<u32, Arc<Mutex<u8>>>,
150 }
151 
152 impl DataQueue {
new() -> Self153     fn new() -> Self {
154         Self {
155             data_map: HashMap::new(),
156             thread_map: HashMap::new(),
157             stop_flag_map: HashMap::new(),
158         }
159     }
160 }
161 
162 type DataQueue_ = Arc<Mutex<DataQueue>>;
163 
164 pub struct QueueManager {}
165 
166 impl QueueManager {
get_instance() -> DataQueue_167     fn get_instance() -> DataQueue_ {
168         static mut INSTANCE: Option<DataQueue_> = None;
169         unsafe {
170             INSTANCE
171                 .get_or_insert_with(|| Arc::new(Mutex::new(DataQueue::new())))
172                 .clone()
173         }
174     }
175 
get_package(session_id: u32, index: usize) -> Option<OutputData>176     async fn get_package(session_id: u32, index: usize) -> Option<OutputData> {
177         let instance = Self::get_instance();
178         let mtx = instance.lock().await;
179         let data_map = &mtx.data_map;
180         if let Some(vec) = data_map.get(&session_id) {
181             let vec = vec.lock().await;
182             if !vec.is_empty() {
183                 let arc = vec.get(index).unwrap();
184                 let data_mtx = arc.lock().await;
185                 return Some(data_mtx.clone());
186             }
187         }
188         None
189     }
190 
put_package(session_id: u32, data: OutputData)191     async fn put_package(session_id: u32, data: OutputData) {
192         let instance = Self::get_instance();
193         let mut mtx = instance.lock().await;
194         let data_map = &mut mtx.data_map;
195         if let Some(vec) = data_map.get(&session_id) {
196             let mut vec = vec.lock().await;
197             let item = Arc::new(Mutex::new(data));
198             vec.push(item);
199         } else {
200             let mut vec = Vec::<Arc<Mutex<OutputData>>>::new();
201             let d = Arc::new(Mutex::new(data));
202             vec.push(d);
203             let v = Arc::new(Mutex::new(vec));
204             data_map.insert(session_id, v);
205         }
206     }
207 
update_package(session_id: u32, index: usize, data: OutputData) -> bool208     async fn update_package(session_id: u32, index: usize, data: OutputData) -> bool {
209         let instance = Self::get_instance();
210         let mtx = instance.lock().await;
211         let data_map = &mtx.data_map;
212         if let Some(vec) = data_map.get(&session_id) {
213             let vec = vec.lock().await;
214             if !vec.is_empty() {
215                 let arc = vec.get(index).unwrap();
216                 let mut data_mtx = arc.lock().await;
217                 *data_mtx = data;
218                 return true;
219             }
220         }
221         false
222     }
223 
get_stop_flag(session_id: u32) -> Option<u8>224     async fn get_stop_flag(session_id: u32) -> Option<u8> {
225         let instance = Self::get_instance();
226         let mtx = instance.lock().await;
227         let stop_flag_map = &mtx.stop_flag_map;
228         if let Some(flag) = stop_flag_map.get(&session_id) {
229             let v = flag.lock().await;
230             Some(*v)
231         } else {
232             None
233         }
234     }
235 
236     #[allow(unused)]
set_stop_flag(session_id: u32)237     async fn set_stop_flag(session_id: u32) {
238         let instance = Self::get_instance();
239         let mut mtx = instance.lock().await;
240         let stop_flag_map = &mut mtx.stop_flag_map;
241         stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
242     }
243 
remove_package(session_id: u32, index: usize) -> bool244     async fn remove_package(session_id: u32, index: usize) -> bool {
245         let instance = Self::get_instance();
246         let mtx = instance.lock().await;
247         let data_map = &mtx.data_map;
248         if let Some(vec) = data_map.get(&session_id) {
249             let mut vec = vec.lock().await;
250             if !vec.is_empty() && index < vec.len() {
251                 vec.remove(index);
252                 return true;
253             }
254         }
255         false
256     }
257 
remove_session(session_id: u32)258     async fn remove_session(session_id: u32) {
259         let instance = Self::get_instance();
260         let mut mtx = instance.lock().await;
261         mtx.data_map.remove(&session_id);
262         mtx.stop_flag_map.remove(&session_id);
263         mtx.thread_map.remove(&session_id);
264         println!("remove_session:{session_id}");
265     }
266 
check_stop(session_id: u32) -> bool267     async fn check_stop(session_id: u32) -> bool {
268         if let Some(stop) = Self::get_stop_flag(session_id).await {
269             return stop == 0;
270         }
271         false
272     }
273 
session_loop(session_id: u32)274     async fn session_loop(session_id: u32) {
275         // 1. 取第[0]个outputdata, 如果是WaitSend 则发送 改变状态为WaitResponse 同时wait
276         //   2. 收到response, 如果是ACK 则改变为ResponseOK 同时wakeup
277         //   3.收到wakeup,则检查状态是否为ResponseOK 如果是,则remove掉,继续step 1;
278         //      如果不是,则检查retry_count, 自减1,继续send, 同时继续超时wait(如果超时,则继续检查状态,retry count 减1,继续send, 超时wait)
279         //      retry count为0, 则表示连接中断,stop session
280         println!("session_loop for {}", session_id);
281         loop {
282             if Self::check_stop(session_id).await {
283                 break;
284             }
285             let mut first_pkg = Self::get_package(session_id, 0).await;
286             while first_pkg.is_none() {
287                 WaiterManager::wait_empty(session_id).await;
288                 first_pkg = Self::get_package(session_id, 0).await;
289                 if Self::check_stop(session_id).await {
290                     break;
291                 }
292             }
293             if Self::check_stop(session_id).await {
294                 break;
295             }
296             let mut first_pkg = first_pkg.unwrap();
297             let mut status = first_pkg.status;
298             let mut retry_count = first_pkg.retry_count;
299 
300             if status == OutputDataStatus::WaitSend {
301                 // 发送数据
302                 let data = first_pkg.data.clone();
303                 let _ret = UartMap::put(session_id, data).await;
304                 // 如果是ack报文 则不需要等待回应
305                 if first_pkg.response {
306                     QueueManager::remove_package(session_id, 0).await;
307                     continue;
308                 }
309                 // 修改data 的status = WaitResponse
310                 first_pkg.status = OutputDataStatus::WaitResponse;
311                 retry_count -= 1;
312                 first_pkg.retry_count = retry_count;
313                 // 更新数据
314                 QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
315                 // 等待response
316                 WaiterManager::wait_response(session_id).await;
317 
318                 if Self::check_stop(session_id).await {
319                     break;
320                 }
321                 // 收到回复
322                 // 重新获取数据
323 
324                 let first_pkg = Self::get_package(session_id, 0).await;
325 
326                 let mut first_pkg = first_pkg.unwrap();
327                 // 得到新状态
328                 status = first_pkg.status;
329 
330                 if status == OutputDataStatus::ResponseOk {
331                     // 删除当前data
332                     QueueManager::remove_package(session_id, 0).await;
333                     continue;
334                 }
335                 retry_count = first_pkg.retry_count;
336                 while retry_count > 0 && status == OutputDataStatus::WaitResponse {
337                     // 保存retry_count
338                     retry_count -= 1;
339                     first_pkg.retry_count = retry_count;
340                     QueueManager::update_package(session_id, 0, first_pkg.clone()).await;
341 
342                     // 再次发送数据
343                     let data = first_pkg.data.clone();
344                     let _ret = UartMap::put(session_id, data).await;
345                     WaiterManager::wait_response(session_id).await;
346 
347                     if Self::check_stop(session_id).await {
348                         break;
349                     }
350 
351                     let first_pkg = Self::get_package(session_id, 0).await;
352 
353                     let first_pkg = first_pkg.unwrap();
354                     status = first_pkg.status;
355 
356                     match status {
357                         OutputDataStatus::ResponseOk => {
358                             QueueManager::remove_package(session_id, 0).await;
359                             break;
360                         }
361                         OutputDataStatus::WaitResponse => {
362                             let first_pkg = Self::get_package(session_id, 0).await;
363                             let first_pkg = first_pkg.unwrap();
364                             status = first_pkg.status;
365                             retry_count = first_pkg.retry_count;
366                             continue;
367                         }
368                         OutputDataStatus::WaitSend => {
369                             QueueManager::remove_package(session_id, 0).await;
370                             break;
371                         }
372                     }
373                 }
374             }
375         }
376         Self::remove_session(session_id).await;
377         println!("session_loop for {} end.", session_id);
378     }
379 }
380 
start_session(session_id: u32)381 pub async fn start_session(session_id: u32) {
382     let instance = QueueManager::get_instance();
383     let mut mtx = instance.lock().await;
384     let thread_map = &mut mtx.thread_map;
385     if thread_map.contains_key(&session_id) {
386         println!("session thread has started.");
387         return;
388     }
389 
390     WaiterManager::start_session(session_id).await;
391 
392     let handle = ylong_runtime::spawn(QueueManager::session_loop(session_id));
393     thread_map.insert(session_id, handle);
394 
395     let stop_flag_map = &mut mtx.stop_flag_map;
396     stop_flag_map.insert(session_id, Arc::new(Mutex::new(1)));
397 }
398 
stop_session(session_id: u32)399 async fn stop_session(session_id: u32) {
400     let instance = QueueManager::get_instance();
401     let mut mtx = instance.lock().await;
402     let stop_flag_map = &mut mtx.stop_flag_map;
403     stop_flag_map.insert(session_id, Arc::new(Mutex::new(0)));
404 
405     WaiterManager::wakeup_empty_wait(session_id).await;
406     WaiterManager::wakeup_response_wait(session_id).await;
407 }
408 
stop_other_session(session_id: u32)409 pub async fn stop_other_session(session_id: u32) {
410     let instance = QueueManager::get_instance();
411     let mtx = instance.lock().await;
412     let session_ids = mtx.data_map.keys();
413     let mut remove_sessions = Vec::new();
414     for k in session_ids {
415         if *k != session_id {
416             remove_sessions.push(*k);
417         }
418     }
419     drop(mtx);
420     for id in remove_sessions {
421         stop_session(id).await;
422     }
423 }
424 
output_package( session_id: u32, response: bool, option: u8, package_index: u32, data: Vec<u8>, )425 async fn output_package(
426     session_id: u32,
427     response: bool,
428     option: u8,
429     package_index: u32,
430     data: Vec<u8>,
431 ) {
432     let pkg = OutputData {
433         session_id,
434         response,
435         option,
436         package_index,
437         data: data.clone(),
438         retry_count: 5,
439         status: OutputDataStatus::WaitSend,
440     };
441     QueueManager::put_package(session_id, pkg).await;
442     WaiterManager::wakeup_empty_wait(session_id).await;
443 }
444 
445 #[allow(unused)]
is_response(option: u8) -> bool446 fn is_response(option: u8) -> bool {
447     let ret = (option & UartOption::Ack as u8) | (option & UartOption::Nak as u8);
448     ret != 0
449 }
450 
on_read_head(head: UartHead)451 pub async fn on_read_head(head: UartHead) {
452     let session_id = head.session_id;
453     let option = head.option;
454     let package_index = head.package_index;
455     if option & (UartOption::Free as u16) != 0 {
456         stop_session(session_id).await;
457         return;
458     }
459     if is_response(option as u8) {
460         let pkg = QueueManager::get_package(session_id, 0).await;
461         let mut pkg = pkg.unwrap();
462         pkg.status = if option & (UartOption::Ack as u16) > 1 {
463             OutputDataStatus::ResponseOk
464         } else {
465             OutputDataStatus::WaitSend
466         };
467         QueueManager::update_package(session_id, 0, pkg).await;
468         WaiterManager::wakeup_response_wait(session_id).await;
469     } else {
470         let mut header_obj =
471             uart::build_header_obj(session_id, UartOption::Ack as u16, 0, package_index);
472         let header = header_obj.serialize();
473         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
474         header_obj.head_checksum = u32::to_le(head_sum);
475         let data = header_obj.serialize();
476         output_package(session_id, true, UartOption::Ack as u8, package_index, data).await;
477     }
478 }
479 
480 #[allow(unused)]
get_package_index(is_create: bool) -> u32481 fn get_package_index(is_create: bool) -> u32 {
482     static mut PACKAGE_INDEX: u32 = 888;
483 
484     unsafe {
485         if is_create {
486             PACKAGE_INDEX += 1;
487             PACKAGE_INDEX
488         } else {
489             PACKAGE_INDEX
490         }
491     }
492 }
493 
start_uart(session_id: u32, wr: UartWriter)494 pub async fn start_uart(session_id: u32, wr: UartWriter) {
495     UartMap::start(session_id, wr).await;
496 }
497 
498 #[allow(unused)]
wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8)499 pub async fn wrap_put(session_id: u32, data: TaskMessage, package_index: u32, option: u8) {
500     let mut pkg_index = package_index;
501     if package_index == 0 {
502         pkg_index = get_package_index(true);
503     }
504     let send = serializer::concat_pack(data);
505     crate::info!("wrap_put send len:{}, send:{:#?}", send.len(), send);
506 
507     let payload_max_len = config::MAX_UART_SIZE_IOBUF as usize - UART_HEAD_SIZE;
508     let mut index = 0;
509     let len = send.len();
510 
511     loop {
512         if index >= len {
513             println!("wrap_put break");
514             break;
515         }
516         let size;
517         let mut op = option;
518         if index + payload_max_len <= len {
519             size = payload_max_len;
520         } else {
521             size = len - index;
522             op = UartOption::Tail as u8 | option;
523         }
524 
525         let data = send[index..index + size].to_vec().clone();
526         let data_sum = data.iter().fold(0u32, |acc, &x| acc + x as u32);
527         let mut header_obj = uart::build_header_obj(session_id, op as u16, size, pkg_index);
528         header_obj.data_checksum = u32::to_le(data_sum);
529 
530         let header = header_obj.serialize();
531         let head_sum = header.iter().fold(0u32, |acc, &x| acc + x as u32);
532         header_obj.head_checksum = u32::to_le(head_sum);
533 
534         let header = header_obj.serialize();
535         crate::info!("header, header_len:{}", header.len());
536         let total = [header, send[index..index + size].to_vec().clone()].concat();
537 
538         output_package(
539             session_id,
540             (op & UartOption::Ack as u8) > 0,
541             op,
542             pkg_index,
543             total,
544         )
545         .await;
546         pkg_index = get_package_index(true);
547         index += size;
548     }
549 }
550