• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::io::{self};
15 use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicU8, Ordering};
16 use std::sync::{Arc, Mutex};
17 use std::time::Duration;
18 
19 use request_utils::file_control::{belong_app_base, check_standardized_path};
20 use ylong_http_client::async_impl::{Body, Client, Request, RequestBuilder, Response};
21 use ylong_http_client::{ErrorKind, HttpClientError};
22 
23 cfg_oh! {
24     use crate::manage::SystemConfig;
25 }
26 
27 use super::config::Version;
28 use super::info::{CommonTaskInfo, State, TaskInfo, UpdateInfo};
29 use super::notify::{EachFileStatus, NotifyData, Progress};
30 use super::reason::Reason;
31 use crate::error::ErrorCode;
32 use crate::manage::database::RequestDb;
33 use crate::manage::network_manager::NetworkManager;
34 use crate::manage::notifier::Notifier;
35 use crate::service::client::ClientManagerEntry;
36 use crate::service::notification_bar::NotificationDispatcher;
37 use crate::task::client::build_client;
38 use crate::task::config::{Action, TaskConfig};
39 use crate::task::files::{AttachedFiles, Files};
40 use crate::task::task_control;
41 use crate::utils::form_item::FileSpec;
42 use crate::utils::{get_current_duration, get_current_timestamp};
43 
44 const RETRY_TIMES: u32 = 4;
45 const RETRY_INTERVAL: u64 = 400;
46 
47 pub(crate) struct RequestTask {
48     pub(crate) conf: TaskConfig,
49     pub(crate) client: ylong_runtime::sync::Mutex<Client>,
50     pub(crate) files: Files,
51     pub(crate) body_files: Files,
52     pub(crate) ctime: u64,
53     pub(crate) mime_type: Mutex<String>,
54     pub(crate) progress: Mutex<Progress>,
55     pub(crate) status: Mutex<TaskStatus>,
56     pub(crate) code: Mutex<Vec<Reason>>,
57     pub(crate) tries: AtomicU32,
58     pub(crate) background_notify_time: AtomicU64,
59     pub(crate) background_notify: Arc<AtomicBool>,
60     pub(crate) file_total_size: AtomicI64,
61     pub(crate) rate_limiting: AtomicU64,
62     pub(crate) max_speed: AtomicI64,
63     pub(crate) last_notify: AtomicU64,
64     pub(crate) client_manager: ClientManagerEntry,
65     pub(crate) running_result: Mutex<Option<Result<(), Reason>>>,
66     pub(crate) timeout_tries: AtomicU32,
67     pub(crate) upload_resume: AtomicBool,
68     pub(crate) mode: AtomicU8,
69     pub(crate) start_time: AtomicU64,
70     pub(crate) task_time: AtomicU64,
71     pub(crate) rest_time: AtomicU64,
72 }
73 
74 impl RequestTask {
task_id(&self) -> u3275     pub(crate) fn task_id(&self) -> u32 {
76         self.conf.common_data.task_id
77     }
78 
uid(&self) -> u6479     pub(crate) fn uid(&self) -> u64 {
80         self.conf.common_data.uid
81     }
82 
config(&self) -> &TaskConfig83     pub(crate) fn config(&self) -> &TaskConfig {
84         &self.conf
85     }
86 
87     // only use for download task
mime_type(&self) -> String88     pub(crate) fn mime_type(&self) -> String {
89         self.mime_type.lock().unwrap().clone()
90     }
91 
action(&self) -> Action92     pub(crate) fn action(&self) -> Action {
93         self.conf.common_data.action
94     }
95 
speed_limit(&self, limit: u64)96     pub(crate) fn speed_limit(&self, limit: u64) {
97         let old = self.rate_limiting.swap(limit, Ordering::SeqCst);
98         if old != limit {
99             info!("task {} speed_limit {}", self.task_id(), limit);
100         }
101     }
102 
network_retry(&self) -> Result<(), TaskError>103     pub(crate) async fn network_retry(&self) -> Result<(), TaskError> {
104         if self.tries.load(Ordering::SeqCst) < RETRY_TIMES {
105             self.tries.fetch_add(1, Ordering::SeqCst);
106             if !NetworkManager::is_online() {
107                 return Err(TaskError::Waiting(TaskPhase::NetworkOffline));
108             } else {
109                 ylong_runtime::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await;
110                 return Err(TaskError::Waiting(TaskPhase::NeedRetry));
111             }
112         }
113         Ok(())
114     }
115 }
116 
change_upload_size(begins: u64, mut ends: i64, size: i64) -> i64117 pub(crate) fn change_upload_size(begins: u64, mut ends: i64, size: i64) -> i64 {
118     if ends < 0 || ends >= size {
119         ends = size - 1;
120     }
121     if begins as i64 > ends {
122         return size;
123     }
124     ends - begins as i64 + 1
125 }
126 
127 impl RequestTask {
new( config: TaskConfig, files: AttachedFiles, client: Client, client_manager: ClientManagerEntry, upload_resume: bool, rest_time: u64, ) -> RequestTask128     pub(crate) fn new(
129         config: TaskConfig,
130         files: AttachedFiles,
131         client: Client,
132         client_manager: ClientManagerEntry,
133         upload_resume: bool,
134         rest_time: u64,
135     ) -> RequestTask {
136         let file_len = files.files.len();
137         let action = config.common_data.action;
138 
139         let file_total_size = match action {
140             Action::Upload => {
141                 let mut file_total_size = 0i64;
142                 // If the total size overflows, ignore it.
143                 for size in files.sizes.iter() {
144                     file_total_size += *size;
145                 }
146                 file_total_size
147             }
148             Action::Download => -1,
149             _ => unreachable!("Action::Any in RequestTask::new never reach"),
150         };
151 
152         let mut sizes = files.sizes.clone();
153 
154         if action == Action::Upload && config.common_data.index < sizes.len() as u32 {
155             sizes[config.common_data.index as usize] = change_upload_size(
156                 config.common_data.begins,
157                 config.common_data.ends,
158                 sizes[config.common_data.index as usize],
159             );
160         }
161 
162         let time = get_current_timestamp();
163         let status = TaskStatus::new(time);
164         let progress = Progress::new(sizes);
165         let mode = AtomicU8::new(config.common_data.mode.repr);
166 
167         RequestTask {
168             conf: config,
169             client: ylong_runtime::sync::Mutex::new(client),
170             files: files.files,
171             body_files: files.body_files,
172             ctime: time,
173             mime_type: Mutex::new(String::new()),
174             progress: Mutex::new(progress),
175             tries: AtomicU32::new(0),
176             status: Mutex::new(status),
177             code: Mutex::new(vec![Reason::Default; file_len]),
178             background_notify_time: AtomicU64::new(time),
179             background_notify: Arc::new(AtomicBool::new(false)),
180             file_total_size: AtomicI64::new(file_total_size),
181             rate_limiting: AtomicU64::new(0),
182             max_speed: AtomicI64::new(0),
183             last_notify: AtomicU64::new(time),
184             client_manager,
185             running_result: Mutex::new(None),
186             timeout_tries: AtomicU32::new(0),
187             upload_resume: AtomicBool::new(upload_resume),
188             mode,
189             start_time: AtomicU64::new(get_current_duration().as_secs()),
190             task_time: AtomicU64::new(0),
191             rest_time: AtomicU64::new(rest_time),
192         }
193     }
194 
new_by_info( config: TaskConfig, #[cfg(feature = "oh")] system: SystemConfig, info: TaskInfo, client_manager: ClientManagerEntry, upload_resume: bool, ) -> Result<RequestTask, ErrorCode>195     pub(crate) fn new_by_info(
196         config: TaskConfig,
197         #[cfg(feature = "oh")] system: SystemConfig,
198         info: TaskInfo,
199         client_manager: ClientManagerEntry,
200         upload_resume: bool,
201     ) -> Result<RequestTask, ErrorCode> {
202         let rest_time = get_rest_time(&config, info.task_time);
203         #[cfg(feature = "oh")]
204         let (files, client) = check_config(&config, rest_time, system)?;
205         #[cfg(not(feature = "oh"))]
206         let (files, client) = check_config(&config, rest_time)?;
207 
208         let file_len = files.files.len();
209         let action = config.common_data.action;
210         let time = get_current_timestamp();
211 
212         let file_total_size = match action {
213             Action::Upload => {
214                 let mut file_total_size = 0i64;
215                 // If the total size overflows, ignore it.
216                 for size in files.sizes.iter() {
217                     file_total_size += *size;
218                 }
219                 file_total_size
220             }
221             Action::Download => *info.progress.sizes.first().unwrap_or(&-1),
222             _ => unreachable!("Action::Any in RequestTask::new never reach"),
223         };
224 
225         // If `TaskInfo` is provided, use data of it.
226         let ctime = info.common_data.ctime;
227         let mime_type = info.mime_type.clone();
228         let tries = info.common_data.tries;
229         let status = TaskStatus {
230             mtime: time,
231             state: State::from(info.progress.common_data.state),
232             reason: Reason::from(info.common_data.reason),
233         };
234         let progress = info.progress;
235         let mode = AtomicU8::new(config.common_data.mode.repr);
236 
237         let mut task = RequestTask {
238             conf: config,
239             client: ylong_runtime::sync::Mutex::new(client),
240             files: files.files,
241             body_files: files.body_files,
242             ctime,
243             mime_type: Mutex::new(mime_type),
244             progress: Mutex::new(progress),
245             tries: AtomicU32::new(tries),
246             status: Mutex::new(status),
247             code: Mutex::new(vec![Reason::Default; file_len]),
248             background_notify_time: AtomicU64::new(time),
249             background_notify: Arc::new(AtomicBool::new(false)),
250             file_total_size: AtomicI64::new(file_total_size),
251             rate_limiting: AtomicU64::new(0),
252             max_speed: AtomicI64::new(info.max_speed),
253             last_notify: AtomicU64::new(time),
254             client_manager,
255             running_result: Mutex::new(None),
256             timeout_tries: AtomicU32::new(0),
257             upload_resume: AtomicBool::new(upload_resume),
258             mode,
259             start_time: AtomicU64::new(get_current_duration().as_secs()),
260             task_time: AtomicU64::new(info.task_time),
261             rest_time: AtomicU64::new(rest_time),
262         };
263         let background_notify = NotificationDispatcher::get_instance().register_task(&task);
264         task.background_notify = background_notify;
265         Ok(task)
266     }
267 
build_notify_data(&self) -> NotifyData268     pub(crate) fn build_notify_data(&self) -> NotifyData {
269         let vec = self.get_each_file_status();
270         NotifyData {
271             bundle: self.conf.bundle.clone(),
272             // `unwrap` for propagating panics among threads.
273             progress: self.progress.lock().unwrap().clone(),
274             action: self.conf.common_data.action,
275             version: self.conf.version,
276             each_file_status: vec,
277             task_id: self.conf.common_data.task_id,
278             uid: self.conf.common_data.uid,
279         }
280     }
281 
update_progress_in_database(&self)282     pub(crate) fn update_progress_in_database(&self) {
283         let mtime = self.status.lock().unwrap().mtime;
284         let reason = self.status.lock().unwrap().reason;
285         let progress = self.progress.lock().unwrap().clone();
286         let update_info = UpdateInfo {
287             mtime,
288             reason: reason.repr,
289             progress,
290             tries: self.tries.load(Ordering::SeqCst),
291             mime_type: self.mime_type(),
292         };
293         RequestDb::get_instance().update_task(self.task_id(), update_info);
294     }
295 
build_request_builder(&self) -> Result<RequestBuilder, HttpClientError>296     pub(crate) fn build_request_builder(&self) -> Result<RequestBuilder, HttpClientError> {
297         use ylong_http_client::async_impl::PercentEncoder;
298 
299         let url = self.conf.url.clone();
300         let url = match PercentEncoder::encode(url.as_str()) {
301             Ok(value) => value,
302             Err(e) => {
303                 error!("url percent encoding error is {:?}", e);
304                 sys_event!(
305                     ExecFault,
306                     DfxCode::TASK_FAULT_03,
307                     &format!("url percent encoding error is {:?}", e)
308                 );
309                 return Err(e);
310             }
311         };
312 
313         let method = match self.conf.method.to_uppercase().as_str() {
314             "PUT" => "PUT",
315             "POST" => "POST",
316             "GET" => "GET",
317             _ => match self.conf.common_data.action {
318                 Action::Upload => {
319                     if self.conf.version == Version::API10 {
320                         "PUT"
321                     } else {
322                         "POST"
323                     }
324                 }
325                 Action::Download => "GET",
326                 _ => "",
327             },
328         };
329         let mut request = RequestBuilder::new().method(method).url(url.as_str());
330         for (key, value) in self.conf.headers.iter() {
331             request = request.header(key.as_str(), value.as_str());
332         }
333         Ok(request)
334     }
335 
build_download_request( task: Arc<RequestTask>, ) -> Result<Request, TaskError>336     pub(crate) async fn build_download_request(
337         task: Arc<RequestTask>,
338     ) -> Result<Request, TaskError> {
339         let mut request_builder = task.build_request_builder()?;
340 
341         let file = task.files.get(0).unwrap();
342 
343         let has_downloaded = task_control::file_metadata(file).await?.len();
344         let resume_download = has_downloaded > 0;
345         let require_range = task.require_range();
346 
347         let begins = task.conf.common_data.begins;
348         let ends = task.conf.common_data.ends;
349 
350         debug!(
351             "task {} build download request, resume_download: {}, require_range: {}",
352             task.task_id(),
353             resume_download,
354             require_range
355         );
356         match (resume_download, require_range) {
357             (true, false) => {
358                 let (builder, support_range) = task.support_range(request_builder);
359                 request_builder = builder;
360                 if support_range {
361                     request_builder =
362                         task.range_request(request_builder, begins + has_downloaded, ends);
363                 } else {
364                     task_control::clear_downloaded_file(task.clone()).await?;
365                 }
366             }
367             (false, true) => {
368                 request_builder = task.range_request(request_builder, begins, ends);
369             }
370             (true, true) => {
371                 let (builder, support_range) = task.support_range(request_builder);
372                 request_builder = builder;
373                 if support_range {
374                     request_builder =
375                         task.range_request(request_builder, begins + has_downloaded, ends);
376                 } else {
377                     return Err(TaskError::Failed(Reason::UnsupportedRangeRequest));
378                 }
379             }
380             (false, false) => {}
381         };
382 
383         let body = if task.conf.data.is_empty() {
384             Body::empty()
385         } else {
386             Body::slice(task.conf.data.clone())
387         };
388         request_builder.body(body).map_err(Into::into)
389     }
390 
range_request( &self, request_builder: RequestBuilder, begins: u64, ends: i64, ) -> RequestBuilder391     fn range_request(
392         &self,
393         request_builder: RequestBuilder,
394         begins: u64,
395         ends: i64,
396     ) -> RequestBuilder {
397         let range = if ends < 0 {
398             format!("bytes={begins}-")
399         } else {
400             format!("bytes={begins}-{ends}")
401         };
402         request_builder.header("Range", range.as_str())
403     }
404 
support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool)405     fn support_range(&self, mut request_builder: RequestBuilder) -> (RequestBuilder, bool) {
406         let progress_guard = self.progress.lock().unwrap();
407         let mut support_range = false;
408         if let Some(etag) = progress_guard.extras.get("etag") {
409             request_builder = request_builder.header("If-Range", etag.as_str());
410             support_range = true;
411         } else if let Some(last_modified) = progress_guard.extras.get("last-modified") {
412             request_builder = request_builder.header("If-Range", last_modified.as_str());
413             support_range = true;
414         }
415         if !support_range {
416             info!("task {} not support range", self.task_id());
417         }
418         (request_builder, support_range)
419     }
420 
get_file_info(&self, response: &Response) -> Result<(), TaskError>421     pub(crate) fn get_file_info(&self, response: &Response) -> Result<(), TaskError> {
422         let content_type = response.headers().get("content-type");
423         if let Some(mime_type) = content_type {
424             if let Ok(value) = mime_type.to_string() {
425                 *self.mime_type.lock().unwrap() = value;
426             }
427         }
428 
429         let content_length = response.headers().get("content-length");
430         if let Some(Ok(len)) = content_length.map(|v| v.to_string()) {
431             match len.parse::<i64>() {
432                 Ok(v) => {
433                     let mut progress = self.progress.lock().unwrap();
434                     progress.sizes =
435                         vec![v + progress.processed.first().map_or_else(
436                         || {
437                             error!("Failed to get a process size from an empty vector in Progress");
438                             Default::default()
439                         },
440                         |x| *x as i64,
441                     )];
442                     self.file_total_size.store(v, Ordering::SeqCst);
443                     debug!("the download task content-length is {}", v);
444                 }
445                 Err(e) => {
446                     error!("convert string to i64 error: {:?}", e);
447                     sys_event!(
448                         ExecFault,
449                         DfxCode::TASK_FAULT_09,
450                         &format!("convert string to i64 error: {:?}", e)
451                     );
452                 }
453             }
454         } else {
455             error!("cannot get content-length of the task");
456             sys_event!(
457                 ExecFault,
458                 DfxCode::TASK_FAULT_09,
459                 "cannot get content-length of the task"
460             );
461             if self.conf.common_data.precise {
462                 return Err(TaskError::Failed(Reason::GetFileSizeFailed));
463             }
464         }
465         Ok(())
466     }
467 
handle_download_error( &self, err: HttpClientError, ) -> Result<(), TaskError>468     pub(crate) async fn handle_download_error(
469         &self,
470         err: HttpClientError,
471     ) -> Result<(), TaskError> {
472         if err.error_kind() != ErrorKind::UserAborted {
473             error!("Task {} {:?}", self.task_id(), err);
474         }
475         match err.error_kind() {
476             ErrorKind::Timeout => {
477                 sys_event!(
478                     ExecFault,
479                     DfxCode::TASK_FAULT_01,
480                     &format!("Task {} {:?}", self.task_id(), err)
481                 );
482                 Err(TaskError::Failed(Reason::ContinuousTaskTimeout))
483             }
484             // user triggered
485             ErrorKind::UserAborted => {
486                 sys_event!(
487                     ExecFault,
488                     DfxCode::TASK_FAULT_09,
489                     &format!("Task {} {:?}", self.task_id(), err)
490                 );
491                 Err(TaskError::Waiting(TaskPhase::UserAbort))
492             }
493             ErrorKind::BodyTransfer | ErrorKind::BodyDecode => {
494                 sys_event!(
495                     ExecFault,
496                     DfxCode::TASK_FAULT_09,
497                     &format!("Task {} {:?}", self.task_id(), err)
498                 );
499                 if format!("{}", err).contains("Below low speed limit") {
500                     Err(TaskError::Failed(Reason::LowSpeed))
501                 } else {
502                     self.network_retry().await?;
503                     Err(TaskError::Failed(Reason::OthersError))
504                 }
505             }
506             _ => {
507                 if format!("{}", err).contains("No space left on device") {
508                     sys_event!(
509                         ExecFault,
510                         DfxCode::TASK_FAULT_09,
511                         &format!("Task {} {:?}", self.task_id(), err)
512                     );
513                     Err(TaskError::Failed(Reason::InsufficientSpace))
514                 } else {
515                     sys_event!(
516                         ExecFault,
517                         DfxCode::TASK_FAULT_09,
518                         &format!("Task {} {:?}", self.task_id(), err)
519                     );
520                     Err(TaskError::Failed(Reason::OthersError))
521                 }
522             }
523         }
524     }
525 
526     #[cfg(feature = "oh")]
notify_response(&self, response: &Response)527     pub(crate) fn notify_response(&self, response: &Response) {
528         let tid = self.conf.common_data.task_id;
529         let version: String = response.version().as_str().into();
530         let status_code: u32 = response.status().as_u16() as u32;
531         let status_message: String;
532         if let Some(reason) = response.status().reason() {
533             status_message = reason.into();
534         } else {
535             error!("bad status_message {:?}", status_code);
536             sys_event!(
537                 ExecFault,
538                 DfxCode::TASK_FAULT_02,
539                 &format!("bad status_message {:?}", status_code)
540             );
541             return;
542         }
543         let headers = response.headers().clone();
544         debug!("notify_response");
545         self.client_manager
546             .send_response(tid, version, status_code, status_message, headers)
547     }
548 
require_range(&self) -> bool549     pub(crate) fn require_range(&self) -> bool {
550         self.conf.common_data.begins > 0 || self.conf.common_data.ends >= 0
551     }
552 
record_upload_response( &self, index: usize, response: Result<Response, HttpClientError>, )553     pub(crate) async fn record_upload_response(
554         &self,
555         index: usize,
556         response: Result<Response, HttpClientError>,
557     ) {
558         if let Ok(mut r) = response {
559             {
560                 let mut guard = self.progress.lock().unwrap();
561                 guard.extras.clear();
562                 for (k, v) in r.headers() {
563                     if let Ok(value) = v.to_string() {
564                         guard.extras.insert(k.to_string().to_lowercase(), value);
565                     }
566                 }
567             }
568 
569             let file = match self.body_files.get(index) {
570                 Some(file) => file,
571                 None => return,
572             };
573             let _ = task_control::file_set_len(file.clone(), 0).await;
574             loop {
575                 let mut buf = [0u8; 1024];
576                 let size = r.data(&mut buf).await;
577                 let size = match size {
578                     Ok(size) => size,
579                     Err(_e) => break,
580                 };
581 
582                 if size == 0 {
583                     break;
584                 }
585                 let _ = task_control::file_write_all(file.clone(), &buf[..size]).await;
586             }
587             // Makes sure all the data has been written to the target file.
588             let _ = task_control::file_sync_all(file).await;
589         }
590     }
591 
get_each_file_status(&self) -> Vec<EachFileStatus>592     pub(crate) fn get_each_file_status(&self) -> Vec<EachFileStatus> {
593         let mut vec = Vec::new();
594         // `unwrap` for propagating panics among threads.
595         let codes_guard = self.code.lock().unwrap();
596         for (i, file_spec) in self.conf.file_specs.iter().enumerate() {
597             let reason = *codes_guard.get(i).unwrap_or(&Reason::Default);
598             vec.push(EachFileStatus {
599                 path: file_spec.path.clone(),
600                 reason,
601                 message: reason.to_str().into(),
602             });
603         }
604         vec
605     }
606 
info(&self) -> TaskInfo607     pub(crate) fn info(&self) -> TaskInfo {
608         let status = self.status.lock().unwrap();
609         let progress = self.progress.lock().unwrap();
610         let mode = self.mode.load(Ordering::Acquire);
611         TaskInfo {
612             bundle: self.conf.bundle.clone(),
613             url: self.conf.url.clone(),
614             data: self.conf.data.clone(),
615             token: self.conf.token.clone(),
616             form_items: self.conf.form_items.clone(),
617             file_specs: self.conf.file_specs.clone(),
618             title: self.conf.title.clone(),
619             description: self.conf.description.clone(),
620             mime_type: {
621                 match self.conf.version {
622                     Version::API10 => match self.conf.common_data.action {
623                         Action::Download => match self.conf.headers.get("Content-Type") {
624                             None => "".into(),
625                             Some(v) => v.clone(),
626                         },
627                         Action::Upload => "multipart/form-data".into(),
628                         _ => "".into(),
629                     },
630                     Version::API9 => self.mime_type.lock().unwrap().clone(),
631                 }
632             },
633             progress: progress.clone(),
634             extras: progress.extras.clone(),
635             common_data: CommonTaskInfo {
636                 task_id: self.conf.common_data.task_id,
637                 uid: self.conf.common_data.uid,
638                 action: self.conf.common_data.action.repr,
639                 mode,
640                 ctime: self.ctime,
641                 mtime: status.mtime,
642                 reason: status.reason.repr,
643                 gauge: self.conf.common_data.gauge,
644                 retry: self.conf.common_data.retry,
645                 tries: self.tries.load(Ordering::SeqCst),
646                 version: self.conf.version as u8,
647                 priority: self.conf.common_data.priority,
648             },
649             max_speed: self.max_speed.load(Ordering::SeqCst),
650             task_time: self.task_time.load(Ordering::SeqCst),
651         }
652     }
653 
notify_header_receive(&self)654     pub(crate) fn notify_header_receive(&self) {
655         if self.conf.version == Version::API9 && self.conf.common_data.action == Action::Upload {
656             let notify_data = self.build_notify_data();
657 
658             Notifier::header_receive(&self.client_manager, notify_data);
659         }
660     }
661 }
662 
663 #[derive(Clone, Debug)]
664 pub(crate) struct TaskStatus {
665     pub(crate) mtime: u64,
666     pub(crate) state: State,
667     pub(crate) reason: Reason,
668 }
669 
670 impl TaskStatus {
new(mtime: u64) -> Self671     pub(crate) fn new(mtime: u64) -> Self {
672         TaskStatus {
673             mtime,
674             state: State::Initialized,
675             reason: Reason::Default,
676         }
677     }
678 }
679 
check_file_specs(file_specs: &[FileSpec]) -> bool680 fn check_file_specs(file_specs: &[FileSpec]) -> bool {
681     for (idx, spec) in file_specs.iter().enumerate() {
682         if spec.is_user_file {
683             continue;
684         }
685         let path = &spec.path;
686         if !check_standardized_path(path) {
687             error!("File path err - path: {}, idx: {}", path, idx);
688             return false;
689         }
690         if !belong_app_base(path) {
691             error!("File path invalid - path: {}, idx: {}", path, idx);
692             sys_event!(
693                 ExecFault,
694                 DfxCode::TASK_FAULT_09,
695                 &format!("File path invalid - path: {}, idx: {}", path, idx)
696             );
697             return false;
698         }
699     }
700     true
701 }
702 
check_config( config: &TaskConfig, total_timeout: u64, #[cfg(feature = "oh")] system: SystemConfig, ) -> Result<(AttachedFiles, Client), ErrorCode>703 pub(crate) fn check_config(
704     config: &TaskConfig,
705     total_timeout: u64,
706     #[cfg(feature = "oh")] system: SystemConfig,
707 ) -> Result<(AttachedFiles, Client), ErrorCode> {
708     if !check_file_specs(&config.file_specs) {
709         return Err(ErrorCode::Other);
710     }
711     let files = AttachedFiles::open(config).map_err(|_| ErrorCode::FileOperationErr)?;
712     #[cfg(feature = "oh")]
713     let client = build_client(config, total_timeout, system).map_err(|_| ErrorCode::Other)?;
714 
715     #[cfg(not(feature = "oh"))]
716     let client = build_client(config, total_timeout).map_err(|_| ErrorCode::Other)?;
717     Ok((files, client))
718 }
719 
get_rest_time(config: &TaskConfig, task_time: u64) -> u64720 pub(crate) fn get_rest_time(config: &TaskConfig, task_time: u64) -> u64 {
721     const SECONDS_IN_TEN_MINUTES: u64 = 10 * 60;
722     const DEFAULT_TOTAL_TIMEOUT: u64 = 60 * 60 * 24 * 7;
723 
724     let mut total_timeout = config.common_data.timeout.total_timeout;
725 
726     if total_timeout == 0 {
727         if !NotificationDispatcher::get_instance()
728             .check_task_notification_available(config.common_data.task_id)
729         {
730             total_timeout = SECONDS_IN_TEN_MINUTES;
731         } else {
732             total_timeout = DEFAULT_TOTAL_TIMEOUT;
733         }
734     }
735 
736     if total_timeout > task_time {
737         total_timeout - task_time
738     } else {
739         0
740     }
741 }
742 
743 impl From<HttpClientError> for TaskError {
from(_value: HttpClientError) -> Self744     fn from(_value: HttpClientError) -> Self {
745         TaskError::Failed(Reason::BuildRequestFailed)
746     }
747 }
748 
749 impl From<io::Error> for TaskError {
from(_value: io::Error) -> Self750     fn from(_value: io::Error) -> Self {
751         TaskError::Failed(Reason::IoError)
752     }
753 }
754 
755 #[derive(Debug, PartialEq, Eq)]
756 pub enum TaskPhase {
757     NeedRetry,
758     UserAbort,
759     NetworkOffline,
760 }
761 
762 #[derive(Debug, PartialEq, Eq)]
763 pub enum TaskError {
764     Failed(Reason),
765     Waiting(TaskPhase),
766 }
767 
768 #[cfg(test)]
769 mod ut_request_task {
770     include!("../../tests/ut/task/ut_request_task.rs");
771 }
772