1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use crate::info::State;
15 use crate::service::client::ClientManagerEntry;
16 use crate::task::notify::{NotifyData, SubscribeType};
17
18 pub(crate) struct Notifier;
19
20 impl Notifier {
complete(client_manager: &ClientManagerEntry, notify_data: NotifyData)21 pub(crate) fn complete(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
22 #[cfg(feature = "oh")]
23 let _ = publish_state_change_event(
24 notify_data.bundle.as_str(),
25 notify_data.task_id,
26 State::Completed.repr as i32,
27 );
28 client_manager.send_notify_data(SubscribeType::Complete, notify_data)
29 }
30
fail(client_manager: &ClientManagerEntry, notify_data: NotifyData)31 pub(crate) fn fail(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
32 #[cfg(feature = "oh")]
33 let _ = publish_state_change_event(
34 notify_data.bundle.as_str(),
35 notify_data.task_id,
36 State::Failed.repr as i32,
37 );
38 client_manager.send_notify_data(SubscribeType::Fail, notify_data)
39 }
40
pause(client_manager: &ClientManagerEntry, notify_data: NotifyData)41 pub(crate) fn pause(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
42 client_manager.send_notify_data(SubscribeType::Pause, notify_data)
43 }
44
resume(client_manager: &ClientManagerEntry, notify_data: NotifyData)45 pub(crate) fn resume(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
46 client_manager.send_notify_data(SubscribeType::Resume, notify_data)
47 }
48
header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData)49 pub(crate) fn header_receive(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
50 client_manager.send_notify_data(SubscribeType::HeaderReceive, notify_data)
51 }
52
progress(client_manager: &ClientManagerEntry, notify_data: NotifyData)53 pub(crate) fn progress(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
54 let total_processed = notify_data.progress.common_data.total_processed;
55 let file_total_size: i64 = notify_data.progress.sizes.iter().sum();
56 if total_processed == 0 && file_total_size < 0 {
57 return;
58 }
59 client_manager.send_notify_data(SubscribeType::Progress, notify_data)
60 }
61
remove(client_manager: &ClientManagerEntry, notify_data: NotifyData)62 pub(crate) fn remove(client_manager: &ClientManagerEntry, notify_data: NotifyData) {
63 let task_id = notify_data.task_id;
64 client_manager.send_notify_data(SubscribeType::Remove, notify_data);
65 client_manager.notify_task_finished(task_id);
66 }
67 }
68
69 #[cfg(feature = "oh")]
publish_state_change_event( bundle_name: &str, task_id: u32, state: i32, ) -> Result<(), ()>70 pub(crate) fn publish_state_change_event(
71 bundle_name: &str,
72 task_id: u32,
73 state: i32,
74 ) -> Result<(), ()> {
75 match crate::utils::PublishStateChangeEvent(bundle_name, task_id, state) {
76 true => Ok(()),
77 false => Err(()),
78 }
79 }
80 #[allow(unused)]
81 #[cfg(test)]
82 mod test {
83 use std::fs::File;
84 use std::sync::Arc;
85 use std::time::Duration;
86
87 use cxx::UniquePtr;
88 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver};
89
90 use crate::config::{Action, ConfigBuilder, Mode};
91 use crate::error::ErrorCode;
92 use crate::info::{State, TaskInfo};
93 use crate::manage::database::RequestDb;
94 use crate::manage::events::{TaskEvent, TaskManagerEvent};
95 use crate::manage::network::{Network, NetworkInfo, NetworkInner, NetworkState, NetworkType};
96 use crate::manage::network_manager::NetworkManager;
97 use crate::manage::task_manager::{TaskManagerRx, TaskManagerTx};
98 use crate::manage::TaskManager;
99 use crate::service::client::{ClientEvent, ClientManager, ClientManagerEntry};
100 use crate::service::run_count::RunCountManagerEntry;
101 use crate::task::notify::SubscribeType;
102 use crate::task::reason::Reason;
103 use crate::tests::{lock_database, test_init};
104
105 const GITEE_FILE_LEN: usize = 1042003;
106
init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>)107 fn init_manager() -> (TaskManager, UnboundedReceiver<ClientEvent>) {
108 let (tx, rx) = unbounded_channel();
109 let task_manager_tx = TaskManagerTx::new(tx);
110 let rx = TaskManagerRx::new(rx);
111 {
112 let network_manager = NetworkManager::get_instance().lock().unwrap();
113 let notifier = network_manager.network.inner.clone();
114 notifier.notify_online(NetworkInfo {
115 network_type: NetworkType::Wifi,
116 is_metered: false,
117 is_roaming: false,
118 });
119 }
120 let (tx, _rx) = unbounded_channel();
121 let run_count = RunCountManagerEntry::new(tx);
122 let (tx, client_rx) = unbounded_channel();
123 let client = ClientManagerEntry::new(tx);
124 (
125 TaskManager::new(task_manager_tx, rx, run_count, client),
126 client_rx,
127 )
128 }
129
130 #[cfg(feature = "oh")]
131 #[test]
ut_network()132 fn ut_network() {
133 test_init();
134 let notifier;
135 {
136 let network_manager = NetworkManager::get_instance().lock().unwrap();
137 notifier = network_manager.network.inner.clone();
138 }
139
140 notifier.notify_online(NetworkInfo {
141 network_type: NetworkType::Wifi,
142 is_metered: false,
143 is_roaming: false,
144 });
145 assert!(NetworkManager::is_online());
146 assert_eq!(
147 NetworkManager::query_network(),
148 NetworkState::Online(NetworkInfo {
149 network_type: NetworkType::Wifi,
150 is_metered: false,
151 is_roaming: false,
152 })
153 );
154 notifier.notify_offline();
155 assert!(!NetworkManager::is_online());
156 notifier.notify_online(NetworkInfo {
157 network_type: NetworkType::Cellular,
158 is_metered: true,
159 is_roaming: true,
160 });
161 assert!(NetworkManager::is_online());
162 assert_eq!(
163 NetworkManager::query_network(),
164 NetworkState::Online(NetworkInfo {
165 network_type: NetworkType::Cellular,
166 is_metered: true,
167 is_roaming: true,
168 })
169 );
170 }
171
172 #[cfg(feature = "oh")]
173 #[test]
ut_network_notify()174 fn ut_network_notify() {
175 test_init();
176 let notifier = NetworkInner::new();
177 notifier.notify_offline();
178 assert!(notifier.notify_online(NetworkInfo {
179 network_type: NetworkType::Wifi,
180 is_metered: true,
181 is_roaming: true,
182 }));
183 assert!(!notifier.notify_online(NetworkInfo {
184 network_type: NetworkType::Wifi,
185 is_metered: true,
186 is_roaming: true,
187 }));
188 assert!(notifier.notify_online(NetworkInfo {
189 network_type: NetworkType::Wifi,
190 is_metered: false,
191 is_roaming: true,
192 }));
193 assert!(notifier.notify_online(NetworkInfo {
194 network_type: NetworkType::Cellular,
195 is_metered: false,
196 is_roaming: true,
197 }));
198 }
199
200 #[test]
ut_notify_progress()201 fn ut_notify_progress() {
202 test_init();
203 let _lock = lock_database();
204 let (mut manager, mut client_rx) = init_manager();
205
206 let file_path = "test_files/ut_notify_completed.txt";
207
208 let file = File::create(file_path).unwrap();
209 let config = ConfigBuilder::new()
210 .action(Action::Download)
211 .retry(true)
212 .mode(Mode::BackGround)
213 .file_spec(file)
214 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
215 .redirect(true)
216 .build();
217 let uid = config.common_data.uid;
218 let task_id = manager.create(config).unwrap();
219 manager.start(uid, task_id);
220 manager.scheduler.reschedule();
221 ylong_runtime::block_on(async {
222 let info = client_rx.recv().await.unwrap();
223 let ClientEvent::SendResponse(tid, version, status_code, reason, headers) = info else {
224 panic!("unexpected event: {:?}", info);
225 };
226 assert_eq!(tid, task_id);
227 assert_eq!(version, "HTTP/1.1");
228 assert_eq!(status_code, 200);
229 assert_eq!(reason, "OK");
230 assert!(!headers.is_empty());
231 loop {
232 let info = client_rx.recv().await.unwrap();
233 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
234 panic!("unexpected event: {:?}", info);
235 };
236 let mut previous = 0;
237 assert_eq!(subscribe_type, SubscribeType::Progress);
238 assert_eq!(data.task_id, task_id);
239 assert!(!data.progress.extras.is_empty());
240 assert_eq!(data.progress.common_data.state, State::Running.repr);
241 assert_eq!(data.progress.common_data.index, 0);
242 assert_eq!(
243 data.progress.processed[0],
244 data.progress.common_data.total_processed
245 );
246
247 assert!(data.progress.common_data.total_processed >= previous);
248 previous = data.progress.common_data.total_processed;
249 if data.progress.common_data.total_processed == GITEE_FILE_LEN {
250 break;
251 }
252 }
253 })
254 }
255
256 #[test]
ut_notify_pause_resume()257 fn ut_notify_pause_resume() {
258 test_init();
259 let _lock = lock_database();
260 let (mut manager, mut client_rx) = init_manager();
261
262 let file_path = "test_files/ut_notify";
263
264 let file = File::create(file_path).unwrap();
265 let config = ConfigBuilder::new()
266 .action(Action::Download)
267 .retry(true)
268 .mode(Mode::BackGround)
269 .file_spec(file)
270 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
271 .redirect(true)
272 .build();
273 let uid = config.common_data.uid;
274 let task_id = manager.create(config).unwrap();
275 manager.start(uid, task_id);
276 manager.pause(uid, task_id);
277 manager.resume(uid, task_id);
278 ylong_runtime::block_on(async {
279 let info = client_rx.recv().await.unwrap();
280 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
281 panic!("unexpected event: {:?}", info);
282 };
283 assert_eq!(subscribe_type, SubscribeType::Pause);
284 assert!(data.progress.extras.is_empty());
285 assert_eq!(data.progress.common_data.state, State::Paused.repr);
286 assert_eq!(data.progress.common_data.index, 0);
287 assert_eq!(
288 data.progress.processed[0],
289 data.progress.common_data.total_processed
290 );
291 assert_eq!(data.progress.common_data.total_processed, 0);
292 let info = client_rx.recv().await.unwrap();
293 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
294 panic!("unexpected event: {:?}", info);
295 };
296 assert_eq!(subscribe_type, SubscribeType::Resume);
297 assert!(data.progress.extras.is_empty());
298 assert_eq!(data.progress.common_data.state, State::Waiting.repr);
299 assert_eq!(data.progress.common_data.index, 0);
300 assert_eq!(
301 data.progress.processed[0],
302 data.progress.common_data.total_processed
303 );
304 assert_eq!(data.progress.common_data.total_processed, 0);
305 })
306 }
307
308 #[test]
ut_notify_remove()309 fn ut_notify_remove() {
310 test_init();
311 let _lock = lock_database();
312 let (mut manager, mut client_rx) = init_manager();
313
314 let file_path = "test_files/ut_notify";
315
316 let file = File::create(file_path).unwrap();
317 let config = ConfigBuilder::new()
318 .action(Action::Download)
319 .retry(true)
320 .mode(Mode::BackGround)
321 .file_spec(file)
322 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
323 .redirect(true)
324 .build();
325 let uid = config.common_data.uid;
326 let task_id = manager.create(config).unwrap();
327 manager.remove(uid, task_id);
328 ylong_runtime::block_on(async {
329 let info = client_rx.recv().await.unwrap();
330 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
331 panic!("unexpected event: {:?}", info);
332 };
333 assert_eq!(subscribe_type, SubscribeType::Remove);
334 assert!(data.progress.extras.is_empty());
335 assert_eq!(data.progress.common_data.state, State::Removed.repr);
336 assert_eq!(data.progress.common_data.index, 0);
337 assert_eq!(
338 data.progress.processed[0],
339 data.progress.common_data.total_processed
340 );
341 assert_eq!(data.progress.common_data.total_processed, 0);
342 })
343 }
344
345 #[test]
ut_notify_completed()346 fn ut_notify_completed() {
347 test_init();
348 let _lock = lock_database();
349 let (mut manager, mut client_rx) = init_manager();
350
351 let file_path = "test_files/ut_notify";
352
353 let file = File::create(file_path).unwrap();
354 let config = ConfigBuilder::new()
355 .action(Action::Download)
356 .retry(true)
357 .mode(Mode::BackGround)
358 .file_spec(file)
359 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
360 .redirect(true)
361 .build();
362 let uid = config.common_data.uid;
363 let task_id = manager.create(config).unwrap();
364 manager.start(uid, task_id);
365 manager.scheduler.task_completed(uid, task_id);
366 ylong_runtime::block_on(async {
367 let info = client_rx.recv().await.unwrap();
368 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
369 panic!("unexpected event: {:?}", info);
370 };
371 assert_eq!(subscribe_type, SubscribeType::Complete);
372 assert!(data.progress.extras.is_empty());
373 assert_eq!(data.progress.common_data.state, State::Completed.repr);
374 assert_eq!(data.progress.common_data.index, 0);
375 assert_eq!(
376 data.progress.processed[0],
377 data.progress.common_data.total_processed
378 );
379 assert_eq!(data.progress.common_data.total_processed, 0);
380 })
381 }
382
383 #[test]
ut_notify_failed()384 fn ut_notify_failed() {
385 test_init();
386 let _lock = lock_database();
387 let (mut manager, mut client_rx) = init_manager();
388
389 let file_path = "test_files/ut_notify";
390
391 let file = File::create(file_path).unwrap();
392 let config = ConfigBuilder::new()
393 .action(Action::Download)
394 .retry(true)
395 .mode(Mode::BackGround)
396 .file_spec(file)
397 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
398 .redirect(true)
399 .build();
400 let uid = config.common_data.uid;
401 let task_id = manager.create(config).unwrap();
402 manager.start(uid, task_id);
403 manager.scheduler.task_failed(uid, task_id, Reason::IoError);
404 ylong_runtime::block_on(async {
405 let info = client_rx.recv().await.unwrap();
406 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
407 panic!("unexpected event: {:?}", info);
408 };
409 assert_eq!(subscribe_type, SubscribeType::Fail);
410 assert!(data.progress.extras.is_empty());
411 assert_eq!(data.progress.common_data.state, State::Failed.repr);
412 assert_eq!(data.progress.common_data.index, 0);
413 assert_eq!(
414 data.progress.processed[0],
415 data.progress.common_data.total_processed
416 );
417 assert_eq!(data.progress.common_data.total_processed, 0);
418 })
419 }
420
421 #[test]
ut_notify_pause_resume_completed()422 fn ut_notify_pause_resume_completed() {
423 test_init();
424 let _lock = lock_database();
425 let (mut manager, mut client_rx) = init_manager();
426
427 let file_path = "test_files/ut_notify";
428
429 let file = File::create(file_path).unwrap();
430 let config = ConfigBuilder::new()
431 .action(Action::Download)
432 .retry(true)
433 .mode(Mode::BackGround)
434 .file_spec(file)
435 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
436 .redirect(true)
437 .build();
438 let uid = config.common_data.uid;
439 let task_id = manager.create(config).unwrap();
440 manager.start(uid, task_id);
441 manager.pause(uid, task_id);
442 manager.scheduler.task_completed(uid, task_id);
443 manager.resume(uid, task_id);
444 ylong_runtime::block_on(async {
445 let info = client_rx.recv().await.unwrap();
446 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
447 panic!("unexpected event: {:?}", info);
448 };
449 assert_eq!(subscribe_type, SubscribeType::Pause);
450 let info = client_rx.recv().await.unwrap();
451 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
452 panic!("unexpected event: {:?}", info);
453 };
454 assert_eq!(subscribe_type, SubscribeType::Resume);
455 assert!(client_rx.is_empty());
456 })
457 }
458
459 #[test]
ut_notify_pause_resume_failed()460 fn ut_notify_pause_resume_failed() {
461 test_init();
462 let _lock = lock_database();
463 let (mut manager, mut client_rx) = init_manager();
464
465 let file_path = "test_files/ut_notify";
466
467 let file = File::create(file_path).unwrap();
468 let config = ConfigBuilder::new()
469 .action(Action::Download)
470 .retry(true)
471 .mode(Mode::BackGround)
472 .file_spec(file)
473 .url("https://www.gitee.com/tiga-ultraman/downloadTests/releases/download/v1.01/test.txt")
474 .redirect(true)
475 .build();
476 let uid = config.common_data.uid;
477 let task_id = manager.create(config).unwrap();
478 manager.start(uid, task_id);
479 manager.pause(uid, task_id);
480 manager.scheduler.task_failed(uid, task_id, Reason::IoError);
481 manager.resume(uid, task_id);
482 ylong_runtime::block_on(async {
483 let info = client_rx.recv().await.unwrap();
484 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
485 panic!("unexpected event: {:?}", info);
486 };
487 assert_eq!(subscribe_type, SubscribeType::Pause);
488 let info = client_rx.recv().await.unwrap();
489 let ClientEvent::SendNotifyData(subscribe_type, data) = info else {
490 panic!("unexpected event: {:?}", info);
491 };
492 assert_eq!(subscribe_type, SubscribeType::Resume);
493 assert!(client_rx.is_empty());
494 })
495 }
496 }
497