• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #![warn(unused)]
15 
16 use std::collections::hash_map::Entry;
17 use std::fs::{self, DirEntry, File, OpenOptions};
18 use std::io::{self, Seek, Write};
19 use std::os::unix::fs::MetadataExt;
20 use std::path::{Path, PathBuf};
21 use std::str::FromStr;
22 use std::sync::{Arc, LazyLock, OnceLock, Weak};
23 use std::time::SystemTime;
24 
25 use request_utils::task_id::TaskId;
26 
27 use super::ram::RamCache;
28 use crate::manage::CacheManager;
29 use crate::spawn;
30 
31 const FINISH_SUFFIX: &str = "_F";
32 
33 static CACHE_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
34     #[cfg(feature = "ohos")]
35     let mut path = match request_utils::context::get_cache_dir() {
36         Some(dir) => PathBuf::from_str(&dir).unwrap(),
37         None => PathBuf::from_str("/data/storage/el2/base/cache").unwrap(),
38     };
39     #[cfg(not(feature = "ohos"))]
40     let mut path = PathBuf::from_str("./").unwrap();
41 
42     path.push("preload_caches");
43     let _ = fs::create_dir_all(path.as_path());
44     path
45 });
46 
47 pub(crate) struct FileCache {
48     task_id: TaskId,
49     handle: &'static CacheManager,
50 }
51 
52 impl Drop for FileCache {
drop(&mut self)53     fn drop(&mut self) {
54         fn drop_inner(me: &mut FileCache) -> Result<(), io::Error> {
55             let path = FileCache::path(&me.task_id);
56             let metadata = fs::metadata(&path)?;
57             info!(
58                 "try drop file cache {} for task {}",
59                 metadata.len(),
60                 me.task_id.brief()
61             );
62             fs::remove_file(path)?;
63             me.handle
64                 .file_handle
65                 .lock()
66                 .unwrap()
67                 .release(metadata.len());
68             Ok(())
69         }
70 
71         if let Err(e) = drop_inner(self) {
72             error!("{} drop file cache error: {}", self.task_id, e);
73         } else {
74             info!("{} file cache drop", self.task_id.brief());
75         }
76     }
77 }
78 
79 impl FileCache {
try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self>80     pub(crate) fn try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self> {
81         let metadata = fs::metadata(Self::path(&task_id)).ok()?;
82         if !CacheManager::apply_cache(
83             &handle.file_handle,
84             &handle.files,
85             FileCache::task_id,
86             metadata.len() as usize,
87         ) {
88             info!("apply file cache for task {} failed", task_id.brief());
89             let path = FileCache::path(&task_id);
90             let _ = fs::remove_file(path);
91             return None;
92         }
93 
94         Some(Self { task_id, handle })
95     }
96 
try_create( task_id: TaskId, handle: &'static CacheManager, cache: Arc<RamCache>, ) -> Option<Self>97     pub(crate) fn try_create(
98         task_id: TaskId,
99         handle: &'static CacheManager,
100         cache: Arc<RamCache>,
101     ) -> Option<Self> {
102         let size = cache.size();
103         info!(
104             "try apply new file cache {} for task {}",
105             size,
106             task_id.brief()
107         );
108 
109         if !CacheManager::apply_cache(&handle.file_handle, &handle.files, FileCache::task_id, size)
110         {
111             info!("apply file cache for task {} failed", task_id.brief());
112             return None;
113         }
114 
115         if let Err(e) = Self::create_file(&task_id, cache) {
116             error!("create file cache error: {}", e);
117             handle.file_handle.lock().unwrap().release(size as u64);
118             return None;
119         }
120         info!("apply file cache for task {} success", task_id.brief());
121         Some(Self { task_id, handle })
122     }
123 
create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error>124     fn create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error> {
125         let path = Self::path(task_id);
126         let mut file = OpenOptions::new()
127             .write(true)
128             .create(true)
129             .truncate(true)
130             .open(path.as_path())?;
131         io::copy(&mut cache.cursor(), &mut file)?;
132         file.flush()?;
133         file.rewind()?;
134         let file_name = format!("{}{}", task_id, FINISH_SUFFIX);
135         let new_path = CACHE_DIR_PATH.join(file_name);
136         fs::rename(path, new_path)?;
137         Ok(())
138     }
139 
open(&self) -> Result<File, io::Error>140     pub(crate) fn open(&self) -> Result<File, io::Error> {
141         OpenOptions::new()
142             .read(true)
143             .open(Self::path(&self.task_id))
144     }
145 
task_id(&self) -> &TaskId146     pub(crate) fn task_id(&self) -> &TaskId {
147         &self.task_id
148     }
149 
path(task_id: &TaskId) -> PathBuf150     fn path(task_id: &TaskId) -> PathBuf {
151         CACHE_DIR_PATH.join(task_id.to_string() + FINISH_SUFFIX)
152     }
153 }
154 
restore_files() -> impl Iterator<Item = TaskId>155 pub(crate) fn restore_files() -> impl Iterator<Item = TaskId> {
156     restore_files_inner(CACHE_DIR_PATH.as_path())
157 }
158 
restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId>159 pub(crate) fn restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId> {
160     let files = fs::read_dir(path).unwrap();
161     let mut v = files
162         .into_iter()
163         .filter_map(|entry| match filter_map_entry(entry, path) {
164             Ok((path, time)) => Some((path, time)),
165             Err(e) => {
166                 error!("restore file error {}", e);
167                 None
168             }
169         })
170         .collect::<Vec<_>>();
171     v.sort_by_key(|(_, time)| *time);
172     v.into_iter().map(|(path, _)| path)
173 }
174 
filter_map_entry( entry: Result<DirEntry, io::Error>, path: &Path, ) -> Result<(TaskId, SystemTime), io::Error>175 fn filter_map_entry(
176     entry: Result<DirEntry, io::Error>,
177     path: &Path,
178 ) -> Result<(TaskId, SystemTime), io::Error> {
179     let file_name = entry?.file_name();
180     let file_name = file_name.to_str().ok_or(io::Error::new(
181         io::ErrorKind::InvalidData,
182         format!("invalid file name {:?}", file_name),
183     ))?;
184     if !file_name.ends_with(FINISH_SUFFIX) {
185         let _ = fs::remove_file(path.join(file_name));
186         return Err(io::Error::new(
187             io::ErrorKind::InvalidData,
188             format!("incomplete file {}", file_name),
189         ));
190     }
191     let task_id = TaskId::new(file_name.trim_end_matches(FINISH_SUFFIX).to_string());
192     let path = path.join(file_name);
193     let time = fs::metadata(path)?.modified()?;
194     Ok((task_id, time))
195 }
196 
197 impl CacheManager {
update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>)198     pub(super) fn update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>) {
199         self.update_from_file_once.lock().unwrap().remove(&task_id);
200         spawn(move || {
201             self.backup_rams
202                 .lock()
203                 .unwrap()
204                 .insert(task_id.clone(), cache.clone());
205             self.files.lock().unwrap().remove(&task_id);
206             if let Some(file_cache) = FileCache::try_create(task_id.clone(), self, cache) {
207                 info!("{} file cache updated", task_id.brief());
208                 self.files
209                     .lock()
210                     .unwrap()
211                     .insert(task_id.clone(), file_cache);
212             };
213             self.backup_rams.lock().unwrap().remove(&task_id);
214         });
215     }
216 
update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>217     pub(crate) fn update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
218         let mut retry = false;
219         loop {
220             let ret = self.update_ram_from_file_inner(task_id, &mut retry);
221             if !retry || ret.is_some() {
222                 break ret;
223             } else {
224                 self.update_from_file_once.lock().unwrap().remove(task_id);
225             }
226         }
227     }
228 
update_ram_from_file_inner( &'static self, task_id: &TaskId, retry: &mut bool, ) -> Option<Arc<RamCache>>229     pub(crate) fn update_ram_from_file_inner(
230         &'static self,
231         task_id: &TaskId,
232         retry: &mut bool,
233     ) -> Option<Arc<RamCache>> {
234         *retry = false;
235         let once = match self
236             .update_from_file_once
237             .lock()
238             .unwrap()
239             .entry(task_id.clone())
240         {
241             Entry::Occupied(entry) => entry.into_mut().clone(),
242             Entry::Vacant(entry) => {
243                 let res = self.rams.lock().unwrap().get(task_id).cloned();
244                 let res = res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned());
245                 if res.is_some() {
246                     return res;
247                 } else {
248                     entry.insert(Arc::new(OnceLock::new())).clone()
249                 }
250             }
251         };
252 
253         let mut ret = None;
254         let res = once.get_or_init(|| {
255             info!("{} ram updated from file", task_id.brief());
256             let mut file = self
257                 .files
258                 .lock()
259                 .unwrap()
260                 .get(task_id)
261                 .ok_or(io::Error::new(io::ErrorKind::NotFound, "not found"))?
262                 .open()?;
263 
264             let size = file.metadata()?.size();
265 
266             let mut cache = RamCache::new(task_id.clone(), self, Some(size as usize));
267             io::copy(&mut file, &mut cache).unwrap();
268 
269             let is_cache = cache.check_size();
270             let cache = Arc::new(cache);
271 
272             if is_cache {
273                 self.update_ram_cache(cache.clone());
274             }
275 
276             ret = Some(cache.clone());
277             let weak_cache = Arc::downgrade(&cache);
278             Ok(weak_cache)
279         });
280 
281         if ret.is_some() {
282             return ret;
283         }
284         let res = match res {
285             Err(e) => {
286                 error!("{} ram update from file failed {}", task_id.brief(), e);
287                 None
288             }
289             Ok(weak) => {
290                 *retry = true;
291                 Weak::upgrade(weak)
292             }
293         };
294         res
295     }
296 }
297 
298 #[cfg(test)]
299 mod test {
300     use std::io::{Read, Write};
301     use std::sync::{Arc, LazyLock};
302     use std::time::Duration;
303     use std::{fs, io};
304 
305     use request_utils::fastrand::fast_random;
306     use request_utils::task_id::TaskId;
307     use request_utils::test::log::init;
308 
309     use super::{CACHE_DIR_PATH, *};
310     const TEST_STRING: &str = "你这猴子真让我欢喜";
311     const TEST_STRING_SIZE: usize = TEST_STRING.len();
312     const TEST_SIZE: u64 = 128;
313 
314     #[test]
ut_cache_file_create()315     fn ut_cache_file_create() {
316         init();
317         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
318         CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
319 
320         // cache not update
321         for _ in 0..1000 {
322             let task_id = TaskId::new(fast_random().to_string());
323             let mut ram_cache =
324                 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
325             ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
326             FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
327         }
328 
329         // cache update
330         for _ in 0..1000 {
331             let task_id = TaskId::new(fast_random().to_string());
332             let mut ram_cache =
333                 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
334             ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
335             let file_cache =
336                 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
337                     .unwrap();
338             CACHE_MANAGER
339                 .files
340                 .lock()
341                 .unwrap()
342                 .insert(task_id, file_cache);
343         }
344     }
345 
346     #[test]
ut_cache_file_try_new_fail()347     fn ut_cache_file_try_new_fail() {
348         init();
349         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
350         CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
351 
352         let mut total = TEST_STRING_SIZE as u64;
353         let mut v = vec![];
354         while total < TEST_SIZE {
355             let task_id = TaskId::new(fast_random().to_string());
356             let mut ram_cache =
357                 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
358             ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
359             v.push(
360                 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
361                     .unwrap(),
362             );
363             total += TEST_STRING_SIZE as u64;
364         }
365         let task_id = TaskId::new(fast_random().to_string());
366         let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
367         ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
368         assert!(
369             FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).is_none()
370         );
371         v.pop();
372         let task_id = TaskId::new(fast_random().to_string());
373         let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
374         ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
375         FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
376     }
377 
378     #[test]
ut_cache_file_drop()379     fn ut_cache_file_drop() {
380         init();
381         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
382         CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
383 
384         let task_id = TaskId::new(fast_random().to_string());
385         let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
386         ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
387         let file_cache =
388             FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
389         assert_eq!(
390             CACHE_MANAGER.file_handle.lock().unwrap().used_ram,
391             TEST_STRING_SIZE as u64
392         );
393         drop(file_cache);
394         assert_eq!(CACHE_MANAGER.file_handle.lock().unwrap().used_ram, 0);
395     }
396 
397     #[test]
ut_cache_file_content()398     fn ut_cache_file_content() {
399         init();
400         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
401         CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
402 
403         let task_id = TaskId::new(fast_random().to_string());
404         let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
405         ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
406         let file_cache =
407             FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
408         let mut file = file_cache.open().unwrap();
409         let mut buf = String::new();
410         file.read_to_string(&mut buf).unwrap();
411         assert_eq!(buf, TEST_STRING);
412     }
413 
414     #[test]
ut_cache_file_restore_files()415     fn ut_cache_file_restore_files() {
416         init();
417         const TEST_DIR: &str = "restore_test";
418 
419         // The first to create are the first to come out
420         let path = CACHE_DIR_PATH.join(TEST_DIR);
421         fs::create_dir_all(&path).unwrap();
422         for i in 0..10 {
423             // not finished will not come out and will be deleted
424             let path = if i % 2 == 0 {
425                 path.join(format!("{}{}", i, FINISH_SUFFIX))
426             } else {
427                 path.join(format!("{}", i))
428             };
429             fs::OpenOptions::new()
430                 .write(true)
431                 .read(true)
432                 .create(true)
433                 .open(path)
434                 .unwrap();
435             std::thread::sleep(Duration::from_millis(10));
436         }
437         for (i, file) in restore_files_inner(path.as_path()).enumerate() {
438             assert_eq!(file.to_string(), (i * 2).to_string());
439         }
440         for i in 0..5 {
441             let path = path.join(format!("{}", i));
442             assert!(fs::metadata(path).is_err_and(|e| e.kind() == io::ErrorKind::NotFound));
443         }
444         fs::remove_dir_all(&path).unwrap();
445     }
446 
447     #[test]
ut_cache_file_update_ram_from_file()448     fn ut_cache_file_update_ram_from_file() {
449         init();
450         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
451         CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
452 
453         let task_id = TaskId::new(fast_random().to_string());
454         let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
455         ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
456         let file_cache =
457             FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
458         CACHE_MANAGER
459             .files
460             .lock()
461             .unwrap()
462             .insert(task_id.clone(), file_cache);
463 
464         let mut v = vec![];
465         for _ in 0..1000 {
466             let task_id = task_id.clone();
467             v.push(std::thread::spawn(move || {
468                 let Some(_) = CACHE_MANAGER.update_ram_from_file(&task_id) else {
469                     return false;
470                 };
471                 true
472             }))
473         }
474         for j in v {
475             assert!(j.join().unwrap());
476         }
477     }
478 }
479