• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod manager;
15 
16 use std::collections::HashMap;
17 use std::net::Shutdown;
18 use std::sync::Arc;
19 use std::time::Duration;
20 
21 pub(crate) use manager::{ClientManager, ClientManagerEntry};
22 use ylong_http_client::Headers;
23 use ylong_runtime::net::UnixDatagram;
24 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
25 use ylong_runtime::sync::oneshot::{channel, Sender};
26 
27 use crate::config::Version;
28 use crate::error::ErrorCode;
29 use crate::task::notify::{NotifyData, SubscribeType, WaitingCause};
30 use crate::task::reason::Reason;
31 use crate::utils::{runtime_spawn, Recv};
32 
33 const REQUEST_MAGIC_NUM: u32 = 0x43434646;
34 const HEADERS_MAX_SIZE: u16 = 8 * 1024;
35 const POSITION_OF_LENGTH: u32 = 10;
36 
37 #[derive(Debug)]
38 pub(crate) enum ClientEvent {
39     OpenChannel(u64, Sender<Result<Arc<UnixDatagram>, ErrorCode>>),
40     Subscribe(u32, u64, u64, u64, Sender<ErrorCode>),
41     Unsubscribe(u32, Sender<ErrorCode>),
42     TaskFinished(u32),
43     Terminate(u64, Sender<ErrorCode>),
44     SendResponse(u32, String, u32, String, Headers),
45     SendNotifyData(SubscribeType, NotifyData),
46     SendFaults(u32, SubscribeType, Reason),
47     SendWaitNotify(u32, WaitingCause),
48     Shutdown,
49 }
50 
51 pub(crate) enum MessageType {
52     HttpResponse = 0,
53     NotifyData,
54     Faults,
55     Waiting,
56 }
57 
58 impl ClientManagerEntry {
open_channel(&self, pid: u64) -> Result<Arc<UnixDatagram>, ErrorCode>59     pub(crate) fn open_channel(&self, pid: u64) -> Result<Arc<UnixDatagram>, ErrorCode> {
60         let (tx, rx) = channel::<Result<Arc<UnixDatagram>, ErrorCode>>();
61         let event = ClientEvent::OpenChannel(pid, tx);
62         if !self.send_event(event) {
63             return Err(ErrorCode::Other);
64         }
65         let rx = Recv::new(rx);
66         match rx.get() {
67             Some(ret) => ret,
68             None => {
69                 error!("open channel fail, recv none");
70                 sys_event!(
71                     ExecFault,
72                     DfxCode::UDS_FAULT_03,
73                     "open channel fail, recv none"
74                 );
75                 Err(ErrorCode::Other)
76             }
77         }
78     }
79 
subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode80     pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode {
81         let (tx, rx) = channel::<ErrorCode>();
82         let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx);
83         if !self.send_event(event) {
84             return ErrorCode::Other;
85         }
86         let rx = Recv::new(rx);
87         match rx.get() {
88             Some(ret) => ret,
89             None => {
90                 error!("subscribe fail, recv none");
91                 sys_event!(
92                     ExecFault,
93                     DfxCode::UDS_FAULT_03,
94                     "subscribe fail, recv none"
95                 );
96                 ErrorCode::Other
97             }
98         }
99     }
100 
unsubscribe(&self, tid: u32) -> ErrorCode101     pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode {
102         let (tx, rx) = channel::<ErrorCode>();
103         let event = ClientEvent::Unsubscribe(tid, tx);
104         if !self.send_event(event) {
105             return ErrorCode::Other;
106         }
107         let rx = Recv::new(rx);
108         match rx.get() {
109             Some(ret) => ret,
110             None => {
111                 error!("unsubscribe failed");
112                 sys_event!(ExecFault, DfxCode::UDS_FAULT_03, "unsubscribe failed");
113                 ErrorCode::Other
114             }
115         }
116     }
117 
notify_task_finished(&self, tid: u32)118     pub(crate) fn notify_task_finished(&self, tid: u32) {
119         let event = ClientEvent::TaskFinished(tid);
120         self.send_event(event);
121     }
122 
notify_process_terminate(&self, pid: u64) -> ErrorCode123     pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode {
124         let (tx, rx) = channel::<ErrorCode>();
125         let event = ClientEvent::Terminate(pid, tx);
126         if !self.send_event(event) {
127             return ErrorCode::Other;
128         }
129         let rx = Recv::new(rx);
130         match rx.get() {
131             Some(ret) => ret,
132             None => {
133                 error!("notify_process_terminate failed");
134                 sys_event!(
135                     ExecFault,
136                     DfxCode::UDS_FAULT_03,
137                     "notify_process_terminate failed"
138                 );
139                 ErrorCode::Other
140             }
141         }
142     }
143 
send_response( &self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )144     pub(crate) fn send_response(
145         &self,
146         tid: u32,
147         version: String,
148         status_code: u32,
149         reason: String,
150         headers: Headers,
151     ) {
152         let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers);
153         let _ = self.send_event(event);
154     }
155 
send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData)156     pub(crate) fn send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData) {
157         let event = ClientEvent::SendNotifyData(subscribe_type, notify_data);
158         let _ = self.send_event(event);
159     }
160 
send_faults(&self, tid: u32, subscribe_type: SubscribeType, reason: Reason)161     pub(crate) fn send_faults(&self, tid: u32, subscribe_type: SubscribeType, reason: Reason) {
162         let event = ClientEvent::SendFaults(tid, subscribe_type, reason);
163         let _ = self.send_event(event);
164     }
165 
send_wait_reason(&self, tid: u32, reason: WaitingCause)166     pub(crate) fn send_wait_reason(&self, tid: u32, reason: WaitingCause) {
167         let event = ClientEvent::SendWaitNotify(tid, reason);
168         let _ = self.send_event(event);
169     }
170 }
171 
172 // uid and token_id will be used later
173 pub(crate) struct Client {
174     pub(crate) pid: u64,
175     pub(crate) message_id: u32,
176     pub(crate) server_sock_fd: UnixDatagram,
177     pub(crate) client_sock_fd: Arc<UnixDatagram>,
178     rx: UnboundedReceiver<ClientEvent>,
179 }
180 
181 impl Client {
constructor( pid: u64, ) -> Option<(UnboundedSender<ClientEvent>, Arc<UnixDatagram>)>182     pub(crate) fn constructor(
183         pid: u64,
184     ) -> Option<(UnboundedSender<ClientEvent>, Arc<UnixDatagram>)> {
185         let (tx, rx) = unbounded_channel();
186         let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() {
187             Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd),
188             Err(err) => {
189                 error!("can't create a pair of sockets, {:?}", err);
190                 sys_event!(
191                     ExecFault,
192                     DfxCode::TASK_FAULT_09,
193                     &format!("can't create a pair of sockets, {:?}", err)
194                 );
195                 return None;
196             }
197         };
198         let client_sock_fd = Arc::new(client_sock_fd);
199         let client = Client {
200             pid,
201             message_id: 1,
202             server_sock_fd,
203             client_sock_fd: client_sock_fd.clone(),
204             rx,
205         };
206 
207         runtime_spawn(client.run());
208         Some((tx, client_sock_fd))
209     }
210 
run(mut self)211     async fn run(mut self) {
212         loop {
213             // for one task, only send last progress message
214             let mut progress_index = HashMap::new();
215             let mut temp_notify_data: Vec<(SubscribeType, NotifyData)> = Vec::new();
216             let mut len = self.rx.len();
217             if len == 0 {
218                 len = 1;
219             }
220             for index in 0..len {
221                 let recv = match self.rx.recv().await {
222                     Ok(message) => message,
223                     Err(e) => {
224                         error!("ClientManager recv error {:?}", e);
225                         sys_event!(
226                             ExecFault,
227                             DfxCode::UDS_FAULT_03,
228                             &format!("ClientManager recv error {:?}", e)
229                         );
230                         continue;
231                     }
232                 };
233                 match recv {
234                     ClientEvent::Shutdown => {
235                         let _ = self.client_sock_fd.shutdown(Shutdown::Both);
236                         let _ = self.server_sock_fd.shutdown(Shutdown::Both);
237                         self.rx.close();
238                         info!("client terminate, pid {}", self.pid);
239                         return;
240                     }
241                     ClientEvent::SendResponse(tid, version, status_code, reason, headers) => {
242                         self.handle_send_response(tid, version, status_code, reason, headers)
243                             .await;
244                     }
245                     ClientEvent::SendFaults(tid, subscribe_type, reason) => {
246                         self.handle_send_faults(tid, subscribe_type, reason).await;
247                     }
248                     ClientEvent::SendNotifyData(subscribe_type, notify_data) => {
249                         if subscribe_type == SubscribeType::Progress {
250                             progress_index.insert(notify_data.task_id, index);
251                         }
252                         temp_notify_data.push((subscribe_type, notify_data));
253                     }
254                     ClientEvent::SendWaitNotify(task_id, waiting_reason) => {
255                         self.handle_send_waiting_notify(task_id, waiting_reason)
256                             .await;
257                     }
258                     _ => {}
259                 }
260             }
261             for (index, (subscribe_type, notify_data)) in temp_notify_data.into_iter().enumerate() {
262                 if subscribe_type != SubscribeType::Progress
263                     || progress_index.get(&notify_data.task_id) == Some(&index)
264                 {
265                     self.handle_send_notify_data(subscribe_type, notify_data)
266                         .await;
267                 }
268             }
269             debug!("Client handle message done");
270         }
271     }
272 
handle_send_faults( &mut self, tid: u32, subscribe_type: SubscribeType, reason: Reason, )273     async fn handle_send_faults(
274         &mut self,
275         tid: u32,
276         subscribe_type: SubscribeType,
277         reason: Reason,
278     ) {
279         let mut message = Vec::<u8>::new();
280         message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
281 
282         message.extend_from_slice(&self.message_id.to_le_bytes());
283         self.message_id += 1;
284 
285         let message_type = MessageType::Faults as u16;
286         message.extend_from_slice(&message_type.to_le_bytes());
287 
288         let message_body_size: u16 = 0;
289         message.extend_from_slice(&message_body_size.to_le_bytes());
290 
291         message.extend_from_slice(&tid.to_le_bytes());
292 
293         message.extend_from_slice(&(subscribe_type as u32).to_le_bytes());
294 
295         message.extend_from_slice(&(reason.repr as u32).to_le_bytes());
296 
297         let size = message.len() as u16;
298         info!("send faults size, {:?}", size);
299         let size = size.to_le_bytes();
300         message[POSITION_OF_LENGTH as usize] = size[0];
301         message[(POSITION_OF_LENGTH + 1) as usize] = size[1];
302         self.send_message(message).await;
303     }
304 
handle_send_waiting_notify(&mut self, task_id: u32, waiting_reason: WaitingCause)305     async fn handle_send_waiting_notify(&mut self, task_id: u32, waiting_reason: WaitingCause) {
306         let mut message = Vec::<u8>::new();
307 
308         message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
309 
310         message.extend_from_slice(&self.message_id.to_le_bytes());
311         self.message_id += 1;
312 
313         let message_type = MessageType::Waiting as u16;
314         message.extend_from_slice(&message_type.to_le_bytes());
315 
316         let message_body_size: u16 = 0;
317         message.extend_from_slice(&message_body_size.to_le_bytes());
318 
319         message.extend_from_slice(&task_id.to_le_bytes());
320 
321         message.extend_from_slice(&(waiting_reason.clone() as u32).to_le_bytes());
322 
323         let size = message.len() as u16;
324         debug!(
325             "send wait notify, tid {:?} reason {:?} size {:?}",
326             task_id, waiting_reason, size
327         );
328         let size = size.to_le_bytes();
329         message[POSITION_OF_LENGTH as usize] = size[0];
330         message[(POSITION_OF_LENGTH + 1) as usize] = size[1];
331 
332         self.send_message(message).await;
333     }
334 
handle_send_response( &mut self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )335     async fn handle_send_response(
336         &mut self,
337         tid: u32,
338         version: String,
339         status_code: u32,
340         reason: String,
341         headers: Headers,
342     ) {
343         let mut response = Vec::<u8>::new();
344 
345         response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
346 
347         response.extend_from_slice(&self.message_id.to_le_bytes());
348         self.message_id += 1;
349 
350         let message_type = MessageType::HttpResponse as u16;
351         response.extend_from_slice(&message_type.to_le_bytes());
352 
353         let message_body_size: u16 = 0;
354         response.extend_from_slice(&message_body_size.to_le_bytes());
355 
356         response.extend_from_slice(&tid.to_le_bytes());
357 
358         response.extend_from_slice(&version.into_bytes());
359         response.push(b'\0');
360 
361         response.extend_from_slice(&status_code.to_le_bytes());
362 
363         response.extend_from_slice(&reason.into_bytes());
364         response.push(b'\0');
365 
366         // The maximum length of the headers in uds should not exceed 8192
367         let mut buf_size = 0;
368         for (k, v) in headers {
369             buf_size += k.as_bytes().len() + v.iter().map(|f| f.len()).sum::<usize>();
370             if buf_size > HEADERS_MAX_SIZE as usize {
371                 break;
372             }
373 
374             response.extend_from_slice(k.as_bytes());
375             response.push(b':');
376             for (i, sub_value) in v.iter().enumerate() {
377                 if i != 0 {
378                     response.push(b',');
379                 }
380                 response.extend_from_slice(sub_value);
381             }
382             response.push(b'\n');
383         }
384 
385         let mut size = response.len() as u16;
386         if size > HEADERS_MAX_SIZE {
387             info!("send response too long");
388             response.truncate(HEADERS_MAX_SIZE as usize);
389             size = HEADERS_MAX_SIZE;
390         }
391         debug!("send response size, {:?}", size);
392         let size = size.to_le_bytes();
393         response[POSITION_OF_LENGTH as usize] = size[0];
394         response[(POSITION_OF_LENGTH + 1) as usize] = size[1];
395 
396         self.send_message(response).await;
397     }
398 
handle_send_notify_data( &mut self, subscribe_type: SubscribeType, notify_data: NotifyData, )399     async fn handle_send_notify_data(
400         &mut self,
401         subscribe_type: SubscribeType,
402         notify_data: NotifyData,
403     ) {
404         let mut message = Vec::<u8>::new();
405 
406         message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes());
407 
408         message.extend_from_slice(&self.message_id.to_le_bytes());
409         self.message_id += 1;
410 
411         let message_type = MessageType::NotifyData as u16;
412         message.extend_from_slice(&message_type.to_le_bytes());
413 
414         let message_body_size: u16 = 0;
415         message.extend_from_slice(&message_body_size.to_le_bytes());
416 
417         message.extend_from_slice(&(subscribe_type as u32).to_le_bytes());
418 
419         message.extend_from_slice(&notify_data.task_id.to_le_bytes());
420 
421         message.extend_from_slice(&(notify_data.progress.common_data.state as u32).to_le_bytes());
422 
423         let index = notify_data.progress.common_data.index;
424         message.extend_from_slice(&(index as u32).to_le_bytes());
425         // for one task, only send last progress message
426         message.extend_from_slice(&(notify_data.progress.processed[index] as u64).to_le_bytes());
427 
428         message.extend_from_slice(
429             &(notify_data.progress.common_data.total_processed as u64).to_le_bytes(),
430         );
431 
432         message.extend_from_slice(&(notify_data.progress.sizes.len() as u32).to_le_bytes());
433         for size in notify_data.progress.sizes {
434             message.extend_from_slice(&size.to_le_bytes());
435         }
436 
437         // The maximum length of the headers in uds should not exceed 8192
438         let mut buf_size = 0;
439         let index = notify_data
440             .progress
441             .extras
442             .iter()
443             .take_while(|x| {
444                 buf_size += x.0.len() + x.1.len();
445                 buf_size < HEADERS_MAX_SIZE as usize
446             })
447             .count();
448 
449         message.extend_from_slice(&(index as u32).to_le_bytes());
450         for (key, value) in notify_data.progress.extras.iter().take(index) {
451             message.extend_from_slice(key.as_bytes());
452             message.push(b'\0');
453             message.extend_from_slice(value.as_bytes());
454             message.push(b'\0');
455         }
456 
457         message.extend_from_slice(&(notify_data.action.repr as u32).to_le_bytes());
458 
459         message.extend_from_slice(&(notify_data.version as u32).to_le_bytes());
460 
461         // Param taskstates used for UploadFile when complete or fail
462         message.extend_from_slice(&(notify_data.each_file_status.len() as u32).to_le_bytes());
463         for status in notify_data.each_file_status {
464             if notify_data.version == Version::API9 {
465                 message.extend_from_slice(&status.path.into_bytes());
466             }
467             message.push(b'\0');
468             message.extend_from_slice(&(status.reason.repr as u32).to_le_bytes());
469             message.extend_from_slice(&status.message.into_bytes());
470             message.push(b'\0');
471         }
472 
473         let size = message.len() as u16;
474         if subscribe_type == SubscribeType::Progress {
475             debug!(
476                 "send tid {} {:?} size {}",
477                 notify_data.task_id, subscribe_type, size
478             );
479         } else {
480             info!(
481                 "send tid {} {:?} size {}",
482                 notify_data.task_id, subscribe_type, size
483             );
484         }
485 
486         let size = size.to_le_bytes();
487         message[POSITION_OF_LENGTH as usize] = size[0];
488         message[(POSITION_OF_LENGTH + 1) as usize] = size[1];
489 
490         self.send_message(message).await;
491     }
492 
send_message(&mut self, message: Vec<u8>)493     async fn send_message(&mut self, message: Vec<u8>) {
494         let ret = self.server_sock_fd.send(&message).await;
495         match ret {
496             Ok(size) => {
497                 debug!("send message ok, pid: {}, size: {}", self.pid, size);
498                 let mut buf: [u8; 4] = [0; 4];
499 
500                 match ylong_runtime::time::timeout(
501                     Duration::from_millis(500),
502                     self.server_sock_fd.recv(&mut buf),
503                 )
504                 .await
505                 {
506                     Ok(ret) => match ret {
507                         Ok(len) => {
508                             debug!("message recv len {:}", len);
509                         }
510                         Err(e) => {
511                             debug!("message recv error: {:?}", e);
512                         }
513                     },
514                     Err(e) => {
515                         debug!("message recv {}", e);
516                         return;
517                     }
518                 };
519 
520                 let len: u32 = u32::from_le_bytes(buf);
521                 if len != message.len() as u32 {
522                     debug!("message len bad, send {:?}, recv {:?}", message.len(), len);
523                 } else {
524                     debug!("notify done, pid: {}", self.pid);
525                 }
526             }
527             Err(err) => {
528                 error!("message send error: {:?}", err);
529             }
530         }
531     }
532 }
533