1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! buffer
16 #![allow(missing_docs)]
17
18 use super::base::{self, Writer};
19 use super::uart::UartWriter;
20 use super::usb::{self, UsbReader, UsbWriter};
21 use super::{tcp, uart_wrapper};
22 #[cfg(feature = "emulator")]
23 use crate::daemon_lib::bridge::BridgeMap;
24 #[cfg(feature = "host")]
25 use crate::host_transfer::host_usb::HostUsbMap;
26
27 use crate::config::TaskMessage;
28 use crate::config::{self, ConnectType};
29 use crate::serializer;
30 #[allow(unused)]
31 use crate::utils::hdc_log::*;
32 use crate::daemon_lib::task_manager;
33 use std::collections::{HashMap, HashSet, VecDeque};
34 use std::io::{self, Error, ErrorKind};
35 use std::sync::Arc;
36 use std::sync::Once;
37 use std::mem::MaybeUninit;
38 use std::time::Duration;
39
40 #[cfg(feature = "host")]
41 extern crate ylong_runtime_static as ylong_runtime;
42 use ylong_runtime::io::AsyncWriteExt;
43 use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf};
44 use ylong_runtime::sync::{mpsc, Mutex, RwLock};
45
46 type ConnectTypeMap_ = Arc<RwLock<HashMap<u32, ConnectType>>>;
47
48 pub struct ConnectTypeMap {}
49 impl ConnectTypeMap {
get_instance() -> ConnectTypeMap_50 fn get_instance() -> ConnectTypeMap_ {
51 static mut CONNECT_TYPE_MAP: Option<ConnectTypeMap_> = None;
52 unsafe {
53 CONNECT_TYPE_MAP
54 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
55 .clone()
56 }
57 }
58
put(session_id: u32, conn_type: ConnectType)59 pub async fn put(session_id: u32, conn_type: ConnectType) {
60 let arc_map = Self::get_instance();
61 let mut map = arc_map.write().await;
62 map.insert(session_id, conn_type.clone());
63 crate::debug!("connect map: add {session_id}, {:?}", conn_type);
64 }
65
get(session_id: u32) -> Option<ConnectType>66 pub async fn get(session_id: u32) -> Option<ConnectType> {
67 let arc_map = Self::get_instance();
68 let map = arc_map.read().await;
69 map.get(&session_id).cloned()
70 }
71
del(session_id: u32)72 pub async fn del(session_id: u32) {
73 let arc_map = Self::get_instance();
74 let mut map = arc_map.write().await;
75 let item = map.remove(&session_id);
76 DiedSession::add(session_id).await;
77 crate::debug!("connect map: del {session_id}: {:?}", item);
78 }
79
get_all_session() -> Vec<u32>80 pub async fn get_all_session() -> Vec<u32> {
81 let mut sessiones = Vec::new();
82 let arc_map = Self::get_instance();
83 let map = arc_map.read().await;
84 for item in map.iter() {
85 sessiones.push(*item.0);
86 }
87 sessiones
88 }
89
dump() -> String90 pub async fn dump() -> String {
91 let arc_map = Self::get_instance();
92 let map = arc_map.read().await;
93 let mut result = "".to_string();
94 for item in map.iter() {
95 let line = format!("session_id:{},\tconnect_type:{:?}\n", item.0, item.1);
96 result.push_str(line.as_str());
97 }
98 result
99 }
100 }
101
dump_session() -> String102 pub async fn dump_session() -> String {
103 ConnectTypeMap::dump().await
104 }
105
106 type TcpWriter_ = Arc<Mutex<SplitWriteHalf>>;
107
108 pub struct TcpMap {
109 map: Mutex<HashMap<u32, TcpWriter_>>,
110 }
111 impl TcpMap {
get_instance() -> &'static TcpMap112 pub(crate) fn get_instance() -> &'static TcpMap {
113 static mut TCP_MAP: MaybeUninit<TcpMap> = MaybeUninit::uninit();
114 static ONCE: Once = Once::new();
115
116 unsafe {
117 ONCE.call_once(|| {
118 let global = TcpMap {
119 map: Mutex::new(HashMap::new())
120 };
121 TCP_MAP = MaybeUninit::new(global);
122 });
123 &*TCP_MAP.as_ptr()
124 }
125 }
126
put(session_id: u32, data: TaskMessage)127 async fn put(session_id: u32, data: TaskMessage) {
128 let send = serializer::concat_pack(data);
129 let instance = Self::get_instance();
130 let map = instance.map.lock().await;
131 let Some(arc_wr) = map.get(&session_id) else {
132 crate::error!("get tcp is None, session_id={session_id}");
133 return;
134 };
135 let mut wr = arc_wr.lock().await;
136 let _ = wr.write_all(send.as_slice()).await;
137 }
138
send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()>139 pub async fn send_channel_message(channel_id: u32, buf: Vec<u8>) -> io::Result<()> {
140 crate::trace!(
141 "send channel({channel_id}) msg: {:?}",
142 buf.iter()
143 .map(|&c| format!("{c:02X}"))
144 .collect::<Vec<_>>()
145 .join(" ")
146 );
147 let send = [
148 u32::to_be_bytes(buf.len() as u32).as_slice(),
149 buf.as_slice(),
150 ]
151 .concat();
152 let instance = Self::get_instance();
153 let map = instance.map.lock().await;
154 if let Some(guard) = map.get(&channel_id) {
155 let mut wr = guard.lock().await;
156 let _ = wr.write_all(send.as_slice()).await;
157 return Ok(());
158 }
159 Err(Error::new(ErrorKind::NotFound, "channel not found"))
160 }
161
start(id: u32, wr: SplitWriteHalf)162 pub async fn start(id: u32, wr: SplitWriteHalf) {
163 let instance = Self::get_instance();
164 let mut map = instance.map.lock().await;
165 let arc_wr = Arc::new(Mutex::new(wr));
166 map.insert(id, arc_wr);
167 ConnectTypeMap::put(id, ConnectType::Tcp).await;
168 crate::warn!("tcp start {id}");
169 }
170
end(id: u32)171 pub async fn end(id: u32) {
172 let instance = Self::get_instance();
173 let mut map = instance.map.lock().await;
174 if let Some(arc_wr) = map.remove(&id) {
175 let mut wr = arc_wr.lock().await;
176 let _ = wr.shutdown().await;
177 }
178 crate::warn!("tcp end {id}");
179 ConnectTypeMap::del(id).await;
180 }
181 }
182
183 pub struct UsbMap {
184 map: std::sync::Mutex<HashMap<u32, UsbWriter>>,
185 }
186 impl UsbMap {
get_instance() -> &'static UsbMap187 pub(crate) fn get_instance() -> &'static UsbMap {
188 static mut USB_MAP: MaybeUninit<UsbMap> = MaybeUninit::uninit();
189 static ONCE: Once = Once::new();
190
191 unsafe {
192 ONCE.call_once(|| {
193 let global = UsbMap {
194 map: std::sync::Mutex::new(HashMap::new())
195 };
196 USB_MAP = MaybeUninit::new(global);
197 });
198 &*USB_MAP.as_ptr()
199 }
200 }
201
202 #[allow(unused)]
put(session_id: u32, data: TaskMessage) -> io::Result<()>203 async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> {
204 if DiedSession::get(session_id).await {
205 return Err(Error::new(ErrorKind::NotFound, "session already died"));;
206 }
207 let instance = Self::get_instance();
208 let body = serializer::concat_pack(data);
209 let head = usb::build_header(session_id, 1, body.len());
210 let mut child_ret = 0;
211 let mut map = instance.map.lock().unwrap();
212 let Some(wr) = map.get(&session_id) else {
213 return Err(Error::new(ErrorKind::NotFound, "session not found"));
214 };
215 match wr.write_all(head) {
216 Ok(_) => {}
217 Err(e) => {
218 return Err(Error::new(ErrorKind::Other, "Error writing head"));
219 }
220 }
221
222 match wr.write_all(body) {
223 Ok(ret) => {
224 child_ret = ret;
225 }
226 Err(e) => {
227 return Err(Error::new(ErrorKind::Other, "Error writing body"));
228 }
229 }
230
231 if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) {
232 let tail = usb::build_header(session_id, 0, 0);
233 // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect
234 // so, we send dummy packet to prevent zero packet generate
235 match wr.write_all(tail) {
236 Ok(_) => {}
237 Err(e) => {
238 return Err(Error::new(ErrorKind::Other, "Error writing tail"));
239 }
240 }
241 }
242 Ok(())
243 }
244
start(session_id: u32, wr: UsbWriter)245 pub async fn start(session_id: u32, wr: UsbWriter) {
246 let buffer_map = Self::get_instance();
247 let mut try_times = 0;
248 let max_try_time = 10;
249 let wait_one_seconds = 1000;
250 loop {
251 try_times += 1;
252 if let Ok(mut map) = buffer_map.map.try_lock() {
253 map.insert(session_id, wr);
254 crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times");
255 break;
256 } else {
257 if try_times > max_try_time {
258 crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd");
259 std::process::exit(0);
260 }
261 crate::error!("start usb session_id:{session_id} try lock failed {try_times} times");
262 std::thread::sleep(Duration::from_millis(wait_one_seconds));
263 }
264 }
265 ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await;
266 }
267
end(session_id: u32)268 pub async fn end(session_id: u32) {
269 let buffer_map = Self::get_instance();
270 let mut try_times = 0;
271 let max_try_time = 10;
272 let wait_ten_ms = 10;
273 loop {
274 try_times += 1;
275 if let Ok(mut map) = buffer_map.map.try_lock() {
276 let _ = map.remove(&session_id);
277 crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times");
278 break;
279 } else {
280 if try_times > max_try_time {
281 crate::error!("end usb session_id:{session_id} get lock failed will force break");
282 break;
283 }
284 crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times");
285 std::thread::sleep(Duration::from_millis(wait_ten_ms));
286 }
287 }
288 ConnectTypeMap::del(session_id).await;
289 }
290 }
291
292 type UartWriter_ = Arc<Mutex<UartWriter>>;
293 type UartMap_ = Arc<RwLock<HashMap<u32, UartWriter_>>>;
294
295 pub struct UartMap {}
296 impl UartMap {
get_instance() -> UartMap_297 fn get_instance() -> UartMap_ {
298 static mut UART_MAP: Option<UartMap_> = None;
299 unsafe {
300 UART_MAP
301 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
302 .clone()
303 }
304 }
305
306 #[allow(unused)]
put(session_id: u32, data: Vec<u8>) -> io::Result<()>307 pub async fn put(session_id: u32, data: Vec<u8>) -> io::Result<()> {
308 let instance = Self::get_instance();
309 let map = instance.read().await;
310 let Some(arc_wr) = map.get(&session_id) else {
311 return Err(Error::new(ErrorKind::NotFound, "session not found"));
312 };
313 let wr = arc_wr.lock().await;
314 wr.write_all(data)?;
315 Ok(())
316 }
317
start(session_id: u32, wr: UartWriter)318 pub async fn start(session_id: u32, wr: UartWriter) {
319 let instance = Self::get_instance();
320 let mut map = instance.write().await;
321 let arc_wr = Arc::new(Mutex::new(wr));
322 if map.contains_key(&session_id) {
323 crate::error!("uart start, contain session:{}", session_id);
324 return;
325 }
326 map.insert(session_id, arc_wr);
327 ConnectTypeMap::put(session_id, ConnectType::Uart).await;
328 }
329 }
330
put(session_id: u32, data: TaskMessage)331 pub async fn put(session_id: u32, data: TaskMessage) {
332 match ConnectTypeMap::get(session_id).await {
333 Some(ConnectType::Tcp) => {
334 TcpMap::put(session_id, data).await;
335 }
336 Some(ConnectType::Usb(_mount_point)) => {
337 if let Err(e) = UsbMap::put(session_id, data).await {
338 crate::error!("{e:?}");
339 task_manager::free_session(session_id).await;
340 }
341 }
342 Some(ConnectType::Uart) => {
343 uart_wrapper::wrap_put(session_id, data, 0, 0).await;
344 }
345 Some(ConnectType::Bt) => {}
346 Some(ConnectType::Bridge) => {
347 #[cfg(feature = "emulator")]
348 BridgeMap::put(session_id, data).await;
349 }
350 Some(ConnectType::HostUsb(_mount_point)) => {
351 #[cfg(feature = "host")]
352 if let Err(e) = HostUsbMap::put(session_id, data).await {
353 crate::error!("{e:?}");
354 }
355 }
356 None => {
357 crate::warn!("fail to get connect type for session:{}, command:{:?}", session_id, data.command);
358 }
359 }
360 }
361
send_channel_data(channel_id: u32, data: Vec<u8>)362 pub async fn send_channel_data(channel_id: u32, data: Vec<u8>) {
363 let _ = TcpMap::send_channel_message(channel_id, data).await;
364 }
365
366 pub enum EchoLevel {
367 INFO,
368 FAIL,
369 RAW,
370 OK, // this echo maybe OK with carriage return and newline
371 }
372
373 impl EchoLevel {
convert_from_message_level(cmd: u8) -> Result<Self, Error>374 pub fn convert_from_message_level(cmd: u8) -> Result<Self, Error> {
375 match cmd {
376 0 => Ok(Self::FAIL),
377 1 => Ok(Self::INFO),
378 2 => Ok(Self::OK),
379 _ => Err(Error::new(ErrorKind::Other, "invalid message level type"))
380 }
381 }
382 }
383
send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()>384 pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> io::Result<()> {
385 let data = match level {
386 EchoLevel::INFO => format!("[Info]{msg}") + "\r\n",
387 EchoLevel::FAIL => format!("[Fail]{msg}") + "\r\n",
388 EchoLevel::RAW => msg.to_string() + "\r\n",
389 EchoLevel::OK => msg.to_string(),
390 };
391 TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await
392 }
393
394 // client recv and print msg
395 type TcpRecver_ = Arc<Mutex<SplitReadHalf>>;
396 type ChannelMap_ = Arc<RwLock<HashMap<u32, TcpRecver_>>>;
397
398 pub struct ChannelMap {}
399 impl ChannelMap {
get_instance() -> ChannelMap_400 fn get_instance() -> ChannelMap_ {
401 static mut TCP_RECVER: Option<ChannelMap_> = None;
402 unsafe {
403 TCP_RECVER
404 .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new())))
405 .clone()
406 }
407 }
408
start(rd: SplitReadHalf)409 pub async fn start(rd: SplitReadHalf) {
410 let instance = Self::get_instance();
411 let mut map = instance.write().await;
412 let arc_rd = Arc::new(Mutex::new(rd));
413 map.insert(0, arc_rd);
414 }
415
recv() -> io::Result<Vec<u8>>416 pub async fn recv() -> io::Result<Vec<u8>> {
417 let instance = Self::get_instance();
418 let map = instance.read().await;
419 let Some(arc_rd) = map.get(&0) else {
420 return Err(Error::new(ErrorKind::NotFound, "channel not found"));
421 };
422 let mut rd = arc_rd.lock().await;
423 tcp::recv_channel_message(&mut rd).await
424 }
425 }
426
usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)>427 pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> {
428 let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN);
429 ylong_runtime::spawn(async move {
430 let mut rd = UsbReader { fd };
431 loop {
432 if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) {
433 crate::warn!("unpack task failed: {}, reopen fd...", e.to_string());
434 break;
435 }
436 }
437 });
438 rx
439 }
440
441 pub struct DiedSession {
442 set: Arc<RwLock<HashSet<u32>>>,
443 queue: Arc<RwLock<VecDeque<u32>>>,
444 }
445 impl DiedSession {
get_instance() -> &'static DiedSession446 pub(crate) fn get_instance() -> &'static DiedSession {
447 static mut DIED_SESSION: MaybeUninit<DiedSession> = MaybeUninit::uninit();
448 static ONCE: Once = Once::new();
449
450 unsafe {
451 ONCE.call_once(|| {
452 let global = DiedSession {
453 set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))),
454 queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM)))
455 };
456 DIED_SESSION = MaybeUninit::new(global);
457 });
458 &*DIED_SESSION.as_ptr()
459 }
460 }
461
add(session_id: u32)462 pub async fn add(session_id: u32) {
463 let instance = Self::get_instance();
464 let mut set = instance.set.write().await;
465 let mut queue = instance.queue.write().await;
466 if queue.len() >= config::MAX_DIED_SESSION_NUM {
467 if let Some(front_session) = queue.pop_front(){
468 set.remove(&front_session);
469 }
470 }
471 if !set.contains(&session_id) {
472 set.insert(session_id);
473 queue.push_back(session_id)
474 }
475 }
476
get(session_id: u32) -> bool477 pub async fn get(session_id: u32) -> bool {
478 let instance = Self::get_instance();
479 let set = instance.set.read().await;
480 set.contains(&session_id)
481 }
482 }