1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17 #include <cstring>
18 #include <memory>
19 #include "tensorflow/core/lib/gtl/cleanup.h"
20 #include "tensorflow/core/platform/env.h"
21
22 namespace tensorflow {
23
BlockNotStale(const std::shared_ptr<Block> & block)24 bool RamFileBlockCache::BlockNotStale(const std::shared_ptr<Block>& block) {
25 mutex_lock l(block->mu);
26 if (block->state != FetchState::FINISHED) {
27 return true; // No need to check for staleness.
28 }
29 if (max_staleness_ == 0) return true; // Not enforcing staleness.
30 return env_->NowSeconds() - block->timestamp <= max_staleness_;
31 }
32
Lookup(const Key & key)33 std::shared_ptr<RamFileBlockCache::Block> RamFileBlockCache::Lookup(
34 const Key& key) {
35 mutex_lock lock(mu_);
36 auto entry = block_map_.find(key);
37 if (entry != block_map_.end()) {
38 if (BlockNotStale(entry->second)) {
39 return entry->second;
40 } else {
41 // Remove the stale block and continue.
42 RemoveFile_Locked(key.first);
43 }
44 }
45
46 // Insert a new empty block, setting the bookkeeping to sentinel values
47 // in order to update them as appropriate.
48 auto new_entry = std::make_shared<Block>();
49 lru_list_.push_front(key);
50 lra_list_.push_front(key);
51 new_entry->lru_iterator = lru_list_.begin();
52 new_entry->lra_iterator = lra_list_.begin();
53 new_entry->timestamp = env_->NowSeconds();
54 block_map_.emplace(std::make_pair(key, new_entry));
55 return new_entry;
56 }
57
58 // Remove blocks from the cache until we do not exceed our maximum size.
Trim()59 void RamFileBlockCache::Trim() {
60 while (!lru_list_.empty() && cache_size_ > max_bytes_) {
61 RemoveBlock(block_map_.find(lru_list_.back()));
62 }
63 }
64
65 /// Move the block to the front of the LRU list if it isn't already there.
UpdateLRU(const Key & key,const std::shared_ptr<Block> & block)66 Status RamFileBlockCache::UpdateLRU(const Key& key,
67 const std::shared_ptr<Block>& block) {
68 mutex_lock lock(mu_);
69 if (block->timestamp == 0) {
70 // The block was evicted from another thread. Allow it to remain evicted.
71 return Status::OK();
72 }
73 if (block->lru_iterator != lru_list_.begin()) {
74 lru_list_.erase(block->lru_iterator);
75 lru_list_.push_front(key);
76 block->lru_iterator = lru_list_.begin();
77 }
78
79 // Check for inconsistent state. If there is a block later in the same file
80 // in the cache, and our current block is not block size, this likely means
81 // we have inconsistent state within the cache. Note: it's possible some
82 // incomplete reads may still go undetected.
83 if (block->data.size() < block_size_) {
84 Key fmax = std::make_pair(key.first, std::numeric_limits<size_t>::max());
85 auto fcmp = block_map_.upper_bound(fmax);
86 if (fcmp != block_map_.begin() && key < (--fcmp)->first) {
87 return errors::Internal("Block cache contents are inconsistent.");
88 }
89 }
90
91 Trim();
92
93 return Status::OK();
94 }
95
MaybeFetch(const Key & key,const std::shared_ptr<Block> & block)96 Status RamFileBlockCache::MaybeFetch(const Key& key,
97 const std::shared_ptr<Block>& block) {
98 bool downloaded_block = false;
99 auto reconcile_state =
100 gtl::MakeCleanup([this, &downloaded_block, &key, &block] {
101 // Perform this action in a cleanup callback to avoid locking mu_ after
102 // locking block->mu.
103 if (downloaded_block) {
104 mutex_lock l(mu_);
105 // Do not update state if the block is already to be evicted.
106 if (block->timestamp != 0) {
107 cache_size_ += block->data.size();
108 // Put to beginning of LRA list.
109 lra_list_.erase(block->lra_iterator);
110 lra_list_.push_front(key);
111 block->lra_iterator = lra_list_.begin();
112 block->timestamp = env_->NowSeconds();
113 }
114 }
115 });
116 // Loop until either block content is successfully fetched, or our request
117 // encounters an error.
118 mutex_lock l(block->mu);
119 Status status = Status::OK();
120 while (true) {
121 switch (block->state) {
122 case FetchState::ERROR:
123 TF_FALLTHROUGH_INTENDED;
124 case FetchState::CREATED:
125 block->state = FetchState::FETCHING;
126 block->mu.unlock(); // Release the lock while making the API call.
127 block->data.clear();
128 block->data.resize(block_size_, 0);
129 size_t bytes_transferred;
130 status.Update(block_fetcher_(key.first, key.second, block_size_,
131 block->data.data(), &bytes_transferred));
132 block->mu.lock(); // Reacquire the lock immediately afterwards
133 if (status.ok()) {
134 block->data.resize(bytes_transferred, 0);
135 block->data.shrink_to_fit();
136 downloaded_block = true;
137 block->state = FetchState::FINISHED;
138 } else {
139 block->state = FetchState::ERROR;
140 }
141 block->cond_var.notify_all();
142 return status;
143 case FetchState::FETCHING:
144 block->cond_var.wait_for(l, std::chrono::seconds(60));
145 if (block->state == FetchState::FINISHED) {
146 return Status::OK();
147 }
148 // Re-loop in case of errors.
149 break;
150 case FetchState::FINISHED:
151 return Status::OK();
152 }
153 }
154 return errors::Internal(
155 "Control flow should never reach the end of RamFileBlockCache::Fetch.");
156 }
157
Read(const string & filename,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)158 Status RamFileBlockCache::Read(const string& filename, size_t offset, size_t n,
159 char* buffer, size_t* bytes_transferred) {
160 *bytes_transferred = 0;
161 if (n == 0) {
162 return Status::OK();
163 }
164 if (!IsCacheEnabled()) {
165 // The cache is effectively disabled, so we pass the read through to the
166 // fetcher without breaking it up into blocks.
167 return block_fetcher_(filename, offset, n, buffer, bytes_transferred);
168 }
169 // Calculate the block-aligned start and end of the read.
170 size_t start = block_size_ * (offset / block_size_);
171 size_t finish = block_size_ * ((offset + n) / block_size_);
172 if (finish < offset + n) {
173 finish += block_size_;
174 }
175 size_t total_bytes_transferred = 0;
176 // Now iterate through the blocks, reading them one at a time.
177 for (size_t pos = start; pos < finish; pos += block_size_) {
178 Key key = std::make_pair(filename, pos);
179 // Look up the block, fetching and inserting it if necessary, and update the
180 // LRU iterator for the key and block.
181 std::shared_ptr<Block> block = Lookup(key);
182 DCHECK(block) << "No block for key " << key.first << "@" << key.second;
183 TF_RETURN_IF_ERROR(MaybeFetch(key, block));
184 TF_RETURN_IF_ERROR(UpdateLRU(key, block));
185 // Copy the relevant portion of the block into the result buffer.
186 const auto& data = block->data;
187 if (offset >= pos + data.size()) {
188 // The requested offset is at or beyond the end of the file. This can
189 // happen if `offset` is not block-aligned, and the read returns the last
190 // block in the file, which does not extend all the way out to `offset`.
191 *bytes_transferred = total_bytes_transferred;
192 return errors::OutOfRange("EOF at offset ", offset, " in file ", filename,
193 " at position ", pos, "with data size ",
194 data.size());
195 }
196 auto begin = data.begin();
197 if (offset > pos) {
198 // The block begins before the slice we're reading.
199 begin += offset - pos;
200 }
201 auto end = data.end();
202 if (pos + data.size() > offset + n) {
203 // The block extends past the end of the slice we're reading.
204 end -= (pos + data.size()) - (offset + n);
205 }
206 if (begin < end) {
207 size_t bytes_to_copy = end - begin;
208 memcpy(&buffer[total_bytes_transferred], &*begin, bytes_to_copy);
209 total_bytes_transferred += bytes_to_copy;
210 }
211 if (data.size() < block_size_) {
212 // The block was a partial block and thus signals EOF at its upper bound.
213 break;
214 }
215 }
216 *bytes_transferred = total_bytes_transferred;
217 return Status::OK();
218 }
219
ValidateAndUpdateFileSignature(const string & filename,int64 file_signature)220 bool RamFileBlockCache::ValidateAndUpdateFileSignature(const string& filename,
221 int64 file_signature) {
222 mutex_lock lock(mu_);
223 auto it = file_signature_map_.find(filename);
224 if (it != file_signature_map_.end()) {
225 if (it->second == file_signature) {
226 return true;
227 }
228 // Remove the file from cache if the signatures don't match.
229 RemoveFile_Locked(filename);
230 it->second = file_signature;
231 return false;
232 }
233 file_signature_map_[filename] = file_signature;
234 return true;
235 }
236
CacheSize() const237 size_t RamFileBlockCache::CacheSize() const {
238 mutex_lock lock(mu_);
239 return cache_size_;
240 }
241
Prune()242 void RamFileBlockCache::Prune() {
243 while (!WaitForNotificationWithTimeout(&stop_pruning_thread_, 1000000)) {
244 mutex_lock lock(mu_);
245 uint64 now = env_->NowSeconds();
246 while (!lra_list_.empty()) {
247 auto it = block_map_.find(lra_list_.back());
248 if (now - it->second->timestamp <= max_staleness_) {
249 // The oldest block is not yet expired. Come back later.
250 break;
251 }
252 // We need to make a copy of the filename here, since it could otherwise
253 // be used within RemoveFile_Locked after `it` is deleted.
254 RemoveFile_Locked(std::string(it->first.first));
255 }
256 }
257 }
258
Flush()259 void RamFileBlockCache::Flush() {
260 mutex_lock lock(mu_);
261 block_map_.clear();
262 lru_list_.clear();
263 lra_list_.clear();
264 cache_size_ = 0;
265 }
266
RemoveFile(const string & filename)267 void RamFileBlockCache::RemoveFile(const string& filename) {
268 mutex_lock lock(mu_);
269 RemoveFile_Locked(filename);
270 }
271
RemoveFile_Locked(const string & filename)272 void RamFileBlockCache::RemoveFile_Locked(const string& filename) {
273 Key begin = std::make_pair(filename, 0);
274 auto it = block_map_.lower_bound(begin);
275 while (it != block_map_.end() && it->first.first == filename) {
276 auto next = std::next(it);
277 RemoveBlock(it);
278 it = next;
279 }
280 }
281
RemoveBlock(BlockMap::iterator entry)282 void RamFileBlockCache::RemoveBlock(BlockMap::iterator entry) {
283 // This signals that the block is removed, and should not be inadvertently
284 // reinserted into the cache in UpdateLRU.
285 entry->second->timestamp = 0;
286 lru_list_.erase(entry->second->lru_iterator);
287 lra_list_.erase(entry->second->lra_iterator);
288 cache_size_ -= entry->second->data.size();
289 block_map_.erase(entry);
290 }
291
292 } // namespace tensorflow
293