1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod manager; 15 16 use std::collections::HashMap; 17 use std::net::Shutdown; 18 use std::sync::Arc; 19 use std::time::Duration; 20 21 pub(crate) use manager::{ClientManager, ClientManagerEntry}; 22 use ylong_http_client::Headers; 23 use ylong_runtime::net::UnixDatagram; 24 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; 25 use ylong_runtime::sync::oneshot::{channel, Sender}; 26 27 use crate::config::Version; 28 use crate::error::ErrorCode; 29 use crate::task::notify::{NotifyData, SubscribeType, WaitingCause}; 30 use crate::task::reason::Reason; 31 use crate::utils::{runtime_spawn, Recv}; 32 33 const REQUEST_MAGIC_NUM: u32 = 0x43434646; 34 const HEADERS_MAX_SIZE: u16 = 8 * 1024; 35 const POSITION_OF_LENGTH: u32 = 10; 36 37 #[derive(Debug)] 38 pub(crate) enum ClientEvent { 39 OpenChannel(u64, Sender<Result<Arc<UnixDatagram>, ErrorCode>>), 40 Subscribe(u32, u64, u64, u64, Sender<ErrorCode>), 41 Unsubscribe(u32, Sender<ErrorCode>), 42 TaskFinished(u32), 43 Terminate(u64, Sender<ErrorCode>), 44 SendResponse(u32, String, u32, String, Headers), 45 SendNotifyData(SubscribeType, NotifyData), 46 SendFaults(u32, SubscribeType, Reason), 47 SendWaitNotify(u32, WaitingCause), 48 Shutdown, 49 } 50 51 pub(crate) enum MessageType { 52 HttpResponse = 0, 53 NotifyData, 54 Faults, 55 Waiting, 56 } 57 58 impl ClientManagerEntry { open_channel(&self, pid: u64) -> Result<Arc<UnixDatagram>, ErrorCode>59 pub(crate) fn open_channel(&self, pid: u64) -> Result<Arc<UnixDatagram>, ErrorCode> { 60 let (tx, rx) = channel::<Result<Arc<UnixDatagram>, ErrorCode>>(); 61 let event = ClientEvent::OpenChannel(pid, tx); 62 if !self.send_event(event) { 63 return Err(ErrorCode::Other); 64 } 65 let rx = Recv::new(rx); 66 match rx.get() { 67 Some(ret) => ret, 68 None => { 69 error!("open channel fail, recv none"); 70 sys_event!( 71 ExecFault, 72 DfxCode::UDS_FAULT_03, 73 "open channel fail, recv none" 74 ); 75 Err(ErrorCode::Other) 76 } 77 } 78 } 79 subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode80 pub(crate) fn subscribe(&self, tid: u32, pid: u64, uid: u64, token_id: u64) -> ErrorCode { 81 let (tx, rx) = channel::<ErrorCode>(); 82 let event = ClientEvent::Subscribe(tid, pid, uid, token_id, tx); 83 if !self.send_event(event) { 84 return ErrorCode::Other; 85 } 86 let rx = Recv::new(rx); 87 match rx.get() { 88 Some(ret) => ret, 89 None => { 90 error!("subscribe fail, recv none"); 91 sys_event!( 92 ExecFault, 93 DfxCode::UDS_FAULT_03, 94 "subscribe fail, recv none" 95 ); 96 ErrorCode::Other 97 } 98 } 99 } 100 unsubscribe(&self, tid: u32) -> ErrorCode101 pub(crate) fn unsubscribe(&self, tid: u32) -> ErrorCode { 102 let (tx, rx) = channel::<ErrorCode>(); 103 let event = ClientEvent::Unsubscribe(tid, tx); 104 if !self.send_event(event) { 105 return ErrorCode::Other; 106 } 107 let rx = Recv::new(rx); 108 match rx.get() { 109 Some(ret) => ret, 110 None => { 111 error!("unsubscribe failed"); 112 sys_event!(ExecFault, DfxCode::UDS_FAULT_03, "unsubscribe failed"); 113 ErrorCode::Other 114 } 115 } 116 } 117 notify_task_finished(&self, tid: u32)118 pub(crate) fn notify_task_finished(&self, tid: u32) { 119 let event = ClientEvent::TaskFinished(tid); 120 self.send_event(event); 121 } 122 notify_process_terminate(&self, pid: u64) -> ErrorCode123 pub(crate) fn notify_process_terminate(&self, pid: u64) -> ErrorCode { 124 let (tx, rx) = channel::<ErrorCode>(); 125 let event = ClientEvent::Terminate(pid, tx); 126 if !self.send_event(event) { 127 return ErrorCode::Other; 128 } 129 let rx = Recv::new(rx); 130 match rx.get() { 131 Some(ret) => ret, 132 None => { 133 error!("notify_process_terminate failed"); 134 sys_event!( 135 ExecFault, 136 DfxCode::UDS_FAULT_03, 137 "notify_process_terminate failed" 138 ); 139 ErrorCode::Other 140 } 141 } 142 } 143 send_response( &self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )144 pub(crate) fn send_response( 145 &self, 146 tid: u32, 147 version: String, 148 status_code: u32, 149 reason: String, 150 headers: Headers, 151 ) { 152 let event = ClientEvent::SendResponse(tid, version, status_code, reason, headers); 153 let _ = self.send_event(event); 154 } 155 send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData)156 pub(crate) fn send_notify_data(&self, subscribe_type: SubscribeType, notify_data: NotifyData) { 157 let event = ClientEvent::SendNotifyData(subscribe_type, notify_data); 158 let _ = self.send_event(event); 159 } 160 send_faults(&self, tid: u32, subscribe_type: SubscribeType, reason: Reason)161 pub(crate) fn send_faults(&self, tid: u32, subscribe_type: SubscribeType, reason: Reason) { 162 let event = ClientEvent::SendFaults(tid, subscribe_type, reason); 163 let _ = self.send_event(event); 164 } 165 send_wait_reason(&self, tid: u32, reason: WaitingCause)166 pub(crate) fn send_wait_reason(&self, tid: u32, reason: WaitingCause) { 167 let event = ClientEvent::SendWaitNotify(tid, reason); 168 let _ = self.send_event(event); 169 } 170 } 171 172 // uid and token_id will be used later 173 pub(crate) struct Client { 174 pub(crate) pid: u64, 175 pub(crate) message_id: u32, 176 pub(crate) server_sock_fd: UnixDatagram, 177 pub(crate) client_sock_fd: Arc<UnixDatagram>, 178 rx: UnboundedReceiver<ClientEvent>, 179 } 180 181 impl Client { constructor( pid: u64, ) -> Option<(UnboundedSender<ClientEvent>, Arc<UnixDatagram>)>182 pub(crate) fn constructor( 183 pid: u64, 184 ) -> Option<(UnboundedSender<ClientEvent>, Arc<UnixDatagram>)> { 185 let (tx, rx) = unbounded_channel(); 186 let (server_sock_fd, client_sock_fd) = match UnixDatagram::pair() { 187 Ok((server_sock_fd, client_sock_fd)) => (server_sock_fd, client_sock_fd), 188 Err(err) => { 189 error!("can't create a pair of sockets, {:?}", err); 190 sys_event!( 191 ExecFault, 192 DfxCode::TASK_FAULT_09, 193 &format!("can't create a pair of sockets, {:?}", err) 194 ); 195 return None; 196 } 197 }; 198 let client_sock_fd = Arc::new(client_sock_fd); 199 let client = Client { 200 pid, 201 message_id: 1, 202 server_sock_fd, 203 client_sock_fd: client_sock_fd.clone(), 204 rx, 205 }; 206 207 runtime_spawn(client.run()); 208 Some((tx, client_sock_fd)) 209 } 210 run(mut self)211 async fn run(mut self) { 212 loop { 213 // for one task, only send last progress message 214 let mut progress_index = HashMap::new(); 215 let mut temp_notify_data: Vec<(SubscribeType, NotifyData)> = Vec::new(); 216 let mut len = self.rx.len(); 217 if len == 0 { 218 len = 1; 219 } 220 for index in 0..len { 221 let recv = match self.rx.recv().await { 222 Ok(message) => message, 223 Err(e) => { 224 error!("ClientManager recv error {:?}", e); 225 sys_event!( 226 ExecFault, 227 DfxCode::UDS_FAULT_03, 228 &format!("ClientManager recv error {:?}", e) 229 ); 230 continue; 231 } 232 }; 233 match recv { 234 ClientEvent::Shutdown => { 235 let _ = self.client_sock_fd.shutdown(Shutdown::Both); 236 let _ = self.server_sock_fd.shutdown(Shutdown::Both); 237 self.rx.close(); 238 info!("client terminate, pid {}", self.pid); 239 return; 240 } 241 ClientEvent::SendResponse(tid, version, status_code, reason, headers) => { 242 self.handle_send_response(tid, version, status_code, reason, headers) 243 .await; 244 } 245 ClientEvent::SendFaults(tid, subscribe_type, reason) => { 246 self.handle_send_faults(tid, subscribe_type, reason).await; 247 } 248 ClientEvent::SendNotifyData(subscribe_type, notify_data) => { 249 if subscribe_type == SubscribeType::Progress { 250 progress_index.insert(notify_data.task_id, index); 251 } 252 temp_notify_data.push((subscribe_type, notify_data)); 253 } 254 ClientEvent::SendWaitNotify(task_id, waiting_reason) => { 255 self.handle_send_waiting_notify(task_id, waiting_reason) 256 .await; 257 } 258 _ => {} 259 } 260 } 261 for (index, (subscribe_type, notify_data)) in temp_notify_data.into_iter().enumerate() { 262 if subscribe_type != SubscribeType::Progress 263 || progress_index.get(¬ify_data.task_id) == Some(&index) 264 { 265 self.handle_send_notify_data(subscribe_type, notify_data) 266 .await; 267 } 268 } 269 debug!("Client handle message done"); 270 } 271 } 272 handle_send_faults( &mut self, tid: u32, subscribe_type: SubscribeType, reason: Reason, )273 async fn handle_send_faults( 274 &mut self, 275 tid: u32, 276 subscribe_type: SubscribeType, 277 reason: Reason, 278 ) { 279 let mut message = Vec::<u8>::new(); 280 message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 281 282 message.extend_from_slice(&self.message_id.to_le_bytes()); 283 self.message_id += 1; 284 285 let message_type = MessageType::Faults as u16; 286 message.extend_from_slice(&message_type.to_le_bytes()); 287 288 let message_body_size: u16 = 0; 289 message.extend_from_slice(&message_body_size.to_le_bytes()); 290 291 message.extend_from_slice(&tid.to_le_bytes()); 292 293 message.extend_from_slice(&(subscribe_type as u32).to_le_bytes()); 294 295 message.extend_from_slice(&(reason.repr as u32).to_le_bytes()); 296 297 let size = message.len() as u16; 298 info!("send faults size, {:?}", size); 299 let size = size.to_le_bytes(); 300 message[POSITION_OF_LENGTH as usize] = size[0]; 301 message[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 302 self.send_message(message).await; 303 } 304 handle_send_waiting_notify(&mut self, task_id: u32, waiting_reason: WaitingCause)305 async fn handle_send_waiting_notify(&mut self, task_id: u32, waiting_reason: WaitingCause) { 306 let mut message = Vec::<u8>::new(); 307 308 message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 309 310 message.extend_from_slice(&self.message_id.to_le_bytes()); 311 self.message_id += 1; 312 313 let message_type = MessageType::Waiting as u16; 314 message.extend_from_slice(&message_type.to_le_bytes()); 315 316 let message_body_size: u16 = 0; 317 message.extend_from_slice(&message_body_size.to_le_bytes()); 318 319 message.extend_from_slice(&task_id.to_le_bytes()); 320 321 message.extend_from_slice(&(waiting_reason.clone() as u32).to_le_bytes()); 322 323 let size = message.len() as u16; 324 debug!( 325 "send wait notify, tid {:?} reason {:?} size {:?}", 326 task_id, waiting_reason, size 327 ); 328 let size = size.to_le_bytes(); 329 message[POSITION_OF_LENGTH as usize] = size[0]; 330 message[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 331 332 self.send_message(message).await; 333 } 334 handle_send_response( &mut self, tid: u32, version: String, status_code: u32, reason: String, headers: Headers, )335 async fn handle_send_response( 336 &mut self, 337 tid: u32, 338 version: String, 339 status_code: u32, 340 reason: String, 341 headers: Headers, 342 ) { 343 let mut response = Vec::<u8>::new(); 344 345 response.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 346 347 response.extend_from_slice(&self.message_id.to_le_bytes()); 348 self.message_id += 1; 349 350 let message_type = MessageType::HttpResponse as u16; 351 response.extend_from_slice(&message_type.to_le_bytes()); 352 353 let message_body_size: u16 = 0; 354 response.extend_from_slice(&message_body_size.to_le_bytes()); 355 356 response.extend_from_slice(&tid.to_le_bytes()); 357 358 response.extend_from_slice(&version.into_bytes()); 359 response.push(b'\0'); 360 361 response.extend_from_slice(&status_code.to_le_bytes()); 362 363 response.extend_from_slice(&reason.into_bytes()); 364 response.push(b'\0'); 365 366 // The maximum length of the headers in uds should not exceed 8192 367 let mut buf_size = 0; 368 for (k, v) in headers { 369 buf_size += k.as_bytes().len() + v.iter().map(|f| f.len()).sum::<usize>(); 370 if buf_size > HEADERS_MAX_SIZE as usize { 371 break; 372 } 373 374 response.extend_from_slice(k.as_bytes()); 375 response.push(b':'); 376 for (i, sub_value) in v.iter().enumerate() { 377 if i != 0 { 378 response.push(b','); 379 } 380 response.extend_from_slice(sub_value); 381 } 382 response.push(b'\n'); 383 } 384 385 let mut size = response.len() as u16; 386 if size > HEADERS_MAX_SIZE { 387 info!("send response too long"); 388 response.truncate(HEADERS_MAX_SIZE as usize); 389 size = HEADERS_MAX_SIZE; 390 } 391 debug!("send response size, {:?}", size); 392 let size = size.to_le_bytes(); 393 response[POSITION_OF_LENGTH as usize] = size[0]; 394 response[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 395 396 self.send_message(response).await; 397 } 398 handle_send_notify_data( &mut self, subscribe_type: SubscribeType, notify_data: NotifyData, )399 async fn handle_send_notify_data( 400 &mut self, 401 subscribe_type: SubscribeType, 402 notify_data: NotifyData, 403 ) { 404 let mut message = Vec::<u8>::new(); 405 406 message.extend_from_slice(&REQUEST_MAGIC_NUM.to_le_bytes()); 407 408 message.extend_from_slice(&self.message_id.to_le_bytes()); 409 self.message_id += 1; 410 411 let message_type = MessageType::NotifyData as u16; 412 message.extend_from_slice(&message_type.to_le_bytes()); 413 414 let message_body_size: u16 = 0; 415 message.extend_from_slice(&message_body_size.to_le_bytes()); 416 417 message.extend_from_slice(&(subscribe_type as u32).to_le_bytes()); 418 419 message.extend_from_slice(¬ify_data.task_id.to_le_bytes()); 420 421 message.extend_from_slice(&(notify_data.progress.common_data.state as u32).to_le_bytes()); 422 423 let index = notify_data.progress.common_data.index; 424 message.extend_from_slice(&(index as u32).to_le_bytes()); 425 // for one task, only send last progress message 426 message.extend_from_slice(&(notify_data.progress.processed[index] as u64).to_le_bytes()); 427 428 message.extend_from_slice( 429 &(notify_data.progress.common_data.total_processed as u64).to_le_bytes(), 430 ); 431 432 message.extend_from_slice(&(notify_data.progress.sizes.len() as u32).to_le_bytes()); 433 for size in notify_data.progress.sizes { 434 message.extend_from_slice(&size.to_le_bytes()); 435 } 436 437 // The maximum length of the headers in uds should not exceed 8192 438 let mut buf_size = 0; 439 let index = notify_data 440 .progress 441 .extras 442 .iter() 443 .take_while(|x| { 444 buf_size += x.0.len() + x.1.len(); 445 buf_size < HEADERS_MAX_SIZE as usize 446 }) 447 .count(); 448 449 message.extend_from_slice(&(index as u32).to_le_bytes()); 450 for (key, value) in notify_data.progress.extras.iter().take(index) { 451 message.extend_from_slice(key.as_bytes()); 452 message.push(b'\0'); 453 message.extend_from_slice(value.as_bytes()); 454 message.push(b'\0'); 455 } 456 457 message.extend_from_slice(&(notify_data.action.repr as u32).to_le_bytes()); 458 459 message.extend_from_slice(&(notify_data.version as u32).to_le_bytes()); 460 461 // Param taskstates used for UploadFile when complete or fail 462 message.extend_from_slice(&(notify_data.each_file_status.len() as u32).to_le_bytes()); 463 for status in notify_data.each_file_status { 464 if notify_data.version == Version::API9 { 465 message.extend_from_slice(&status.path.into_bytes()); 466 } 467 message.push(b'\0'); 468 message.extend_from_slice(&(status.reason.repr as u32).to_le_bytes()); 469 message.extend_from_slice(&status.message.into_bytes()); 470 message.push(b'\0'); 471 } 472 473 let size = message.len() as u16; 474 if subscribe_type == SubscribeType::Progress { 475 debug!( 476 "send tid {} {:?} size {}", 477 notify_data.task_id, subscribe_type, size 478 ); 479 } else { 480 info!( 481 "send tid {} {:?} size {}", 482 notify_data.task_id, subscribe_type, size 483 ); 484 } 485 486 let size = size.to_le_bytes(); 487 message[POSITION_OF_LENGTH as usize] = size[0]; 488 message[(POSITION_OF_LENGTH + 1) as usize] = size[1]; 489 490 self.send_message(message).await; 491 } 492 send_message(&mut self, message: Vec<u8>)493 async fn send_message(&mut self, message: Vec<u8>) { 494 let ret = self.server_sock_fd.send(&message).await; 495 match ret { 496 Ok(size) => { 497 debug!("send message ok, pid: {}, size: {}", self.pid, size); 498 let mut buf: [u8; 4] = [0; 4]; 499 500 match ylong_runtime::time::timeout( 501 Duration::from_millis(500), 502 self.server_sock_fd.recv(&mut buf), 503 ) 504 .await 505 { 506 Ok(ret) => match ret { 507 Ok(len) => { 508 debug!("message recv len {:}", len); 509 } 510 Err(e) => { 511 debug!("message recv error: {:?}", e); 512 } 513 }, 514 Err(e) => { 515 debug!("message recv {}", e); 516 return; 517 } 518 }; 519 520 let len: u32 = u32::from_le_bytes(buf); 521 if len != message.len() as u32 { 522 debug!("message len bad, send {:?}, recv {:?}", message.len(), len); 523 } else { 524 debug!("notify done, pid: {}", self.pid); 525 } 526 } 527 Err(err) => { 528 error!("message send error: {:?}", err); 529 } 530 } 531 } 532 } 533