• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::sync::atomic::{AtomicBool, Ordering};
16 use std::sync::{Arc, LazyLock, Mutex};
17 
18 use ylong_runtime::fastrand::fast_random;
19 use ylong_runtime::sync::mpsc::{self, unbounded_channel};
20 
21 use super::database::NotificationDb;
22 use super::notify_flow::{EventualNotify, NotifyFlow, NotifyInfo, ProgressNotify};
23 use super::task_handle::{cancel_notification, NotificationCheck};
24 use crate::info::TaskInfo;
25 use crate::service::notification_bar::NotificationConfig;
26 use crate::task::request_task::RequestTask;
27 use crate::utils::get_current_duration;
28 
29 pub(crate) const NOTIFY_PROGRESS_INTERVAL: u64 = 500;
30 
31 pub struct NotificationDispatcher {
32     database: Arc<NotificationDb>,
33     task_gauge: Mutex<HashMap<u32, Arc<AtomicBool>>>,
34     flow: mpsc::UnboundedSender<NotifyInfo>,
35 }
36 
37 impl NotificationDispatcher {
new() -> Self38     fn new() -> Self {
39         let database = Arc::new(NotificationDb::new());
40         let (tx, rx) = unbounded_channel();
41         NotifyFlow::new(rx, database.clone()).run();
42         Self {
43             database: database.clone(),
44             task_gauge: Mutex::new(HashMap::new()),
45             flow: tx,
46         }
47     }
48 
get_instance() -> &'static Self49     pub(crate) fn get_instance() -> &'static Self {
50         static INSTANCE: LazyLock<NotificationDispatcher> =
51             LazyLock::new(NotificationDispatcher::new);
52         &INSTANCE
53     }
54 
clear_task_info(&self, task_id: u32)55     pub(crate) fn clear_task_info(&self, task_id: u32) {
56         self.database.clear_task_info(task_id);
57     }
58 
clear_group_info(&self)59     pub(crate) fn clear_group_info(&self) {
60         self.database.clear_group_info_a_week_ago();
61     }
62 
disable_task_notification(&self, uid: u64, task_id: u32)63     pub(crate) fn disable_task_notification(&self, uid: u64, task_id: u32) {
64         self.database.disable_task_notification(task_id);
65         self.unregister_task(uid, task_id, true);
66     }
67 
enable_task_progress_notification(&self, task_id: u32)68     pub(crate) fn enable_task_progress_notification(&self, task_id: u32) {
69         if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) {
70             gauge.store(true, Ordering::Release);
71         }
72     }
73 
update_task_customized_notification(&self, config: &NotificationConfig)74     pub(crate) fn update_task_customized_notification(&self, config: &NotificationConfig) {
75         self.database.update_task_customized_notification(config);
76     }
77 
check_task_notification_available(&self, task_id: u32) -> bool78     pub(crate) fn check_task_notification_available(&self, task_id: u32) -> bool {
79         self.database.check_task_notification_available(&task_id)
80     }
81 
register_task(&self, task: &RequestTask) -> Arc<AtomicBool>82     pub(crate) fn register_task(&self, task: &RequestTask) -> Arc<AtomicBool> {
83         let gauge = if let Some(gid) = self.database.query_task_gid(task.task_id()) {
84             if self.database.check_group_notification_available(&gid) && self.database.is_gauge(gid)
85             {
86                 Arc::new(AtomicBool::new(true))
87             } else {
88                 Arc::new(AtomicBool::new(false))
89             }
90         } else {
91             let gauge = task.notification_check(&self.database);
92             Arc::new(AtomicBool::new(gauge))
93         };
94         self.task_gauge
95             .lock()
96             .unwrap()
97             .insert(task.task_id(), gauge.clone());
98         gauge
99     }
100 
unregister_task(&self, uid: u64, task_id: u32, affect_group: bool)101     pub(crate) fn unregister_task(&self, uid: u64, task_id: u32, affect_group: bool) {
102         match (
103             self.task_gauge.lock().unwrap().get(&task_id).cloned(),
104             self.database.query_task_gid(task_id),
105         ) {
106             (Some(gauge), Some(gid)) => {
107                 if affect_group {
108                     gauge.store(false, Ordering::Release);
109                     let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid));
110                 }
111             }
112             (None, Some(gid)) => {
113                 if affect_group {
114                     let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid));
115                 }
116             }
117             (Some(gauge), None) => {
118                 gauge.store(false, Ordering::Release);
119                 cancel_notification(task_id);
120             }
121             (None, None) => {}
122         }
123     }
124 
publish_progress_notification(&self, task: &RequestTask)125     pub(crate) fn publish_progress_notification(&self, task: &RequestTask) {
126         let progress = task.progress.lock().unwrap();
127         let mut total = Some(0);
128         for size in progress.sizes.iter() {
129             if *size < 0 {
130                 total = None;
131                 break;
132             }
133             *total.as_mut().unwrap() += *size as u64;
134         }
135         let multi_upload = match progress.sizes.len() {
136             0 | 1 => None,
137             len => Some((progress.common_data.index, len)),
138         };
139         let notify = ProgressNotify {
140             action: task.action(),
141             task_id: task.task_id(),
142             uid: task.uid(),
143             file_name: match task.conf.file_specs.first() {
144                 Some(spec) => spec.file_name.clone(),
145                 None => {
146                     error!("Failed to get the first file_spec from an empty vector in TaskConfig");
147                     String::new()
148                 }
149             },
150             processed: progress.common_data.total_processed as u64,
151             total,
152             multi_upload,
153         };
154         let _ = self.flow.send(NotifyInfo::Progress(notify));
155     }
156 
publish_success_notification(&self, info: &TaskInfo)157     pub(crate) fn publish_success_notification(&self, info: &TaskInfo) {
158         self.task_gauge
159             .lock()
160             .unwrap()
161             .remove(&info.common_data.task_id);
162         if !info.notification_check(&self.database) {
163             return;
164         }
165         let notify = EventualNotify {
166             action: info.action(),
167             task_id: info.common_data.task_id,
168             processed: info.progress.common_data.total_processed as u64,
169             uid: info.uid(),
170             file_name: match info.file_specs.first() {
171                 Some(spec) => spec.file_name.clone(),
172                 None => {
173                     error!("Failed to get the first file_spec from an empty vector in TaskInfo");
174                     String::new()
175                 }
176             },
177             is_successful: true,
178         };
179         let _ = self.flow.send(NotifyInfo::Eventual(notify));
180     }
181 
publish_failed_notification(&self, info: &TaskInfo)182     pub(crate) fn publish_failed_notification(&self, info: &TaskInfo) {
183         self.task_gauge
184             .lock()
185             .unwrap()
186             .remove(&info.common_data.task_id);
187         if !info.notification_check(&self.database) {
188             return;
189         }
190         let notify = EventualNotify {
191             action: info.action(),
192             task_id: info.common_data.task_id,
193             processed: info.progress.common_data.total_processed as u64,
194             uid: info.uid(),
195             file_name: match info.file_specs.first() {
196                 Some(spec) => spec.file_name.clone(),
197                 None => {
198                     error!("Failed to get the first file_spec from an empty vector in TaskInfo");
199                     String::new()
200                 }
201             },
202             is_successful: false,
203         };
204         let _ = self.flow.send(NotifyInfo::Eventual(notify));
205     }
206 
attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool207     pub(crate) fn attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool {
208         if !self.database.attach_able(group_id) {
209             return false;
210         }
211         info!("Attach task {:?} to group {}", task_ids, group_id);
212         let is_gauge = self.database.is_gauge(group_id);
213         for task_id in task_ids.iter().copied() {
214             self.database.update_task_group(task_id, group_id);
215             if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) {
216                 gauge.store(is_gauge, std::sync::atomic::Ordering::Release);
217             }
218         }
219         if !self.database.check_group_notification_available(&group_id) {
220             return true;
221         }
222 
223         let _ = self
224             .flow
225             .send(NotifyInfo::AttachGroup(group_id, uid, task_ids));
226         true
227     }
228 
delete_group(&self, group_id: u32, uid: u64) -> bool229     pub(crate) fn delete_group(&self, group_id: u32, uid: u64) -> bool {
230         info!("Delete group {}", group_id);
231         if !self.database.attach_able(group_id) {
232             return false;
233         }
234         self.database.disable_attach_group(group_id);
235         if !self.database.check_group_notification_available(&group_id) {
236             return true;
237         }
238         let notify = NotifyInfo::GroupEventual(group_id, uid);
239         let _ = self.flow.send(notify);
240         true
241     }
242 
create_group( &self, gauge: bool, title: Option<String>, text: Option<String>, disable: bool, ) -> u32243     pub(crate) fn create_group(
244         &self,
245         gauge: bool,
246         title: Option<String>,
247         text: Option<String>,
248         disable: bool,
249     ) -> u32 {
250         let new_group_id = loop {
251             let candidate = fast_random() as u32;
252             if !self.database.contains_group(candidate) {
253                 break candidate;
254             }
255         };
256         info!(
257             "Create group {} gauge {} customized_title {:?} customized_text {:?} disable {}",
258             new_group_id, gauge, title, text, disable
259         );
260 
261         let current_time = get_current_duration().as_millis() as u64;
262         self.database
263             .update_group_config(new_group_id, gauge, current_time, !disable);
264         if title.is_some() || text.is_some() {
265             self.database
266                 .update_group_customized_notification(new_group_id, title, text);
267         }
268         new_group_id
269     }
270 }
271