1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::HashMap; 15 use std::sync::atomic::{AtomicBool, Ordering}; 16 use std::sync::{Arc, LazyLock, Mutex}; 17 18 use ylong_runtime::fastrand::fast_random; 19 use ylong_runtime::sync::mpsc::{self, unbounded_channel}; 20 21 use super::database::NotificationDb; 22 use super::notify_flow::{EventualNotify, NotifyFlow, NotifyInfo, ProgressNotify}; 23 use super::task_handle::{cancel_notification, NotificationCheck}; 24 use crate::info::TaskInfo; 25 use crate::task::request_task::RequestTask; 26 27 pub(crate) const NOTIFY_PROGRESS_INTERVAL: u64 = 500; 28 29 pub struct NotificationDispatcher { 30 database: Arc<NotificationDb>, 31 task_gauge: Mutex<HashMap<u32, Arc<AtomicBool>>>, 32 flow: mpsc::UnboundedSender<NotifyInfo>, 33 } 34 35 impl NotificationDispatcher { new() -> Self36 fn new() -> Self { 37 let database = Arc::new(NotificationDb::new()); 38 let (tx, rx) = unbounded_channel(); 39 NotifyFlow::new(rx, database.clone()).run(); 40 Self { 41 database: database.clone(), 42 task_gauge: Mutex::new(HashMap::new()), 43 flow: tx, 44 } 45 } 46 get_instance() -> &'static Self47 pub(crate) fn get_instance() -> &'static Self { 48 static INSTANCE: LazyLock<NotificationDispatcher> = 49 LazyLock::new(NotificationDispatcher::new); 50 &INSTANCE 51 } 52 clear_task_info(&self, task_id: u32)53 pub(crate) fn clear_task_info(&self, task_id: u32) { 54 self.database.clear_task_info(task_id); 55 } 56 clear_group_info(&self)57 pub(crate) fn clear_group_info(&self) { 58 self.database.clear_group_info_a_week_ago(); 59 } 60 update_task_customized_notification( &self, task_id: u32, title: Option<String>, text: Option<String>, )61 pub(crate) fn update_task_customized_notification( 62 &self, 63 task_id: u32, 64 title: Option<String>, 65 text: Option<String>, 66 ) { 67 self.database 68 .update_task_customized_notification(task_id, title, text); 69 } 70 register_task(&self, task: &RequestTask) -> Arc<AtomicBool>71 pub(crate) fn register_task(&self, task: &RequestTask) -> Arc<AtomicBool> { 72 let gauge = if let Some(gid) = self.database.query_task_gid(task.task_id()) { 73 if self.database.is_gauge(gid) { 74 Arc::new(AtomicBool::new(true)) 75 } else { 76 Arc::new(AtomicBool::new(false)) 77 } 78 } else { 79 let gauge = task.notification_check(&self.database); 80 Arc::new(AtomicBool::new(gauge)) 81 }; 82 self.task_gauge 83 .lock() 84 .unwrap() 85 .insert(task.task_id(), gauge.clone()); 86 gauge 87 } 88 unregister_task(&self, uid: u64, task_id: u32)89 pub(crate) fn unregister_task(&self, uid: u64, task_id: u32) { 90 match ( 91 self.task_gauge.lock().unwrap().get(&task_id).cloned(), 92 self.database.query_task_gid(task_id), 93 ) { 94 (Some(gauge), Some(gid)) => { 95 gauge.store(false, Ordering::Release); 96 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid)); 97 } 98 (None, Some(gid)) => { 99 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid)); 100 } 101 (Some(gauge), None) => { 102 gauge.store(false, Ordering::Release); 103 cancel_notification(task_id); 104 } 105 (None, None) => {} 106 } 107 } 108 publish_progress_notification(&self, task: &RequestTask)109 pub(crate) fn publish_progress_notification(&self, task: &RequestTask) { 110 let progress = task.progress.lock().unwrap(); 111 let mut total = Some(0); 112 for size in progress.sizes.iter() { 113 if *size < 0 { 114 total = None; 115 break; 116 } 117 *total.as_mut().unwrap() += *size as u64; 118 } 119 let multi_upload = match progress.sizes.len() { 120 0 | 1 => None, 121 len => Some((progress.common_data.index, len)), 122 }; 123 let notify = ProgressNotify { 124 action: task.action(), 125 task_id: task.task_id(), 126 uid: task.uid(), 127 file_name: task.conf.file_specs[0].file_name.clone(), 128 processed: progress.common_data.total_processed as u64, 129 total, 130 multi_upload, 131 }; 132 let _ = self.flow.send(NotifyInfo::Progress(notify)); 133 } 134 publish_success_notification(&self, info: &TaskInfo)135 pub(crate) fn publish_success_notification(&self, info: &TaskInfo) { 136 self.task_gauge 137 .lock() 138 .unwrap() 139 .remove(&info.common_data.task_id); 140 if !info.notification_check(&self.database) { 141 return; 142 } 143 let notify = EventualNotify { 144 action: info.action(), 145 task_id: info.common_data.task_id, 146 processed: info.progress.common_data.total_processed as u64, 147 uid: info.uid(), 148 file_name: info.file_specs[0].file_name.clone(), 149 is_successful: true, 150 }; 151 let _ = self.flow.send(NotifyInfo::Eventual(notify)); 152 } 153 publish_failed_notification(&self, info: &TaskInfo)154 pub(crate) fn publish_failed_notification(&self, info: &TaskInfo) { 155 self.task_gauge 156 .lock() 157 .unwrap() 158 .remove(&info.common_data.task_id); 159 if !info.notification_check(&self.database) { 160 return; 161 } 162 let notify = EventualNotify { 163 action: info.action(), 164 task_id: info.common_data.task_id, 165 processed: info.progress.common_data.total_processed as u64, 166 uid: info.uid(), 167 file_name: info.file_specs[0].file_name.clone(), 168 is_successful: false, 169 }; 170 let _ = self.flow.send(NotifyInfo::Eventual(notify)); 171 } 172 attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool173 pub(crate) fn attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool { 174 if !self.database.attach_able(group_id) { 175 return false; 176 } 177 info!("Attach task {:?} to group {}", task_ids, group_id); 178 let is_gauge = self.database.is_gauge(group_id); 179 for task_id in task_ids.iter().copied() { 180 self.database.update_task_group(task_id, group_id); 181 if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) { 182 gauge.store(is_gauge, std::sync::atomic::Ordering::Release); 183 } 184 } 185 let _ = self 186 .flow 187 .send(NotifyInfo::AttachGroup(group_id, uid, task_ids)); 188 true 189 } 190 delete_group(&self, group_id: u32, uid: u64) -> bool191 pub(crate) fn delete_group(&self, group_id: u32, uid: u64) -> bool { 192 info!("Delete group {}", group_id); 193 if !self.database.attach_able(group_id) { 194 return false; 195 } 196 self.database.disable_attach_group(group_id); 197 let notify = NotifyInfo::GroupEventual(group_id, uid); 198 let _ = self.flow.send(notify); 199 true 200 } 201 create_group( &self, gauge: bool, title: Option<String>, text: Option<String>, ) -> u32202 pub(crate) fn create_group( 203 &self, 204 gauge: bool, 205 title: Option<String>, 206 text: Option<String>, 207 ) -> u32 { 208 let new_group_id = loop { 209 let candidate = fast_random() as u32; 210 if !self.database.contains_group(candidate) { 211 break candidate; 212 } 213 }; 214 info!( 215 "Create group {} gauge {} customized_title {:?} customized_text {:?}", 216 new_group_id, gauge, title, text 217 ); 218 219 let current_time = std::time::SystemTime::now() 220 .duration_since(std::time::UNIX_EPOCH) 221 .unwrap() 222 .as_millis() as u64; 223 self.database 224 .update_group_config(new_group_id, gauge, current_time); 225 if title.is_some() || text.is_some() { 226 self.database 227 .update_group_customized_notification(new_group_id, title, text); 228 } 229 new_group_id 230 } 231 } 232