• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::{hash_map, HashMap};
15 use std::sync::Arc;
16 
17 use ylong_runtime::net::UnixDatagram;
18 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
19 use ylong_runtime::sync::oneshot::Sender;
20 
21 use super::{Client, ClientEvent};
22 
23 cfg_oh! {
24     use crate::ability::PANIC_INFO;
25 }
26 use crate::error::ErrorCode;
27 use crate::utils::runtime_spawn;
28 
29 #[derive(Clone)]
30 pub(crate) struct ClientManagerEntry {
31     tx: UnboundedSender<ClientEvent>,
32 }
33 
34 impl ClientManagerEntry {
new(tx: UnboundedSender<ClientEvent>) -> Self35     pub(crate) fn new(tx: UnboundedSender<ClientEvent>) -> Self {
36         Self { tx }
37     }
38 
send_event(&self, event: ClientEvent) -> bool39     pub(crate) fn send_event(&self, event: ClientEvent) -> bool {
40         if self.tx.send(event).is_err() {
41             #[cfg(feature = "oh")]
42             unsafe {
43                 if let Some(e) = PANIC_INFO.as_ref() {
44                     error!("Sends ClientManager event failed {}", e);
45                     sys_event!(
46                         ExecFault,
47                         DfxCode::UDS_FAULT_02,
48                         &format!("Sends ClientManager event failed {}", e)
49                     );
50                 } else {
51                     info!("ClientManager is unloading");
52                 }
53             }
54             return false;
55         }
56         true
57     }
58 }
59 pub(crate) struct ClientManager {
60     // map from pid to client and fd
61     clients: HashMap<u64, (UnboundedSender<ClientEvent>, Arc<UnixDatagram>)>,
62     pid_map: HashMap<u32, u64>,
63     rx: UnboundedReceiver<ClientEvent>,
64 }
65 
66 impl ClientManager {
init() -> ClientManagerEntry67     pub(crate) fn init() -> ClientManagerEntry {
68         debug!("ClientManager init");
69         let (tx, rx) = unbounded_channel();
70         let client_manager = ClientManager {
71             clients: HashMap::new(),
72             pid_map: HashMap::new(),
73             rx,
74         };
75         runtime_spawn(client_manager.run());
76         ClientManagerEntry::new(tx)
77     }
78 
run(mut self)79     async fn run(mut self) {
80         loop {
81             let recv = match self.rx.recv().await {
82                 Ok(message) => message,
83                 Err(e) => {
84                     error!("ClientManager recv error {:?}", e);
85                     sys_event!(
86                         ExecFault,
87                         DfxCode::UDS_FAULT_03,
88                         &format!("ClientManager recv error {:?}", e)
89                     );
90                     continue;
91                 }
92             };
93 
94             match recv {
95                 ClientEvent::OpenChannel(pid, tx) => self.handle_open_channel(pid, tx),
96                 ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => {
97                     self.handle_subscribe(tid, pid, uid, token_id, tx)
98                 }
99                 ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx),
100                 ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid),
101                 ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx),
102                 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => {
103                     if let Some(&pid) = self.pid_map.get(&tid) {
104                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
105                             if let Err(err) = tx.send(ClientEvent::SendResponse(
106                                 tid,
107                                 version,
108                                 status_code,
109                                 reason,
110                                 headers,
111                             )) {
112                                 error!("send response error, {}", err);
113                                 sys_event!(
114                                     ExecFault,
115                                     DfxCode::UDS_FAULT_02,
116                                     &format!("send response error, {}", err)
117                                 );
118                             }
119                         } else {
120                             debug!("response client not found");
121                         }
122                     } else {
123                         debug!("response pid not found");
124                     }
125                 }
126                 ClientEvent::SendNotifyData(subscribe_type, notify_data) => {
127                     if let Some(&pid) = self.pid_map.get(&(notify_data.task_id)) {
128                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
129                             if let Err(err) =
130                                 tx.send(ClientEvent::SendNotifyData(subscribe_type, notify_data))
131                             {
132                                 error!("send notify data error, {}", err);
133                                 sys_event!(
134                                     ExecFault,
135                                     DfxCode::UDS_FAULT_02,
136                                     &format!("send notify data error, {}", err)
137                                 );
138                             }
139                         } else {
140                             debug!("response client not found");
141                         }
142                     } else {
143                         debug!("notify data pid not found");
144                     }
145                 }
146                 ClientEvent::SendFaults(tid, subscribe_type, reason) => {
147                     if let Some(&pid) = self.pid_map.get(&tid) {
148                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
149                             if let Err(err) =
150                                 tx.send(ClientEvent::SendFaults(tid, subscribe_type, reason))
151                             {
152                                 error!("send faults error, {}", err);
153                                 sys_event!(
154                                     ExecFault,
155                                     DfxCode::UDS_FAULT_02,
156                                     &format!("send faults error, {}", err)
157                                 );
158                             }
159                         }
160                     }
161                 }
162                 ClientEvent::SendWaitNotify(tid, reason) => {
163                     if let Some(&pid) = self.pid_map.get(&tid) {
164                         if let Some((tx, _fd)) = self.clients.get_mut(&pid) {
165                             if let Err(err) = tx.send(ClientEvent::SendWaitNotify(tid, reason)) {
166                                 error!("send faults error, {}", err);
167                                 sys_event!(
168                                     ExecFault,
169                                     DfxCode::UDS_FAULT_02,
170                                     &format!("send faults error, {}", err)
171                                 );
172                             }
173                         }
174                     }
175                 }
176                 _ => {}
177             }
178 
179             debug!("ClientManager handle message done");
180         }
181     }
182 
handle_open_channel(&mut self, pid: u64, tx: Sender<Result<Arc<UnixDatagram>, ErrorCode>>)183     fn handle_open_channel(&mut self, pid: u64, tx: Sender<Result<Arc<UnixDatagram>, ErrorCode>>) {
184         match self.clients.entry(pid) {
185             hash_map::Entry::Occupied(o) => {
186                 let (_, fd) = o.get();
187                 let _ = tx.send(Ok(fd.clone()));
188             }
189             hash_map::Entry::Vacant(v) => match Client::constructor(pid) {
190                 Some((client, ud_fd)) => {
191                     let _ = tx.send(Ok(ud_fd.clone()));
192                     v.insert((client, ud_fd));
193                 }
194                 None => {
195                     let _ = tx.send(Err(ErrorCode::Other));
196                 }
197             },
198         }
199     }
200 
handle_subscribe( &mut self, tid: u32, pid: u64, _uid: u64, _token_id: u64, tx: Sender<ErrorCode>, )201     fn handle_subscribe(
202         &mut self,
203         tid: u32,
204         pid: u64,
205         _uid: u64,
206         _token_id: u64,
207         tx: Sender<ErrorCode>,
208     ) {
209         if let Some(_client) = self.clients.get_mut(&pid) {
210             self.pid_map.insert(tid, pid);
211             let _ = tx.send(ErrorCode::ErrOk);
212         } else {
213             info!("channel not open, pid {}", pid);
214             let _ = tx.send(ErrorCode::ChannelNotOpen);
215         }
216     }
217 
handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>)218     fn handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>) {
219         if let Some(&pid) = self.pid_map.get(&tid) {
220             self.pid_map.remove(&tid);
221             if let Some(_client) = self.clients.get_mut(&pid) {
222                 let _ = tx.send(ErrorCode::ErrOk);
223                 return;
224             } else {
225                 debug!("client not found");
226             }
227         } else {
228             debug!("unsubscribe tid not found");
229         }
230         let _ = tx.send(ErrorCode::Other);
231     }
232 
handle_task_finished(&mut self, tid: u32)233     fn handle_task_finished(&mut self, tid: u32) {
234         if self.pid_map.remove(&tid).is_some() {
235             debug!("unsubscribe tid {:?}", tid);
236         } else {
237             debug!("unsubscribe tid not found");
238         }
239     }
240 
handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>)241     fn handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>) {
242         if let Some((tx, _)) = self.clients.get_mut(&pid) {
243             let _ = tx.send(ClientEvent::Shutdown);
244             self.clients.remove(&pid);
245         } else {
246             debug!("terminate pid not found");
247         }
248         let _ = tx.send(ErrorCode::ErrOk);
249     }
250 }
251