• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::io;
16 use std::sync::{Arc, Mutex, OnceLock, Weak};
17 
18 use request_utils::lru::LRUCache;
19 use request_utils::task_id::TaskId;
20 
21 use super::data::{self, restore_files, FileCache, RamCache};
22 use crate::data::MAX_CACHE_SIZE;
23 
24 const DEFAULT_RAM_CACHE_SIZE: u64 = 1024 * 1024 * 20;
25 const DEFAULT_FILE_CACHE_SIZE: u64 = 1024 * 1024 * 100;
26 
27 pub struct CacheManager {
28     pub(crate) rams: Mutex<LRUCache<TaskId, Arc<RamCache>>>,
29     pub(crate) backup_rams: Mutex<HashMap<TaskId, Arc<RamCache>>>,
30     pub(crate) files: Mutex<LRUCache<TaskId, FileCache>>,
31 
32     pub(crate) update_from_file_once:
33         Mutex<HashMap<TaskId, Arc<OnceLock<io::Result<Weak<RamCache>>>>>>,
34     pub(crate) ram_handle: Mutex<data::Handle>,
35     pub(crate) file_handle: Mutex<data::Handle>,
36 }
37 
38 impl CacheManager {
new() -> Self39     pub fn new() -> Self {
40         Self {
41             rams: Mutex::new(LRUCache::new()),
42             files: Mutex::new(LRUCache::new()),
43             backup_rams: Mutex::new(HashMap::new()),
44             update_from_file_once: Mutex::new(HashMap::new()),
45 
46             ram_handle: Mutex::new(data::Handle::new(DEFAULT_RAM_CACHE_SIZE)),
47             file_handle: Mutex::new(data::Handle::new(DEFAULT_FILE_CACHE_SIZE)),
48         }
49     }
50 
set_ram_cache_size(&self, size: u64)51     pub fn set_ram_cache_size(&self, size: u64) {
52         self.ram_handle.lock().unwrap().change_total_size(size);
53         CacheManager::apply_cache(&self.ram_handle, &self.rams, |a| RamCache::task_id(a), 0);
54     }
55 
set_file_cache_size(&self, size: u64)56     pub fn set_file_cache_size(&self, size: u64) {
57         self.file_handle.lock().unwrap().change_total_size(size);
58         CacheManager::apply_cache(&self.file_handle, &self.files, FileCache::task_id, 0);
59     }
60 
restore_files(&'static self)61     pub fn restore_files(&'static self) {
62         for task_id in restore_files() {
63             let Some(file_cache) = FileCache::try_restore(task_id.clone(), self) else {
64                 continue;
65             };
66             self.files.lock().unwrap().insert(task_id, file_cache);
67         }
68     }
69 
fetch(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>70     pub fn fetch(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
71         self.get_cache(task_id)
72     }
73 
remove(&self, task_id: TaskId)74     pub fn remove(&self, task_id: TaskId) {
75         self.files.lock().unwrap().remove(&task_id);
76         self.backup_rams.lock().unwrap().remove(&task_id);
77         self.rams.lock().unwrap().remove(&task_id);
78         self.update_from_file_once.lock().unwrap().remove(&task_id);
79     }
80 
contains(&self, task_id: &TaskId) -> bool81     pub fn contains(&self, task_id: &TaskId) -> bool {
82         self.files.lock().unwrap().contains_key(task_id)
83             || self.backup_rams.lock().unwrap().contains_key(task_id)
84             || self.rams.lock().unwrap().contains_key(task_id)
85     }
86 
get_cache(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>87     pub(crate) fn get_cache(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> {
88         let res = self.rams.lock().unwrap().get(task_id).cloned();
89         res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned())
90             .or_else(|| self.update_ram_from_file(task_id))
91     }
92 
apply_cache<T>( handle: &Mutex<data::Handle>, caches: &Mutex<LRUCache<TaskId, T>>, task_id: fn(&T) -> &TaskId, size: usize, ) -> bool93     pub(super) fn apply_cache<T>(
94         handle: &Mutex<data::Handle>,
95         caches: &Mutex<LRUCache<TaskId, T>>,
96         task_id: fn(&T) -> &TaskId,
97         size: usize,
98     ) -> bool {
99         loop {
100             if size > MAX_CACHE_SIZE as usize {
101                 return false;
102             }
103             if handle.lock().unwrap().apply_cache_size(size as u64) {
104                 return true;
105             };
106 
107             match caches.lock().unwrap().pop() {
108                 Some(cache) => {
109                     info!("CacheManager release cache {}", task_id(&cache).brief());
110                 }
111                 None => {
112                     info!("CacheManager release cache failed");
113                     return false;
114                 }
115             }
116         }
117     }
118 }
119 
120 #[cfg(test)]
121 mod test {
122     use std::io::{Read, Write};
123     use std::sync::LazyLock;
124     use std::thread;
125     use std::time::Duration;
126 
127     use request_utils::fastrand::fast_random;
128     use request_utils::test::log::init;
129 
130     use super::*;
131     const TEST_STRING: &str = "你这猴子真让我欢喜";
132     const TEST_STRING_SIZE: usize = TEST_STRING.len();
133 
134     #[test]
ut_cache_manager_update_file()135     fn ut_cache_manager_update_file() {
136         init();
137         let task_id = TaskId::new(fast_random().to_string());
138         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
139 
140         // update cache
141         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
142         cache.write_all(TEST_STRING.as_bytes()).unwrap();
143         cache.finish_write();
144         thread::sleep(Duration::from_millis(100));
145 
146         // files contain cache
147         let mut file = CACHE_MANAGER
148             .files
149             .lock()
150             .unwrap()
151             .remove(&task_id)
152             .unwrap()
153             .open()
154             .unwrap();
155         let mut buf = String::new();
156         file.read_to_string(&mut buf).unwrap();
157         assert_eq!(buf, TEST_STRING);
158 
159         // backup caches removed for file exist
160         assert!(!CACHE_MANAGER
161             .backup_rams
162             .lock()
163             .unwrap()
164             .contains_key(&task_id));
165     }
166 
167     #[test]
ut_cache_manager_get()168     fn ut_cache_manager_get() {
169         init();
170         let task_id = TaskId::new(fast_random().to_string());
171         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
172 
173         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
174 
175         cache.write_all(TEST_STRING.as_bytes()).unwrap();
176         cache.finish_write();
177 
178         let cache = CACHE_MANAGER.get_cache(&task_id).unwrap();
179         let mut buf = String::new();
180         cache.cursor().read_to_string(&mut buf).unwrap();
181         assert_eq!(buf, TEST_STRING);
182     }
183 
184     #[test]
ut_cache_manager_cache_from_file()185     fn ut_cache_manager_cache_from_file() {
186         init();
187         let task_id = TaskId::new(fast_random().to_string());
188 
189         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
190         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
191         cache.write_all(TEST_STRING.as_bytes()).unwrap();
192         cache.finish_write();
193 
194         thread::sleep(Duration::from_millis(100));
195         CACHE_MANAGER.rams.lock().unwrap().remove(&task_id);
196 
197         let mut v = vec![];
198         for _ in 0..1 {
199             let task_id = task_id.clone();
200             v.push(std::thread::spawn(move || {
201                 let cache = CACHE_MANAGER.get_cache(&task_id).unwrap();
202                 let mut buf = String::new();
203                 cache.cursor().read_to_string(&mut buf).unwrap();
204                 buf == TEST_STRING
205             }));
206         }
207         for t in v {
208             assert!(t.join().unwrap());
209         }
210     }
211 
212     #[test]
ut_cache_manager_cache_from_file_clean()213     fn ut_cache_manager_cache_from_file_clean() {
214         init();
215         let task_id = TaskId::new(fast_random().to_string());
216         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
217 
218         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
219         cache.write_all(TEST_STRING.as_bytes()).unwrap();
220         cache.finish_write();
221         thread::sleep(Duration::from_millis(100));
222         CACHE_MANAGER.rams.lock().unwrap().remove(&task_id);
223 
224         CACHE_MANAGER.get_cache(&task_id).unwrap();
225         assert!(CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id));
226         assert!(!CACHE_MANAGER
227             .backup_rams
228             .lock()
229             .unwrap()
230             .contains_key(&task_id));
231         assert!(!CACHE_MANAGER
232             .update_from_file_once
233             .lock()
234             .unwrap()
235             .contains_key(&task_id));
236     }
237 
238     #[test]
ut_cache_manager_update_same()239     fn ut_cache_manager_update_same() {
240         init();
241         let task_id = TaskId::new(fast_random().to_string());
242         static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new);
243 
244         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE));
245 
246         cache.write_all(TEST_STRING.as_bytes()).unwrap();
247         cache.finish_write();
248 
249         let mut test_string = TEST_STRING.to_string();
250         test_string.push_str(TEST_STRING);
251 
252         let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(test_string.len()));
253         cache.write_all(test_string.as_bytes()).unwrap();
254         cache.finish_write();
255 
256         let cache = CACHE_MANAGER.get_cache(&task_id).unwrap();
257         let mut buf = String::new();
258         cache.cursor().read_to_string(&mut buf).unwrap();
259         assert_eq!(buf, test_string);
260 
261         CACHE_MANAGER.rams.lock().unwrap().remove(&task_id);
262 
263         let mut buf = String::new();
264         cache.cursor().read_to_string(&mut buf).unwrap();
265         assert_eq!(buf, test_string);
266     }
267 }
268