• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::hash_map::Entry;
15 use std::collections::HashMap;
16 use std::sync::Arc;
17 
18 use ylong_runtime::sync::mpsc::{self, UnboundedReceiver};
19 
20 use super::database::{CustomizedNotification, NotificationDb};
21 use super::ffi::{NotifyContent, PublishNotification};
22 use super::task_handle::cancel_notification;
23 use crate::config::Action;
24 use crate::info::State;
25 use crate::manage::database::RequestDb;
26 use crate::utils::{get_current_timestamp, runtime_spawn};
27 
28 const NOTIFY_PROGRESS_INTERVAL: u64 = if cfg!(test) { 1 } else { 500 };
29 
30 pub(crate) struct NotifyFlow {
31     database: Arc<NotificationDb>,
32     // key for task_id.
33     notify_type_map: HashMap<u32, NotifyType>,
34 
35     // key for request_id, group or task.
36     last_notify_map: HashMap<u32, u64>,
37 
38     group_notify_progress: HashMap<u32, GroupProgress>,
39     // value 1 for title, 2 for text.
40     group_customized_notify: HashMap<u32, Option<CustomizedNotification>>,
41     group_gauge: HashMap<u32, bool>,
42     task_customized_notify: HashMap<u32, Option<CustomizedNotification>>,
43     rx: mpsc::UnboundedReceiver<NotifyInfo>,
44 }
45 
46 pub(crate) struct GroupProgress {
47     task_progress: HashMap<u32, u64>,
48     total_progress: u64,
49     task_state: HashMap<u32, State>,
50     successful: usize,
51     failed: usize,
52 }
53 
54 impl GroupProgress {
new() -> Self55     pub(crate) fn new() -> Self {
56         Self {
57             task_progress: HashMap::new(),
58             total_progress: 0,
59             task_state: HashMap::new(),
60             successful: 0,
61             failed: 0,
62         }
63     }
64 
update_task_progress(&mut self, task_id: u32, processed: u64)65     pub(crate) fn update_task_progress(&mut self, task_id: u32, processed: u64) {
66         let prev = match self.task_progress.entry(task_id) {
67             Entry::Occupied(entry) => entry.into_mut(),
68             Entry::Vacant(entry) => entry.insert(0),
69         };
70         self.total_progress += processed - *prev;
71         *prev = processed;
72     }
73 
update_task_state(&mut self, task_id: u32, state: State)74     pub(crate) fn update_task_state(&mut self, task_id: u32, state: State) {
75         let prev = match self.task_state.get_mut(&task_id) {
76             Some(prev) => prev,
77             None => {
78                 self.task_state.insert(task_id, state);
79                 if state == State::Completed {
80                     self.successful += 1;
81                 } else if state == State::Failed {
82                     self.failed += 1;
83                 }
84                 return;
85             }
86         };
87         if *prev == state {
88             return;
89         }
90         if *prev != State::Completed && *prev != State::Failed {
91             if state == State::Completed {
92                 self.successful += 1;
93             } else if state == State::Failed {
94                 self.failed += 1;
95             }
96         } else if state == State::Completed {
97             self.successful += 1;
98             self.failed -= 1;
99         } else if state == State::Failed {
100             self.failed += 1;
101             self.successful -= 1;
102         }
103         *prev = state;
104     }
105 
successful(&self) -> usize106     pub(crate) fn successful(&self) -> usize {
107         self.successful
108     }
109 
failed(&self) -> usize110     pub(crate) fn failed(&self) -> usize {
111         self.failed
112     }
113 
total(&self) -> usize114     pub(crate) fn total(&self) -> usize {
115         self.task_state.len()
116     }
processed(&self) -> u64117     pub(crate) fn processed(&self) -> u64 {
118         self.total_progress
119     }
120 
is_finish(&self) -> bool121     pub(crate) fn is_finish(&self) -> bool {
122         self.total() == self.successful + self.failed
123     }
124 }
125 
126 #[derive(Clone, Debug)]
127 pub struct ProgressNotify {
128     pub(crate) action: Action,
129     pub(crate) task_id: u32,
130     pub(crate) uid: u64,
131     pub(crate) processed: u64,
132     pub(crate) total: Option<u64>,
133     pub(crate) multi_upload: Option<(usize, usize)>,
134     pub(crate) file_name: String,
135 }
136 
137 #[derive(Clone, Debug)]
138 pub(crate) struct EventualNotify {
139     pub(crate) action: Action,
140     pub(crate) task_id: u32,
141     pub(crate) uid: u64,
142     pub(crate) processed: u64,
143     pub(crate) file_name: String,
144     pub(crate) is_successful: bool,
145 }
146 
147 #[derive(Debug)]
148 pub(crate) enum NotifyInfo {
149     Eventual(EventualNotify),
150     Progress(ProgressNotify),
151     AttachGroup(u32, u64, Vec<u32>),
152     Unregister(u64, u32, u32),
153     GroupEventual(u32, u64),
154 }
155 
156 #[derive(Clone, Copy)]
157 enum NotifyType {
158     Group(u32),
159     Task,
160 }
161 
162 impl NotifyFlow {
new(rx: UnboundedReceiver<NotifyInfo>, database: Arc<NotificationDb>) -> Self163     pub(crate) fn new(rx: UnboundedReceiver<NotifyInfo>, database: Arc<NotificationDb>) -> Self {
164         Self {
165             database,
166             notify_type_map: HashMap::new(),
167             last_notify_map: HashMap::new(),
168             group_notify_progress: HashMap::new(),
169             group_gauge: HashMap::new(),
170             task_customized_notify: HashMap::new(),
171             group_customized_notify: HashMap::new(),
172             rx,
173         }
174     }
175 
run(mut self)176     pub(crate) fn run(mut self) {
177         runtime_spawn(async move {
178             loop {
179                 let info = match self.rx.recv().await {
180                     Ok(message) => message,
181                     Err(e) => {
182                         error!("Notification flow channel error: {:?}", e);
183                         sys_event!(
184                             ExecFault,
185                             DfxCode::UDS_FAULT_03,
186                             &format!("Notification flow channel error: {:?}", e)
187                         );
188                         continue;
189                     }
190                 };
191 
192                 if let Some(content) = match info {
193                     NotifyInfo::Eventual(info) => self.publish_completed_notify(&info),
194                     NotifyInfo::Progress(info) => self.publish_progress_notification(info),
195                     NotifyInfo::GroupEventual(group_id, uid) => self.group_eventual(group_id, uid),
196                     NotifyInfo::AttachGroup(group_id, uid, task_ids) => {
197                         self.attach_group(group_id, task_ids, uid)
198                     }
199                     NotifyInfo::Unregister(uid, task_id, group_id) => {
200                         self.unregister_task(uid, task_id, group_id)
201                     }
202                 } {
203                     PublishNotification(&content);
204                 }
205             }
206         });
207     }
208 
unregister_task(&mut self, uid: u64, task_id: u32, group_id: u32) -> Option<NotifyContent>209     fn unregister_task(&mut self, uid: u64, task_id: u32, group_id: u32) -> Option<NotifyContent> {
210         info!(
211             "Unregister task: uid: {}, task_id: {}, group_id: {}",
212             uid, task_id, group_id
213         );
214         let customized = self.group_customized_notify(group_id);
215         let progress = match self.group_notify_progress.entry(group_id) {
216             Entry::Occupied(entry) => entry.into_mut(),
217             Entry::Vacant(entry) => {
218                 let progress = Self::get_group_progress(&self.database, group_id);
219                 entry.insert(progress)
220             }
221         };
222         if progress
223             .task_state
224             .get(&task_id)
225             .is_some_and(|state| *state != State::Completed && *state != State::Failed)
226         {
227             progress.task_state.remove(&task_id);
228         }
229         if progress.task_state.is_empty() {
230             cancel_notification(group_id);
231             return None;
232         }
233         if !Self::group_eventual_check(&self.database, progress, group_id) {
234             return None;
235         }
236         Some(NotifyContent::group_eventual_notify(
237             customized,
238             Action::Download,
239             group_id,
240             uid as u32,
241             progress.processed(),
242             progress.successful() as i32,
243             progress.failed() as i32,
244         ))
245     }
246 
update_db_task_state_and_progress(group_progress: &mut GroupProgress, task_id: u32)247     fn update_db_task_state_and_progress(group_progress: &mut GroupProgress, task_id: u32) {
248         let Some(processed) = RequestDb::get_instance().query_task_total_processed(task_id) else {
249             return;
250         };
251         let Some(state) = RequestDb::get_instance().query_task_state(task_id) else {
252             return;
253         };
254         if state == State::Removed.repr {
255             return;
256         }
257         group_progress.update_task_state(task_id, State::from(state));
258         group_progress.update_task_progress(task_id, processed as u64);
259     }
260 
get_group_progress(database: &NotificationDb, group_id: u32) -> GroupProgress261     fn get_group_progress(database: &NotificationDb, group_id: u32) -> GroupProgress {
262         let mut group_progress = GroupProgress::new();
263         for task_id in database.query_group_tasks(group_id) {
264             Self::update_db_task_state_and_progress(&mut group_progress, task_id);
265         }
266         group_progress
267     }
268 
attach_group( &mut self, group_id: u32, task_ids: Vec<u32>, uid: u64, ) -> Option<NotifyContent>269     fn attach_group(
270         &mut self,
271         group_id: u32,
272         task_ids: Vec<u32>,
273         uid: u64,
274     ) -> Option<NotifyContent> {
275         let is_gauge = self.check_gauge(group_id);
276         let customized = self.group_customized_notify(group_id);
277         let progress = match self.group_notify_progress.entry(group_id) {
278             Entry::Occupied(entry) => {
279                 let progress = entry.into_mut();
280                 for task_id in task_ids {
281                     Self::update_db_task_state_and_progress(progress, task_id);
282                 }
283                 progress
284             }
285             Entry::Vacant(entry) => {
286                 let progress = Self::get_group_progress(&self.database, group_id);
287                 entry.insert(progress)
288             }
289         };
290         if !is_gauge {
291             return None;
292         }
293         Some(NotifyContent::group_progress_notify(
294             customized,
295             Action::Download,
296             group_id,
297             uid as u32,
298             progress,
299         ))
300     }
301 
check_gauge(&mut self, group_id: u32) -> bool302     fn check_gauge(&mut self, group_id: u32) -> bool {
303         match self.group_gauge.get(&group_id) {
304             Some(gauge) => *gauge,
305             None => {
306                 let gauge = self.database.is_gauge(group_id);
307                 self.group_gauge.insert(group_id, gauge);
308                 gauge
309             }
310         }
311     }
312 
group_customized_notify(&mut self, group_id: u32) -> Option<CustomizedNotification>313     fn group_customized_notify(&mut self, group_id: u32) -> Option<CustomizedNotification> {
314         match self.group_customized_notify.entry(group_id) {
315             Entry::Occupied(entry) => entry.get().clone(),
316             Entry::Vacant(entry) => {
317                 let customized = self.database.query_group_customized_notification(group_id);
318                 entry.insert(customized).clone()
319             }
320         }
321     }
322 
task_customized_notify(&mut self, task_id: u32) -> Option<CustomizedNotification>323     fn task_customized_notify(&mut self, task_id: u32) -> Option<CustomizedNotification> {
324         match self.task_customized_notify.entry(task_id) {
325             Entry::Occupied(entry) => entry.get().clone(),
326             Entry::Vacant(entry) => {
327                 let customized = self.database.query_task_customized_notification(task_id);
328                 entry.insert(customized).clone()
329             }
330         }
331     }
332 
publish_progress_notification(&mut self, info: ProgressNotify) -> Option<NotifyContent>333     fn publish_progress_notification(&mut self, info: ProgressNotify) -> Option<NotifyContent> {
334         let content = match self.get_request_id(info.task_id) {
335             NotifyType::Group(group_id) => {
336                 if !self.check_gauge(group_id) {
337                     return None;
338                 }
339                 let progress_interval_check = self.progress_interval_check(group_id);
340 
341                 let customized = self.group_customized_notify(group_id);
342                 let progress = match self.group_notify_progress.entry(group_id) {
343                     Entry::Occupied(entry) => entry.into_mut(),
344                     Entry::Vacant(entry) => {
345                         let progress = Self::get_group_progress(&self.database, group_id);
346                         entry.insert(progress)
347                     }
348                 };
349                 progress.update_task_progress(info.task_id, info.processed);
350 
351                 if !progress_interval_check {
352                     return None;
353                 }
354                 NotifyContent::group_progress_notify(
355                     customized,
356                     info.action,
357                     group_id,
358                     info.uid as u32,
359                     progress,
360                 )
361             }
362             NotifyType::Task => NotifyContent::task_progress_notify(
363                 self.task_customized_notify(info.task_id),
364                 &info,
365             ),
366         };
367         Some(content)
368     }
369 
progress_interval_check(&mut self, request_id: u32) -> bool370     fn progress_interval_check(&mut self, request_id: u32) -> bool {
371         match self.last_notify_map.entry(request_id) {
372             Entry::Occupied(mut entry) => {
373                 let last_notify = entry.get_mut();
374                 let current = get_current_timestamp();
375                 if current < NOTIFY_PROGRESS_INTERVAL + *last_notify {
376                     return false;
377                 }
378                 *last_notify = current;
379                 true
380             }
381             Entry::Vacant(entry) => {
382                 let last_notify = get_current_timestamp();
383                 entry.insert(last_notify);
384                 true
385             }
386         }
387     }
388 
publish_completed_notify(&mut self, info: &EventualNotify) -> Option<NotifyContent>389     fn publish_completed_notify(&mut self, info: &EventualNotify) -> Option<NotifyContent> {
390         let content = match self.get_request_id(info.task_id) {
391             NotifyType::Group(group_id) => {
392                 let is_gauge = self.check_gauge(group_id);
393 
394                 let customized = self.group_customized_notify(group_id);
395                 let group_progress = match self.group_notify_progress.entry(group_id) {
396                     Entry::Occupied(entry) => {
397                         let progress = entry.into_mut();
398                         progress.update_task_progress(info.task_id, info.processed);
399                         if info.is_successful {
400                             progress.update_task_state(info.task_id, State::Completed);
401                         } else {
402                             progress.update_task_state(info.task_id, State::Failed);
403                         }
404                         progress
405                     }
406                     Entry::Vacant(entry) => {
407                         let progress = Self::get_group_progress(&self.database, group_id);
408                         entry.insert(progress)
409                     }
410                 };
411 
412                 let group_eventual =
413                     Self::group_eventual_check(&self.database, group_progress, group_id);
414 
415                 if !group_eventual {
416                     if is_gauge {
417                         NotifyContent::group_progress_notify(
418                             customized,
419                             info.action,
420                             group_id,
421                             info.uid as u32,
422                             group_progress,
423                         )
424                     } else {
425                         return None;
426                     }
427                 } else {
428                     self.database.clear_group_info(group_id);
429                     NotifyContent::group_eventual_notify(
430                         customized,
431                         info.action,
432                         group_id,
433                         info.uid as u32,
434                         group_progress.processed(),
435                         group_progress.successful() as i32,
436                         group_progress.failed() as i32,
437                     )
438                 }
439             }
440             NotifyType::Task => {
441                 let content = NotifyContent::task_eventual_notify(
442                     self.task_customized_notify(info.task_id),
443                     info.action,
444                     info.task_id,
445                     info.uid as u32,
446                     info.file_name.clone(),
447                     info.is_successful,
448                 );
449                 if info.is_successful {
450                     self.database.clear_task_info(info.task_id);
451                 }
452                 content
453             }
454         };
455         Some(content)
456     }
457 
group_eventual(&mut self, group_id: u32, uid: u64) -> Option<NotifyContent>458     fn group_eventual(&mut self, group_id: u32, uid: u64) -> Option<NotifyContent> {
459         let customized = self.group_customized_notify(group_id);
460         let group_progress = match self.group_notify_progress.entry(group_id) {
461             Entry::Occupied(entry) => entry.into_mut(),
462             Entry::Vacant(entry) => {
463                 let progress = Self::get_group_progress(&self.database, group_id);
464                 entry.insert(progress)
465             }
466         };
467 
468         let group_eventual = Self::group_eventual_check(&self.database, group_progress, group_id);
469 
470         if !group_eventual {
471             return None;
472         }
473         Some(NotifyContent::group_eventual_notify(
474             customized,
475             Action::Download,
476             group_id,
477             uid as u32,
478             group_progress.processed(),
479             group_progress.successful() as i32,
480             group_progress.failed() as i32,
481         ))
482     }
483 
get_request_id(&mut self, task_id: u32) -> NotifyType484     fn get_request_id(&mut self, task_id: u32) -> NotifyType {
485         if let Some(n_type) = self.notify_type_map.get(&task_id) {
486             return *n_type;
487         }
488         let n_type = match self.database.query_task_gid(task_id) {
489             Some(group_id) => NotifyType::Group(group_id),
490             None => NotifyType::Task,
491         };
492 
493         self.notify_type_map.insert(task_id, n_type);
494         n_type
495     }
496 
group_eventual_check( database: &NotificationDb, group_progress: &mut GroupProgress, group_id: u32, ) -> bool497     fn group_eventual_check(
498         database: &NotificationDb,
499         group_progress: &mut GroupProgress,
500         group_id: u32,
501     ) -> bool {
502         !database.attach_able(group_id) && group_progress.is_finish()
503     }
504 }
505 
506 #[cfg(test)]
507 mod ut_notify_flow {
508     include!("../../../tests/ut/service/notification_bar/ut_notify_flow.rs");
509 }
510