• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 use glob::{Paths, Pattern};
17 #[cfg(feature = "compress_lz4_flex")]
18 use lz4_flex::frame::FrameEncoder;
19 use std::error::Error;
20 use std::fs::File;
21 use std::io;
22 use std::path::{Path, PathBuf};
23 use tar::Builder;
24 use ylong_runtime::sync::mpsc::bounded::{BoundedReceiver, BoundedSender};
25 
26 #[derive(Clone, Copy)]
27 #[non_exhaustive]
28 pub enum CompressAlgorithm {
29     #[allow(dead_code)]
30     None,
31 
32     #[cfg(feature = "compress_lz4_flex")]
33     #[allow(dead_code)]
34     Lz4Flex,
35 }
36 
37 pub struct Options {
38     /// The working directory to store the files, default is "backup".
39     pub stash_dir: PathBuf,
40 
41     /// The threshold to determine if a file is huge in byte. A huge file will be transferred directly without
42     /// archiving. Currently the default value is `usize::MAX`, which means no file is huge.
43     pub threshold_huge_file: usize,
44 
45     /// The threshold to split the tar in byte. A new tar will be created if the size of the current tar exceeds this
46     /// threshold. Currently the default value is `usize::MAX`, which means no tar will be split.
47     pub threshold_split_tar: usize,
48 
49     /// The compress algorithm to use, default is `CompressAlgorithm::None`.
50     pub compress_algorithm: CompressAlgorithm,
51 }
52 
53 #[derive(Debug)]
54 pub struct Archive {
55     #[allow(dead_code)]
56     /// The path of the archive file
57     path: PathBuf,
58 
59     #[allow(dead_code)]
60     /// The type of the archive file
61     archive_type: ArchiveType,
62 
63     #[allow(dead_code)]
64     /// The size of the file's content in byte
65     content_size: usize,
66 }
67 
68 #[derive(Debug)]
69 pub enum ArchiveType {
70     /// Files are backup-ed directly
71     Plain,
72 
73     /// Files are backup-ed in a tar
74     Tar,
75 }
76 
77 struct BackupContext {
78     option: Options,
79 
80     cur_tar: Option<Output>,
81     cur_tar_size: usize,
82     tar_cnt: usize,
83 
84     inputs: BoundedReceiver<PathBuf>,
85     outputs: BoundedSender<Archive>,
86 }
87 
88 enum Output {
89     Uncompressed(Builder<File>),
90 
91     #[cfg(feature = "compress_lz4_flex")]
92     CompressedLz4Flex(Builder<FrameEncoder<File>>),
93 }
94 
scan_files<S>( includes: Vec<S>, excludes: Vec<S>, sender: BoundedSender<PathBuf>, ) -> Result<(), Box<dyn Error>> where S: AsRef<str>,95 pub async fn scan_files<S>(
96     includes: Vec<S>,
97     excludes: Vec<S>,
98     sender: BoundedSender<PathBuf>,
99 ) -> Result<(), Box<dyn Error>>
100 where
101     S: AsRef<str>,
102 {
103     let exclude_patterns = build_exclude_patterns(excludes)?;
104     traverse_and_send(includes, exclude_patterns, sender).await
105 }
106 
backup_files( option: Options, paths_receiver: BoundedReceiver<PathBuf>, archives_sender: BoundedSender<Archive>, ) -> Result<(), Box<dyn Error>>107 pub async fn backup_files(
108     option: Options,
109     paths_receiver: BoundedReceiver<PathBuf>,
110     archives_sender: BoundedSender<Archive>,
111 ) -> Result<(), Box<dyn Error>> {
112     BackupContext::new(option, paths_receiver, archives_sender)
113         .archive()
114         .await?;
115 
116     Ok(())
117 }
118 
build_exclude_patterns<S>(excludes: Vec<S>) -> Result<Vec<Pattern>, Box<dyn Error>> where S: AsRef<str>,119 fn build_exclude_patterns<S>(excludes: Vec<S>) -> Result<Vec<Pattern>, Box<dyn Error>>
120 where
121     S: AsRef<str>,
122 {
123     for exclude in &excludes {
124         if !exclude.as_ref().starts_with("/") {
125             let err = io::Error::new(
126                 io::ErrorKind::InvalidInput,
127                 "exclude pattern must be absolute",
128             );
129             return Err(Box::new(err));
130         }
131     }
132 
133     let excludes = excludes.iter().map(|exclude| {
134         if let Some(stripped) = exclude.as_ref().strip_suffix("/") {
135             stripped
136         } else {
137             exclude.as_ref()
138         }
139     });
140 
141     let exclude_patterns = excludes
142         .into_iter()
143         .map(|p| glob::Pattern::new(p.as_ref()))
144         .collect::<Result<Vec<Pattern>, _>>()?;
145 
146     Ok(exclude_patterns)
147 }
148 
traverse_and_send<S>( includes: Vec<S>, excludes: Vec<Pattern>, sender: BoundedSender<PathBuf>, ) -> Result<(), Box<dyn Error>> where S: AsRef<str>,149 async fn traverse_and_send<S>(
150     includes: Vec<S>,
151     excludes: Vec<Pattern>,
152     sender: BoundedSender<PathBuf>,
153 ) -> Result<(), Box<dyn Error>>
154 where
155     S: AsRef<str>,
156 {
157     let mut option = glob::MatchOptions::new();
158     option.require_literal_separator = true;
159 
160     let includes = includes
161         .iter()
162         .map(|p| glob::glob_with(p.as_ref(), option))
163         .collect::<Result<Vec<Paths>, _>>()?;
164 
165     for path in includes.into_iter().flatten() {
166         let path = path?;
167         if excludes.iter().any(|p| p.matches_path(&path)) {
168             continue;
169         }
170 
171         let is_path_exclude =
172             |path: &Path| -> bool { excludes.iter().any(|p| p.matches_path(path)) };
173 
174         let metadata = path.metadata()?;
175         if metadata.is_file() {
176             if !is_path_exclude(&path.as_path()) {
177                 sender.send(path).await?;
178             }
179         } else if metadata.is_dir() {
180             let walker = walkdir::WalkDir::new(path);
181             for entry in walker
182                 .into_iter()
183                 .filter_entry(|e| !is_path_exclude(e.path()))
184             {
185                 let entry = entry?;
186                 if entry.file_type().is_file() {
187                     sender.send(entry.path().to_path_buf()).await?;
188                 }
189             }
190         }
191     }
192 
193     Ok(())
194 }
195 
196 impl Default for Options {
default() -> Self197     fn default() -> Self {
198         Self {
199             stash_dir: PathBuf::from("backup"),
200             threshold_huge_file: usize::MAX, // TODO: fix me
201             threshold_split_tar: usize::MAX, // TODO: fix me
202             compress_algorithm: CompressAlgorithm::None,
203         }
204     }
205 }
206 
207 impl Output {
new(path: PathBuf, compress_algorithm: CompressAlgorithm) -> Result<Self, io::Error>208     fn new(path: PathBuf, compress_algorithm: CompressAlgorithm) -> Result<Self, io::Error> {
209         let prefix = path
210             .parent()
211             .expect(format!("failed to get parent of {:?}", path).as_str());
212         if !prefix.exists() {
213             std::fs::create_dir_all(prefix)
214                 .expect(format!("failed to create {:?}", prefix).as_str());
215         }
216 
217         match compress_algorithm {
218             CompressAlgorithm::None => Ok(Self::Uncompressed(Builder::new(File::create(path)?))),
219             #[cfg(feature = "compress_lz4_flex")]
220             CompressAlgorithm::Lz4Flex => Ok(Self::CompressedLz4Flex(Builder::new(
221                 FrameEncoder::new(File::create(path)?),
222             ))),
223         }
224     }
225 
append_file(&mut self, achievable_path: &PathBuf) -> Result<usize, io::Error>226     fn append_file(&mut self, achievable_path: &PathBuf) -> Result<usize, io::Error> {
227         #[cfg(feature = "ohos")]
228         assert_eq!(std::env::current_dir(), r#"/"#);
229 
230         let abs_path = achievable_path
231             .canonicalize()
232             .expect("File to append is not achievable");
233         let relative_path = abs_path
234             .strip_prefix(std::env::current_dir().expect("Cannot get current dir"))
235             .expect("File to append is not in current dir");
236 
237         match self {
238             Self::Uncompressed(builder) => {
239                 builder.append_path_with_name(&achievable_path, relative_path)?
240             }
241 
242             #[cfg(feature = "compress_lz4_flex")]
243             Self::CompressedLz4Flex(builder) => {
244                 builder.append_path_with_name(&achievable_path, relative_path)?
245             }
246         };
247         Ok(std::fs::metadata(achievable_path)?.len() as usize)
248     }
249 
finish(self) -> Result<(), io::Error>250     fn finish(self) -> Result<(), io::Error> {
251         match self {
252             Self::Uncompressed(mut builder) => {
253                 builder.finish()?;
254             }
255             #[cfg(feature = "compress_lz4_flex")]
256             Self::CompressedLz4Flex(builder) => {
257                 let encoder = builder.into_inner().expect("failed to get encoder");
258                 encoder.finish()?;
259             }
260         }
261         Ok(())
262     }
263 }
264 
265 impl BackupContext {
new( option: Options, inputs: BoundedReceiver<PathBuf>, outputs: BoundedSender<Archive>, ) -> Self266     fn new(
267         option: Options,
268         inputs: BoundedReceiver<PathBuf>,
269         outputs: BoundedSender<Archive>,
270     ) -> Self {
271         Self {
272             option,
273             cur_tar: None,
274             cur_tar_size: 0,
275             tar_cnt: 0,
276             inputs,
277             outputs,
278         }
279     }
280 
archive(&mut self) -> Result<(), Box<dyn Error>>281     async fn archive(&mut self) -> Result<(), Box<dyn Error>> {
282         while let Ok(path) = self.inputs.recv().await {
283             assert!(path.exists());
284             assert!(path.metadata()?.is_file());
285 
286             if let Some(archive) = self.archive_single_file(path).await? {
287                 self.outputs.send(archive).await?;
288             }
289         }
290 
291         if let Some(archive) = self.retrieve_cur_tar()? {
292             self.outputs.send(archive).await?;
293         }
294 
295         Ok(())
296     }
297 
archive_single_file(&mut self, path: PathBuf) -> Result<Option<Archive>, io::Error>298     async fn archive_single_file(&mut self, path: PathBuf) -> Result<Option<Archive>, io::Error> {
299         let file_size = path.metadata()?.len() as usize;
300 
301         if file_size > self.option.threshold_huge_file {
302             return Ok(Some(Archive {
303                 path,
304                 archive_type: ArchiveType::Plain,
305                 content_size: file_size,
306             }));
307         }
308 
309         if self.cur_tar.is_none() {
310             self.cur_tar = Some(Output::new(
311                 self.next_tar_path(),
312                 self.option.compress_algorithm,
313             )?);
314         }
315 
316         let cur_tar = self.cur_tar.as_mut().unwrap();
317         self.cur_tar_size += cur_tar.append_file(&path)?;
318         if self.cur_tar_size > self.option.threshold_split_tar {
319             return self.retrieve_cur_tar();
320         }
321 
322         Ok(None)
323     }
324 
cur_tar_path(&self) -> PathBuf325     fn cur_tar_path(&self) -> PathBuf {
326         let path = self.option.stash_dir.join(self.tar_cnt.to_string());
327         match self.option.compress_algorithm {
328             CompressAlgorithm::None => path.with_extension("tar"),
329 
330             #[cfg(feature = "compress_lz4_flex")]
331             CompressAlgorithm::Lz4Flex => path.with_extension("tar.lz4"),
332         }
333     }
334 
next_tar_path(&mut self) -> PathBuf335     fn next_tar_path(&mut self) -> PathBuf {
336         self.tar_cnt += 1;
337         self.cur_tar_path()
338     }
339 
retrieve_cur_tar(&mut self) -> Result<Option<Archive>, io::Error>340     fn retrieve_cur_tar(&mut self) -> Result<Option<Archive>, io::Error> {
341         if let None = self.cur_tar {
342             return Ok(None);
343         }
344         let last_tar = self
345             .cur_tar
346             .take()
347             .expect("last_tar is guaranteed to be Some");
348         last_tar.finish()?;
349         let archive = Archive {
350             path: self.cur_tar_path(),
351             archive_type: ArchiveType::Tar,
352             content_size: self.cur_tar_size,
353         };
354         Ok(Some(archive))
355     }
356 }
357