1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::{hash_map, HashMap}; 15 use std::sync::Arc; 16 17 use ylong_runtime::net::UnixDatagram; 18 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; 19 use ylong_runtime::sync::oneshot::Sender; 20 21 use super::{Client, ClientEvent}; 22 23 cfg_oh! { 24 use crate::ability::PANIC_INFO; 25 } 26 use crate::error::ErrorCode; 27 use crate::utils::runtime_spawn; 28 29 #[derive(Clone)] 30 pub(crate) struct ClientManagerEntry { 31 tx: UnboundedSender<ClientEvent>, 32 } 33 34 impl ClientManagerEntry { new(tx: UnboundedSender<ClientEvent>) -> Self35 pub(crate) fn new(tx: UnboundedSender<ClientEvent>) -> Self { 36 Self { tx } 37 } 38 send_event(&self, event: ClientEvent) -> bool39 pub(crate) fn send_event(&self, event: ClientEvent) -> bool { 40 if self.tx.send(event).is_err() { 41 #[cfg(feature = "oh")] 42 unsafe { 43 if let Some(e) = PANIC_INFO.as_ref() { 44 error!("Sends ClientManager event failed {}", e); 45 sys_event!( 46 ExecFault, 47 DfxCode::UDS_FAULT_02, 48 &format!("Sends ClientManager event failed {}", e) 49 ); 50 } else { 51 info!("ClientManager is unloading"); 52 } 53 } 54 return false; 55 } 56 true 57 } 58 } 59 pub(crate) struct ClientManager { 60 // map from pid to client and fd 61 clients: HashMap<u64, (UnboundedSender<ClientEvent>, Arc<UnixDatagram>)>, 62 pid_map: HashMap<u32, u64>, 63 rx: UnboundedReceiver<ClientEvent>, 64 } 65 66 impl ClientManager { init() -> ClientManagerEntry67 pub(crate) fn init() -> ClientManagerEntry { 68 debug!("ClientManager init"); 69 let (tx, rx) = unbounded_channel(); 70 let client_manager = ClientManager { 71 clients: HashMap::new(), 72 pid_map: HashMap::new(), 73 rx, 74 }; 75 runtime_spawn(client_manager.run()); 76 ClientManagerEntry::new(tx) 77 } 78 run(mut self)79 async fn run(mut self) { 80 loop { 81 let recv = match self.rx.recv().await { 82 Ok(message) => message, 83 Err(e) => { 84 error!("ClientManager recv error {:?}", e); 85 sys_event!( 86 ExecFault, 87 DfxCode::UDS_FAULT_03, 88 &format!("ClientManager recv error {:?}", e) 89 ); 90 continue; 91 } 92 }; 93 94 match recv { 95 ClientEvent::OpenChannel(pid, tx) => self.handle_open_channel(pid, tx), 96 ClientEvent::Subscribe(tid, pid, uid, token_id, tx) => { 97 self.handle_subscribe(tid, pid, uid, token_id, tx) 98 } 99 ClientEvent::Unsubscribe(tid, tx) => self.handle_unsubscribe(tid, tx), 100 ClientEvent::TaskFinished(tid) => self.handle_task_finished(tid), 101 ClientEvent::Terminate(pid, tx) => self.handle_process_terminated(pid, tx), 102 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => { 103 if let Some(&pid) = self.pid_map.get(&tid) { 104 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 105 if let Err(err) = tx.send(ClientEvent::SendResponse( 106 tid, 107 version, 108 status_code, 109 reason, 110 headers, 111 )) { 112 error!("send response error, {}", err); 113 sys_event!( 114 ExecFault, 115 DfxCode::UDS_FAULT_02, 116 &format!("send response error, {}", err) 117 ); 118 } 119 } else { 120 debug!("response client not found"); 121 } 122 } else { 123 debug!("response pid not found"); 124 } 125 } 126 ClientEvent::SendNotifyData(subscribe_type, notify_data) => { 127 if let Some(&pid) = self.pid_map.get(&(notify_data.task_id)) { 128 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 129 if let Err(err) = 130 tx.send(ClientEvent::SendNotifyData(subscribe_type, notify_data)) 131 { 132 error!("send notify data error, {}", err); 133 sys_event!( 134 ExecFault, 135 DfxCode::UDS_FAULT_02, 136 &format!("send notify data error, {}", err) 137 ); 138 } 139 } else { 140 debug!("response client not found"); 141 } 142 } else { 143 debug!("notify data pid not found"); 144 } 145 } 146 ClientEvent::SendFaults(tid, subscribe_type, reason) => { 147 if let Some(&pid) = self.pid_map.get(&tid) { 148 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 149 if let Err(err) = 150 tx.send(ClientEvent::SendFaults(tid, subscribe_type, reason)) 151 { 152 error!("send faults error, {}", err); 153 sys_event!( 154 ExecFault, 155 DfxCode::UDS_FAULT_02, 156 &format!("send faults error, {}", err) 157 ); 158 } 159 } 160 } 161 } 162 ClientEvent::SendWaitNotify(tid, reason) => { 163 if let Some(&pid) = self.pid_map.get(&tid) { 164 if let Some((tx, _fd)) = self.clients.get_mut(&pid) { 165 if let Err(err) = tx.send(ClientEvent::SendWaitNotify(tid, reason)) { 166 error!("send faults error, {}", err); 167 sys_event!( 168 ExecFault, 169 DfxCode::UDS_FAULT_02, 170 &format!("send faults error, {}", err) 171 ); 172 } 173 } 174 } 175 } 176 _ => {} 177 } 178 179 debug!("ClientManager handle message done"); 180 } 181 } 182 handle_open_channel(&mut self, pid: u64, tx: Sender<Result<Arc<UnixDatagram>, ErrorCode>>)183 fn handle_open_channel(&mut self, pid: u64, tx: Sender<Result<Arc<UnixDatagram>, ErrorCode>>) { 184 match self.clients.entry(pid) { 185 hash_map::Entry::Occupied(o) => { 186 let (_, fd) = o.get(); 187 let _ = tx.send(Ok(fd.clone())); 188 } 189 hash_map::Entry::Vacant(v) => match Client::constructor(pid) { 190 Some((client, ud_fd)) => { 191 let _ = tx.send(Ok(ud_fd.clone())); 192 v.insert((client, ud_fd)); 193 } 194 None => { 195 let _ = tx.send(Err(ErrorCode::Other)); 196 } 197 }, 198 } 199 } 200 handle_subscribe( &mut self, tid: u32, pid: u64, _uid: u64, _token_id: u64, tx: Sender<ErrorCode>, )201 fn handle_subscribe( 202 &mut self, 203 tid: u32, 204 pid: u64, 205 _uid: u64, 206 _token_id: u64, 207 tx: Sender<ErrorCode>, 208 ) { 209 if let Some(_client) = self.clients.get_mut(&pid) { 210 self.pid_map.insert(tid, pid); 211 let _ = tx.send(ErrorCode::ErrOk); 212 } else { 213 info!("channel not open, pid {}", pid); 214 let _ = tx.send(ErrorCode::ChannelNotOpen); 215 } 216 } 217 handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>)218 fn handle_unsubscribe(&mut self, tid: u32, tx: Sender<ErrorCode>) { 219 if let Some(&pid) = self.pid_map.get(&tid) { 220 self.pid_map.remove(&tid); 221 if let Some(_client) = self.clients.get_mut(&pid) { 222 let _ = tx.send(ErrorCode::ErrOk); 223 return; 224 } else { 225 debug!("client not found"); 226 } 227 } else { 228 debug!("unsubscribe tid not found"); 229 } 230 let _ = tx.send(ErrorCode::Other); 231 } 232 handle_task_finished(&mut self, tid: u32)233 fn handle_task_finished(&mut self, tid: u32) { 234 if self.pid_map.remove(&tid).is_some() { 235 debug!("unsubscribe tid {:?}", tid); 236 } else { 237 debug!("unsubscribe tid not found"); 238 } 239 } 240 handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>)241 fn handle_process_terminated(&mut self, pid: u64, tx: Sender<ErrorCode>) { 242 if let Some((tx, _)) = self.clients.get_mut(&pid) { 243 let _ = tx.send(ClientEvent::Shutdown); 244 self.clients.remove(&pid); 245 } else { 246 debug!("terminate pid not found"); 247 } 248 let _ = tx.send(ErrorCode::ErrOk); 249 } 250 } 251