• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! buffer
16 #![allow(missing_docs)]
17 
18 use super::base::{self, Writer};
19 use super::uart::UartWriter;
20 use super::usb::{self, UsbReader, UsbWriter};
21 use super::{tcp, uart_wrapper};
22 #[cfg(feature = "emulator")]
23 use crate::daemon_lib::bridge::BridgeMap;
24 #[cfg(feature = "host")]
25 use crate::host_transfer::host_usb::HostUsbMap;
26 
27 use crate::config::TaskMessage;
28 use crate::config::{self, ConnectType};
29 use crate::serializer;
30 #[allow(unused)]
31 use crate::utils::hdc_log::*;
32 use crate::daemon_lib::task_manager;
33 use std::collections::{HashMap, HashSet, VecDeque};
34 use std::io::{self, Error, ErrorKind};
35 use std::sync::Arc;
36 use std::sync::Once;
37 use std::mem::MaybeUninit;
38 use std::time::Duration;
39 
40 #[cfg(feature = "host")]
41 extern crate ylong_runtime_static as ylong_runtime;
42 use ylong_runtime::io::AsyncWriteExt;
43 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf};
44 use ylong_runtime::sync::{mpsc, Mutex, RwLock};
45 
46 type ConnectTypeMap_ = Arc<RwLock<HashMap<u32, ConnectType>>>;
47 
48 pub struct ConnectTypeMap {}
49 impl ConnectTypeMap {
get_instance() -> ConnectTypeMap_50     fn get_instance() -> ConnectTypeMap_ {
51         static mut CONNECT_TYPE_MAP: Option<ConnectTypeMap_> = None;
52         unsafe {
53             CONNECT_TYPE_MAP
54                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
55                 .clone()
56         }
57     }
58 
put(session_id: u32, conn_type: ConnectType)59     pub async fn put(session_id: u32, conn_type: ConnectType) {
60         let arc_map = Self::get_instance();
61         let mut map = arc_map.write().await;
62         map.insert(session_id, conn_type.clone());
63         crate::debug!("connect map: add {session_id}, {:?}", conn_type);
64     }
65 
get(session_id: u32) -> Option<ConnectType>66     pub async fn get(session_id: u32) -> Option<ConnectType> {
67         let arc_map = Self::get_instance();
68         let map = arc_map.read().await;
69         map.get(&session_id).cloned()
70     }
71 
del(session_id: u32)72     pub async fn del(session_id: u32) {
73         let arc_map = Self::get_instance();
74         let mut map = arc_map.write().await;
75         let item = map.remove(&session_id);
76         DiedSession::add(session_id).await;
77         crate::debug!("connect map: del {session_id}: {:?}", item);
78     }
79 
get_all_session() -> Vec<u32>80     pub async fn get_all_session() -> Vec<u32> {
81         let mut sessiones = Vec::new();
82         let arc_map = Self::get_instance();
83         let map = arc_map.read().await;
84         for item in map.iter() {
85             sessiones.push(*item.0);
86         }
87         sessiones
88     }
89 
dump() -> String90     pub async fn dump() -> String {
91         let arc_map = Self::get_instance();
92         let map = arc_map.read().await;
93         let mut result = "".to_string();
94         for item in map.iter() {
95             let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1);
96             result.push_str(line.as_str());
97         }
98         result
99     }
100 }
101 
dump_session() -> String102 pub async fn dump_session() -> String {
103     ConnectTypeMap::dump().await
104 }
105 
106 type TcpWriter_ = Arc<Mutex<SplitWriteHalf>>;
107 
108 pub struct TcpMap {
109     map: Mutex<HashMap<u32, TcpWriter_>>,
110 }
111 impl TcpMap {
get_instance() -> &'static TcpMap112     pub(crate)  fn get_instance() -> &'static TcpMap {
113         static mut TCP_MAP: MaybeUninit<TcpMap> = MaybeUninit::uninit();
114         static ONCE: Once = Once::new();
115 
116         unsafe {
117             ONCE.call_once(|| {
118                 let global = TcpMap {
119                     map: Mutex::new(HashMap::new())
120                 };
121                 TCP_MAP = MaybeUninit::new(global);
122             });
123             &*TCP_MAP.as_ptr()
124         }
125     }
126 
put(session_id: u32, data: TaskMessage)127     async fn put(session_id: u32, data: TaskMessage) {
128         let send = serializer::concat_pack(data);
129         let instance = Self::get_instance();
130         let map = instance.map.lock().await;
131         let Some(arc_wr) = map.get(&session_id) else {
132             crate::error!("get tcp is None, session_id={session_id}");
133             return;
134         };
135         let mut wr = arc_wr.lock().await;
136         let _ = wr.write_all(send.as_slice()).await;
137     }
138 
send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()>139     pub async fn send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()> {
140         crate::trace!(
141             "send channel({channel_id}) msg: {:?}",
142             buf.iter()
143                 .map(|&c| format!("{c:02X}"))
144                 .collect::<Vec<_>>()
145                 .join(" ")
146         );
147         let send = [
148             u32::to_be_bytes(buf.len() as u32).as_slice(),
149             buf.as_slice(),
150         ]
151         .concat();
152         let instance = Self::get_instance();
153         let map = instance.map.lock().await;
154         if let Some(guard) = map.get(&channel_id) {
155             let mut wr = guard.lock().await;
156             let _ = wr.write_all(send.as_slice()).await;
157             return Ok(());
158         }
159         Err(Error::new(ErrorKind::NotFound, "channel not found"))
160     }
161 
start(id: u32, wr: SplitWriteHalf)162     pub async fn start(id: u32, wr: SplitWriteHalf) {
163         let instance = Self::get_instance();
164         let mut map = instance.map.lock().await;
165         let arc_wr = Arc::new(Mutex::new(wr));
166         map.insert(id, arc_wr);
167         ConnectTypeMap::put(id, ConnectType::Tcp).await;
168         crate::warn!("tcp start {id}");
169     }
170 
end(id: u32)171     pub async fn end(id: u32) {
172         let instance = Self::get_instance();
173         let mut map = instance.map.lock().await;
174         if let Some(arc_wr) = map.remove(&id) {
175             let mut wr = arc_wr.lock().await;
176             let _ = wr.shutdown().await;
177         }
178         crate::warn!("tcp end {id}");
179         ConnectTypeMap::del(id).await;
180     }
181 }
182 
183 pub struct UsbMap {
184     map: std::sync::Mutex<HashMap<u32, UsbWriter>>,
185 }
186 impl UsbMap {
get_instance() -> &'static UsbMap187     pub(crate)  fn get_instance() -> &'static UsbMap {
188         static mut USB_MAP: MaybeUninit<UsbMap> = MaybeUninit::uninit();
189         static ONCE: Once = Once::new();
190 
191         unsafe {
192             ONCE.call_once(|| {
193                 let global = UsbMap {
194                     map: std::sync::Mutex::new(HashMap::new())
195                 };
196                 USB_MAP = MaybeUninit::new(global);
197             });
198             &*USB_MAP.as_ptr()
199         }
200     }
201 
202     #[allow(unused)]
put(session_id: u32, data: TaskMessage) -> io::Result<()>203     async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> {
204         if DiedSession::get(session_id).await {
205             return Err(Error::new(ErrorKind::NotFound, "session already died"));;
206         }
207         let instance = Self::get_instance();
208         let body = serializer::concat_pack(data);
209         let head = usb::build_header(session_id, 1, body.len());
210         let mut child_ret = 0;
211         let mut map = instance.map.lock().unwrap();
212         let Some(wr) = map.get(&session_id) else {
213             return Err(Error::new(ErrorKind::NotFound, "session not found"));
214         };
215         match wr.write_all(head) {
216             Ok(_) => {}
217             Err(e) => {
218                 return Err(Error::new(ErrorKind::Other, "Error writing head"));
219             }
220         }
221 
222         match wr.write_all(body) {
223             Ok(ret) => {
224                 child_ret = ret;
225             }
226             Err(e) => {
227                 return Err(Error::new(ErrorKind::Other, "Error writing body"));
228             }
229         }
230 
231         if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) {
232             let tail = usb::build_header(session_id, 0, 0);
233             // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect
234             // so, we send dummy packet to prevent zero packet generate
235             match wr.write_all(tail) {
236                 Ok(_) => {}
237                 Err(e) => {
238                     return Err(Error::new(ErrorKind::Other, "Error writing tail"));
239                 }
240             }
241         }
242         Ok(())
243     }
244 
start(session_id: u32, wr: UsbWriter)245     pub async fn start(session_id: u32, wr: UsbWriter) {
246         let buffer_map = Self::get_instance();
247         let mut try_times = 0;
248         let max_try_time = 10;
249         let wait_one_seconds = 1000;
250         loop {
251             try_times += 1;
252             if let Ok(mut map) = buffer_map.map.try_lock() {
253                 map.insert(session_id, wr);
254                 crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times");
255                 break;
256             } else {
257                 if try_times > max_try_time {
258                     crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd");
259                     std::process::exit(0);
260                 }
261                 crate::error!("start usb session_id:{session_id} try lock failed {try_times} times");
262                 std::thread::sleep(Duration::from_millis(wait_one_seconds));
263             }
264         }
265         ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await;
266     }
267 
end(session_id: u32)268     pub async fn end(session_id: u32) {
269         let buffer_map = Self::get_instance();
270         let mut try_times = 0;
271         let max_try_time = 10;
272         let wait_ten_ms = 10;
273         loop {
274             try_times += 1;
275             if let Ok(mut map) = buffer_map.map.try_lock() {
276                 let _ = map.remove(&session_id);
277                 crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times");
278                 break;
279             } else {
280                 if try_times > max_try_time {
281                     crate::error!("end usb session_id:{session_id} get lock failed will force break");
282                     break;
283                 }
284                 crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times");
285                 std::thread::sleep(Duration::from_millis(wait_ten_ms));
286             }
287         }
288         ConnectTypeMap::del(session_id).await;
289     }
290 }
291 
292 type UartWriter_ = Arc<Mutex<UartWriter>>;
293 type UartMap_ = Arc<RwLock<HashMap<u32, UartWriter_>>>;
294 
295 pub struct UartMap {}
296 impl UartMap {
get_instance() -> UartMap_297     fn get_instance() -> UartMap_ {
298         static mut UART_MAP: Option<UartMap_> = None;
299         unsafe {
300             UART_MAP
301                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
302                 .clone()
303         }
304     }
305 
306     #[allow(unused)]
put(session_id: u32, data: Vec<u8>) -> io::Result<()>307     pub async fn put(session_id: u32, data: Vec<u8>) -> io::Result<()> {
308         let instance = Self::get_instance();
309         let map = instance.read().await;
310         let Some(arc_wr) = map.get(&session_id) else {
311             return Err(Error::new(ErrorKind::NotFound, "session not found"));
312         };
313         let wr = arc_wr.lock().await;
314         wr.write_all(data)?;
315         Ok(())
316     }
317 
start(session_id: u32, wr: UartWriter)318     pub async fn start(session_id: u32, wr: UartWriter) {
319         let instance = Self::get_instance();
320         let mut map = instance.write().await;
321         let arc_wr = Arc::new(Mutex::new(wr));
322         if map.contains_key(&session_id) {
323             crate::error!("uart start, contain session:{}", session_id);
324             return;
325         }
326         map.insert(session_id, arc_wr);
327         ConnectTypeMap::put(session_id, ConnectType::Uart).await;
328     }
329 }
330 
put(session_id: u32, data: TaskMessage)331 pub async fn put(session_id: u32, data: TaskMessage) {
332     match ConnectTypeMap::get(session_id).await {
333         Some(ConnectType::Tcp) => {
334             TcpMap::put(session_id, data).await;
335         }
336         Some(ConnectType::Usb(_mount_point)) => {
337             if let Err(e) = UsbMap::put(session_id, data).await {
338                 crate::error!("{e:?}");
339                 task_manager::free_session(session_id).await;
340             }
341         }
342         Some(ConnectType::Uart) => {
343             uart_wrapper::wrap_put(session_id, data, 0, 0).await;
344         }
345         Some(ConnectType::Bt) => {}
346         Some(ConnectType::Bridge) => {
347             #[cfg(feature = "emulator")]
348             BridgeMap::put(session_id, data).await;
349         }
350         Some(ConnectType::HostUsb(_mount_point)) => {
351             #[cfg(feature = "host")]
352             if let Err(e) = HostUsbMap::put(session_id, data).await {
353                 crate::error!("{e:?}");
354             }
355         }
356         None => {
357             crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command);
358         }
359     }
360 }
361 
send_channel_data(channel_id: u32, data: Vec<u8>)362 pub async fn send_channel_data(channel_id: u32, data: Vec<u8>) {
363     let _ = TcpMap::send_channel_message(channel_id, data).await;
364 }
365 
366 pub enum EchoLevel {
367     INFO,
368     FAIL,
369     RAW,
370     OK, // this echo maybe OK with carriage return and newline
371 }
372 
373 impl EchoLevel {
convert_from_message_level(cmd: u8) -> Result<Self, Error>374     pub fn convert_from_message_level(cmd: u8) -> Result<Self, Error> {
375         match cmd {
376             0 => Ok(Self::FAIL),
377             1 => Ok(Self::INFO),
378             2 => Ok(Self::OK),
379             _ => Err(Error::new(ErrorKind::Other, "invalid message level type"))
380         }
381     }
382 }
383 
send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()>384 pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> {
385     let data = match level {
386         EchoLevel::INFO => format!("[Info]{msg}") + "\r\n",
387         EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n",
388         EchoLevel::RAW => msg.to_string() + "\r\n",
389         EchoLevel::OK => msg.to_string(),
390     };
391     TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await
392 }
393 
394 // client recv and print msg
395 type TcpRecver_ = Arc<Mutex<SplitReadHalf>>;
396 type ChannelMap_ = Arc<RwLock<HashMap<u32, TcpRecver_>>>;
397 
398 pub struct ChannelMap {}
399 impl ChannelMap {
get_instance() -> ChannelMap_400     fn get_instance() -> ChannelMap_ {
401         static mut TCP_RECVER: Option<ChannelMap_> = None;
402         unsafe {
403             TCP_RECVER
404                 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
405                 .clone()
406         }
407     }
408 
start(rd: SplitReadHalf)409     pub async fn start(rd: SplitReadHalf) {
410         let instance = Self::get_instance();
411         let mut map = instance.write().await;
412         let arc_rd = Arc::new(Mutex::new(rd));
413         map.insert(0, arc_rd);
414     }
415 
recv() -> io::Result<Vec<u8>>416     pub async fn recv() -> io::Result<Vec<u8>> {
417         let instance = Self::get_instance();
418         let map = instance.read().await;
419         let Some(arc_rd) = map.get(&0) else {
420             return Err(Error::new(ErrorKind::NotFound, "channel not found"));
421         };
422         let mut rd = arc_rd.lock().await;
423         tcp::recv_channel_message(&mut rd).await
424     }
425 }
426 
usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)>427 pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> {
428     let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN);
429     ylong_runtime::spawn(async move {
430         let mut rd = UsbReader { fd };
431         loop {
432             if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) {
433                 crate::warn!("unpack task failed: {}, reopen fd...", e.to_string());
434                 break;
435             }
436         }
437     });
438     rx
439 }
440 
441 pub struct DiedSession {
442     set: Arc<RwLock<HashSet<u32>>>,
443     queue: Arc<RwLock<VecDeque<u32>>>,
444 }
445 impl DiedSession {
get_instance() -> &'static DiedSession446     pub(crate)  fn get_instance() -> &'static DiedSession {
447         static mut DIED_SESSION: MaybeUninit<DiedSession> = MaybeUninit::uninit();
448         static ONCE: Once = Once::new();
449 
450         unsafe {
451             ONCE.call_once(|| {
452                 let global = DiedSession {
453                     set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))),
454                     queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM)))
455                 };
456                 DIED_SESSION = MaybeUninit::new(global);
457             });
458             &*DIED_SESSION.as_ptr()
459         }
460     }
461 
add(session_id: u32)462     pub async fn add(session_id: u32) {
463         let instance = Self::get_instance();
464         let mut set = instance.set.write().await;
465         let mut queue = instance.queue.write().await;
466         if queue.len() >= config::MAX_DIED_SESSION_NUM {
467             if let Some(front_session) = queue.pop_front(){
468                 set.remove(&front_session);
469             }
470         }
471         if !set.contains(&session_id) {
472             set.insert(session_id);
473             queue.push_back(session_id)
474         }
475     }
476 
get(session_id: u32) -> bool477     pub async fn get(session_id: u32) -> bool {
478         let instance = Self::get_instance();
479         let set = instance.set.read().await;
480         set.contains(&session_id)
481     }
482 }