1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::future::Future; 15 use std::pin::Pin; 16 use std::sync::atomic::{AtomicBool, Ordering}; 17 use std::sync::Arc; 18 use std::task::{Context, Poll}; 19 use std::time::Duration; 20 21 use ylong_http_client::HttpClientError; 22 use ylong_runtime::io::AsyncWrite; 23 use ylong_runtime::time::{sleep, Sleep}; 24 25 use crate::manage::notifier::Notifier; 26 use crate::service::notification_bar::{NotificationDispatcher, NOTIFY_PROGRESS_INTERVAL}; 27 use crate::task::request_task::RequestTask; 28 use crate::utils::get_current_timestamp; 29 30 const SPEED_LIMIT_INTERVAL: u64 = 1000; 31 const FRONT_NOTIFY_INTERVAL: u64 = 1000; 32 33 pub(crate) struct TaskOperator { 34 pub(crate) sleep: Option<Pin<Box<Sleep>>>, 35 pub(crate) task: Arc<RequestTask>, 36 pub(crate) last_time: u64, 37 pub(crate) last_size: u64, 38 pub(crate) more_sleep_time: u64, 39 pub(crate) abort_flag: Arc<AtomicBool>, 40 } 41 42 impl TaskOperator { new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self43 pub(crate) fn new(task: Arc<RequestTask>, abort_flag: Arc<AtomicBool>) -> Self { 44 Self { 45 sleep: None, 46 task, 47 last_time: 0, 48 last_size: 0, 49 more_sleep_time: 0, 50 abort_flag, 51 } 52 } 53 poll_progress_common( &mut self, cx: &mut Context<'_>, ) -> Poll<Result<(), HttpClientError>>54 pub(crate) fn poll_progress_common( 55 &mut self, 56 cx: &mut Context<'_>, 57 ) -> Poll<Result<(), HttpClientError>> { 58 if self.abort_flag.load(Ordering::Acquire) { 59 return Poll::Ready(Err(HttpClientError::user_aborted())); 60 } 61 let current = get_current_timestamp(); 62 63 if current >= self.task.last_notify.load(Ordering::SeqCst) + FRONT_NOTIFY_INTERVAL { 64 let notify_data = self.task.build_notify_data(); 65 self.task.last_notify.store(current, Ordering::SeqCst); 66 Notifier::progress(&self.task.client_manager, notify_data); 67 } 68 69 if self.task.background_notify.load(Ordering::Acquire) 70 && current 71 > self.task.background_notify_time.load(Ordering::SeqCst) + NOTIFY_PROGRESS_INTERVAL 72 { 73 self.task 74 .background_notify_time 75 .store(current, Ordering::SeqCst); 76 NotificationDispatcher::get_instance().publish_progress_notification(&self.task); 77 } 78 79 let total_processed = self 80 .task 81 .progress 82 .lock() 83 .unwrap() 84 .common_data 85 .total_processed as u64; 86 87 self.sleep = None; 88 let speed_limit = self.task.rate_limiting.load(Ordering::SeqCst); 89 if speed_limit != 0 { 90 if self.more_sleep_time != 0 { 91 // wake up for notify, sleep until speed limit conditions are met 92 self.sleep = Some(Box::pin(sleep(Duration::from_millis(self.more_sleep_time)))); 93 self.more_sleep_time = 0; 94 } else if self.last_time == 0 { 95 // get the init time and size, for speed caculate 96 self.last_time = current; 97 self.last_size = total_processed; 98 } else if current - self.last_time < SPEED_LIMIT_INTERVAL 99 && ((total_processed - self.last_size) > speed_limit * SPEED_LIMIT_INTERVAL) 100 { 101 // sleep until notification is required or speed limit conditions are met 102 let limit_time = 103 (total_processed - self.last_size) / speed_limit - (current - self.last_time); 104 let notify_time = FRONT_NOTIFY_INTERVAL 105 - (current - self.task.last_notify.load(Ordering::SeqCst)); 106 let sleep_time = if limit_time > notify_time { 107 self.more_sleep_time = limit_time - notify_time; 108 notify_time 109 } else { 110 limit_time 111 }; 112 self.sleep = Some(Box::pin(sleep(Duration::from_millis(sleep_time)))); 113 } else if current - self.last_time >= SPEED_LIMIT_INTERVAL { 114 // last caculate window has meet speed limit, update last_time and last_size, 115 // for next poll's speed compare 116 self.last_time = current; 117 self.last_size = total_processed; 118 } 119 } 120 121 if self.sleep.is_some() { 122 match Pin::new(self.sleep.as_mut().unwrap()).poll(cx) { 123 Poll::Ready(_) => return Poll::Ready(Ok(())), 124 Poll::Pending => return Poll::Pending, 125 } 126 } 127 Poll::Ready(Ok(())) 128 } 129 poll_write_file( &self, cx: &mut Context<'_>, data: &[u8], skip_size: usize, ) -> Poll<Result<usize, HttpClientError>>130 pub(crate) fn poll_write_file( 131 &self, 132 cx: &mut Context<'_>, 133 data: &[u8], 134 skip_size: usize, 135 ) -> Poll<Result<usize, HttpClientError>> { 136 let file = self.task.files.get_mut(0).unwrap(); 137 let mut progress_guard = self.task.progress.lock().unwrap(); 138 match Pin::new(file).poll_write(cx, data) { 139 Poll::Ready(Ok(size)) => { 140 progress_guard.processed[0] += size; 141 progress_guard.common_data.total_processed += size; 142 Poll::Ready(Ok(size + skip_size)) 143 } 144 Poll::Pending => Poll::Pending, 145 Poll::Ready(Err(e)) => Poll::Ready(Err(HttpClientError::other(e))), 146 } 147 } 148 } 149