• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #![warn(unused)]
15 
16 use std::collections::hash_map::Entry;
17 use std::fs::{self, DirEntry, File, OpenOptions};
18 use std::io::{self, Seek, Write};
19 use std::os::unix::fs::MetadataExt;
20 use std::path::{Path, PathBuf};
21 use std::str::FromStr;
22 use std::sync::{Arc, LazyLock, OnceLock, Weak};
23 use std::time::SystemTime;
24 
25 use request_utils::task_id::TaskId;
26 
27 use super::ram::RamCache;
28 use crate::manage::CacheManager;
29 use crate::spawn;
30 
31 const FINISH_SUFFIX: &str = "_F";
32 
33 static CACHE_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
34     #[cfg(feature = "ohos")]
35     let mut path = match request_utils::context::get_cache_dir() {
36         Some(dir) => PathBuf::from_str(&dir).unwrap(),
37         None => {
38             error!("get cache dir failed");
39             PathBuf::from_str("/data/storage/el2/base/cache").unwrap()
40         }
41     };
42     #[cfg(not(feature = "ohos"))]
43     let mut path = PathBuf::from_str("./").unwrap();
44 
45     path.push("preload_caches");
46     if let Err(e) = fs::create_dir_all(path.as_path()) {
47         error!("create cache dir error {}", e);
48     }
49     path
50 });
51 
52 pub(crate) struct FileCache {
53     task_id: TaskId,
54     handle: &'static CacheManager,
55 }
56 
57 impl Drop for FileCache {
drop(&mut self)58     fn drop(&mut self) {
59         fn drop_inner(me: &mut FileCache) -> Result<(), io::Error> {
60             let path = FileCache::path(&me.task_id);
61             let metadata = fs::metadata(&path)?;
62             debug!(
63                 "try drop file cache {} for task {}",
64                 metadata.len(),
65                 me.task_id.brief()
66             );
67             fs::remove_file(path)?;
68             me.handle
69                 .file_handle
70                 .lock()
71                 .unwrap()
72                 .release(metadata.len());
73             Ok(())
74         }
75 
76         if let Err(e) = drop_inner(self) {
77             error!("{} drop file cache error: {}", self.task_id, e);
78         } else {
79             info!("{} file cache drop", self.task_id.brief());
80         }
81     }
82 }
83 
84 impl FileCache {
try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self>85     pub(crate) fn try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self> {
86         let metadata = fs::metadata(Self::path(&task_id)).ok()?;
87         if !CacheManager::apply_cache(
88             &handle.file_handle,
89             &handle.files,
90             FileCache::task_id,
91             metadata.len() as usize,
92         ) {
93             info!("apply file cache for task {} failed", task_id.brief());
94             let path = FileCache::path(&task_id);
95             let _ = fs::remove_file(path);
96             return None;
97         }
98 
99         Some(Self { task_id, handle })
100     }
101 
try_create( task_id: TaskId, handle: &'static CacheManager, cache: Arc<RamCache>, ) -> Option<Self>102     pub(crate) fn try_create(
103         task_id: TaskId,
104         handle: &'static CacheManager,
105         cache: Arc<RamCache>,
106     ) -> Option<Self> {
107         let size = cache.size();
108         debug!(
109             "try apply new file cache {} for task {}",
110             size,
111             task_id.brief()
112         );
113 
114         if !CacheManager::apply_cache(&handle.file_handle, &handle.files, FileCache::task_id, size)
115         {
116             info!("apply file cache for task {} failed", task_id.brief());
117             return None;
118         }
119 
120         if let Err(e) = Self::create_file(&task_id, cache) {
121             error!("create file cache error: {}", e);
122             handle.file_handle.lock().unwrap().release(size as u64);
123             return None;
124         }
125         Some(Self { task_id, handle })
126     }
127 
create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error>128     fn create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error> {
129         let path = Self::path(task_id);
130         let mut file = OpenOptions::new()
131             .write(true)
132             .create(true)
133             .truncate(true)
134             .open(path.as_path())?;
135         io::copy(&mut cache.cursor(), &mut file)?;
136         file.flush()?;
137         file.rewind()?;
138         let file_name = format!("{}{}", task_id, FINISH_SUFFIX);
139         let new_path = CACHE_DIR_PATH.join(file_name);
140         fs::rename(path, new_path)?;
141         Ok(())
142     }
143 
open(&self) -> Result<File, io::Error>144     pub(crate) fn open(&self) -> Result<File, io::Error> {
145         OpenOptions::new()
146             .read(true)
147             .open(Self::path(&self.task_id))
148     }
149 
task_id(&self) -> &TaskId150     pub(crate) fn task_id(&self) -> &TaskId {
151         &self.task_id
152     }
153 
path(task_id: &TaskId) -> PathBuf154     fn path(task_id: &TaskId) -> PathBuf {
155         CACHE_DIR_PATH.join(task_id.to_string() + FINISH_SUFFIX)
156     }
157 }
158 
restore_files() -> impl Iterator<Item = TaskId>159 pub(crate) fn restore_files() -> impl Iterator<Item = TaskId> {
160     restore_files_inner(CACHE_DIR_PATH.as_path())
161 }
162 
restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId>163 pub(crate) fn restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId> {
164     let closure = |(path, _)| path;
165 
166     let files = match fs::read_dir(path) {
167         Ok(files) => files,
168         Err(e) => {
169             error!("read dir error {}", e);
170             return vec![].into_iter().map(closure);
171         }
172     };
173     let mut v = files
174         .into_iter()
175         .filter_map(|entry| match filter_map_entry(entry, path) {
176             Ok((path, time)) => Some((path, time)),
177             Err(e) => {
178                 error!("restore file error {}", e);
179                 None
180             }
181         })
182         .collect::<Vec<_>>();
183     v.sort_by_key(|(_, time)| *time);
184     v.into_iter().map(closure)
185 }
186 
filter_map_entry( entry: Result<DirEntry, io::Error>, path: &Path, ) -> Result<(TaskId, SystemTime), io::Error>187 fn filter_map_entry(
188     entry: Result<DirEntry, io::Error>,
189     path: &Path,
190 ) -> Result<(TaskId, SystemTime), io::Error> {
191     let file_name = entry?.file_name();
192     let file_name = file_name.to_str().ok_or(io::Error::new(
193         io::ErrorKind::InvalidData,
194         format!("invalid file name {:?}", file_name),
195     ))?;
196     if !file_name.ends_with(FINISH_SUFFIX) {
197         let _ = fs::remove_file(path.join(file_name));
198         return Err(io::Error::new(
199             io::ErrorKind::InvalidData,
200             format!("incomplete file {}", file_name),
201         ));
202     }
203     let task_id = TaskId::new(file_name.trim_end_matches(FINISH_SUFFIX).to_string());
204     let path = path.join(file_name);
205     let time = fs::metadata(path)?.modified()?;
206     Ok((task_id, time))
207 }
208 
209 impl CacheManager {
update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>)210     pub(super) fn update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>) {
211         self.update_from_file_once.lock().unwrap().remove(&task_id);
212         spawn(move || {
213             self.backup_rams
214                 .lock()
215                 .unwrap()
216                 .insert(task_id.clone(), cache.clone());
217             self.files.lock().unwrap().remove(&task_id);
218             if let Some(file_cache) = FileCache::try_create(task_id.clone(), self, cache) {
219                 info!("{} file cache updated", task_id.brief());
220                 self.files
221                     .lock()
222                     .unwrap()
223                     .insert(task_id.clone(), file_cache);
224             };
225             self.backup_rams.lock().unwrap().remove(&task_id);
226         });
227     }
228 
update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>229     pub(crate) fn update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
230         let mut retry = false;
231         loop {
232             let ret = self.update_ram_from_file_inner(task_id, &mut retry);
233             if !retry || ret.is_some() {
234                 break ret;
235             } else {
236                 self.update_from_file_once.lock().unwrap().remove(task_id);
237             }
238         }
239     }
240 
update_ram_from_file_inner( &'static self, task_id: &TaskId, retry: &mut bool, ) -> Option<Arc<RamCache>>241     pub(crate) fn update_ram_from_file_inner(
242         &'static self,
243         task_id: &TaskId,
244         retry: &mut bool,
245     ) -> Option<Arc<RamCache>> {
246         *retry = false;
247         let once = match self
248             .update_from_file_once
249             .lock()
250             .unwrap()
251             .entry(task_id.clone())
252         {
253             Entry::Occupied(entry) => entry.into_mut().clone(),
254             Entry::Vacant(entry) => {
255                 let res = self.rams.lock().unwrap().get(task_id).cloned();
256                 let res = res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned());
257                 if res.is_some() {
258                     return res;
259                 } else {
260                     entry.insert(Arc::new(OnceLock::new())).clone()
261                 }
262             }
263         };
264 
265         let mut ret = None;
266         let res = once.get_or_init(|| {
267             debug!("{} ram updated from file", task_id.brief());
268             let mut file = self
269                 .files
270                 .lock()
271                 .unwrap()
272                 .get(task_id)
273                 .ok_or(io::Error::new(io::ErrorKind::NotFound, "not found"))?
274                 .open()?;
275 
276             let size = file.metadata()?.size();
277 
278             let mut cache = RamCache::new(task_id.clone(), self, Some(size as usize));
279             io::copy(&mut file, &mut cache).unwrap();
280 
281             let is_cache = cache.check_size();
282             let cache = Arc::new(cache);
283 
284             if is_cache {
285                 self.update_ram_cache(cache.clone());
286             }
287 
288             ret = Some(cache.clone());
289             let weak_cache = Arc::downgrade(&cache);
290             Ok(weak_cache)
291         });
292 
293         if ret.is_some() {
294             return ret;
295         }
296         let res = match res {
297             Err(e) => {
298                 info!("{} ram update from file failed {}", task_id.brief(), e);
299                 None
300             }
301             Ok(weak) => {
302                 *retry = true;
303                 Weak::upgrade(weak)
304             }
305         };
306         res
307     }
308 }
309 
310 #[cfg(test)]
311 mod ut_file {
312     include!("../../tests/ut/data/ut_file.rs");
313 }
314