1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_ 17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_ 18 19 #include <string> 20 #include <utility> 21 #include <vector> 22 23 #include "tensorflow/core/lib/core/status.h" 24 #include "tensorflow/core/platform/cloud/auth_provider.h" 25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h" 26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h" 27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h" 28 #include "tensorflow/core/platform/cloud/file_block_cache.h" 29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h" 30 #include "tensorflow/core/platform/cloud/gcs_throttle.h" 31 #include "tensorflow/core/platform/cloud/http_request.h" 32 #include "tensorflow/core/platform/cloud/retrying_file_system.h" 33 #include "tensorflow/core/platform/file_system.h" 34 35 namespace tensorflow { 36 37 class GcsFileSystem; 38 39 /// GcsStatsInterface allows for instrumentation of the GCS file system. 40 /// 41 /// GcsStatsInterface and its subclasses must be safe to use from multiple 42 /// threads concurrently. 43 /// 44 /// WARNING! This is an experimental interface that may change or go away at any 45 /// time. 46 class GcsStatsInterface { 47 public: 48 /// Configure is called by the GcsFileSystem to provide instrumentation hooks. 49 /// 50 /// Note: Configure can be called multiple times (e.g. if the block cache is 51 /// re-initialized). 52 virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle, 53 const FileBlockCache* block_cache) = 0; 54 55 /// RecordBlockLoadRequest is called to record a block load request is about 56 /// to be made. 57 virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0; 58 59 /// RecordBlockRetrieved is called once a block within the file has been 60 /// retrieved. 61 virtual void RecordBlockRetrieved(const string& file, size_t offset, 62 size_t bytes_transferred) = 0; 63 64 // RecordStatObjectRequest is called once a statting object request over GCS 65 // is about to be made. 66 virtual void RecordStatObjectRequest() = 0; 67 68 /// HttpStats is called to optionally provide a RequestStats listener 69 /// to be annotated on every HTTP request made to the GCS API. 70 /// 71 /// HttpStats() may return nullptr. 72 virtual HttpRequest::RequestStats* HttpStats() = 0; 73 74 virtual ~GcsStatsInterface() = default; 75 }; 76 77 /// Google Cloud Storage implementation of a file system. 78 /// 79 /// The clients should use RetryingGcsFileSystem defined below, 80 /// which adds retry logic to GCS operations. 81 class GcsFileSystem : public FileSystem { 82 public: 83 struct TimeoutConfig; 84 85 // Main constructor used (via RetryingFileSystem) throughout Tensorflow 86 GcsFileSystem(); 87 // Used mostly for unit testing or use cases which need to customize the 88 // filesystem from defaults 89 GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider, 90 std::unique_ptr<HttpRequest::Factory> http_request_factory, 91 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size, 92 size_t max_bytes, uint64 max_staleness, 93 uint64 stat_cache_max_age, size_t stat_cache_max_entries, 94 uint64 matching_paths_cache_max_age, 95 size_t matching_paths_cache_max_entries, 96 RetryConfig retry_config, TimeoutConfig timeouts, 97 const std::unordered_set<string>& allowed_locations, 98 std::pair<const string, const string>* additional_header); 99 100 Status NewRandomAccessFile( 101 const string& filename, 102 std::unique_ptr<RandomAccessFile>* result) override; 103 104 Status NewWritableFile(const string& fname, 105 std::unique_ptr<WritableFile>* result) override; 106 107 Status NewAppendableFile(const string& fname, 108 std::unique_ptr<WritableFile>* result) override; 109 110 Status NewReadOnlyMemoryRegionFromFile( 111 const string& filename, 112 std::unique_ptr<ReadOnlyMemoryRegion>* result) override; 113 114 Status FileExists(const string& fname) override; 115 116 Status Stat(const string& fname, FileStatistics* stat) override; 117 118 Status GetChildren(const string& dir, std::vector<string>* result) override; 119 120 Status GetMatchingPaths(const string& pattern, 121 std::vector<string>* results) override; 122 123 Status DeleteFile(const string& fname) override; 124 125 Status CreateDir(const string& dirname) override; 126 127 Status DeleteDir(const string& dirname) override; 128 129 Status GetFileSize(const string& fname, uint64* file_size) override; 130 131 Status RenameFile(const string& src, const string& target) override; 132 133 Status IsDirectory(const string& fname) override; 134 135 Status DeleteRecursively(const string& dirname, int64* undeleted_files, 136 int64* undeleted_dirs) override; 137 138 void FlushCaches() override; 139 140 /// Set an object to collect runtime statistics from the GcsFilesystem. 141 void SetStats(GcsStatsInterface* stats); 142 143 /// These accessors are mainly for testing purposes, to verify that the 144 /// environment variables that control these parameters are handled correctly. block_size()145 size_t block_size() { 146 tf_shared_lock l(block_cache_lock_); 147 return file_block_cache_->block_size(); 148 } max_bytes()149 size_t max_bytes() { 150 tf_shared_lock l(block_cache_lock_); 151 return file_block_cache_->max_bytes(); 152 } max_staleness()153 uint64 max_staleness() { 154 tf_shared_lock l(block_cache_lock_); 155 return file_block_cache_->max_staleness(); 156 } timeouts()157 TimeoutConfig timeouts() const { return timeouts_; } allowed_locations()158 std::unordered_set<string> allowed_locations() const { 159 return allowed_locations_; 160 } additional_header_name()161 string additional_header_name() const { 162 return additional_header_ ? additional_header_->first : ""; 163 } additional_header_value()164 string additional_header_value() const { 165 return additional_header_ ? additional_header_->second : ""; 166 } 167 stat_cache_max_age()168 uint64 stat_cache_max_age() const { return stat_cache_->max_age(); } stat_cache_max_entries()169 size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); } 170 matching_paths_cache_max_age()171 uint64 matching_paths_cache_max_age() const { 172 return matching_paths_cache_->max_age(); 173 } matching_paths_cache_max_entries()174 size_t matching_paths_cache_max_entries() const { 175 return matching_paths_cache_->max_entries(); 176 } 177 178 /// Structure containing the information for timeouts related to accessing the 179 /// GCS APIs. 180 /// 181 /// All values are in seconds. 182 struct TimeoutConfig { 183 // The request connection timeout. If a connection cannot be established 184 // within `connect` seconds, abort the request. 185 uint32 connect = 120; // 2 minutes 186 187 // The request idle timeout. If a request has seen no activity in `idle` 188 // seconds, abort the request. 189 uint32 idle = 60; // 1 minute 190 191 // The maximum total time a metadata request can take. If a request has not 192 // completed within `metadata` seconds, the request is aborted. 193 uint32 metadata = 3600; // 1 hour 194 195 // The maximum total time a block read request can take. If a request has 196 // not completed within `read` seconds, the request is aborted. 197 uint32 read = 3600; // 1 hour 198 199 // The maximum total time an upload request can take. If a request has not 200 // completed within `write` seconds, the request is aborted. 201 uint32 write = 3600; // 1 hour 202 TimeoutConfigTimeoutConfig203 TimeoutConfig() {} TimeoutConfigTimeoutConfig204 TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read, 205 uint32 write) 206 : connect(connect), 207 idle(idle), 208 metadata(metadata), 209 read(read), 210 write(write) {} 211 }; 212 213 Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request); 214 215 /// \brief Sets a new AuthProvider on the GCS FileSystem. 216 /// 217 /// The new auth provider will be used for all subsequent requests. 218 void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider); 219 220 /// \brief Resets the block cache and re-instantiates it with the new values. 221 /// 222 /// This method can be used to clear the existing block cache and/or to 223 /// re-configure the block cache for different values. 224 /// 225 /// Note: the existing block cache is not cleaned up until all existing files 226 /// have been closed. 227 void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes, 228 uint64 max_staleness_secs); 229 230 private: 231 // GCS file statistics. 232 struct GcsFileStat { 233 FileStatistics base; 234 int64 generation_number = 0; 235 }; 236 237 /// \brief Checks if the bucket exists. Returns OK if the check succeeded. 238 /// 239 /// 'result' is set if the function returns OK. 'result' cannot be nullptr. 240 Status BucketExists(const string& bucket, bool* result); 241 242 /// \brief Retrieves the GCS bucket location. Returns OK if the location was 243 /// retrieved. 244 /// 245 /// Given a string bucket the GCS bucket metadata API will be called and the 246 /// location string filled with the location of the bucket. 247 /// 248 /// This requires the bucket metadata permission. 249 /// Repeated calls for the same bucket are cached so this function can be 250 /// called frequently without causing an extra API call 251 Status GetBucketLocation(const string& bucket, string* location); 252 253 /// \brief Check if the GCS buckets location is allowed with the current 254 /// constraint configuration 255 Status CheckBucketLocationConstraint(const string& bucket); 256 257 /// \brief Given the input bucket `bucket`, fills `result_buffer` with the 258 /// results of the metadata. Returns OK if the API call succeeds without 259 /// error. 260 Status GetBucketMetadata(const string& bucket, 261 std::vector<char>* result_buffer); 262 263 /// \brief Checks if the object exists. Returns OK if the check succeeded. 264 /// 265 /// 'result' is set if the function returns OK. 'result' cannot be nullptr. 266 Status ObjectExists(const string& fname, const string& bucket, 267 const string& object, bool* result); 268 269 /// \brief Checks if the folder exists. Returns OK if the check succeeded. 270 /// 271 /// 'result' is set if the function returns OK. 'result' cannot be nullptr. 272 Status FolderExists(const string& dirname, bool* result); 273 274 /// \brief Internal version of GetChildren with more knobs. 275 /// 276 /// If 'recursively' is true, returns all objects in all subfolders. 277 /// Otherwise only returns the immediate children in the directory. 278 /// 279 /// If 'include_self_directory_marker' is true and there is a GCS directory 280 /// marker at the path 'dir', GetChildrenBound will return an empty string 281 /// as one of the children that represents this marker. 282 Status GetChildrenBounded(const string& dir, uint64 max_results, 283 std::vector<string>* result, bool recursively, 284 bool include_self_directory_marker); 285 286 /// Retrieves file statistics assuming fname points to a GCS object. The data 287 /// may be read from cache or from GCS directly. 288 Status StatForObject(const string& fname, const string& bucket, 289 const string& object, GcsFileStat* stat); 290 /// Retrieves file statistics of file fname directly from GCS. 291 Status UncachedStatForObject(const string& fname, const string& bucket, 292 const string& object, GcsFileStat* stat); 293 294 Status RenameObject(const string& src, const string& target); 295 296 std::unique_ptr<FileBlockCache> MakeFileBlockCache(size_t block_size, 297 size_t max_bytes, 298 uint64 max_staleness); 299 300 /// Loads file contents from GCS for a given filename, offset, and length. 301 Status LoadBufferFromGCS(const string& filename, size_t offset, size_t n, 302 char* buffer, size_t* bytes_transferred); 303 304 // Clear all the caches related to the file with name `filename`. 305 void ClearFileCaches(const string& fname); 306 307 mutex mu_; 308 std::unique_ptr<AuthProvider> auth_provider_ GUARDED_BY(mu_); 309 std::shared_ptr<HttpRequest::Factory> http_request_factory_; 310 std::unique_ptr<ZoneProvider> zone_provider_; 311 // block_cache_lock_ protects the file_block_cache_ pointer (Note that 312 // FileBlockCache instances are themselves threadsafe). 313 mutex block_cache_lock_; 314 std::unique_ptr<FileBlockCache> file_block_cache_ 315 GUARDED_BY(block_cache_lock_); 316 std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_; 317 std::unique_ptr<GcsDnsCache> dns_cache_; 318 GcsThrottle throttle_; 319 320 using StatCache = ExpiringLRUCache<GcsFileStat>; 321 std::unique_ptr<StatCache> stat_cache_; 322 323 using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>; 324 std::unique_ptr<MatchingPathsCache> matching_paths_cache_; 325 326 using BucketLocationCache = ExpiringLRUCache<string>; 327 std::unique_ptr<BucketLocationCache> bucket_location_cache_; 328 std::unordered_set<string> allowed_locations_; 329 330 TimeoutConfig timeouts_; 331 332 GcsStatsInterface* stats_ = nullptr; // Not owned. 333 334 /// The initial delay for exponential backoffs when retrying failed calls. 335 RetryConfig retry_config_; 336 337 // Additional header material to be transmitted with all GCS requests 338 std::unique_ptr<std::pair<const string, const string>> additional_header_; 339 340 TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem); 341 }; 342 343 /// Google Cloud Storage implementation of a file system with retry on failures. 344 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> { 345 public: RetryingGcsFileSystem()346 RetryingGcsFileSystem() 347 : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem), 348 RetryConfig(100000 /* init_delay_time_us */)) {} 349 }; 350 351 } // namespace tensorflow 352 353 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_ 354