1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 #![warn(unused)]
15
16 use std::collections::hash_map::Entry;
17 use std::fs::{self, DirEntry, File, OpenOptions};
18 use std::io::{self, Seek, Write};
19 use std::os::unix::fs::MetadataExt;
20 use std::path::{Path, PathBuf};
21 use std::str::FromStr;
22 use std::sync::{Arc, LazyLock, OnceLock, Weak};
23 use std::time::SystemTime;
24
25 use request_utils::task_id::TaskId;
26
27 use super::ram::RamCache;
28 use crate::manage::CacheManager;
29 use crate::spawn;
30
31 const FINISH_SUFFIX: &str = "_F";
32
33 static CACHE_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
34 #[cfg(feature = "ohos")]
35 let mut path = match request_utils::context::get_cache_dir() {
36 Some(dir) => PathBuf::from_str(&dir).unwrap(),
37 None => PathBuf::from_str("/data/storage/el2/base/cache").unwrap(),
38 };
39 #[cfg(not(feature = "ohos"))]
40 let mut path = PathBuf::from_str("./").unwrap();
41
42 path.push("preload_caches");
43 let _ = fs::create_dir_all(path.as_path());
44 path
45 });
46
47 pub(crate) struct FileCache {
48 task_id: TaskId,
49 handle: &'static CacheManager,
50 }
51
52 impl Drop for FileCache {
drop(&mut self)53 fn drop(&mut self) {
54 fn drop_inner(me: &mut FileCache) -> Result<(), io::Error> {
55 let path = FileCache::path(&me.task_id);
56 let metadata = fs::metadata(&path)?;
57 info!(
58 "try drop file cache {} for task {}",
59 metadata.len(),
60 me.task_id.brief()
61 );
62 fs::remove_file(path)?;
63 me.handle
64 .file_handle
65 .lock()
66 .unwrap()
67 .release(metadata.len());
68 Ok(())
69 }
70
71 if let Err(e) = drop_inner(self) {
72 error!("{} drop file cache error: {}", self.task_id, e);
73 } else {
74 info!("{} file cache drop", self.task_id.brief());
75 }
76 }
77 }
78
79 impl FileCache {
try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self>80 pub(crate) fn try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self> {
81 let metadata = fs::metadata(Self::path(&task_id)).ok()?;
82 if !CacheManager::apply_cache(
83 &handle.file_handle,
84 &handle.files,
85 FileCache::task_id,
86 metadata.len() as usize,
87 ) {
88 info!("apply file cache for task {} failed", task_id.brief());
89 let path = FileCache::path(&task_id);
90 let _ = fs::remove_file(path);
91 return None;
92 }
93
94 Some(Self { task_id, handle })
95 }
96
try_create( task_id: TaskId, handle: &'static CacheManager, cache: Arc<RamCache>, ) -> Option<Self>97 pub(crate) fn try_create(
98 task_id: TaskId,
99 handle: &'static CacheManager,
100 cache: Arc<RamCache>,
101 ) -> Option<Self> {
102 let size = cache.size();
103 info!(
104 "try apply new file cache {} for task {}",
105 size,
106 task_id.brief()
107 );
108
109 if !CacheManager::apply_cache(&handle.file_handle, &handle.files, FileCache::task_id, size)
110 {
111 info!("apply file cache for task {} failed", task_id.brief());
112 return None;
113 }
114
115 if let Err(e) = Self::create_file(&task_id, cache) {
116 error!("create file cache error: {}", e);
117 handle.file_handle.lock().unwrap().release(size as u64);
118 return None;
119 }
120 info!("apply file cache for task {} success", task_id.brief());
121 Some(Self { task_id, handle })
122 }
123
create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error>124 fn create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error> {
125 let path = Self::path(task_id);
126 let mut file = OpenOptions::new()
127 .write(true)
128 .create(true)
129 .truncate(true)
130 .open(path.as_path())?;
131 io::copy(&mut cache.cursor(), &mut file)?;
132 file.flush()?;
133 file.rewind()?;
134 let file_name = format!("{}{}", task_id, FINISH_SUFFIX);
135 let new_path = CACHE_DIR_PATH.join(file_name);
136 fs::rename(path, new_path)?;
137 Ok(())
138 }
139
open(&self) -> Result<File, io::Error>140 pub(crate) fn open(&self) -> Result<File, io::Error> {
141 OpenOptions::new()
142 .read(true)
143 .open(Self::path(&self.task_id))
144 }
145
task_id(&self) -> &TaskId146 pub(crate) fn task_id(&self) -> &TaskId {
147 &self.task_id
148 }
149
path(task_id: &TaskId) -> PathBuf150 fn path(task_id: &TaskId) -> PathBuf {
151 CACHE_DIR_PATH.join(task_id.to_string() + FINISH_SUFFIX)
152 }
153 }
154
restore_files() -> impl Iterator<Item = TaskId>155 pub(crate) fn restore_files() -> impl Iterator<Item = TaskId> {
156 restore_files_inner(CACHE_DIR_PATH.as_path())
157 }
158
restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId>159 pub(crate) fn restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId> {
160 let files = fs::read_dir(path).unwrap();
161 let mut v = files
162 .into_iter()
163 .filter_map(|entry| match filter_map_entry(entry, path) {
164 Ok((path, time)) => Some((path, time)),
165 Err(e) => {
166 error!("restore file error {}", e);
167 None
168 }
169 })
170 .collect::<Vec<_>>();
171 v.sort_by_key(|(_, time)| *time);
172 v.into_iter().map(|(path, _)| path)
173 }
174
filter_map_entry( entry: Result<DirEntry, io::Error>, path: &Path, ) -> Result<(TaskId, SystemTime), io::Error>175 fn filter_map_entry(
176 entry: Result<DirEntry, io::Error>,
177 path: &Path,
178 ) -> Result<(TaskId, SystemTime), io::Error> {
179 let file_name = entry?.file_name();
180 let file_name = file_name.to_str().ok_or(io::Error::new(
181 io::ErrorKind::InvalidData,
182 format!("invalid file name {:?}", file_name),
183 ))?;
184 if !file_name.ends_with(FINISH_SUFFIX) {
185 let _ = fs::remove_file(path.join(file_name));
186 return Err(io::Error::new(
187 io::ErrorKind::InvalidData,
188 format!("incomplete file {}", file_name),
189 ));
190 }
191 let task_id = TaskId::new(file_name.trim_end_matches(FINISH_SUFFIX).to_string());
192 let path = path.join(file_name);
193 let time = fs::metadata(path)?.modified()?;
194 Ok((task_id, time))
195 }
196
197 impl CacheManager {
update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>)198 pub(super) fn update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>) {
199 self.update_from_file_once.lock().unwrap().remove(&task_id);
200 spawn(move || {
201 self.backup_rams
202 .lock()
203 .unwrap()
204 .insert(task_id.clone(), cache.clone());
205 self.files.lock().unwrap().remove(&task_id);
206 if let Some(file_cache) = FileCache::try_create(task_id.clone(), self, cache) {
207 info!("{} file cache updated", task_id.brief());
208 self.files
209 .lock()
210 .unwrap()
211 .insert(task_id.clone(), file_cache);
212 };
213 self.backup_rams.lock().unwrap().remove(&task_id);
214 });
215 }
216
update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>217 pub(crate) fn update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
218 let mut retry = false;
219 loop {
220 let ret = self.update_ram_from_file_inner(task_id, &mut retry);
221 if !retry || ret.is_some() {
222 break ret;
223 } else {
224 self.update_from_file_once.lock().unwrap().remove(task_id);
225 }
226 }
227 }
228
update_ram_from_file_inner( &'static self, task_id: &TaskId, retry: &mut bool, ) -> Option<Arc<RamCache>>229 pub(crate) fn update_ram_from_file_inner(
230 &'static self,
231 task_id: &TaskId,
232 retry: &mut bool,
233 ) -> Option<Arc<RamCache>> {
234 *retry = false;
235 let once = match self
236 .update_from_file_once
237 .lock()
238 .unwrap()
239 .entry(task_id.clone())
240 {
241 Entry::Occupied(entry) => entry.into_mut().clone(),
242 Entry::Vacant(entry) => {
243 let res = self.rams.lock().unwrap().get(task_id).cloned();
244 let res = res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned());
245 if res.is_some() {
246 return res;
247 } else {
248 entry.insert(Arc::new(OnceLock::new())).clone()
249 }
250 }
251 };
252
253 let mut ret = None;
254 let res = once.get_or_init(|| {
255 info!("{} ram updated from file", task_id.brief());
256 let mut file = self
257 .files
258 .lock()
259 .unwrap()
260 .get(task_id)
261 .ok_or(io::Error::new(io::ErrorKind::NotFound, "not found"))?
262 .open()?;
263
264 let size = file.metadata()?.size();
265
266 let mut cache = RamCache::new(task_id.clone(), self, Some(size as usize));
267 io::copy(&mut file, &mut cache).unwrap();
268
269 let is_cache = cache.check_size();
270 let cache = Arc::new(cache);
271
272 if is_cache {
273 self.update_ram_cache(cache.clone());
274 }
275
276 ret = Some(cache.clone());
277 let weak_cache = Arc::downgrade(&cache);
278 Ok(weak_cache)
279 });
280
281 if ret.is_some() {
282 return ret;
283 }
284 let res = match res {
285 Err(e) => {
286 error!("{} ram update from file failed {}", task_id.brief(), e);
287 None
288 }
289 Ok(weak) => {
290 *retry = true;
291 Weak::upgrade(weak)
292 }
293 };
294 res
295 }
296 }
297
298 #[cfg(test)]
299 mod test {
300 use std::io::{Read, Write};
301 use std::sync::{Arc, LazyLock};
302 use std::time::Duration;
303 use std::{fs, io};
304
305 use request_utils::fastrand::fast_random;
306 use request_utils::task_id::TaskId;
307 use request_utils::test::log::init;
308
309 use super::{CACHE_DIR_PATH, *};
310 const TEST_STRING: &str = "你这猴子真让我欢喜";
311 const TEST_STRING_SIZE: usize = TEST_STRING.len();
312 const TEST_SIZE: u64 = 128;
313
314 #[test]
ut_cache_file_create()315 fn ut_cache_file_create() {
316 init();
317 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
318 CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
319
320 // cache not update
321 for _ in 0..1000 {
322 let task_id = TaskId::new(fast_random().to_string());
323 let mut ram_cache =
324 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
325 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
326 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
327 }
328
329 // cache update
330 for _ in 0..1000 {
331 let task_id = TaskId::new(fast_random().to_string());
332 let mut ram_cache =
333 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
334 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
335 let file_cache =
336 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
337 .unwrap();
338 CACHE_MANAGER
339 .files
340 .lock()
341 .unwrap()
342 .insert(task_id, file_cache);
343 }
344 }
345
346 #[test]
ut_cache_file_try_new_fail()347 fn ut_cache_file_try_new_fail() {
348 init();
349 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
350 CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
351
352 let mut total = TEST_STRING_SIZE as u64;
353 let mut v = vec![];
354 while total < TEST_SIZE {
355 let task_id = TaskId::new(fast_random().to_string());
356 let mut ram_cache =
357 RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
358 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
359 v.push(
360 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache))
361 .unwrap(),
362 );
363 total += TEST_STRING_SIZE as u64;
364 }
365 let task_id = TaskId::new(fast_random().to_string());
366 let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
367 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
368 assert!(
369 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).is_none()
370 );
371 v.pop();
372 let task_id = TaskId::new(fast_random().to_string());
373 let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
374 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
375 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
376 }
377
378 #[test]
ut_cache_file_drop()379 fn ut_cache_file_drop() {
380 init();
381 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
382 CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
383
384 let task_id = TaskId::new(fast_random().to_string());
385 let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
386 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
387 let file_cache =
388 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
389 assert_eq!(
390 CACHE_MANAGER.file_handle.lock().unwrap().used_ram,
391 TEST_STRING_SIZE as u64
392 );
393 drop(file_cache);
394 assert_eq!(CACHE_MANAGER.file_handle.lock().unwrap().used_ram, 0);
395 }
396
397 #[test]
ut_cache_file_content()398 fn ut_cache_file_content() {
399 init();
400 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
401 CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
402
403 let task_id = TaskId::new(fast_random().to_string());
404 let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
405 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
406 let file_cache =
407 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
408 let mut file = file_cache.open().unwrap();
409 let mut buf = String::new();
410 file.read_to_string(&mut buf).unwrap();
411 assert_eq!(buf, TEST_STRING);
412 }
413
414 #[test]
ut_cache_file_restore_files()415 fn ut_cache_file_restore_files() {
416 init();
417 const TEST_DIR: &str = "restore_test";
418
419 // The first to create are the first to come out
420 let path = CACHE_DIR_PATH.join(TEST_DIR);
421 fs::create_dir_all(&path).unwrap();
422 for i in 0..10 {
423 // not finished will not come out and will be deleted
424 let path = if i % 2 == 0 {
425 path.join(format!("{}{}", i, FINISH_SUFFIX))
426 } else {
427 path.join(format!("{}", i))
428 };
429 fs::OpenOptions::new()
430 .write(true)
431 .read(true)
432 .create(true)
433 .open(path)
434 .unwrap();
435 std::thread::sleep(Duration::from_millis(10));
436 }
437 for (i, file) in restore_files_inner(path.as_path()).enumerate() {
438 assert_eq!(file.to_string(), (i * 2).to_string());
439 }
440 for i in 0..5 {
441 let path = path.join(format!("{}", i));
442 assert!(fs::metadata(path).is_err_and(|e| e.kind() == io::ErrorKind::NotFound));
443 }
444 fs::remove_dir_all(&path).unwrap();
445 }
446
447 #[test]
ut_cache_file_update_ram_from_file()448 fn ut_cache_file_update_ram_from_file() {
449 init();
450 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
451 CACHE_MANAGER.set_file_cache_size(TEST_SIZE);
452
453 let task_id = TaskId::new(fast_random().to_string());
454 let mut ram_cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
455 ram_cache.write_all(TEST_STRING.as_bytes()).unwrap();
456 let file_cache =
457 FileCache::try_create(task_id.clone(), &CACHE_MANAGER, Arc::new(ram_cache)).unwrap();
458 CACHE_MANAGER
459 .files
460 .lock()
461 .unwrap()
462 .insert(task_id.clone(), file_cache);
463
464 let mut v = vec![];
465 for _ in 0..1000 {
466 let task_id = task_id.clone();
467 v.push(std::thread::spawn(move || {
468 let Some(_) = CACHE_MANAGER.update_ram_from_file(&task_id) else {
469 return false;
470 };
471 true
472 }))
473 }
474 for j in v {
475 assert!(j.join().unwrap());
476 }
477 }
478 }
479