1 // Copyright (C) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::cmp::Ordering; 15 use std::io::{Cursor, Write}; 16 use std::sync::Arc; 17 18 use request_utils::task_id::TaskId; 19 20 use super::MAX_CACHE_SIZE; 21 use crate::manage::CacheManager; 22 23 const DEFAULT_TRUNK_CAPACITY: usize = 512; 24 25 pub struct RamCache { 26 pub(super) task_id: TaskId, 27 data: Vec<u8>, 28 applied: u64, 29 handle: &'static CacheManager, 30 } 31 32 impl Drop for RamCache { drop(&mut self)33 fn drop(&mut self) { 34 if self.applied != 0 { 35 info!( 36 "ram cache {} released {}", 37 self.task_id.brief(), 38 self.applied 39 ); 40 self.handle.ram_handle.lock().unwrap().release(self.applied); 41 } 42 } 43 } 44 45 impl RamCache { new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self46 pub(crate) fn new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self { 47 let applied = match size { 48 Some(size) => { 49 if CacheManager::apply_cache( 50 &handle.ram_handle, 51 &handle.rams, 52 |a| RamCache::task_id(a), 53 size, 54 ) { 55 info!( 56 "apply ram cache {} for task {} success", 57 size, 58 task_id.brief() 59 ); 60 size as u64 61 } else { 62 error!( 63 "apply ram cache {} for task {} failed", 64 size, 65 task_id.brief() 66 ); 67 0 68 } 69 } 70 None => 0, 71 }; 72 73 Self { 74 task_id, 75 data: Vec::with_capacity(size.unwrap_or(DEFAULT_TRUNK_CAPACITY)), 76 applied, 77 handle, 78 } 79 } 80 finish_write(mut self) -> Arc<RamCache>81 pub(crate) fn finish_write(mut self) -> Arc<RamCache> { 82 let is_cache = self.check_size(); 83 let me = Arc::new(self); 84 85 if is_cache { 86 me.handle.update_ram_cache(me.clone()); 87 } 88 me.handle.update_file_cache(me.task_id.clone(), me.clone()); 89 me 90 } 91 check_size(&mut self) -> bool92 pub(crate) fn check_size(&mut self) -> bool { 93 match (self.data.len() as u64).cmp(&self.applied) { 94 Ordering::Equal => true, 95 Ordering::Greater => { 96 let diff = self.data.len() - self.applied as usize; 97 if self.data.len() > MAX_CACHE_SIZE as usize 98 || !CacheManager::apply_cache( 99 &self.handle.ram_handle, 100 &self.handle.rams, 101 |a| RamCache::task_id(a), 102 diff, 103 ) 104 { 105 info!( 106 "apply extra ram {} cache for task {} failed", 107 diff, 108 self.task_id.brief() 109 ); 110 self.handle.ram_handle.lock().unwrap().release(self.applied); 111 self.applied = 0; 112 false 113 } else { 114 info!( 115 "apply extra ram {} cache for task {} success", 116 diff, 117 self.task_id.brief() 118 ); 119 self.applied = self.data.len() as u64; 120 true 121 } 122 } 123 Ordering::Less => { 124 self.handle 125 .ram_handle 126 .lock() 127 .unwrap() 128 .release(self.applied - self.data.len() as u64); 129 self.applied = self.data.len() as u64; 130 true 131 } 132 } 133 } 134 task_id(&self) -> &TaskId135 pub(crate) fn task_id(&self) -> &TaskId { 136 &self.task_id 137 } 138 size(&self) -> usize139 pub fn size(&self) -> usize { 140 self.data.len() 141 } 142 cursor(&self) -> Cursor<&[u8]>143 pub fn cursor(&self) -> Cursor<&[u8]> { 144 Cursor::new(&self.data) 145 } 146 } 147 148 impl Write for RamCache { write(&mut self, buf: &[u8]) -> std::io::Result<usize>149 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 150 self.data.write(buf) 151 } 152 flush(&mut self) -> std::io::Result<()>153 fn flush(&mut self) -> std::io::Result<()> { 154 self.data.flush() 155 } 156 } 157 158 impl CacheManager { update_ram_cache(&'static self, cache: Arc<RamCache>)159 pub(crate) fn update_ram_cache(&'static self, cache: Arc<RamCache>) { 160 let task_id = cache.task_id().clone(); 161 162 if self 163 .rams 164 .lock() 165 .unwrap() 166 .insert(task_id.clone(), cache.clone()) 167 .is_some() 168 { 169 self.files.lock().unwrap().remove(&task_id); 170 info!("{} old caches delete", task_id.brief()); 171 } 172 self.update_from_file_once.lock().unwrap().remove(&task_id); 173 } 174 } 175 176 #[cfg(test)] 177 mod ut_ram { 178 include!("../../tests/ut/data/ut_ram.rs"); 179 } 180