1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::hash_map::Entry; 15 use std::collections::HashMap; 16 use std::sync::Arc; 17 18 use ylong_runtime::sync::mpsc::{self, UnboundedReceiver}; 19 20 use super::database::{CustomizedNotification, NotificationDb}; 21 use super::ffi::{NotifyContent, PublishNotification}; 22 use super::task_handle::cancel_notification; 23 use crate::config::Action; 24 use crate::info::State; 25 use crate::manage::database::RequestDb; 26 use crate::utils::{get_current_timestamp, runtime_spawn}; 27 28 const NOTIFY_PROGRESS_INTERVAL: u64 = if cfg!(test) { 1 } else { 500 }; 29 30 pub(crate) struct NotifyFlow { 31 database: Arc<NotificationDb>, 32 // key for task_id. 33 notify_type_map: HashMap<u32, NotifyType>, 34 35 // key for request_id, group or task. 36 last_notify_map: HashMap<u32, u64>, 37 38 group_notify_progress: HashMap<u32, GroupProgress>, 39 // value 1 for title, 2 for text. 40 group_customized_notify: HashMap<u32, Option<CustomizedNotification>>, 41 group_gauge: HashMap<u32, bool>, 42 task_customized_notify: HashMap<u32, Option<CustomizedNotification>>, 43 rx: mpsc::UnboundedReceiver<NotifyInfo>, 44 } 45 46 pub(crate) struct GroupProgress { 47 task_progress: HashMap<u32, u64>, 48 total_progress: u64, 49 task_state: HashMap<u32, State>, 50 successful: usize, 51 failed: usize, 52 } 53 54 impl GroupProgress { new() -> Self55 pub(crate) fn new() -> Self { 56 Self { 57 task_progress: HashMap::new(), 58 total_progress: 0, 59 task_state: HashMap::new(), 60 successful: 0, 61 failed: 0, 62 } 63 } 64 update_task_progress(&mut self, task_id: u32, processed: u64)65 pub(crate) fn update_task_progress(&mut self, task_id: u32, processed: u64) { 66 let prev = match self.task_progress.entry(task_id) { 67 Entry::Occupied(entry) => entry.into_mut(), 68 Entry::Vacant(entry) => entry.insert(0), 69 }; 70 self.total_progress += processed - *prev; 71 *prev = processed; 72 } 73 update_task_state(&mut self, task_id: u32, state: State)74 pub(crate) fn update_task_state(&mut self, task_id: u32, state: State) { 75 let prev = match self.task_state.get_mut(&task_id) { 76 Some(prev) => prev, 77 None => { 78 self.task_state.insert(task_id, state); 79 if state == State::Completed { 80 self.successful += 1; 81 } else if state == State::Failed { 82 self.failed += 1; 83 } 84 return; 85 } 86 }; 87 if *prev == state { 88 return; 89 } 90 if *prev != State::Completed && *prev != State::Failed { 91 if state == State::Completed { 92 self.successful += 1; 93 } else if state == State::Failed { 94 self.failed += 1; 95 } 96 } else if state == State::Completed { 97 self.successful += 1; 98 self.failed -= 1; 99 } else if state == State::Failed { 100 self.failed += 1; 101 self.successful -= 1; 102 } 103 *prev = state; 104 } 105 successful(&self) -> usize106 pub(crate) fn successful(&self) -> usize { 107 self.successful 108 } 109 failed(&self) -> usize110 pub(crate) fn failed(&self) -> usize { 111 self.failed 112 } 113 total(&self) -> usize114 pub(crate) fn total(&self) -> usize { 115 self.task_state.len() 116 } processed(&self) -> u64117 pub(crate) fn processed(&self) -> u64 { 118 self.total_progress 119 } 120 is_finish(&self) -> bool121 pub(crate) fn is_finish(&self) -> bool { 122 self.total() == self.successful + self.failed 123 } 124 } 125 126 #[derive(Clone, Debug)] 127 pub struct ProgressNotify { 128 pub(crate) action: Action, 129 pub(crate) task_id: u32, 130 pub(crate) uid: u64, 131 pub(crate) processed: u64, 132 pub(crate) total: Option<u64>, 133 pub(crate) multi_upload: Option<(usize, usize)>, 134 pub(crate) file_name: String, 135 } 136 137 #[derive(Clone, Debug)] 138 pub(crate) struct EventualNotify { 139 pub(crate) action: Action, 140 pub(crate) task_id: u32, 141 pub(crate) uid: u64, 142 pub(crate) processed: u64, 143 pub(crate) file_name: String, 144 pub(crate) is_successful: bool, 145 } 146 147 #[derive(Debug)] 148 pub(crate) enum NotifyInfo { 149 Eventual(EventualNotify), 150 Progress(ProgressNotify), 151 AttachGroup(u32, u64, Vec<u32>), 152 Unregister(u64, u32, u32), 153 GroupEventual(u32, u64), 154 } 155 156 #[derive(Clone, Copy)] 157 enum NotifyType { 158 Group(u32), 159 Task, 160 } 161 162 impl NotifyFlow { new(rx: UnboundedReceiver<NotifyInfo>, database: Arc<NotificationDb>) -> Self163 pub(crate) fn new(rx: UnboundedReceiver<NotifyInfo>, database: Arc<NotificationDb>) -> Self { 164 Self { 165 database, 166 notify_type_map: HashMap::new(), 167 last_notify_map: HashMap::new(), 168 group_notify_progress: HashMap::new(), 169 group_gauge: HashMap::new(), 170 task_customized_notify: HashMap::new(), 171 group_customized_notify: HashMap::new(), 172 rx, 173 } 174 } 175 run(mut self)176 pub(crate) fn run(mut self) { 177 runtime_spawn(async move { 178 loop { 179 let info = match self.rx.recv().await { 180 Ok(message) => message, 181 Err(e) => { 182 error!("Notification flow channel error: {:?}", e); 183 sys_event!( 184 ExecFault, 185 DfxCode::UDS_FAULT_03, 186 &format!("Notification flow channel error: {:?}", e) 187 ); 188 continue; 189 } 190 }; 191 192 if let Some(content) = match info { 193 NotifyInfo::Eventual(info) => self.publish_completed_notify(&info), 194 NotifyInfo::Progress(info) => self.publish_progress_notification(info), 195 NotifyInfo::GroupEventual(group_id, uid) => self.group_eventual(group_id, uid), 196 NotifyInfo::AttachGroup(group_id, uid, task_ids) => { 197 self.attach_group(group_id, task_ids, uid) 198 } 199 NotifyInfo::Unregister(uid, task_id, group_id) => { 200 self.unregister_task(uid, task_id, group_id) 201 } 202 } { 203 PublishNotification(&content); 204 } 205 } 206 }); 207 } 208 unregister_task(&mut self, uid: u64, task_id: u32, group_id: u32) -> Option<NotifyContent>209 fn unregister_task(&mut self, uid: u64, task_id: u32, group_id: u32) -> Option<NotifyContent> { 210 info!( 211 "Unregister task: uid: {}, task_id: {}, group_id: {}", 212 uid, task_id, group_id 213 ); 214 let customized = self.group_customized_notify(group_id); 215 let progress = match self.group_notify_progress.entry(group_id) { 216 Entry::Occupied(entry) => entry.into_mut(), 217 Entry::Vacant(entry) => { 218 let progress = Self::get_group_progress(&self.database, group_id); 219 entry.insert(progress) 220 } 221 }; 222 if progress 223 .task_state 224 .get(&task_id) 225 .is_some_and(|state| *state != State::Completed && *state != State::Failed) 226 { 227 progress.task_state.remove(&task_id); 228 } 229 if progress.task_state.is_empty() { 230 cancel_notification(group_id); 231 return None; 232 } 233 if !Self::group_eventual_check(&self.database, progress, group_id) { 234 return None; 235 } 236 Some(NotifyContent::group_eventual_notify( 237 customized, 238 Action::Download, 239 group_id, 240 uid as u32, 241 progress.processed(), 242 progress.successful() as i32, 243 progress.failed() as i32, 244 )) 245 } 246 update_db_task_state_and_progress(group_progress: &mut GroupProgress, task_id: u32)247 fn update_db_task_state_and_progress(group_progress: &mut GroupProgress, task_id: u32) { 248 let Some(processed) = RequestDb::get_instance().query_task_total_processed(task_id) else { 249 return; 250 }; 251 let Some(state) = RequestDb::get_instance().query_task_state(task_id) else { 252 return; 253 }; 254 if state == State::Removed.repr { 255 return; 256 } 257 group_progress.update_task_state(task_id, State::from(state)); 258 group_progress.update_task_progress(task_id, processed as u64); 259 } 260 get_group_progress(database: &NotificationDb, group_id: u32) -> GroupProgress261 fn get_group_progress(database: &NotificationDb, group_id: u32) -> GroupProgress { 262 let mut group_progress = GroupProgress::new(); 263 for task_id in database.query_group_tasks(group_id) { 264 Self::update_db_task_state_and_progress(&mut group_progress, task_id); 265 } 266 group_progress 267 } 268 attach_group( &mut self, group_id: u32, task_ids: Vec<u32>, uid: u64, ) -> Option<NotifyContent>269 fn attach_group( 270 &mut self, 271 group_id: u32, 272 task_ids: Vec<u32>, 273 uid: u64, 274 ) -> Option<NotifyContent> { 275 let is_gauge = self.check_gauge(group_id); 276 let customized = self.group_customized_notify(group_id); 277 let progress = match self.group_notify_progress.entry(group_id) { 278 Entry::Occupied(entry) => { 279 let progress = entry.into_mut(); 280 for task_id in task_ids { 281 Self::update_db_task_state_and_progress(progress, task_id); 282 } 283 progress 284 } 285 Entry::Vacant(entry) => { 286 let progress = Self::get_group_progress(&self.database, group_id); 287 entry.insert(progress) 288 } 289 }; 290 if !is_gauge { 291 return None; 292 } 293 Some(NotifyContent::group_progress_notify( 294 customized, 295 Action::Download, 296 group_id, 297 uid as u32, 298 progress, 299 )) 300 } 301 check_gauge(&mut self, group_id: u32) -> bool302 fn check_gauge(&mut self, group_id: u32) -> bool { 303 match self.group_gauge.get(&group_id) { 304 Some(gauge) => *gauge, 305 None => { 306 let gauge = self.database.is_gauge(group_id); 307 self.group_gauge.insert(group_id, gauge); 308 gauge 309 } 310 } 311 } 312 group_customized_notify(&mut self, group_id: u32) -> Option<CustomizedNotification>313 fn group_customized_notify(&mut self, group_id: u32) -> Option<CustomizedNotification> { 314 match self.group_customized_notify.entry(group_id) { 315 Entry::Occupied(entry) => entry.get().clone(), 316 Entry::Vacant(entry) => { 317 let customized = self.database.query_group_customized_notification(group_id); 318 entry.insert(customized).clone() 319 } 320 } 321 } 322 task_customized_notify(&mut self, task_id: u32) -> Option<CustomizedNotification>323 fn task_customized_notify(&mut self, task_id: u32) -> Option<CustomizedNotification> { 324 match self.task_customized_notify.entry(task_id) { 325 Entry::Occupied(entry) => entry.get().clone(), 326 Entry::Vacant(entry) => { 327 let customized = self.database.query_task_customized_notification(task_id); 328 entry.insert(customized).clone() 329 } 330 } 331 } 332 publish_progress_notification(&mut self, info: ProgressNotify) -> Option<NotifyContent>333 fn publish_progress_notification(&mut self, info: ProgressNotify) -> Option<NotifyContent> { 334 let content = match self.get_request_id(info.task_id) { 335 NotifyType::Group(group_id) => { 336 if !self.check_gauge(group_id) { 337 return None; 338 } 339 let progress_interval_check = self.progress_interval_check(group_id); 340 341 let customized = self.group_customized_notify(group_id); 342 let progress = match self.group_notify_progress.entry(group_id) { 343 Entry::Occupied(entry) => entry.into_mut(), 344 Entry::Vacant(entry) => { 345 let progress = Self::get_group_progress(&self.database, group_id); 346 entry.insert(progress) 347 } 348 }; 349 progress.update_task_progress(info.task_id, info.processed); 350 351 if !progress_interval_check { 352 return None; 353 } 354 NotifyContent::group_progress_notify( 355 customized, 356 info.action, 357 group_id, 358 info.uid as u32, 359 progress, 360 ) 361 } 362 NotifyType::Task => NotifyContent::task_progress_notify( 363 self.task_customized_notify(info.task_id), 364 &info, 365 ), 366 }; 367 Some(content) 368 } 369 progress_interval_check(&mut self, request_id: u32) -> bool370 fn progress_interval_check(&mut self, request_id: u32) -> bool { 371 match self.last_notify_map.entry(request_id) { 372 Entry::Occupied(mut entry) => { 373 let last_notify = entry.get_mut(); 374 let current = get_current_timestamp(); 375 if current < NOTIFY_PROGRESS_INTERVAL + *last_notify { 376 return false; 377 } 378 *last_notify = current; 379 true 380 } 381 Entry::Vacant(entry) => { 382 let last_notify = get_current_timestamp(); 383 entry.insert(last_notify); 384 true 385 } 386 } 387 } 388 publish_completed_notify(&mut self, info: &EventualNotify) -> Option<NotifyContent>389 fn publish_completed_notify(&mut self, info: &EventualNotify) -> Option<NotifyContent> { 390 let content = match self.get_request_id(info.task_id) { 391 NotifyType::Group(group_id) => { 392 let is_gauge = self.check_gauge(group_id); 393 394 let customized = self.group_customized_notify(group_id); 395 let group_progress = match self.group_notify_progress.entry(group_id) { 396 Entry::Occupied(entry) => { 397 let progress = entry.into_mut(); 398 progress.update_task_progress(info.task_id, info.processed); 399 if info.is_successful { 400 progress.update_task_state(info.task_id, State::Completed); 401 } else { 402 progress.update_task_state(info.task_id, State::Failed); 403 } 404 progress 405 } 406 Entry::Vacant(entry) => { 407 let progress = Self::get_group_progress(&self.database, group_id); 408 entry.insert(progress) 409 } 410 }; 411 412 let group_eventual = 413 Self::group_eventual_check(&self.database, group_progress, group_id); 414 415 if !group_eventual { 416 if is_gauge { 417 NotifyContent::group_progress_notify( 418 customized, 419 info.action, 420 group_id, 421 info.uid as u32, 422 group_progress, 423 ) 424 } else { 425 return None; 426 } 427 } else { 428 self.database.clear_group_info(group_id); 429 NotifyContent::group_eventual_notify( 430 customized, 431 info.action, 432 group_id, 433 info.uid as u32, 434 group_progress.processed(), 435 group_progress.successful() as i32, 436 group_progress.failed() as i32, 437 ) 438 } 439 } 440 NotifyType::Task => { 441 let content = NotifyContent::task_eventual_notify( 442 self.task_customized_notify(info.task_id), 443 info.action, 444 info.task_id, 445 info.uid as u32, 446 info.file_name.clone(), 447 info.is_successful, 448 ); 449 if info.is_successful { 450 self.database.clear_task_info(info.task_id); 451 } 452 content 453 } 454 }; 455 Some(content) 456 } 457 group_eventual(&mut self, group_id: u32, uid: u64) -> Option<NotifyContent>458 fn group_eventual(&mut self, group_id: u32, uid: u64) -> Option<NotifyContent> { 459 let customized = self.group_customized_notify(group_id); 460 let group_progress = match self.group_notify_progress.entry(group_id) { 461 Entry::Occupied(entry) => entry.into_mut(), 462 Entry::Vacant(entry) => { 463 let progress = Self::get_group_progress(&self.database, group_id); 464 entry.insert(progress) 465 } 466 }; 467 468 let group_eventual = Self::group_eventual_check(&self.database, group_progress, group_id); 469 470 if !group_eventual { 471 return None; 472 } 473 Some(NotifyContent::group_eventual_notify( 474 customized, 475 Action::Download, 476 group_id, 477 uid as u32, 478 group_progress.processed(), 479 group_progress.successful() as i32, 480 group_progress.failed() as i32, 481 )) 482 } 483 get_request_id(&mut self, task_id: u32) -> NotifyType484 fn get_request_id(&mut self, task_id: u32) -> NotifyType { 485 if let Some(n_type) = self.notify_type_map.get(&task_id) { 486 return *n_type; 487 } 488 let n_type = match self.database.query_task_gid(task_id) { 489 Some(group_id) => NotifyType::Group(group_id), 490 None => NotifyType::Task, 491 }; 492 493 self.notify_type_map.insert(task_id, n_type); 494 n_type 495 } 496 group_eventual_check( database: &NotificationDb, group_progress: &mut GroupProgress, group_id: u32, ) -> bool497 fn group_eventual_check( 498 database: &NotificationDb, 499 group_progress: &mut GroupProgress, 500 group_id: u32, 501 ) -> bool { 502 !database.attach_able(group_id) && group_progress.is_finish() 503 } 504 } 505 506 #[cfg(test)] 507 mod ut_notify_flow { 508 include!("../../../tests/ut/service/notification_bar/ut_notify_flow.rs"); 509 } 510