• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod keeper;
15 mod running_task;
16 use std::collections::{HashMap, HashSet};
17 use std::sync::atomic::{AtomicBool, Ordering};
18 use std::sync::Arc;
19 
20 use keeper::SAKeeper;
21 
22 cfg_oh! {
23     use crate::ability::SYSTEM_CONFIG_MANAGER;
24 }
25 use ylong_runtime::task::JoinHandle;
26 
27 use crate::config::Mode;
28 use crate::error::ErrorCode;
29 use crate::manage::database::RequestDb;
30 use crate::manage::events::{TaskEvent, TaskManagerEvent};
31 use crate::manage::scheduler::qos::{QosChanges, QosDirection};
32 use crate::manage::scheduler::queue::running_task::RunningTask;
33 use crate::manage::task_manager::TaskManagerTx;
34 use crate::service::client::ClientManagerEntry;
35 use crate::service::run_count::RunCountManagerEntry;
36 use crate::task::config::Action;
37 use crate::task::info::State;
38 use crate::task::reason::Reason;
39 use crate::task::request_task::RequestTask;
40 use crate::utils::runtime_spawn;
41 
42 pub(crate) struct RunningQueue {
43     download_queue: HashMap<(u64, u32), Arc<RequestTask>>,
44     upload_queue: HashMap<(u64, u32), Arc<RequestTask>>,
45     running_tasks: HashMap<(u64, u32), Option<AbortHandle>>,
46     keeper: SAKeeper,
47     tx: TaskManagerTx,
48     run_count_manager: RunCountManagerEntry,
49     client_manager: ClientManagerEntry,
50     // paused and then resume upload task need to upload from the breakpoint
51     pub(crate) upload_resume: HashSet<u32>,
52 }
53 
54 impl RunningQueue {
new( tx: TaskManagerTx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Self55     pub(crate) fn new(
56         tx: TaskManagerTx,
57         run_count_manager: RunCountManagerEntry,
58         client_manager: ClientManagerEntry,
59     ) -> Self {
60         Self {
61             download_queue: HashMap::new(),
62             upload_queue: HashMap::new(),
63             keeper: SAKeeper::new(tx.clone()),
64             tx,
65             running_tasks: HashMap::new(),
66             run_count_manager,
67             client_manager,
68             upload_resume: HashSet::new(),
69         }
70     }
71 
get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>72     pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> {
73         self.download_queue
74             .get(&(uid, task_id))
75             .or_else(|| self.upload_queue.get(&(uid, task_id)))
76     }
77 
task_finish(&mut self, uid: u64, task_id: u32)78     pub(crate) fn task_finish(&mut self, uid: u64, task_id: u32) {
79         self.running_tasks.remove(&(uid, task_id));
80     }
81 
try_restart(&mut self, uid: u64, task_id: u32) -> bool82     pub(crate) fn try_restart(&mut self, uid: u64, task_id: u32) -> bool {
83         if let Some(task) = self
84             .download_queue
85             .get(&(uid, task_id))
86             .or(self.upload_queue.get(&(uid, task_id)))
87         {
88             if self.running_tasks.contains_key(&(uid, task_id)) {
89                 return false;
90             }
91             info!("{} restart running", task_id);
92             let running_task = RunningTask::new(task.clone(), self.tx.clone(), self.keeper.clone());
93             let abort_flag = Arc::new(AtomicBool::new(false));
94             let abort_flag_clone = abort_flag.clone();
95             let join_handle = runtime_spawn(async move {
96                 running_task.run(abort_flag_clone.clone()).await;
97             });
98             let uid = task.uid();
99             let task_id = task.task_id();
100             self.running_tasks.insert(
101                 (uid, task_id),
102                 Some(AbortHandle::new(abort_flag, join_handle)),
103             );
104             true
105         } else {
106             false
107         }
108     }
109 
tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>110     pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> {
111         self.download_queue
112             .values()
113             .chain(self.upload_queue.values())
114     }
115 
running_tasks(&self) -> usize116     pub(crate) fn running_tasks(&self) -> usize {
117         self.download_queue.len() + self.upload_queue.len()
118     }
119 
dump_tasks(&self)120     pub(crate) fn dump_tasks(&self) {
121         info!("dump all running {}", self.running_tasks());
122 
123         for ((uid, task_id), task) in self.download_queue.iter().chain(self.upload_queue.iter()) {
124             let task_status = task.status.lock().unwrap();
125             info!(
126                 "dump task {}, uid {}, action {}, mode {}, bundle {}, status {:?}",
127                 task_id,
128                 uid,
129                 task.action().repr,
130                 task.mode().repr,
131                 task.bundle(),
132                 *task_status
133             );
134         }
135     }
136 
reschedule(&mut self, qos: QosChanges, qos_remove_queue: &mut Vec<(u64, u32)>)137     pub(crate) fn reschedule(&mut self, qos: QosChanges, qos_remove_queue: &mut Vec<(u64, u32)>) {
138         if let Some(vec) = qos.download {
139             self.reschedule_inner(Action::Download, vec, qos_remove_queue)
140         }
141         if let Some(vec) = qos.upload {
142             self.reschedule_inner(Action::Upload, vec, qos_remove_queue)
143         }
144     }
145 
reschedule_inner( &mut self, action: Action, qos_vec: Vec<QosDirection>, qos_remove_queue: &mut Vec<(u64, u32)>, )146     pub(crate) fn reschedule_inner(
147         &mut self,
148         action: Action,
149         qos_vec: Vec<QosDirection>,
150         qos_remove_queue: &mut Vec<(u64, u32)>,
151     ) {
152         let mut new_queue = HashMap::new();
153 
154         let queue = if action == Action::Download {
155             &mut self.download_queue
156         } else {
157             &mut self.upload_queue
158         };
159 
160         // We need to decide which tasks need to continue running based on `QosChanges`.
161         for qos_direction in qos_vec.iter() {
162             let uid = qos_direction.uid();
163             let task_id = qos_direction.task_id();
164 
165             if let Some(task) = queue.remove(&(uid, task_id)) {
166                 // If we can find that the task is running in `running_tasks`,
167                 // we just need to adjust its rate.
168                 task.speed_limit(qos_direction.direction() as u64);
169                 // Then we put it into `satisfied_tasks`.
170                 new_queue.insert((uid, task_id), task);
171                 continue;
172             }
173 
174             // If the task is not in the current running queue, retrieve
175             // the corresponding task from the database and start it.
176 
177             #[cfg(feature = "oh")]
178             let system_config = unsafe { SYSTEM_CONFIG_MANAGER.assume_init_ref().system_config() };
179             let upload_resume = self.upload_resume.remove(&task_id);
180 
181             let task = match RequestDb::get_instance().get_task(
182                 task_id,
183                 #[cfg(feature = "oh")]
184                 system_config,
185                 &self.client_manager,
186                 upload_resume,
187             ) {
188                 Ok(task) => task,
189                 Err(ErrorCode::TaskNotFound) => continue, // If we cannot find the task, skip it.
190                 Err(ErrorCode::TaskStateErr) => continue, // If we cannot find the task, skip it.
191                 Err(e) => {
192                     info!("get task {} error:{:?}", task_id, e);
193                     if let Some(info) = RequestDb::get_instance().get_task_qos_info(task_id) {
194                         self.tx.send_event(TaskManagerEvent::Task(TaskEvent::Failed(
195                             task_id,
196                             uid,
197                             Reason::OthersError,
198                             Mode::from(info.mode),
199                         )));
200                     }
201                     qos_remove_queue.push((uid, task_id));
202                     continue;
203                 }
204             };
205             task.speed_limit(qos_direction.direction() as u64);
206 
207             new_queue.insert((uid, task_id), task.clone());
208 
209             if self.running_tasks.contains_key(&(uid, task_id)) {
210                 info!("task {} not finished", task_id);
211                 continue;
212             }
213 
214             info!("{} create running", task_id);
215             let running_task = RunningTask::new(task.clone(), self.tx.clone(), self.keeper.clone());
216             RequestDb::get_instance().update_task_state(
217                 running_task.task_id(),
218                 State::Running,
219                 Reason::Default,
220             );
221             let abort_flag = Arc::new(AtomicBool::new(false));
222             let abort_flag_clone = abort_flag.clone();
223             let join_handle = runtime_spawn(async move {
224                 running_task.run(abort_flag_clone).await;
225             });
226 
227             let uid = task.uid();
228             let task_id = task.task_id();
229             self.running_tasks.insert(
230                 (uid, task_id),
231                 Some(AbortHandle::new(abort_flag, join_handle)),
232             );
233         }
234         // every satisfied tasks in running has been moved, set left tasks to Waiting
235 
236         for task in queue.values() {
237             if let Some(join_handle) = self.running_tasks.get_mut(&(task.uid(), task.task_id())) {
238                 if let Some(join_handle) = join_handle.take() {
239                     join_handle.cancel();
240                 };
241             }
242         }
243         *queue = new_queue;
244 
245         #[cfg(feature = "oh")]
246         self.run_count_manager
247             .notify_run_count(self.download_queue.len() + self.upload_queue.len());
248     }
249 
retry_all_tasks(&mut self)250     pub(crate) fn retry_all_tasks(&mut self) {
251         for task in self.running_tasks.drain() {
252             if let Some(handle) = task.1 {
253                 handle.cancel();
254             }
255         }
256     }
257 }
258 
259 struct AbortHandle {
260     abort_flag: Arc<AtomicBool>,
261     join_handle: JoinHandle<()>,
262 }
263 
264 impl AbortHandle {
new(abort_flag: Arc<AtomicBool>, join_handle: JoinHandle<()>) -> Self265     fn new(abort_flag: Arc<AtomicBool>, join_handle: JoinHandle<()>) -> Self {
266         Self {
267             abort_flag,
268             join_handle,
269         }
270     }
cancel(self)271     fn cancel(self) {
272         self.abort_flag.store(true, Ordering::Release);
273         self.join_handle.cancel();
274     }
275 }
276