1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::cmp::Ordering; 15 use std::io::{Cursor, Write}; 16 use std::sync::Arc; 17 18 use request_utils::task_id::TaskId; 19 20 use super::MAX_CACHE_SIZE; 21 use crate::manage::CacheManager; 22 23 const DEFAULT_TRUNK_CAPACITY: usize = 512; 24 25 pub struct RamCache { 26 pub(super) task_id: TaskId, 27 data: Vec<u8>, 28 applied: u64, 29 handle: &'static CacheManager, 30 } 31 32 impl Drop for RamCache { drop(&mut self)33 fn drop(&mut self) { 34 if self.applied != 0 { 35 info!( 36 "ram cache {} released {}", 37 self.task_id.brief(), 38 self.applied 39 ); 40 self.handle.ram_handle.lock().unwrap().release(self.applied); 41 } 42 } 43 } 44 45 impl RamCache { new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self46 pub(crate) fn new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self { 47 let applied = match size { 48 Some(size) => { 49 if CacheManager::apply_cache( 50 &handle.ram_handle, 51 &handle.rams, 52 |a| RamCache::task_id(a), 53 size, 54 ) { 55 info!( 56 "apply ram cache {} for task {} success", 57 size, 58 task_id.brief() 59 ); 60 size as u64 61 } else { 62 error!( 63 "apply ram cache {} for task {} failed", 64 size, 65 task_id.brief() 66 ); 67 0 68 } 69 } 70 None => 0, 71 }; 72 73 Self { 74 task_id, 75 data: Vec::with_capacity(size.unwrap_or(DEFAULT_TRUNK_CAPACITY)), 76 applied, 77 handle, 78 } 79 } 80 finish_write(mut self) -> Arc<RamCache>81 pub(crate) fn finish_write(mut self) -> Arc<RamCache> { 82 let is_cache = self.check_size(); 83 let me = Arc::new(self); 84 85 if is_cache { 86 me.handle.update_ram_cache(me.clone()); 87 } 88 me.handle.update_file_cache(me.task_id.clone(), me.clone()); 89 me 90 } 91 check_size(&mut self) -> bool92 pub(crate) fn check_size(&mut self) -> bool { 93 match (self.data.len() as u64).cmp(&self.applied) { 94 Ordering::Equal => true, 95 Ordering::Greater => { 96 let diff = self.data.len() - self.applied as usize; 97 if self.data.len() > MAX_CACHE_SIZE as usize 98 || !CacheManager::apply_cache( 99 &self.handle.ram_handle, 100 &self.handle.rams, 101 |a| RamCache::task_id(a), 102 diff, 103 ) 104 { 105 info!( 106 "apply extra ram {} cache for task {} failed", 107 diff, 108 self.task_id.brief() 109 ); 110 self.handle.ram_handle.lock().unwrap().release(self.applied); 111 self.applied = 0; 112 false 113 } else { 114 info!( 115 "apply extra ram {} cache for task {} success", 116 diff, 117 self.task_id.brief() 118 ); 119 self.applied = self.data.len() as u64; 120 true 121 } 122 } 123 Ordering::Less => { 124 self.handle 125 .ram_handle 126 .lock() 127 .unwrap() 128 .release(self.applied - self.data.len() as u64); 129 self.applied = self.data.len() as u64; 130 true 131 } 132 } 133 } 134 task_id(&self) -> &TaskId135 pub(crate) fn task_id(&self) -> &TaskId { 136 &self.task_id 137 } 138 size(&self) -> usize139 pub fn size(&self) -> usize { 140 self.data.len() 141 } 142 cursor(&self) -> Cursor<&[u8]>143 pub fn cursor(&self) -> Cursor<&[u8]> { 144 Cursor::new(&self.data) 145 } 146 } 147 148 impl Write for RamCache { write(&mut self, buf: &[u8]) -> std::io::Result<usize>149 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 150 self.data.write(buf) 151 } 152 flush(&mut self) -> std::io::Result<()>153 fn flush(&mut self) -> std::io::Result<()> { 154 self.data.flush() 155 } 156 } 157 158 impl CacheManager { update_ram_cache(&'static self, cache: Arc<RamCache>)159 pub(crate) fn update_ram_cache(&'static self, cache: Arc<RamCache>) { 160 let task_id = cache.task_id().clone(); 161 162 if self 163 .rams 164 .lock() 165 .unwrap() 166 .insert(task_id.clone(), cache.clone()) 167 .is_some() 168 { 169 self.files.lock().unwrap().remove(&task_id); 170 info!("{} old caches delete", task_id.brief()); 171 } 172 self.update_from_file_once.lock().unwrap().remove(&task_id); 173 } 174 } 175 176 #[cfg(test)] 177 mod test { 178 179 use std::sync::LazyLock; 180 use std::thread; 181 use std::time::Duration; 182 183 use request_utils::fastrand::fast_random; 184 use request_utils::test::log::init; 185 186 use super::*; 187 188 const TEST_STRING: &str = "你这猴子真让我欢喜"; 189 const TEST_STRING_SIZE: usize = TEST_STRING.len(); 190 const TEST_SIZE: u64 = 128; 191 192 #[test] ut_cache_ram_try_new()193 fn ut_cache_ram_try_new() { 194 init(); 195 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 196 CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); 197 198 // cache not update 199 for _ in 0..1000 { 200 let task_id = TaskId::new(fast_random().to_string()); 201 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 202 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 203 } 204 205 // cache update 206 for _ in 0..1000 { 207 let task_id = TaskId::new(fast_random().to_string()); 208 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 209 210 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 211 CACHE_MANAGER.update_ram_cache(Arc::new(cache)); 212 } 213 214 // cache update and save to file 215 for _ in 0..1000 { 216 let task_id = TaskId::new(fast_random().to_string()); 217 let mut cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 218 cache.write_all(TEST_STRING.as_bytes()).unwrap(); 219 cache.finish_write(); 220 assert!(CACHE_MANAGER.rams.lock().unwrap().contains_key(&task_id)); 221 thread::sleep(Duration::from_millis(5)); 222 } 223 } 224 225 #[test] ut_cache_ram_try_new_fail()226 fn ut_cache_ram_try_new_fail() { 227 init(); 228 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 229 CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); 230 231 let mut total = TEST_STRING_SIZE as u64; 232 let mut v = vec![]; 233 while total < TEST_SIZE { 234 let task_id = TaskId::new(fast_random().to_string()); 235 v.push(RamCache::new( 236 task_id.clone(), 237 &CACHE_MANAGER, 238 Some(TEST_STRING_SIZE), 239 )); 240 total += TEST_STRING_SIZE as u64; 241 } 242 assert_eq!( 243 RamCache::new( 244 TaskId::new(fast_random().to_string()), 245 &CACHE_MANAGER, 246 Some(TEST_STRING_SIZE) 247 ) 248 .applied, 249 0 250 ); 251 v.pop(); 252 RamCache::new( 253 TaskId::new(fast_random().to_string()), 254 &CACHE_MANAGER, 255 Some(TEST_STRING_SIZE), 256 ); 257 } 258 #[test] ut_cache_ram_drop()259 fn ut_cache_ram_drop() { 260 init(); 261 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 262 CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); 263 264 let task_id = TaskId::new(fast_random().to_string()); 265 let cache = RamCache::new(task_id.clone(), &CACHE_MANAGER, Some(TEST_STRING_SIZE)); 266 assert_eq!( 267 CACHE_MANAGER.ram_handle.lock().unwrap().used_ram, 268 TEST_STRING_SIZE as u64 269 ); 270 drop(cache); 271 assert_eq!(CACHE_MANAGER.ram_handle.lock().unwrap().used_ram, 0); 272 } 273 274 #[test] ut_cache_ram_temp()275 fn ut_cache_ram_temp() { 276 init(); 277 static CACHE_MANAGER: LazyLock<CacheManager> = LazyLock::new(CacheManager::new); 278 CACHE_MANAGER.set_ram_cache_size(TEST_SIZE); 279 } 280 } 281