1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::HashMap; 15 use std::sync::atomic::{AtomicBool, Ordering}; 16 use std::sync::{Arc, LazyLock, Mutex}; 17 18 use ylong_runtime::fastrand::fast_random; 19 use ylong_runtime::sync::mpsc::{self, unbounded_channel}; 20 21 use super::database::NotificationDb; 22 use super::notify_flow::{EventualNotify, NotifyFlow, NotifyInfo, ProgressNotify}; 23 use super::task_handle::{cancel_notification, NotificationCheck}; 24 use crate::info::TaskInfo; 25 use crate::service::notification_bar::NotificationConfig; 26 use crate::task::request_task::RequestTask; 27 use crate::utils::get_current_duration; 28 29 pub(crate) const NOTIFY_PROGRESS_INTERVAL: u64 = 500; 30 31 pub struct NotificationDispatcher { 32 database: Arc<NotificationDb>, 33 task_gauge: Mutex<HashMap<u32, Arc<AtomicBool>>>, 34 flow: mpsc::UnboundedSender<NotifyInfo>, 35 } 36 37 impl NotificationDispatcher { new() -> Self38 fn new() -> Self { 39 let database = Arc::new(NotificationDb::new()); 40 let (tx, rx) = unbounded_channel(); 41 NotifyFlow::new(rx, database.clone()).run(); 42 Self { 43 database: database.clone(), 44 task_gauge: Mutex::new(HashMap::new()), 45 flow: tx, 46 } 47 } 48 get_instance() -> &'static Self49 pub(crate) fn get_instance() -> &'static Self { 50 static INSTANCE: LazyLock<NotificationDispatcher> = 51 LazyLock::new(NotificationDispatcher::new); 52 &INSTANCE 53 } 54 clear_task_info(&self, task_id: u32)55 pub(crate) fn clear_task_info(&self, task_id: u32) { 56 self.database.clear_task_info(task_id); 57 } 58 clear_group_info(&self)59 pub(crate) fn clear_group_info(&self) { 60 self.database.clear_group_info_a_week_ago(); 61 } 62 disable_task_notification(&self, uid: u64, task_id: u32)63 pub(crate) fn disable_task_notification(&self, uid: u64, task_id: u32) { 64 self.database.disable_task_notification(task_id); 65 self.unregister_task(uid, task_id, true); 66 } 67 enable_task_progress_notification(&self, task_id: u32)68 pub(crate) fn enable_task_progress_notification(&self, task_id: u32) { 69 if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) { 70 gauge.store(true, Ordering::Release); 71 } 72 } 73 update_task_customized_notification(&self, config: &NotificationConfig)74 pub(crate) fn update_task_customized_notification(&self, config: &NotificationConfig) { 75 self.database.update_task_customized_notification(config); 76 } 77 check_task_notification_available(&self, task_id: u32) -> bool78 pub(crate) fn check_task_notification_available(&self, task_id: u32) -> bool { 79 self.database.check_task_notification_available(&task_id) 80 } 81 register_task(&self, task: &RequestTask) -> Arc<AtomicBool>82 pub(crate) fn register_task(&self, task: &RequestTask) -> Arc<AtomicBool> { 83 let gauge = if let Some(gid) = self.database.query_task_gid(task.task_id()) { 84 if self.database.check_group_notification_available(&gid) && self.database.is_gauge(gid) 85 { 86 Arc::new(AtomicBool::new(true)) 87 } else { 88 Arc::new(AtomicBool::new(false)) 89 } 90 } else { 91 let gauge = task.notification_check(&self.database); 92 Arc::new(AtomicBool::new(gauge)) 93 }; 94 self.task_gauge 95 .lock() 96 .unwrap() 97 .insert(task.task_id(), gauge.clone()); 98 gauge 99 } 100 unregister_task(&self, uid: u64, task_id: u32, affect_group: bool)101 pub(crate) fn unregister_task(&self, uid: u64, task_id: u32, affect_group: bool) { 102 match ( 103 self.task_gauge.lock().unwrap().get(&task_id).cloned(), 104 self.database.query_task_gid(task_id), 105 ) { 106 (Some(gauge), Some(gid)) => { 107 if affect_group { 108 gauge.store(false, Ordering::Release); 109 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid)); 110 } 111 } 112 (None, Some(gid)) => { 113 if affect_group { 114 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid)); 115 } 116 } 117 (Some(gauge), None) => { 118 gauge.store(false, Ordering::Release); 119 cancel_notification(task_id); 120 } 121 (None, None) => {} 122 } 123 } 124 publish_progress_notification(&self, task: &RequestTask)125 pub(crate) fn publish_progress_notification(&self, task: &RequestTask) { 126 let progress = task.progress.lock().unwrap(); 127 let mut total = Some(0); 128 for size in progress.sizes.iter() { 129 if *size < 0 { 130 total = None; 131 break; 132 } 133 *total.as_mut().unwrap() += *size as u64; 134 } 135 let multi_upload = match progress.sizes.len() { 136 0 | 1 => None, 137 len => Some((progress.common_data.index, len)), 138 }; 139 let notify = ProgressNotify { 140 action: task.action(), 141 task_id: task.task_id(), 142 uid: task.uid(), 143 file_name: match task.conf.file_specs.first() { 144 Some(spec) => spec.file_name.clone(), 145 None => { 146 error!("Failed to get the first file_spec from an empty vector in TaskConfig"); 147 String::new() 148 } 149 }, 150 processed: progress.common_data.total_processed as u64, 151 total, 152 multi_upload, 153 }; 154 let _ = self.flow.send(NotifyInfo::Progress(notify)); 155 } 156 publish_success_notification(&self, info: &TaskInfo)157 pub(crate) fn publish_success_notification(&self, info: &TaskInfo) { 158 self.task_gauge 159 .lock() 160 .unwrap() 161 .remove(&info.common_data.task_id); 162 if !info.notification_check(&self.database) { 163 return; 164 } 165 let notify = EventualNotify { 166 action: info.action(), 167 task_id: info.common_data.task_id, 168 processed: info.progress.common_data.total_processed as u64, 169 uid: info.uid(), 170 file_name: match info.file_specs.first() { 171 Some(spec) => spec.file_name.clone(), 172 None => { 173 error!("Failed to get the first file_spec from an empty vector in TaskInfo"); 174 String::new() 175 } 176 }, 177 is_successful: true, 178 }; 179 let _ = self.flow.send(NotifyInfo::Eventual(notify)); 180 } 181 publish_failed_notification(&self, info: &TaskInfo)182 pub(crate) fn publish_failed_notification(&self, info: &TaskInfo) { 183 self.task_gauge 184 .lock() 185 .unwrap() 186 .remove(&info.common_data.task_id); 187 if !info.notification_check(&self.database) { 188 return; 189 } 190 let notify = EventualNotify { 191 action: info.action(), 192 task_id: info.common_data.task_id, 193 processed: info.progress.common_data.total_processed as u64, 194 uid: info.uid(), 195 file_name: match info.file_specs.first() { 196 Some(spec) => spec.file_name.clone(), 197 None => { 198 error!("Failed to get the first file_spec from an empty vector in TaskInfo"); 199 String::new() 200 } 201 }, 202 is_successful: false, 203 }; 204 let _ = self.flow.send(NotifyInfo::Eventual(notify)); 205 } 206 attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool207 pub(crate) fn attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool { 208 if !self.database.attach_able(group_id) { 209 return false; 210 } 211 info!("Attach task {:?} to group {}", task_ids, group_id); 212 let is_gauge = self.database.is_gauge(group_id); 213 for task_id in task_ids.iter().copied() { 214 self.database.update_task_group(task_id, group_id); 215 if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) { 216 gauge.store(is_gauge, std::sync::atomic::Ordering::Release); 217 } 218 } 219 if !self.database.check_group_notification_available(&group_id) { 220 return true; 221 } 222 223 let _ = self 224 .flow 225 .send(NotifyInfo::AttachGroup(group_id, uid, task_ids)); 226 true 227 } 228 delete_group(&self, group_id: u32, uid: u64) -> bool229 pub(crate) fn delete_group(&self, group_id: u32, uid: u64) -> bool { 230 info!("Delete group {}", group_id); 231 if !self.database.attach_able(group_id) { 232 return false; 233 } 234 self.database.disable_attach_group(group_id); 235 if !self.database.check_group_notification_available(&group_id) { 236 return true; 237 } 238 let notify = NotifyInfo::GroupEventual(group_id, uid); 239 let _ = self.flow.send(notify); 240 true 241 } 242 create_group( &self, gauge: bool, title: Option<String>, text: Option<String>, disable: bool, ) -> u32243 pub(crate) fn create_group( 244 &self, 245 gauge: bool, 246 title: Option<String>, 247 text: Option<String>, 248 disable: bool, 249 ) -> u32 { 250 let new_group_id = loop { 251 let candidate = fast_random() as u32; 252 if !self.database.contains_group(candidate) { 253 break candidate; 254 } 255 }; 256 info!( 257 "Create group {} gauge {} customized_title {:?} customized_text {:?} disable {}", 258 new_group_id, gauge, title, text, disable 259 ); 260 261 let current_time = get_current_duration().as_millis() as u64; 262 self.database 263 .update_group_config(new_group_id, gauge, current_time, !disable); 264 if title.is_some() || text.is_some() { 265 self.database 266 .update_group_customized_notification(new_group_id, title, text); 267 } 268 new_group_id 269 } 270 } 271