• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use crate::info::State;
15 use crate::service::client::ClientManagerEntry;
16 use crate::task::notify::{NotifyData, SubscribeType};
17 
18 pub(crate) struct Notifier;
19 
20 impl Notifier {
complete(client_manager: &ClientManagerEntry, notify_data: NotifyData)21     pub(crate) fn complete(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
22         #[cfg(feature = "oh")]
23         let _ = publish_state_change_event(
24             notify_data.bundle.as_str(),
25             notify_data.task_id,
26             State::Completed.repr as i32,
27         );
28         client_manager.send_notify_data(SubscribeType::Complete, notify_data)
29     }
30 
fail(client_manager: &ClientManagerEntry, notify_data: NotifyData)31     pub(crate) fn fail(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
32         #[cfg(feature = "oh")]
33         let _ = publish_state_change_event(
34             notify_data.bundle.as_str(),
35             notify_data.task_id,
36             State::Failed.repr as i32,
37         );
38         client_manager.send_notify_data(SubscribeType::Fail, notify_data)
39     }
40 
pause(client_manager: &ClientManagerEntry, notify_data: NotifyData)41     pub(crate) fn pause(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
42         client_manager.send_notify_data(SubscribeType::Pause, notify_data)
43     }
44 
resume(client_manager: &ClientManagerEntry, notify_data: NotifyData)45     pub(crate) fn resume(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
46         client_manager.send_notify_data(SubscribeType::Resume, notify_data)
47     }
48 
header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData)49     pub(crate) fn header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
50         client_manager.send_notify_data(SubscribeType::HeaderReceive, notify_data)
51     }
52 
progress(client_manager: &ClientManagerEntry, notify_data: NotifyData)53     pub(crate) fn progress(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
54         let total_processed = notify_data.progress.common_data.total_processed;
55         let file_total_size: i64 = notify_data.progress.sizes.iter().sum();
56         if total_processed == 0 && file_total_size < 0 {
57             return;
58         }
59         client_manager.send_notify_data(SubscribeType::Progress, notify_data)
60     }
61 
remove(client_manager: &ClientManagerEntry, notify_data: NotifyData)62     pub(crate) fn remove(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
63         let task_id = notify_data.task_id;
64         client_manager.send_notify_data(SubscribeType::Remove, notify_data);
65         client_manager.notify_task_finished(task_id);
66     }
67 }
68 
69 #[cfg(feature = "oh")]
publish_state_change_event( bundle_name: &str, task_id: u32, state: i32, ) -> Result<(), ()>70 pub(crate) fn publish_state_change_event(
71     bundle_name: &str,
72     task_id: u32,
73     state: i32,
74 ) -> Result<(), ()> {
75     match crate::utils::PublishStateChangeEvent(bundle_name, task_id, state) {
76         true => Ok(()),
77         false => Err(()),
78     }
79 }
80 #[allow(unused)]
81 #[cfg(test)]
82 mod test {
83     use std::fs::File;
84     use std::sync::Arc;
85     use std::time::Duration;
86 
87     use cxx::UniquePtr;
88     use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver};
89 
90     use crate::config::{Action, ConfigBuilder, Mode};
91     use crate::error::ErrorCode;
92     use crate::info::{State, TaskInfo};
93     use crate::manage::database::RequestDb;
94     use crate::manage::events::{TaskEvent, TaskManagerEvent};
95     use crate::manage::network::{Network, NetworkInfo, NetworkInner, NetworkState, NetworkType};
96     use crate::manage::network_manager::NetworkManager;
97     use crate::manage::task_manager::{TaskManagerRx, TaskManagerTx};
98     use crate::manage::TaskManager;
99     use crate::service::client::{ClientEvent, ClientManager, ClientManagerEntry};
100     use crate::service::run_count::RunCountManagerEntry;
101     use crate::task::notify::SubscribeType;
102     use crate::task::reason::Reason;
103     use crate::tests::{lock_database, test_init};
104 
105     const GITEE_FILE_LEN: usize = 1042003;
106 
init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>)107     fn init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>) {
108         let (tx, rx) = unbounded_channel();
109         let task_manager_tx = TaskManagerTx::new(tx);
110         let rx = TaskManagerRx::new(rx);
111         {
112             let network_manager = NetworkManager::get_instance().lock().unwrap();
113             let notifier = network_manager.network.inner.clone();
114             notifier.notify_online(NetworkInfo {
115                 network_type: NetworkType::Wifi,
116                 is_metered: false,
117                 is_roaming: false,
118             });
119         }
120         let (tx, _rx) = unbounded_channel();
121         let run_count = RunCountManagerEntry::new(tx);
122         let (tx, client_rx) = unbounded_channel();
123         let client = ClientManagerEntry::new(tx);
124         (
125             TaskManager::new(task_manager_tx, rx, run_count, client),
126             client_rx,
127         )
128     }
129 
130     #[cfg(feature = "oh")]
131     #[test]
ut_network()132     fn ut_network() {
133         test_init();
134         let notifier;
135         {
136             let network_manager = NetworkManager::get_instance().lock().unwrap();
137             notifier = network_manager.network.inner.clone();
138         }
139 
140         notifier.notify_online(NetworkInfo {
141             network_type: NetworkType::Wifi,
142             is_metered: false,
143             is_roaming: false,
144         });
145         assert!(NetworkManager::is_online());
146         assert_eq!(
147             NetworkManager::query_network(),
148             NetworkState::Online(NetworkInfo {
149                 network_type: NetworkType::Wifi,
150                 is_metered: false,
151                 is_roaming: false,
152             })
153         );
154         notifier.notify_offline();
155         assert!(!NetworkManager::is_online());
156         notifier.notify_online(NetworkInfo {
157             network_type: NetworkType::Cellular,
158             is_metered: true,
159             is_roaming: true,
160         });
161         assert!(NetworkManager::is_online());
162         assert_eq!(
163             NetworkManager::query_network(),
164             NetworkState::Online(NetworkInfo {
165                 network_type: NetworkType::Cellular,
166                 is_metered: true,
167                 is_roaming: true,
168             })
169         );
170     }
171 
172     #[cfg(feature = "oh")]
173     #[test]
ut_network_notify()174     fn ut_network_notify() {
175         test_init();
176         let notifier = NetworkInner::new();
177         notifier.notify_offline();
178         assert!(notifier.notify_online(NetworkInfo {
179             network_type: NetworkType::Wifi,
180             is_metered: true,
181             is_roaming: true,
182         }));
183         assert!(!notifier.notify_online(NetworkInfo {
184             network_type: NetworkType::Wifi,
185             is_metered: true,
186             is_roaming: true,
187         }));
188         assert!(notifier.notify_online(NetworkInfo {
189             network_type: NetworkType::Wifi,
190             is_metered: false,
191             is_roaming: true,
192         }));
193         assert!(notifier.notify_online(NetworkInfo {
194             network_type: NetworkType::Cellular,
195             is_metered: false,
196             is_roaming: true,
197         }));
198     }
199 
200     #[test]
ut_notify_progress()201     fn ut_notify_progress() {
202         test_init();
203         let _lock = lock_database();
204         let (mut manager, mut client_rx) = init_manager();
205 
206         let file_path = "test_files/ut_notify_completed.txt";
207 
208         let file = File::create(file_path).unwrap();
209         let config = ConfigBuilder::new()
210         .action(Action::Download)
211         .retry(true)
212         .mode(Mode::BackGround)
213         .file_spec(file)
214         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
215         .redirect(true)
216         .build();
217         let uid = config.common_data.uid;
218         let task_id = manager.create(config).unwrap();
219         manager.start(uid, task_id);
220         manager.scheduler.reschedule();
221         ylong_runtime::block_on(async {
222             let info = client_rx.recv().await.unwrap();
223             let ClientEvent::SendResponse(tid, version, status_code, reason, headers) = info else {
224                 panic!("unexpected event: {:?}", info);
225             };
226             assert_eq!(tid, task_id);
227             assert_eq!(version, "HTTP/1.1");
228             assert_eq!(status_code, 200);
229             assert_eq!(reason, "OK");
230             assert!(!headers.is_empty());
231             loop {
232                 let info = client_rx.recv().await.unwrap();
233                 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
234                     panic!("unexpected event: {:?}", info);
235                 };
236                 let mut previous = 0;
237                 assert_eq!(subscribe_type, SubscribeType::Progress);
238                 assert_eq!(data.task_id, task_id);
239                 assert!(!data.progress.extras.is_empty());
240                 assert_eq!(data.progress.common_data.state, State::Running.repr);
241                 assert_eq!(data.progress.common_data.index, 0);
242                 assert_eq!(
243                     data.progress.processed[0],
244                     data.progress.common_data.total_processed
245                 );
246 
247                 assert!(data.progress.common_data.total_processed >= previous);
248                 previous = data.progress.common_data.total_processed;
249                 if data.progress.common_data.total_processed == GITEE_FILE_LEN {
250                     break;
251                 }
252             }
253         })
254     }
255 
256     #[test]
ut_notify_pause_resume()257     fn ut_notify_pause_resume() {
258         test_init();
259         let _lock = lock_database();
260         let (mut manager, mut client_rx) = init_manager();
261 
262         let file_path = "test_files/ut_notify";
263 
264         let file = File::create(file_path).unwrap();
265         let config = ConfigBuilder::new()
266         .action(Action::Download)
267         .retry(true)
268         .mode(Mode::BackGround)
269         .file_spec(file)
270         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
271         .redirect(true)
272         .build();
273         let uid = config.common_data.uid;
274         let task_id = manager.create(config).unwrap();
275         manager.start(uid, task_id);
276         manager.pause(uid, task_id);
277         manager.resume(uid, task_id);
278         ylong_runtime::block_on(async {
279             let info = client_rx.recv().await.unwrap();
280             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
281                 panic!("unexpected event: {:?}", info);
282             };
283             assert_eq!(subscribe_type, SubscribeType::Pause);
284             assert!(data.progress.extras.is_empty());
285             assert_eq!(data.progress.common_data.state, State::Paused.repr);
286             assert_eq!(data.progress.common_data.index, 0);
287             assert_eq!(
288                 data.progress.processed[0],
289                 data.progress.common_data.total_processed
290             );
291             assert_eq!(data.progress.common_data.total_processed, 0);
292             let info = client_rx.recv().await.unwrap();
293             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
294                 panic!("unexpected event: {:?}", info);
295             };
296             assert_eq!(subscribe_type, SubscribeType::Resume);
297             assert!(data.progress.extras.is_empty());
298             assert_eq!(data.progress.common_data.state, State::Waiting.repr);
299             assert_eq!(data.progress.common_data.index, 0);
300             assert_eq!(
301                 data.progress.processed[0],
302                 data.progress.common_data.total_processed
303             );
304             assert_eq!(data.progress.common_data.total_processed, 0);
305         })
306     }
307 
308     #[test]
ut_notify_remove()309     fn ut_notify_remove() {
310         test_init();
311         let _lock = lock_database();
312         let (mut manager, mut client_rx) = init_manager();
313 
314         let file_path = "test_files/ut_notify";
315 
316         let file = File::create(file_path).unwrap();
317         let config = ConfigBuilder::new()
318         .action(Action::Download)
319         .retry(true)
320         .mode(Mode::BackGround)
321         .file_spec(file)
322         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
323         .redirect(true)
324         .build();
325         let uid = config.common_data.uid;
326         let task_id = manager.create(config).unwrap();
327         manager.remove(uid, task_id);
328         ylong_runtime::block_on(async {
329             let info = client_rx.recv().await.unwrap();
330             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
331                 panic!("unexpected event: {:?}", info);
332             };
333             assert_eq!(subscribe_type, SubscribeType::Remove);
334             assert!(data.progress.extras.is_empty());
335             assert_eq!(data.progress.common_data.state, State::Removed.repr);
336             assert_eq!(data.progress.common_data.index, 0);
337             assert_eq!(
338                 data.progress.processed[0],
339                 data.progress.common_data.total_processed
340             );
341             assert_eq!(data.progress.common_data.total_processed, 0);
342         })
343     }
344 
345     #[test]
ut_notify_completed()346     fn ut_notify_completed() {
347         test_init();
348         let _lock = lock_database();
349         let (mut manager, mut client_rx) = init_manager();
350 
351         let file_path = "test_files/ut_notify";
352 
353         let file = File::create(file_path).unwrap();
354         let config = ConfigBuilder::new()
355         .action(Action::Download)
356         .retry(true)
357         .mode(Mode::BackGround)
358         .file_spec(file)
359         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
360         .redirect(true)
361         .build();
362         let uid = config.common_data.uid;
363         let task_id = manager.create(config).unwrap();
364         manager.start(uid, task_id);
365         manager.scheduler.task_completed(uid, task_id);
366         ylong_runtime::block_on(async {
367             let info = client_rx.recv().await.unwrap();
368             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
369                 panic!("unexpected event: {:?}", info);
370             };
371             assert_eq!(subscribe_type, SubscribeType::Complete);
372             assert!(data.progress.extras.is_empty());
373             assert_eq!(data.progress.common_data.state, State::Completed.repr);
374             assert_eq!(data.progress.common_data.index, 0);
375             assert_eq!(
376                 data.progress.processed[0],
377                 data.progress.common_data.total_processed
378             );
379             assert_eq!(data.progress.common_data.total_processed, 0);
380         })
381     }
382 
383     #[test]
ut_notify_failed()384     fn ut_notify_failed() {
385         test_init();
386         let _lock = lock_database();
387         let (mut manager, mut client_rx) = init_manager();
388 
389         let file_path = "test_files/ut_notify";
390 
391         let file = File::create(file_path).unwrap();
392         let config = ConfigBuilder::new()
393         .action(Action::Download)
394         .retry(true)
395         .mode(Mode::BackGround)
396         .file_spec(file)
397         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
398         .redirect(true)
399         .build();
400         let uid = config.common_data.uid;
401         let task_id = manager.create(config).unwrap();
402         manager.start(uid, task_id);
403         manager.scheduler.task_failed(uid, task_id, Reason::IoError);
404         ylong_runtime::block_on(async {
405             let info = client_rx.recv().await.unwrap();
406             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
407                 panic!("unexpected event: {:?}", info);
408             };
409             assert_eq!(subscribe_type, SubscribeType::Fail);
410             assert!(data.progress.extras.is_empty());
411             assert_eq!(data.progress.common_data.state, State::Failed.repr);
412             assert_eq!(data.progress.common_data.index, 0);
413             assert_eq!(
414                 data.progress.processed[0],
415                 data.progress.common_data.total_processed
416             );
417             assert_eq!(data.progress.common_data.total_processed, 0);
418         })
419     }
420 
421     #[test]
ut_notify_pause_resume_completed()422     fn ut_notify_pause_resume_completed() {
423         test_init();
424         let _lock = lock_database();
425         let (mut manager, mut client_rx) = init_manager();
426 
427         let file_path = "test_files/ut_notify";
428 
429         let file = File::create(file_path).unwrap();
430         let config = ConfigBuilder::new()
431         .action(Action::Download)
432         .retry(true)
433         .mode(Mode::BackGround)
434         .file_spec(file)
435         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
436         .redirect(true)
437         .build();
438         let uid = config.common_data.uid;
439         let task_id = manager.create(config).unwrap();
440         manager.start(uid, task_id);
441         manager.pause(uid, task_id);
442         manager.scheduler.task_completed(uid, task_id);
443         manager.resume(uid, task_id);
444         ylong_runtime::block_on(async {
445             let info = client_rx.recv().await.unwrap();
446             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
447                 panic!("unexpected event: {:?}", info);
448             };
449             assert_eq!(subscribe_type, SubscribeType::Pause);
450             let info = client_rx.recv().await.unwrap();
451             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
452                 panic!("unexpected event: {:?}", info);
453             };
454             assert_eq!(subscribe_type, SubscribeType::Resume);
455             assert!(client_rx.is_empty());
456         })
457     }
458 
459     #[test]
ut_notify_pause_resume_failed()460     fn ut_notify_pause_resume_failed() {
461         test_init();
462         let _lock = lock_database();
463         let (mut manager, mut client_rx) = init_manager();
464 
465         let file_path = "test_files/ut_notify";
466 
467         let file = File::create(file_path).unwrap();
468         let config = ConfigBuilder::new()
469         .action(Action::Download)
470         .retry(true)
471         .mode(Mode::BackGround)
472         .file_spec(file)
473         .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
474         .redirect(true)
475         .build();
476         let uid = config.common_data.uid;
477         let task_id = manager.create(config).unwrap();
478         manager.start(uid, task_id);
479         manager.pause(uid, task_id);
480         manager.scheduler.task_failed(uid, task_id, Reason::IoError);
481         manager.resume(uid, task_id);
482         ylong_runtime::block_on(async {
483             let info = client_rx.recv().await.unwrap();
484             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
485                 panic!("unexpected event: {:?}", info);
486             };
487             assert_eq!(subscribe_type, SubscribeType::Pause);
488             let info = client_rx.recv().await.unwrap();
489             let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
490                 panic!("unexpected event: {:?}", info);
491             };
492             assert_eq!(subscribe_type, SubscribeType::Resume);
493             assert!(client_rx.is_empty());
494         })
495     }
496 }
497