1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 18 19 #include <functional> 20 #include <list> 21 #include <map> 22 #include <memory> 23 #include <string> 24 #include <vector> 25 #include "tensorflow/core/lib/core/status.h" 26 #include "tensorflow/core/lib/core/stringpiece.h" 27 #include "tensorflow/core/platform/cloud/file_block_cache.h" 28 #include "tensorflow/core/platform/env.h" 29 #include "tensorflow/core/platform/mutex.h" 30 #include "tensorflow/core/platform/notification.h" 31 #include "tensorflow/core/platform/thread_annotations.h" 32 #include "tensorflow/core/platform/types.h" 33 34 namespace tensorflow { 35 36 /// \brief An LRU block cache of file contents, keyed by {filename, offset}. 37 /// 38 /// This class should be shared by read-only random access files on a remote 39 /// filesystem (e.g. GCS). 40 class RamFileBlockCache : public FileBlockCache { 41 public: 42 /// The callback executed when a block is not found in the cache, and needs to 43 /// be fetched from the backing filesystem. This callback is provided when the 44 /// cache is constructed. The returned Status should be OK as long as the 45 /// read from the remote filesystem succeeded (similar to the semantics of the 46 /// read(2) system call). 47 typedef std::function<Status(const string& filename, size_t offset, 48 size_t buffer_size, char* buffer, 49 size_t* bytes_transferred)> 50 BlockFetcher; 51 52 RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, 53 BlockFetcher block_fetcher, Env* env = Env::Default()) block_size_(block_size)54 : block_size_(block_size), 55 max_bytes_(max_bytes), 56 max_staleness_(max_staleness), 57 block_fetcher_(block_fetcher), 58 env_(env) { 59 if (max_staleness_ > 0) { 60 pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", 61 [this] { Prune(); })); 62 } 63 VLOG(1) << "GCS file block cache is " 64 << (IsCacheEnabled() ? "enabled" : "disabled"); 65 } 66 ~RamFileBlockCache()67 ~RamFileBlockCache() override { 68 if (pruning_thread_) { 69 stop_pruning_thread_.Notify(); 70 // Destroying pruning_thread_ will block until Prune() receives the above 71 // notification and returns. 72 pruning_thread_.reset(); 73 } 74 } 75 76 /// Read `n` bytes from `filename` starting at `offset` into `out`. This 77 /// method will return: 78 /// 79 /// 1) The error from the remote filesystem, if the read from the remote 80 /// filesystem failed. 81 /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded, 82 /// but the read returned a partial block, and the LRU cache contained a 83 /// block at a higher offset (indicating that the partial block should have 84 /// been a full block). 85 /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but 86 /// the file contents do not extend past `offset` and thus nothing was 87 /// placed in `out`. 88 /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed 89 /// in `out`). 90 Status Read(const string& filename, size_t offset, size_t n, char* buffer, 91 size_t* bytes_transferred) override; 92 93 // Validate the given file signature with the existing file signature in the 94 // cache. Returns true if the signature doesn't change or the file doesn't 95 // exist before. If the signature changes, update the existing signature with 96 // the new one and remove the file from cache. 97 bool ValidateAndUpdateFileSignature(const string& filename, 98 int64 file_signature) override 99 LOCKS_EXCLUDED(mu_); 100 101 /// Remove all cached blocks for `filename`. 102 void RemoveFile(const string& filename) override LOCKS_EXCLUDED(mu_); 103 104 /// Remove all cached data. 105 void Flush() override LOCKS_EXCLUDED(mu_); 106 107 /// Accessors for cache parameters. block_size()108 size_t block_size() const override { return block_size_; } max_bytes()109 size_t max_bytes() const override { return max_bytes_; } max_staleness()110 uint64 max_staleness() const override { return max_staleness_; } 111 112 /// The current size (in bytes) of the cache. 113 size_t CacheSize() const override LOCKS_EXCLUDED(mu_); 114 115 // Returns true if the cache is enabled. If false, the BlockFetcher callback 116 // is always executed during Read. IsCacheEnabled()117 bool IsCacheEnabled() const override { 118 return block_size_ > 0 && max_bytes_ > 0; 119 } 120 121 private: 122 /// The size of the blocks stored in the LRU cache, as well as the size of the 123 /// reads from the underlying filesystem. 124 const size_t block_size_; 125 /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. 126 const size_t max_bytes_; 127 /// The maximum staleness of any block in the LRU cache, in seconds. 128 const uint64 max_staleness_; 129 /// The callback to read a block from the underlying filesystem. 130 const BlockFetcher block_fetcher_; 131 /// The Env from which we read timestamps. 132 Env* const env_; // not owned 133 134 /// \brief The key type for the file block cache. 135 /// 136 /// The file block cache key is a {filename, offset} pair. 137 typedef std::pair<string, size_t> Key; 138 139 /// \brief The state of a block. 140 /// 141 /// A block begins in the CREATED stage. The first thread will attempt to read 142 /// the block from the filesystem, transitioning the state of the block to 143 /// FETCHING. After completing, if the read was successful the state should 144 /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can 145 /// re-fetch the block if the state is ERROR. 146 enum class FetchState { 147 CREATED, 148 FETCHING, 149 FINISHED, 150 ERROR, 151 }; 152 153 /// \brief A block of a file. 154 /// 155 /// A file block consists of the block data, the block's current position in 156 /// the LRU cache, the timestamp (seconds since epoch) at which the block 157 /// was cached, a coordination lock, and state & condition variables. 158 /// 159 /// Thread safety: 160 /// The iterator and timestamp fields should only be accessed while holding 161 /// the block-cache-wide mu_ instance variable. The state variable should only 162 /// be accessed while holding the Block's mu lock. The data vector should only 163 /// be accessed after state == FINISHED, and it should never be modified. 164 /// 165 /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock 166 /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking 167 /// mu_. 168 struct Block { 169 /// The block data. 170 std::vector<char> data; 171 /// A list iterator pointing to the block's position in the LRU list. 172 std::list<Key>::iterator lru_iterator; 173 /// A list iterator pointing to the block's position in the LRA list. 174 std::list<Key>::iterator lra_iterator; 175 /// The timestamp (seconds since epoch) at which the block was cached. 176 uint64 timestamp; 177 /// Mutex to guard state variable 178 mutex mu; 179 /// The state of the block. 180 FetchState state GUARDED_BY(mu) = FetchState::CREATED; 181 /// Wait on cond_var if state is FETCHING. 182 condition_variable cond_var; 183 }; 184 185 /// \brief The block map type for the file block cache. 186 /// 187 /// The block map is an ordered map from Key to Block. 188 typedef std::map<Key, std::shared_ptr<Block>> BlockMap; 189 190 /// Prune the cache by removing files with expired blocks. 191 void Prune() LOCKS_EXCLUDED(mu_); 192 193 bool BlockNotStale(const std::shared_ptr<Block>& block) 194 EXCLUSIVE_LOCKS_REQUIRED(mu_); 195 196 /// Look up a Key in the block cache. 197 std::shared_ptr<Block> Lookup(const Key& key) LOCKS_EXCLUDED(mu_); 198 199 Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block) 200 LOCKS_EXCLUDED(mu_); 201 202 /// Trim the block cache to make room for another entry. 203 void Trim() EXCLUSIVE_LOCKS_REQUIRED(mu_); 204 205 /// Update the LRU iterator for the block at `key`. 206 Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block) 207 LOCKS_EXCLUDED(mu_); 208 209 /// Remove all blocks of a file, with mu_ already held. 210 void RemoveFile_Locked(const string& filename) EXCLUSIVE_LOCKS_REQUIRED(mu_); 211 212 /// Remove the block `entry` from the block map and LRU list, and update the 213 /// cache size accordingly. 214 void RemoveBlock(BlockMap::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mu_); 215 216 /// The cache pruning thread that removes files with expired blocks. 217 std::unique_ptr<Thread> pruning_thread_; 218 219 /// Notification for stopping the cache pruning thread. 220 Notification stop_pruning_thread_; 221 222 /// Guards access to the block map, LRU list, and cached byte count. 223 mutable mutex mu_; 224 225 /// The block map (map from Key to Block). 226 BlockMap block_map_ GUARDED_BY(mu_); 227 228 /// The LRU list of block keys. The front of the list identifies the most 229 /// recently accessed block. 230 std::list<Key> lru_list_ GUARDED_BY(mu_); 231 232 /// The LRA (least recently added) list of block keys. The front of the list 233 /// identifies the most recently added block. 234 /// 235 /// Note: blocks are added to lra_list_ only after they have successfully been 236 /// fetched from the underlying block store. 237 std::list<Key> lra_list_ GUARDED_BY(mu_); 238 239 /// The combined number of bytes in all of the cached blocks. 240 size_t cache_size_ GUARDED_BY(mu_) = 0; 241 242 // A filename->file_signature map. 243 std::map<string, int64> file_signature_map_ GUARDED_BY(mu_); 244 }; 245 246 } // namespace tensorflow 247 248 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 249