• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::pin::Pin;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 use std::task::{Context, Poll};
19 use std::time::Duration;
20 
21 use ylong_http_client::HttpClientError;
22 use ylong_runtime::io::AsyncWrite;
23 use ylong_runtime::time::{sleep, Sleep};
24 
25 use crate::manage::notifier::Notifier;
26 use crate::service::notification_bar::{NotificationDispatcher, NOTIFY_PROGRESS_INTERVAL};
27 use crate::task::request_task::RequestTask;
28 use crate::utils::get_current_timestamp;
29 
30 const SPEED_LIMIT_INTERVAL: u64 = 1000;
31 const FRONT_NOTIFY_INTERVAL: u64 = 1000;
32 
33 pub(crate) struct TaskOperator {
34     pub(crate) sleep: Option<Pin<Box<Sleep>>>,
35     pub(crate) task: Arc<RequestTask>,
36     pub(crate) last_time: u64,
37     pub(crate) last_size: u64,
38     pub(crate) more_sleep_time: u64,
39     pub(crate) abort_flag: Arc<AtomicBool>,
40 }
41 
42 impl TaskOperator {
new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self43     pub(crate) fn new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self {
44         Self {
45             sleep: None,
46             task,
47             last_time: 0,
48             last_size: 0,
49             more_sleep_time: 0,
50             abort_flag,
51         }
52     }
53 
poll_progress_common( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), HttpClientError>>54     pub(crate) fn poll_progress_common(
55         &mut self,
56         cx: &mut Context<'_>,
57     ) -> Poll<Result<(), HttpClientError>> {
58         if self.abort_flag.load(Ordering::Acquire) {
59             return Poll::Ready(Err(HttpClientError::user_aborted()));
60         }
61         let current = get_current_timestamp();
62 
63         if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL {
64             let notify_data = self.task.build_notify_data();
65             self.task.last_notify.store(current, Ordering::SeqCst);
66             Notifier::progress(&self.task.client_manager, notify_data);
67         }
68 
69         if self.task.background_notify.load(Ordering::Acquire)
70             && current
71                 > self.task.background_notify_time.load(Ordering::SeqCst) + NOTIFY_PROGRESS_INTERVAL
72         {
73             self.task
74                 .background_notify_time
75                 .store(current, Ordering::SeqCst);
76             NotificationDispatcher::get_instance().publish_progress_notification(&self.task);
77         }
78 
79         let total_processed = self
80             .task
81             .progress
82             .lock()
83             .unwrap()
84             .common_data
85             .total_processed as u64;
86 
87         self.sleep = None;
88         let speed_limit = self.task.rate_limiting.load(Ordering::SeqCst);
89         if speed_limit != 0 {
90             if self.more_sleep_time != 0 {
91                 // wake up for notify, sleep until speed limit conditions are met
92                 self.sleep = Some(Box::pin(sleep(Duration::from_millis(self.more_sleep_time))));
93                 self.more_sleep_time = 0;
94             } else if self.last_time == 0 {
95                 // get the init time and size, for speed caculate
96                 self.last_time = current;
97                 self.last_size = total_processed;
98             } else if current - self.last_time < SPEED_LIMIT_INTERVAL
99                 && ((total_processed - self.last_size) > speed_limit * SPEED_LIMIT_INTERVAL)
100             {
101                 // sleep until notification is required or speed limit conditions are met
102                 let limit_time =
103                     (total_processed - self.last_size) / speed_limit - (current - self.last_time);
104                 let notify_time = FRONT_NOTIFY_INTERVAL
105                     - (current - self.task.last_notify.load(Ordering::SeqCst));
106                 let sleep_time = if limit_time > notify_time {
107                     self.more_sleep_time = limit_time - notify_time;
108                     notify_time
109                 } else {
110                     limit_time
111                 };
112                 self.sleep = Some(Box::pin(sleep(Duration::from_millis(sleep_time))));
113             } else if current - self.last_time >= SPEED_LIMIT_INTERVAL {
114                 // last caculate window has meet speed limit, update last_time and last_size,
115                 // for next poll's speed compare
116                 self.last_time = current;
117                 self.last_size = total_processed;
118             }
119         }
120 
121         if self.sleep.is_some() {
122             match Pin::new(self.sleep.as_mut().unwrap()).poll(cx) {
123                 Poll::Ready(_) => return Poll::Ready(Ok(())),
124                 Poll::Pending => return Poll::Pending,
125             }
126         }
127         Poll::Ready(Ok(()))
128     }
129 
poll_write_file( &self, cx: &mut Context<'_>, data: &[u8], skip_size: usize, ) -> Poll<Result<usize, HttpClientError>>130     pub(crate) fn poll_write_file(
131         &self,
132         cx: &mut Context<'_>,
133         data: &[u8],
134         skip_size: usize,
135     ) -> Poll<Result<usize, HttpClientError>> {
136         let file = self.task.files.get_mut(0).unwrap();
137         let mut progress_guard = self.task.progress.lock().unwrap();
138         match Pin::new(file).poll_write(cx, data) {
139             Poll::Ready(Ok(size)) => {
140                 progress_guard.processed[0] += size;
141                 progress_guard.common_data.total_processed += size;
142                 Poll::Ready(Ok(size + skip_size))
143             }
144             Poll::Pending => Poll::Pending,
145             Poll::Ready(Err(e)) => Poll::Ready(Err(HttpClientError::other(e))),
146         }
147     }
148 }
149