1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 use glob::{Paths, Pattern};
17 #[cfg(feature = "compress_lz4_flex")]
18 use lz4_flex::frame::FrameEncoder;
19 use std::error::Error;
20 use std::fs::File;
21 use std::io;
22 use std::path::{Path, PathBuf};
23 use tar::Builder;
24 use ylong_runtime::sync::mpsc::bounded::{BoundedReceiver, BoundedSender};
25
26 #[derive(Clone, Copy)]
27 #[non_exhaustive]
28 pub enum CompressAlgorithm {
29 #[allow(dead_code)]
30 None,
31
32 #[cfg(feature = "compress_lz4_flex")]
33 #[allow(dead_code)]
34 Lz4Flex,
35 }
36
37 pub struct Options {
38 /// The working directory to store the files, default is "backup".
39 pub stash_dir: PathBuf,
40
41 /// The threshold to determine if a file is huge in byte. A huge file will be transferred directly without
42 /// archiving. Currently the default value is `usize::MAX`, which means no file is huge.
43 pub threshold_huge_file: usize,
44
45 /// The threshold to split the tar in byte. A new tar will be created if the size of the current tar exceeds this
46 /// threshold. Currently the default value is `usize::MAX`, which means no tar will be split.
47 pub threshold_split_tar: usize,
48
49 /// The compress algorithm to use, default is `CompressAlgorithm::None`.
50 pub compress_algorithm: CompressAlgorithm,
51 }
52
53 #[derive(Debug)]
54 pub struct Archive {
55 #[allow(dead_code)]
56 /// The path of the archive file
57 path: PathBuf,
58
59 #[allow(dead_code)]
60 /// The type of the archive file
61 archive_type: ArchiveType,
62
63 #[allow(dead_code)]
64 /// The size of the file's content in byte
65 content_size: usize,
66 }
67
68 #[derive(Debug)]
69 pub enum ArchiveType {
70 /// Files are backup-ed directly
71 Plain,
72
73 /// Files are backup-ed in a tar
74 Tar,
75 }
76
77 struct BackupContext {
78 option: Options,
79
80 cur_tar: Option<Output>,
81 cur_tar_size: usize,
82 tar_cnt: usize,
83
84 inputs: BoundedReceiver<PathBuf>,
85 outputs: BoundedSender<Archive>,
86 }
87
88 enum Output {
89 Uncompressed(Builder<File>),
90
91 #[cfg(feature = "compress_lz4_flex")]
92 CompressedLz4Flex(Builder<FrameEncoder<File>>),
93 }
94
scan_files<S>( includes: Vec<S>, excludes: Vec<S>, sender: BoundedSender<PathBuf>, ) -> Result<(), Box<dyn Error>> where S: AsRef<str>,95 pub async fn scan_files<S>(
96 includes: Vec<S>,
97 excludes: Vec<S>,
98 sender: BoundedSender<PathBuf>,
99 ) -> Result<(), Box<dyn Error>>
100 where
101 S: AsRef<str>,
102 {
103 let exclude_patterns = build_exclude_patterns(excludes)?;
104 traverse_and_send(includes, exclude_patterns, sender).await
105 }
106
backup_files( option: Options, paths_receiver: BoundedReceiver<PathBuf>, archives_sender: BoundedSender<Archive>, ) -> Result<(), Box<dyn Error>>107 pub async fn backup_files(
108 option: Options,
109 paths_receiver: BoundedReceiver<PathBuf>,
110 archives_sender: BoundedSender<Archive>,
111 ) -> Result<(), Box<dyn Error>> {
112 BackupContext::new(option, paths_receiver, archives_sender)
113 .archive()
114 .await?;
115
116 Ok(())
117 }
118
build_exclude_patterns<S>(excludes: Vec<S>) -> Result<Vec<Pattern>, Box<dyn Error>> where S: AsRef<str>,119 fn build_exclude_patterns<S>(excludes: Vec<S>) -> Result<Vec<Pattern>, Box<dyn Error>>
120 where
121 S: AsRef<str>,
122 {
123 for exclude in &excludes {
124 if !exclude.as_ref().starts_with("/") {
125 let err = io::Error::new(
126 io::ErrorKind::InvalidInput,
127 "exclude pattern must be absolute",
128 );
129 return Err(Box::new(err));
130 }
131 }
132
133 let excludes = excludes.iter().map(|exclude| {
134 if let Some(stripped) = exclude.as_ref().strip_suffix("/") {
135 stripped
136 } else {
137 exclude.as_ref()
138 }
139 });
140
141 let exclude_patterns = excludes
142 .into_iter()
143 .map(|p| glob::Pattern::new(p.as_ref()))
144 .collect::<Result<Vec<Pattern>, _>>()?;
145
146 Ok(exclude_patterns)
147 }
148
traverse_and_send<S>( includes: Vec<S>, excludes: Vec<Pattern>, sender: BoundedSender<PathBuf>, ) -> Result<(), Box<dyn Error>> where S: AsRef<str>,149 async fn traverse_and_send<S>(
150 includes: Vec<S>,
151 excludes: Vec<Pattern>,
152 sender: BoundedSender<PathBuf>,
153 ) -> Result<(), Box<dyn Error>>
154 where
155 S: AsRef<str>,
156 {
157 let mut option = glob::MatchOptions::new();
158 option.require_literal_separator = true;
159
160 let includes = includes
161 .iter()
162 .map(|p| glob::glob_with(p.as_ref(), option))
163 .collect::<Result<Vec<Paths>, _>>()?;
164
165 for path in includes.into_iter().flatten() {
166 let path = path?;
167 if excludes.iter().any(|p| p.matches_path(&path)) {
168 continue;
169 }
170
171 let is_path_exclude =
172 |path: &Path| -> bool { excludes.iter().any(|p| p.matches_path(path)) };
173
174 let metadata = path.metadata()?;
175 if metadata.is_file() {
176 if !is_path_exclude(&path.as_path()) {
177 sender.send(path).await?;
178 }
179 } else if metadata.is_dir() {
180 let walker = walkdir::WalkDir::new(path);
181 for entry in walker
182 .into_iter()
183 .filter_entry(|e| !is_path_exclude(e.path()))
184 {
185 let entry = entry?;
186 if entry.file_type().is_file() {
187 sender.send(entry.path().to_path_buf()).await?;
188 }
189 }
190 }
191 }
192
193 Ok(())
194 }
195
196 impl Default for Options {
default() -> Self197 fn default() -> Self {
198 Self {
199 stash_dir: PathBuf::from("backup"),
200 threshold_huge_file: usize::MAX, // TODO: fix me
201 threshold_split_tar: usize::MAX, // TODO: fix me
202 compress_algorithm: CompressAlgorithm::None,
203 }
204 }
205 }
206
207 impl Output {
new(path: PathBuf, compress_algorithm: CompressAlgorithm) -> Result<Self, io::Error>208 fn new(path: PathBuf, compress_algorithm: CompressAlgorithm) -> Result<Self, io::Error> {
209 let prefix = path
210 .parent()
211 .expect(format!("failed to get parent of {:?}", path).as_str());
212 if !prefix.exists() {
213 std::fs::create_dir_all(prefix)
214 .expect(format!("failed to create {:?}", prefix).as_str());
215 }
216
217 match compress_algorithm {
218 CompressAlgorithm::None => Ok(Self::Uncompressed(Builder::new(File::create(path)?))),
219 #[cfg(feature = "compress_lz4_flex")]
220 CompressAlgorithm::Lz4Flex => Ok(Self::CompressedLz4Flex(Builder::new(
221 FrameEncoder::new(File::create(path)?),
222 ))),
223 }
224 }
225
append_file(&mut self, achievable_path: &PathBuf) -> Result<usize, io::Error>226 fn append_file(&mut self, achievable_path: &PathBuf) -> Result<usize, io::Error> {
227 #[cfg(feature = "ohos")]
228 assert_eq!(std::env::current_dir(), r#"/"#);
229
230 let abs_path = achievable_path
231 .canonicalize()
232 .expect("File to append is not achievable");
233 let relative_path = abs_path
234 .strip_prefix(std::env::current_dir().expect("Cannot get current dir"))
235 .expect("File to append is not in current dir");
236
237 match self {
238 Self::Uncompressed(builder) => {
239 builder.append_path_with_name(&achievable_path, relative_path)?
240 }
241
242 #[cfg(feature = "compress_lz4_flex")]
243 Self::CompressedLz4Flex(builder) => {
244 builder.append_path_with_name(&achievable_path, relative_path)?
245 }
246 };
247 Ok(std::fs::metadata(achievable_path)?.len() as usize)
248 }
249
finish(self) -> Result<(), io::Error>250 fn finish(self) -> Result<(), io::Error> {
251 match self {
252 Self::Uncompressed(mut builder) => {
253 builder.finish()?;
254 }
255 #[cfg(feature = "compress_lz4_flex")]
256 Self::CompressedLz4Flex(builder) => {
257 let encoder = builder.into_inner().expect("failed to get encoder");
258 encoder.finish()?;
259 }
260 }
261 Ok(())
262 }
263 }
264
265 impl BackupContext {
new( option: Options, inputs: BoundedReceiver<PathBuf>, outputs: BoundedSender<Archive>, ) -> Self266 fn new(
267 option: Options,
268 inputs: BoundedReceiver<PathBuf>,
269 outputs: BoundedSender<Archive>,
270 ) -> Self {
271 Self {
272 option,
273 cur_tar: None,
274 cur_tar_size: 0,
275 tar_cnt: 0,
276 inputs,
277 outputs,
278 }
279 }
280
archive(&mut self) -> Result<(), Box<dyn Error>>281 async fn archive(&mut self) -> Result<(), Box<dyn Error>> {
282 while let Ok(path) = self.inputs.recv().await {
283 assert!(path.exists());
284 assert!(path.metadata()?.is_file());
285
286 if let Some(archive) = self.archive_single_file(path).await? {
287 self.outputs.send(archive).await?;
288 }
289 }
290
291 if let Some(archive) = self.retrieve_cur_tar()? {
292 self.outputs.send(archive).await?;
293 }
294
295 Ok(())
296 }
297
archive_single_file(&mut self, path: PathBuf) -> Result<Option<Archive>, io::Error>298 async fn archive_single_file(&mut self, path: PathBuf) -> Result<Option<Archive>, io::Error> {
299 let file_size = path.metadata()?.len() as usize;
300
301 if file_size > self.option.threshold_huge_file {
302 return Ok(Some(Archive {
303 path,
304 archive_type: ArchiveType::Plain,
305 content_size: file_size,
306 }));
307 }
308
309 if self.cur_tar.is_none() {
310 self.cur_tar = Some(Output::new(
311 self.next_tar_path(),
312 self.option.compress_algorithm,
313 )?);
314 }
315
316 let cur_tar = self.cur_tar.as_mut().unwrap();
317 self.cur_tar_size += cur_tar.append_file(&path)?;
318 if self.cur_tar_size > self.option.threshold_split_tar {
319 return self.retrieve_cur_tar();
320 }
321
322 Ok(None)
323 }
324
cur_tar_path(&self) -> PathBuf325 fn cur_tar_path(&self) -> PathBuf {
326 let path = self.option.stash_dir.join(self.tar_cnt.to_string());
327 match self.option.compress_algorithm {
328 CompressAlgorithm::None => path.with_extension("tar"),
329
330 #[cfg(feature = "compress_lz4_flex")]
331 CompressAlgorithm::Lz4Flex => path.with_extension("tar.lz4"),
332 }
333 }
334
next_tar_path(&mut self) -> PathBuf335 fn next_tar_path(&mut self) -> PathBuf {
336 self.tar_cnt += 1;
337 self.cur_tar_path()
338 }
339
retrieve_cur_tar(&mut self) -> Result<Option<Archive>, io::Error>340 fn retrieve_cur_tar(&mut self) -> Result<Option<Archive>, io::Error> {
341 if let None = self.cur_tar {
342 return Ok(None);
343 }
344 let last_tar = self
345 .cur_tar
346 .take()
347 .expect("last_tar is guaranteed to be Some");
348 last_tar.finish()?;
349 let archive = Archive {
350 path: self.cur_tar_path(),
351 archive_type: ArchiveType::Tar,
352 content_size: self.cur_tar_size,
353 };
354 Ok(Some(archive))
355 }
356 }
357