1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 #![warn(unused)]
15
16 use std::collections::hash_map::Entry;
17 use std::fs::{self, DirEntry, File, OpenOptions};
18 use std::io::{self, Seek, Write};
19 use std::os::unix::fs::MetadataExt;
20 use std::path::{Path, PathBuf};
21 use std::str::FromStr;
22 use std::sync::{Arc, LazyLock, OnceLock, Weak};
23 use std::time::SystemTime;
24
25 use request_utils::task_id::TaskId;
26
27 use super::ram::RamCache;
28 use crate::manage::CacheManager;
29 use crate::spawn;
30
31 const FINISH_SUFFIX: &str = "_F";
32
33 static CACHE_DIR_PATH: LazyLock<PathBuf> = LazyLock::new(|| {
34 #[cfg(feature = "ohos")]
35 let mut path = match request_utils::context::get_cache_dir() {
36 Some(dir) => PathBuf::from_str(&dir).unwrap(),
37 None => {
38 error!("get cache dir failed");
39 PathBuf::from_str("/data/storage/el2/base/cache").unwrap()
40 }
41 };
42 #[cfg(not(feature = "ohos"))]
43 let mut path = PathBuf::from_str("./").unwrap();
44
45 path.push("preload_caches");
46 if let Err(e) = fs::create_dir_all(path.as_path()) {
47 error!("create cache dir error {}", e);
48 }
49 path
50 });
51
52 pub(crate) struct FileCache {
53 task_id: TaskId,
54 handle: &'static CacheManager,
55 }
56
57 impl Drop for FileCache {
drop(&mut self)58 fn drop(&mut self) {
59 fn drop_inner(me: &mut FileCache) -> Result<(), io::Error> {
60 let path = FileCache::path(&me.task_id);
61 let metadata = fs::metadata(&path)?;
62 debug!(
63 "try drop file cache {} for task {}",
64 metadata.len(),
65 me.task_id.brief()
66 );
67 fs::remove_file(path)?;
68 me.handle
69 .file_handle
70 .lock()
71 .unwrap()
72 .release(metadata.len());
73 Ok(())
74 }
75
76 if let Err(e) = drop_inner(self) {
77 error!("{} drop file cache error: {}", self.task_id, e);
78 } else {
79 info!("{} file cache drop", self.task_id.brief());
80 }
81 }
82 }
83
84 impl FileCache {
try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self>85 pub(crate) fn try_restore(task_id: TaskId, handle: &'static CacheManager) -> Option<Self> {
86 let metadata = fs::metadata(Self::path(&task_id)).ok()?;
87 if !CacheManager::apply_cache(
88 &handle.file_handle,
89 &handle.files,
90 FileCache::task_id,
91 metadata.len() as usize,
92 ) {
93 info!("apply file cache for task {} failed", task_id.brief());
94 let path = FileCache::path(&task_id);
95 let _ = fs::remove_file(path);
96 return None;
97 }
98
99 Some(Self { task_id, handle })
100 }
101
try_create( task_id: TaskId, handle: &'static CacheManager, cache: Arc<RamCache>, ) -> Option<Self>102 pub(crate) fn try_create(
103 task_id: TaskId,
104 handle: &'static CacheManager,
105 cache: Arc<RamCache>,
106 ) -> Option<Self> {
107 let size = cache.size();
108 debug!(
109 "try apply new file cache {} for task {}",
110 size,
111 task_id.brief()
112 );
113
114 if !CacheManager::apply_cache(&handle.file_handle, &handle.files, FileCache::task_id, size)
115 {
116 info!("apply file cache for task {} failed", task_id.brief());
117 return None;
118 }
119
120 if let Err(e) = Self::create_file(&task_id, cache) {
121 error!("create file cache error: {}", e);
122 handle.file_handle.lock().unwrap().release(size as u64);
123 return None;
124 }
125 Some(Self { task_id, handle })
126 }
127
create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error>128 fn create_file(task_id: &TaskId, cache: Arc<RamCache>) -> Result<(), io::Error> {
129 let path = Self::path(task_id);
130 let mut file = OpenOptions::new()
131 .write(true)
132 .create(true)
133 .truncate(true)
134 .open(path.as_path())?;
135 io::copy(&mut cache.cursor(), &mut file)?;
136 file.flush()?;
137 file.rewind()?;
138 let file_name = format!("{}{}", task_id, FINISH_SUFFIX);
139 let new_path = CACHE_DIR_PATH.join(file_name);
140 fs::rename(path, new_path)?;
141 Ok(())
142 }
143
open(&self) -> Result<File, io::Error>144 pub(crate) fn open(&self) -> Result<File, io::Error> {
145 OpenOptions::new()
146 .read(true)
147 .open(Self::path(&self.task_id))
148 }
149
task_id(&self) -> &TaskId150 pub(crate) fn task_id(&self) -> &TaskId {
151 &self.task_id
152 }
153
path(task_id: &TaskId) -> PathBuf154 fn path(task_id: &TaskId) -> PathBuf {
155 CACHE_DIR_PATH.join(task_id.to_string() + FINISH_SUFFIX)
156 }
157 }
158
restore_files() -> impl Iterator<Item = TaskId>159 pub(crate) fn restore_files() -> impl Iterator<Item = TaskId> {
160 restore_files_inner(CACHE_DIR_PATH.as_path())
161 }
162
restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId>163 pub(crate) fn restore_files_inner(path: &Path) -> impl Iterator<Item = TaskId> {
164 let closure = |(path, _)| path;
165
166 let files = match fs::read_dir(path) {
167 Ok(files) => files,
168 Err(e) => {
169 error!("read dir error {}", e);
170 return vec![].into_iter().map(closure);
171 }
172 };
173 let mut v = files
174 .into_iter()
175 .filter_map(|entry| match filter_map_entry(entry, path) {
176 Ok((path, time)) => Some((path, time)),
177 Err(e) => {
178 error!("restore file error {}", e);
179 None
180 }
181 })
182 .collect::<Vec<_>>();
183 v.sort_by_key(|(_, time)| *time);
184 v.into_iter().map(closure)
185 }
186
filter_map_entry( entry: Result<DirEntry, io::Error>, path: &Path, ) -> Result<(TaskId, SystemTime), io::Error>187 fn filter_map_entry(
188 entry: Result<DirEntry, io::Error>,
189 path: &Path,
190 ) -> Result<(TaskId, SystemTime), io::Error> {
191 let file_name = entry?.file_name();
192 let file_name = file_name.to_str().ok_or(io::Error::new(
193 io::ErrorKind::InvalidData,
194 format!("invalid file name {:?}", file_name),
195 ))?;
196 if !file_name.ends_with(FINISH_SUFFIX) {
197 let _ = fs::remove_file(path.join(file_name));
198 return Err(io::Error::new(
199 io::ErrorKind::InvalidData,
200 format!("incomplete file {}", file_name),
201 ));
202 }
203 let task_id = TaskId::new(file_name.trim_end_matches(FINISH_SUFFIX).to_string());
204 let path = path.join(file_name);
205 let time = fs::metadata(path)?.modified()?;
206 Ok((task_id, time))
207 }
208
209 impl CacheManager {
update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>)210 pub(super) fn update_file_cache(&'static self, task_id: TaskId, cache: Arc<RamCache>) {
211 self.update_from_file_once.lock().unwrap().remove(&task_id);
212 spawn(move || {
213 self.backup_rams
214 .lock()
215 .unwrap()
216 .insert(task_id.clone(), cache.clone());
217 self.files.lock().unwrap().remove(&task_id);
218 if let Some(file_cache) = FileCache::try_create(task_id.clone(), self, cache) {
219 info!("{} file cache updated", task_id.brief());
220 self.files
221 .lock()
222 .unwrap()
223 .insert(task_id.clone(), file_cache);
224 };
225 self.backup_rams.lock().unwrap().remove(&task_id);
226 });
227 }
228
update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>229 pub(crate) fn update_ram_from_file(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
230 let mut retry = false;
231 loop {
232 let ret = self.update_ram_from_file_inner(task_id, &mut retry);
233 if !retry || ret.is_some() {
234 break ret;
235 } else {
236 self.update_from_file_once.lock().unwrap().remove(task_id);
237 }
238 }
239 }
240
update_ram_from_file_inner( &'static self, task_id: &TaskId, retry: &mut bool, ) -> Option<Arc<RamCache>>241 pub(crate) fn update_ram_from_file_inner(
242 &'static self,
243 task_id: &TaskId,
244 retry: &mut bool,
245 ) -> Option<Arc<RamCache>> {
246 *retry = false;
247 let once = match self
248 .update_from_file_once
249 .lock()
250 .unwrap()
251 .entry(task_id.clone())
252 {
253 Entry::Occupied(entry) => entry.into_mut().clone(),
254 Entry::Vacant(entry) => {
255 let res = self.rams.lock().unwrap().get(task_id).cloned();
256 let res = res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned());
257 if res.is_some() {
258 return res;
259 } else {
260 entry.insert(Arc::new(OnceLock::new())).clone()
261 }
262 }
263 };
264
265 let mut ret = None;
266 let res = once.get_or_init(|| {
267 debug!("{} ram updated from file", task_id.brief());
268 let mut file = self
269 .files
270 .lock()
271 .unwrap()
272 .get(task_id)
273 .ok_or(io::Error::new(io::ErrorKind::NotFound, "not found"))?
274 .open()?;
275
276 let size = file.metadata()?.size();
277
278 let mut cache = RamCache::new(task_id.clone(), self, Some(size as usize));
279 io::copy(&mut file, &mut cache).unwrap();
280
281 let is_cache = cache.check_size();
282 let cache = Arc::new(cache);
283
284 if is_cache {
285 self.update_ram_cache(cache.clone());
286 }
287
288 ret = Some(cache.clone());
289 let weak_cache = Arc::downgrade(&cache);
290 Ok(weak_cache)
291 });
292
293 if ret.is_some() {
294 return ret;
295 }
296 let res = match res {
297 Err(e) => {
298 info!("{} ram update from file failed {}", task_id.brief(), e);
299 None
300 }
301 Ok(weak) => {
302 *retry = true;
303 Weak::upgrade(weak)
304 }
305 };
306 res
307 }
308 }
309
310 #[cfg(test)]
311 mod ut_file {
312 include!("../../tests/ut/data/ut_file.rs");
313 }
314