• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cmp::Ordering;
15 use std::io::{Cursor, Write};
16 use std::sync::Arc;
17 
18 use request_utils::task_id::TaskId;
19 
20 use super::MAX_CACHE_SIZE;
21 use crate::manage::CacheManager;
22 
23 const DEFAULT_TRUNK_CAPACITY: usize = 512;
24 
25 pub struct RamCache {
26     pub(super) task_id: TaskId,
27     data: Vec<u8>,
28     applied: u64,
29     handle: &'static CacheManager,
30 }
31 
32 impl Drop for RamCache {
drop(&mut self)33     fn drop(&mut self) {
34         if self.applied != 0 {
35             info!(
36                 "ram cache {} released {}",
37                 self.task_id.brief(),
38                 self.applied
39             );
40             self.handle.ram_handle.lock().unwrap().release(self.applied);
41         }
42     }
43 }
44 
45 impl RamCache {
new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self46     pub(crate) fn new(task_id: TaskId, handle: &'static CacheManager, size: Option<usize>) -> Self {
47         let applied = match size {
48             Some(size) => {
49                 if CacheManager::apply_cache(
50                     &handle.ram_handle,
51                     &handle.rams,
52                     |a| RamCache::task_id(a),
53                     size,
54                 ) {
55                     info!(
56                         "apply ram cache {} for task {} success",
57                         size,
58                         task_id.brief()
59                     );
60                     size as u64
61                 } else {
62                     error!(
63                         "apply ram cache {} for task {} failed",
64                         size,
65                         task_id.brief()
66                     );
67                     0
68                 }
69             }
70             None => 0,
71         };
72 
73         Self {
74             task_id,
75             data: Vec::with_capacity(size.unwrap_or(DEFAULT_TRUNK_CAPACITY)),
76             applied,
77             handle,
78         }
79     }
80 
finish_write(mut self) -> Arc<RamCache>81     pub(crate) fn finish_write(mut self) -> Arc<RamCache> {
82         let is_cache = self.check_size();
83         let me = Arc::new(self);
84 
85         if is_cache {
86             me.handle.update_ram_cache(me.clone());
87         }
88         me.handle.update_file_cache(me.task_id.clone(), me.clone());
89         me
90     }
91 
check_size(&mut self) -> bool92     pub(crate) fn check_size(&mut self) -> bool {
93         match (self.data.len() as u64).cmp(&self.applied) {
94             Ordering::Equal => true,
95             Ordering::Greater => {
96                 let diff = self.data.len() - self.applied as usize;
97                 if self.data.len() > MAX_CACHE_SIZE as usize
98                     || !CacheManager::apply_cache(
99                         &self.handle.ram_handle,
100                         &self.handle.rams,
101                         |a| RamCache::task_id(a),
102                         diff,
103                     )
104                 {
105                     info!(
106                         "apply extra ram {} cache for task {} failed",
107                         diff,
108                         self.task_id.brief()
109                     );
110                     self.handle.ram_handle.lock().unwrap().release(self.applied);
111                     self.applied = 0;
112                     false
113                 } else {
114                     info!(
115                         "apply extra ram {} cache for task {} success",
116                         diff,
117                         self.task_id.brief()
118                     );
119                     self.applied = self.data.len() as u64;
120                     true
121                 }
122             }
123             Ordering::Less => {
124                 self.handle
125                     .ram_handle
126                     .lock()
127                     .unwrap()
128                     .release(self.applied - self.data.len() as u64);
129                 self.applied = self.data.len() as u64;
130                 true
131             }
132         }
133     }
134 
task_id(&self) -> &TaskId135     pub(crate) fn task_id(&self) -> &TaskId {
136         &self.task_id
137     }
138 
size(&self) -> usize139     pub fn size(&self) -> usize {
140         self.data.len()
141     }
142 
cursor(&self) -> Cursor<&[u8]>143     pub fn cursor(&self) -> Cursor<&[u8]> {
144         Cursor::new(&self.data)
145     }
146 }
147 
148 impl Write for RamCache {
write(&mut self, buf: &[u8]) -> std::io::Result<usize>149     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
150         self.data.write(buf)
151     }
152 
flush(&mut self) -> std::io::Result<()>153     fn flush(&mut self) -> std::io::Result<()> {
154         self.data.flush()
155     }
156 }
157 
158 impl CacheManager {
update_ram_cache(&'static self, cache: Arc<RamCache>)159     pub(crate) fn update_ram_cache(&'static self, cache: Arc<RamCache>) {
160         let task_id = cache.task_id().clone();
161 
162         if self
163             .rams
164             .lock()
165             .unwrap()
166             .insert(task_id.clone(), cache.clone())
167             .is_some()
168         {
169             self.files.lock().unwrap().remove(&task_id);
170             info!("{} old caches delete", task_id.brief());
171         }
172         self.update_from_file_once.lock().unwrap().remove(&task_id);
173     }
174 }
175 
176 #[cfg(test)]
177 mod ut_ram {
178     include!("../../tests/ut/data/ut_ram.rs");
179 }
180