• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::sync::atomic::{AtomicBool, Ordering};
16 use std::sync::{Arc, LazyLock, Mutex};
17 
18 use ylong_runtime::fastrand::fast_random;
19 use ylong_runtime::sync::mpsc::{self, unbounded_channel};
20 
21 use super::database::NotificationDb;
22 use super::notify_flow::{EventualNotify, NotifyFlow, NotifyInfo, ProgressNotify};
23 use super::task_handle::{cancel_notification, NotificationCheck};
24 use crate::info::TaskInfo;
25 use crate::task::request_task::RequestTask;
26 
27 pub(crate) const NOTIFY_PROGRESS_INTERVAL: u64 = 500;
28 
29 pub struct NotificationDispatcher {
30     database: Arc<NotificationDb>,
31     task_gauge: Mutex<HashMap<u32, Arc<AtomicBool>>>,
32     flow: mpsc::UnboundedSender<NotifyInfo>,
33 }
34 
35 impl NotificationDispatcher {
new() -> Self36     fn new() -> Self {
37         let database = Arc::new(NotificationDb::new());
38         let (tx, rx) = unbounded_channel();
39         NotifyFlow::new(rx, database.clone()).run();
40         Self {
41             database: database.clone(),
42             task_gauge: Mutex::new(HashMap::new()),
43             flow: tx,
44         }
45     }
46 
get_instance() -> &'static Self47     pub(crate) fn get_instance() -> &'static Self {
48         static INSTANCE: LazyLock<NotificationDispatcher> =
49             LazyLock::new(NotificationDispatcher::new);
50         &INSTANCE
51     }
52 
clear_task_info(&self, task_id: u32)53     pub(crate) fn clear_task_info(&self, task_id: u32) {
54         self.database.clear_task_info(task_id);
55     }
56 
clear_group_info(&self)57     pub(crate) fn clear_group_info(&self) {
58         self.database.clear_group_info_a_week_ago();
59     }
60 
update_task_customized_notification( &self, task_id: u32, title: Option<String>, text: Option<String>, )61     pub(crate) fn update_task_customized_notification(
62         &self,
63         task_id: u32,
64         title: Option<String>,
65         text: Option<String>,
66     ) {
67         self.database
68             .update_task_customized_notification(task_id, title, text);
69     }
70 
register_task(&self, task: &RequestTask) -> Arc<AtomicBool>71     pub(crate) fn register_task(&self, task: &RequestTask) -> Arc<AtomicBool> {
72         let gauge = if let Some(gid) = self.database.query_task_gid(task.task_id()) {
73             if self.database.is_gauge(gid) {
74                 Arc::new(AtomicBool::new(true))
75             } else {
76                 Arc::new(AtomicBool::new(false))
77             }
78         } else {
79             let gauge = task.notification_check(&self.database);
80             Arc::new(AtomicBool::new(gauge))
81         };
82         self.task_gauge
83             .lock()
84             .unwrap()
85             .insert(task.task_id(), gauge.clone());
86         gauge
87     }
88 
unregister_task(&self, uid: u64, task_id: u32)89     pub(crate) fn unregister_task(&self, uid: u64, task_id: u32) {
90         match (
91             self.task_gauge.lock().unwrap().get(&task_id).cloned(),
92             self.database.query_task_gid(task_id),
93         ) {
94             (Some(gauge), Some(gid)) => {
95                 gauge.store(false, Ordering::Release);
96                 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid));
97             }
98             (None, Some(gid)) => {
99                 let _ = self.flow.send(NotifyInfo::Unregister(uid, task_id, gid));
100             }
101             (Some(gauge), None) => {
102                 gauge.store(false, Ordering::Release);
103                 cancel_notification(task_id);
104             }
105             (None, None) => {}
106         }
107     }
108 
publish_progress_notification(&self, task: &RequestTask)109     pub(crate) fn publish_progress_notification(&self, task: &RequestTask) {
110         let progress = task.progress.lock().unwrap();
111         let mut total = Some(0);
112         for size in progress.sizes.iter() {
113             if *size < 0 {
114                 total = None;
115                 break;
116             }
117             *total.as_mut().unwrap() += *size as u64;
118         }
119         let multi_upload = match progress.sizes.len() {
120             0 | 1 => None,
121             len => Some((progress.common_data.index, len)),
122         };
123         let notify = ProgressNotify {
124             action: task.action(),
125             task_id: task.task_id(),
126             uid: task.uid(),
127             file_name: task.conf.file_specs[0].file_name.clone(),
128             processed: progress.common_data.total_processed as u64,
129             total,
130             multi_upload,
131         };
132         let _ = self.flow.send(NotifyInfo::Progress(notify));
133     }
134 
publish_success_notification(&self, info: &TaskInfo)135     pub(crate) fn publish_success_notification(&self, info: &TaskInfo) {
136         self.task_gauge
137             .lock()
138             .unwrap()
139             .remove(&info.common_data.task_id);
140         if !info.notification_check(&self.database) {
141             return;
142         }
143         let notify = EventualNotify {
144             action: info.action(),
145             task_id: info.common_data.task_id,
146             processed: info.progress.common_data.total_processed as u64,
147             uid: info.uid(),
148             file_name: info.file_specs[0].file_name.clone(),
149             is_successful: true,
150         };
151         let _ = self.flow.send(NotifyInfo::Eventual(notify));
152     }
153 
publish_failed_notification(&self, info: &TaskInfo)154     pub(crate) fn publish_failed_notification(&self, info: &TaskInfo) {
155         self.task_gauge
156             .lock()
157             .unwrap()
158             .remove(&info.common_data.task_id);
159         if !info.notification_check(&self.database) {
160             return;
161         }
162         let notify = EventualNotify {
163             action: info.action(),
164             task_id: info.common_data.task_id,
165             processed: info.progress.common_data.total_processed as u64,
166             uid: info.uid(),
167             file_name: info.file_specs[0].file_name.clone(),
168             is_successful: false,
169         };
170         let _ = self.flow.send(NotifyInfo::Eventual(notify));
171     }
172 
attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool173     pub(crate) fn attach_group(&self, task_ids: Vec<u32>, group_id: u32, uid: u64) -> bool {
174         if !self.database.attach_able(group_id) {
175             return false;
176         }
177         info!("Attach task {:?} to group {}", task_ids, group_id);
178         let is_gauge = self.database.is_gauge(group_id);
179         for task_id in task_ids.iter().copied() {
180             self.database.update_task_group(task_id, group_id);
181             if let Some(gauge) = self.task_gauge.lock().unwrap().get(&task_id) {
182                 gauge.store(is_gauge, std::sync::atomic::Ordering::Release);
183             }
184         }
185         let _ = self
186             .flow
187             .send(NotifyInfo::AttachGroup(group_id, uid, task_ids));
188         true
189     }
190 
delete_group(&self, group_id: u32, uid: u64) -> bool191     pub(crate) fn delete_group(&self, group_id: u32, uid: u64) -> bool {
192         info!("Delete group {}", group_id);
193         if !self.database.attach_able(group_id) {
194             return false;
195         }
196         self.database.disable_attach_group(group_id);
197         let notify = NotifyInfo::GroupEventual(group_id, uid);
198         let _ = self.flow.send(notify);
199         true
200     }
201 
create_group( &self, gauge: bool, title: Option<String>, text: Option<String>, ) -> u32202     pub(crate) fn create_group(
203         &self,
204         gauge: bool,
205         title: Option<String>,
206         text: Option<String>,
207     ) -> u32 {
208         let new_group_id = loop {
209             let candidate = fast_random() as u32;
210             if !self.database.contains_group(candidate) {
211                 break candidate;
212             }
213         };
214         info!(
215             "Create group {} gauge {} customized_title {:?} customized_text {:?}",
216             new_group_id, gauge, title, text
217         );
218 
219         let current_time = std::time::SystemTime::now()
220             .duration_since(std::time::UNIX_EPOCH)
221             .unwrap()
222             .as_millis() as u64;
223         self.database
224             .update_group_config(new_group_id, gauge, current_time);
225         if title.is_some() || text.is_some() {
226             self.database
227                 .update_group_customized_notification(new_group_id, title, text);
228         }
229         new_group_id
230     }
231 }
232