1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::collections::HashMap; 15 use std::io; 16 use std::sync::{Arc, Mutex, OnceLock, Weak}; 17 18 use request_utils::lru::LRUCache; 19 use request_utils::task_id::TaskId; 20 21 use super::data::{self, restore_files, FileCache, RamCache}; 22 use crate::data::MAX_CACHE_SIZE; 23 24 const DEFAULT_RAM_CACHE_SIZE: u64 = 1024 * 1024 * 20; 25 const DEFAULT_FILE_CACHE_SIZE: u64 = 1024 * 1024 * 100; 26 27 pub struct CacheManager { 28 pub(crate) rams: Mutex<LRUCache<TaskId, Arc<RamCache>>>, 29 pub(crate) backup_rams: Mutex<HashMap<TaskId, Arc<RamCache>>>, 30 pub(crate) files: Mutex<LRUCache<TaskId, FileCache>>, 31 32 pub(crate) update_from_file_once: 33 Mutex<HashMap<TaskId, Arc<OnceLock<io::Result<Weak<RamCache>>>>>>, 34 pub(crate) ram_handle: Mutex<data::Handle>, 35 pub(crate) file_handle: Mutex<data::Handle>, 36 } 37 38 impl CacheManager { new() -> Self39 pub fn new() -> Self { 40 Self { 41 rams: Mutex::new(LRUCache::new()), 42 files: Mutex::new(LRUCache::new()), 43 backup_rams: Mutex::new(HashMap::new()), 44 update_from_file_once: Mutex::new(HashMap::new()), 45 46 ram_handle: Mutex::new(data::Handle::new(DEFAULT_RAM_CACHE_SIZE)), 47 file_handle: Mutex::new(data::Handle::new(DEFAULT_FILE_CACHE_SIZE)), 48 } 49 } 50 set_ram_cache_size(&self, size: u64)51 pub fn set_ram_cache_size(&self, size: u64) { 52 self.ram_handle.lock().unwrap().change_total_size(size); 53 CacheManager::apply_cache(&self.ram_handle, &self.rams, |a| RamCache::task_id(a), 0); 54 } 55 set_file_cache_size(&self, size: u64)56 pub fn set_file_cache_size(&self, size: u64) { 57 self.file_handle.lock().unwrap().change_total_size(size); 58 CacheManager::apply_cache(&self.file_handle, &self.files, FileCache::task_id, 0); 59 } 60 restore_files(&'static self)61 pub fn restore_files(&'static self) { 62 for task_id in restore_files() { 63 let Some(file_cache) = FileCache::try_restore(task_id.clone(), self) else { 64 continue; 65 }; 66 self.files.lock().unwrap().insert(task_id, file_cache); 67 } 68 } 69 fetch(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>70 pub fn fetch(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> { 71 self.get_cache(task_id) 72 } 73 remove(&self, task_id: TaskId)74 pub fn remove(&self, task_id: TaskId) { 75 self.files.lock().unwrap().remove(&task_id); 76 self.backup_rams.lock().unwrap().remove(&task_id); 77 self.rams.lock().unwrap().remove(&task_id); 78 self.update_from_file_once.lock().unwrap().remove(&task_id); 79 } 80 contains(&self, task_id: &TaskId) -> bool81 pub fn contains(&self, task_id: &TaskId) -> bool { 82 self.files.lock().unwrap().contains_key(task_id) 83 || self.backup_rams.lock().unwrap().contains_key(task_id) 84 || self.rams.lock().unwrap().contains_key(task_id) 85 } 86 get_cache(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>>87 pub(crate) fn get_cache(&'static self, task_id: &TaskId) -> Option<Arc<RamCache>> { 88 let res = self.rams.lock().unwrap().get(task_id).cloned(); 89 res.or_else(|| self.backup_rams.lock().unwrap().get(task_id).cloned()) 90 .or_else(|| self.update_ram_from_file(task_id)) 91 } 92 apply_cache<T>( handle: &Mutex<data::Handle>, caches: &Mutex<LRUCache<TaskId, T>>, task_id: fn(&T) -> &TaskId, size: usize, ) -> bool93 pub(super) fn apply_cache<T>( 94 handle: &Mutex<data::Handle>, 95 caches: &Mutex<LRUCache<TaskId, T>>, 96 task_id: fn(&T) -> &TaskId, 97 size: usize, 98 ) -> bool { 99 loop { 100 if size > MAX_CACHE_SIZE as usize { 101 return false; 102 } 103 if handle.lock().unwrap().apply_cache_size(size as u64) { 104 return true; 105 }; 106 107 match caches.lock().unwrap().pop() { 108 Some(cache) => { 109 info!("CacheManager release cache {}", task_id(&cache).brief()); 110 } 111 None => { 112 info!("CacheManager release cache failed"); 113 return false; 114 } 115 } 116 } 117 } 118 } 119 120 #[cfg(test)] 121 mod test { 122 use std::io::{Read, Write}; 123 use std::sync::LazyLock; 124 use std::thread; 125 use std::time::Duration; 126 127 use request_utils::fastrand::fast_random; 128 use request_utils::test::log::init; 129 130 use super::*; 131 const TEST_STRING: &str = "你这猴子真让我欢喜"; 132 const TEST_STRING_SIZE: usize = TEST_STRING.len(); 133 134 #[test] ut_cache_manager_update_file()135 fn ut_cache_manager_update_file() { 136 init(); 137 let task_id = TaskId::new(fast_random().to_string()); 138 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 139 140 // update cache 141 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 142 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 143 cache.finish_write(); 144 thread::sleep(Duration::from_millis(100)); 145 146 // files contain cache 147 let mut file = CACHE_MANAGER 148 .files 149 .lock() 150 .unwrap() 151 .remove(&task_id) 152 .unwrap() 153 .open() 154 .unwrap(); 155 let mut buf = String::new(); 156 file.read_to_string(&mut buf).unwrap(); 157 assert_eq!(buf, TEST_STRING); 158 159 // backup caches removed for file exist 160 assert!(!CACHE_MANAGER 161 .backup_rams 162 .lock() 163 .unwrap() 164 .contains_key(&task_id)); 165 } 166 167 #[test] ut_cache_manager_get()168 fn ut_cache_manager_get() { 169 init(); 170 let task_id = TaskId::new(fast_random().to_string()); 171 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 172 173 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 174 175 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 176 cache.finish_write(); 177 178 let cache = CACHE_MANAGER.get_cache(&task_id).unwrap(); 179 let mut buf = String::new(); 180 cache.cursor().read_to_string(&mut buf).unwrap(); 181 assert_eq!(buf, TEST_STRING); 182 } 183 184 #[test] ut_cache_manager_cache_from_file()185 fn ut_cache_manager_cache_from_file() { 186 init(); 187 let task_id = TaskId::new(fast_random().to_string()); 188 189 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 190 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 191 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 192 cache.finish_write(); 193 194 thread::sleep(Duration::from_millis(100)); 195 CACHE_MANAGER.rams.lock().unwrap().remove(&task_id); 196 197 let mut v = vec![]; 198 for _ in 0..1 { 199 let task_id = task_id.clone(); 200 v.push(std::thread::spawn(move || { 201 let cache = CACHE_MANAGER.get_cache(&task_id).unwrap(); 202 let mut buf = String::new(); 203 cache.cursor().read_to_string(&mut buf).unwrap(); 204 buf == TEST_STRING 205 })); 206 } 207 for t in v { 208 assert!(t.join().unwrap()); 209 } 210 } 211 212 #[test] ut_cache_manager_cache_from_file_clean()213 fn ut_cache_manager_cache_from_file_clean() { 214 init(); 215 let task_id = TaskId::new(fast_random().to_string()); 216 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 217 218 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 219 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 220 cache.finish_write(); 221 thread::sleep(Duration::from_millis(100)); 222 CACHE_MANAGER.rams.lock().unwrap().remove(&task_id); 223 224 CACHE_MANAGER.get_cache(&task_id).unwrap(); 225 assert!(CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id)); 226 assert!(!CACHE_MANAGER 227 .backup_rams 228 .lock() 229 .unwrap() 230 .contains_key(&task_id)); 231 assert!(!CACHE_MANAGER 232 .update_from_file_once 233 .lock() 234 .unwrap() 235 .contains_key(&task_id)); 236 } 237 238 #[test] ut_cache_manager_update_same()239 fn ut_cache_manager_update_same() { 240 init(); 241 let task_id = TaskId::new(fast_random().to_string()); 242 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 243 244 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 245 246 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 247 cache.finish_write(); 248 249 let mut test_string = TEST_STRING.to_string(); 250 test_string.push_str(TEST_STRING); 251 252 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(test_string.len())); 253 cache.write_all(test_string.as_bytes()).unwrap(); 254 cache.finish_write(); 255 256 let cache = CACHE_MANAGER.get_cache(&task_id).unwrap(); 257 let mut buf = String::new(); 258 cache.cursor().read_to_string(&mut buf).unwrap(); 259 assert_eq!(buf, test_string); 260 261 CACHE_MANAGER.rams.lock().unwrap().remove(&task_id); 262 263 let mut buf = String::new(); 264 cache.cursor().read_to_string(&mut buf).unwrap(); 265 assert_eq!(buf, test_string); 266 } 267 } 268