1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
18
19 #include <string>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23
24 #include "tensorflow/core/platform/cloud/auth_provider.h"
25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
28 #include "tensorflow/core/platform/cloud/file_block_cache.h"
29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
30 #include "tensorflow/core/platform/cloud/gcs_throttle.h"
31 #include "tensorflow/core/platform/cloud/http_request.h"
32 #include "tensorflow/core/platform/file_system.h"
33 #include "tensorflow/core/platform/retrying_file_system.h"
34 #include "tensorflow/core/platform/status.h"
35
36 namespace tensorflow {
37
38 class GcsFileSystem;
39
40 // The environment variable that overrides the block size for aligned reads from
41 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
42 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
43 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
44 // The environment variable that overrides the max size of the LRU cache of
45 // blocks read from GCS. Specified in MB.
46 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
47 constexpr size_t kDefaultMaxCacheSize = 0;
48 // The environment variable that overrides the maximum staleness of cached file
49 // contents. Once any block of a file reaches this staleness, all cached blocks
50 // will be evicted on the next read.
51 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
52 constexpr uint64 kDefaultMaxStaleness = 0;
53
54 // Helper function to extract an environment variable and convert it into a
55 // value of type T.
56 template <typename T>
GetEnvVar(const char * varname,bool (* convert)(StringPiece,T *),T * value)57 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
58 T* value) {
59 const char* env_value = std::getenv(varname);
60 if (env_value == nullptr) {
61 return false;
62 }
63 return convert(env_value, value);
64 }
65
66 /// GcsStatsInterface allows for instrumentation of the GCS file system.
67 ///
68 /// GcsStatsInterface and its subclasses must be safe to use from multiple
69 /// threads concurrently.
70 ///
71 /// WARNING! This is an experimental interface that may change or go away at any
72 /// time.
73 class GcsStatsInterface {
74 public:
75 /// Configure is called by the GcsFileSystem to provide instrumentation hooks.
76 ///
77 /// Note: Configure can be called multiple times (e.g. if the block cache is
78 /// re-initialized).
79 virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
80 const FileBlockCache* block_cache) = 0;
81
82 /// RecordBlockLoadRequest is called to record a block load request is about
83 /// to be made.
84 virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
85
86 /// RecordBlockRetrieved is called once a block within the file has been
87 /// retrieved.
88 virtual void RecordBlockRetrieved(const string& file, size_t offset,
89 size_t bytes_transferred) = 0;
90
91 // RecordStatObjectRequest is called once a statting object request over GCS
92 // is about to be made.
93 virtual void RecordStatObjectRequest() = 0;
94
95 /// HttpStats is called to optionally provide a RequestStats listener
96 /// to be annotated on every HTTP request made to the GCS API.
97 ///
98 /// HttpStats() may return nullptr.
99 virtual HttpRequest::RequestStats* HttpStats() = 0;
100
101 virtual ~GcsStatsInterface() = default;
102 };
103
104 struct UploadSessionHandle {
105 std::string session_uri;
106 bool resumable;
107 };
108
109 /// Google Cloud Storage implementation of a file system.
110 ///
111 /// The clients should use RetryingGcsFileSystem defined below,
112 /// which adds retry logic to GCS operations.
113 class GcsFileSystem : public FileSystem {
114 public:
115 struct TimeoutConfig;
116
117 // Main constructor used (via RetryingFileSystem) throughout Tensorflow
118 explicit GcsFileSystem(bool make_default_cache = true);
119 // Used mostly for unit testing or use cases which need to customize the
120 // filesystem from defaults
121 GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
122 std::unique_ptr<HttpRequest::Factory> http_request_factory,
123 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
124 size_t max_bytes, uint64 max_staleness,
125 uint64 stat_cache_max_age, size_t stat_cache_max_entries,
126 uint64 matching_paths_cache_max_age,
127 size_t matching_paths_cache_max_entries,
128 RetryConfig retry_config, TimeoutConfig timeouts,
129 const std::unordered_set<string>& allowed_locations,
130 std::pair<const string, const string>* additional_header,
131 bool compose_append);
132
133 TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
134
135 Status NewRandomAccessFile(
136 const string& fname, TransactionToken* token,
137 std::unique_ptr<RandomAccessFile>* result) override;
138
139 Status NewWritableFile(const string& fname, TransactionToken* token,
140 std::unique_ptr<WritableFile>* result) override;
141
142 Status NewAppendableFile(const string& fname, TransactionToken* token,
143 std::unique_ptr<WritableFile>* result) override;
144
145 Status NewReadOnlyMemoryRegionFromFile(
146 const string& fname, TransactionToken* token,
147 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
148
149 Status FileExists(const string& fname, TransactionToken* token) override;
150
151 Status Stat(const string& fname, TransactionToken* token,
152 FileStatistics* stat) override;
153
154 Status GetChildren(const string& dir, TransactionToken* token,
155 std::vector<string>* result) override;
156
157 Status GetMatchingPaths(const string& pattern, TransactionToken* token,
158 std::vector<string>* results) override;
159
160 Status DeleteFile(const string& fname, TransactionToken* token) override;
161
162 Status CreateDir(const string& dirname, TransactionToken* token) override;
163
164 Status DeleteDir(const string& dirname, TransactionToken* token) override;
165
166 Status GetFileSize(const string& fname, TransactionToken* token,
167 uint64* file_size) override;
168
169 Status RenameFile(const string& src, const string& target,
170 TransactionToken* token) override;
171
172 Status IsDirectory(const string& fname, TransactionToken* token) override;
173
174 Status DeleteRecursively(const string& dirname, TransactionToken* token,
175 int64* undeleted_files,
176 int64* undeleted_dirs) override;
177
178 void FlushCaches(TransactionToken* token) override;
179
180 /// Set an object to collect runtime statistics from the GcsFilesystem.
181 void SetStats(GcsStatsInterface* stats);
182
183 /// Set an object to collect file block cache stats.
184 void SetCacheStats(FileBlockCacheStatsInterface* cache_stats);
185
186 /// These accessors are mainly for testing purposes, to verify that the
187 /// environment variables that control these parameters are handled correctly.
block_size()188 size_t block_size() {
189 tf_shared_lock l(block_cache_lock_);
190 return file_block_cache_->block_size();
191 }
max_bytes()192 size_t max_bytes() {
193 tf_shared_lock l(block_cache_lock_);
194 return file_block_cache_->max_bytes();
195 }
max_staleness()196 uint64 max_staleness() {
197 tf_shared_lock l(block_cache_lock_);
198 return file_block_cache_->max_staleness();
199 }
timeouts()200 TimeoutConfig timeouts() const { return timeouts_; }
allowed_locations()201 std::unordered_set<string> allowed_locations() const {
202 return allowed_locations_;
203 }
204
compose_append()205 bool compose_append() const { return compose_append_; }
additional_header_name()206 string additional_header_name() const {
207 return additional_header_ ? additional_header_->first : "";
208 }
additional_header_value()209 string additional_header_value() const {
210 return additional_header_ ? additional_header_->second : "";
211 }
212
stat_cache_max_age()213 uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
stat_cache_max_entries()214 size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
215
matching_paths_cache_max_age()216 uint64 matching_paths_cache_max_age() const {
217 return matching_paths_cache_->max_age();
218 }
matching_paths_cache_max_entries()219 size_t matching_paths_cache_max_entries() const {
220 return matching_paths_cache_->max_entries();
221 }
222
223 /// Structure containing the information for timeouts related to accessing the
224 /// GCS APIs.
225 ///
226 /// All values are in seconds.
227 struct TimeoutConfig {
228 // The request connection timeout. If a connection cannot be established
229 // within `connect` seconds, abort the request.
230 uint32 connect = 120; // 2 minutes
231
232 // The request idle timeout. If a request has seen no activity in `idle`
233 // seconds, abort the request.
234 uint32 idle = 60; // 1 minute
235
236 // The maximum total time a metadata request can take. If a request has not
237 // completed within `metadata` seconds, the request is aborted.
238 uint32 metadata = 3600; // 1 hour
239
240 // The maximum total time a block read request can take. If a request has
241 // not completed within `read` seconds, the request is aborted.
242 uint32 read = 3600; // 1 hour
243
244 // The maximum total time an upload request can take. If a request has not
245 // completed within `write` seconds, the request is aborted.
246 uint32 write = 3600; // 1 hour
247
TimeoutConfigTimeoutConfig248 TimeoutConfig() {}
TimeoutConfigTimeoutConfig249 TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
250 uint32 write)
251 : connect(connect),
252 idle(idle),
253 metadata(metadata),
254 read(read),
255 write(write) {}
256 };
257
258 Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
259
260 /// \brief Sets a new AuthProvider on the GCS FileSystem.
261 ///
262 /// The new auth provider will be used for all subsequent requests.
263 void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
264
265 /// \brief Resets the block cache and re-instantiates it with the new values.
266 ///
267 /// This method can be used to clear the existing block cache and/or to
268 /// re-configure the block cache for different values.
269 ///
270 /// Note: the existing block cache is not cleaned up until all existing files
271 /// have been closed.
272 void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
273 uint64 max_staleness_secs);
274
275 protected:
276 virtual std::unique_ptr<FileBlockCache> MakeFileBlockCache(
277 size_t block_size, size_t max_bytes, uint64 max_staleness);
278
279 /// Loads file contents from GCS for a given filename, offset, and length.
280 virtual Status LoadBufferFromGCS(const string& fname, size_t offset, size_t n,
281 char* buffer, size_t* bytes_transferred);
282
283 // Creates an upload session for an upcoming GCS object upload.
284 virtual Status CreateNewUploadSession(uint64 start_offset,
285 const std::string& object_to_upload,
286 const std::string& bucket,
287 uint64 file_size,
288 const std::string& gcs_path,
289 UploadSessionHandle* session_handle);
290
291 // Uploads object data to session.
292 virtual Status UploadToSession(const std::string& session_uri,
293 uint64 start_offset, uint64 already_uploaded,
294 const std::string& tmp_content_filename,
295 uint64 file_size,
296 const std::string& file_path);
297
298 /// \brief Requests status of a previously initiated upload session.
299 ///
300 /// If the upload has already succeeded, sets 'completed' to true.
301 /// Otherwise sets 'completed' to false and 'uploaded' to the currently
302 /// uploaded size in bytes.
303 virtual Status RequestUploadSessionStatus(const string& session_uri,
304 uint64 file_size,
305 const std::string& gcs_path,
306 bool* completed, uint64* uploaded);
307
308 Status ParseGcsPathForScheme(StringPiece fname, string scheme,
309 bool empty_object_ok, string* bucket,
310 string* object);
311
312 /// \brief Splits a GCS path to a bucket and an object.
313 ///
314 /// For example, "gs://bucket-name/path/to/file.txt" gets split into
315 /// "bucket-name" and "path/to/file.txt".
316 /// If fname only contains the bucket and empty_object_ok = true, the returned
317 /// object is empty.
318 virtual Status ParseGcsPath(StringPiece fname, bool empty_object_ok,
319 string* bucket, string* object);
320
321 std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
322
323 // Used by a subclass.
324 TimeoutConfig timeouts_;
325
326 /// The retry configuration used for retrying failed calls.
327 RetryConfig retry_config_;
328
329 private:
330 // GCS file statistics.
331 struct GcsFileStat {
332 FileStatistics base;
333 int64 generation_number = 0;
334 };
335
336 /// \brief Checks if the bucket exists. Returns OK if the check succeeded.
337 ///
338 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
339 Status BucketExists(const string& bucket, bool* result);
340
341 /// \brief Retrieves the GCS bucket location. Returns OK if the location was
342 /// retrieved.
343 ///
344 /// Given a string bucket the GCS bucket metadata API will be called and the
345 /// location string filled with the location of the bucket.
346 ///
347 /// This requires the bucket metadata permission.
348 /// Repeated calls for the same bucket are cached so this function can be
349 /// called frequently without causing an extra API call
350 Status GetBucketLocation(const string& bucket, string* location);
351
352 /// \brief Check if the GCS buckets location is allowed with the current
353 /// constraint configuration
354 Status CheckBucketLocationConstraint(const string& bucket);
355
356 /// \brief Given the input bucket `bucket`, fills `result_buffer` with the
357 /// results of the metadata. Returns OK if the API call succeeds without
358 /// error.
359 Status GetBucketMetadata(const string& bucket,
360 std::vector<char>* result_buffer);
361
362 /// \brief Checks if the object exists. Returns OK if the check succeeded.
363 ///
364 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
365 Status ObjectExists(const string& fname, const string& bucket,
366 const string& object, bool* result);
367
368 /// \brief Checks if the folder exists. Returns OK if the check succeeded.
369 ///
370 /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
371 Status FolderExists(const string& dirname, bool* result);
372
373 /// \brief Internal version of GetChildren with more knobs.
374 ///
375 /// If 'recursively' is true, returns all objects in all subfolders.
376 /// Otherwise only returns the immediate children in the directory.
377 ///
378 /// If 'include_self_directory_marker' is true and there is a GCS directory
379 /// marker at the path 'dir', GetChildrenBound will return an empty string
380 /// as one of the children that represents this marker.
381 Status GetChildrenBounded(const string& dir, uint64 max_results,
382 std::vector<string>* result, bool recursively,
383 bool include_self_directory_marker);
384
385 /// Retrieves file statistics assuming fname points to a GCS object. The data
386 /// may be read from cache or from GCS directly.
387 Status StatForObject(const string& fname, const string& bucket,
388 const string& object, GcsFileStat* stat);
389 /// Retrieves file statistics of file fname directly from GCS.
390 Status UncachedStatForObject(const string& fname, const string& bucket,
391 const string& object, GcsFileStat* stat);
392
393 Status RenameObject(const string& src, const string& target);
394
395 // Clear all the caches related to the file with name `filename`.
396 void ClearFileCaches(const string& fname);
397
398 mutex mu_;
399 std::unique_ptr<AuthProvider> auth_provider_ TF_GUARDED_BY(mu_);
400 std::shared_ptr<HttpRequest::Factory> http_request_factory_;
401 std::unique_ptr<ZoneProvider> zone_provider_;
402
403 // Reads smaller than block_size_ will trigger a read of block_size_.
404 uint64 block_size_;
405
406 // block_cache_lock_ protects the file_block_cache_ pointer (Note that
407 // FileBlockCache instances are themselves threadsafe).
408 mutex block_cache_lock_;
409 std::unique_ptr<FileBlockCache> file_block_cache_
410 TF_GUARDED_BY(block_cache_lock_);
411 std::unique_ptr<GcsDnsCache> dns_cache_;
412 GcsThrottle throttle_;
413
414 using StatCache = ExpiringLRUCache<GcsFileStat>;
415 std::unique_ptr<StatCache> stat_cache_;
416
417 using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
418 std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
419
420 using BucketLocationCache = ExpiringLRUCache<string>;
421 std::unique_ptr<BucketLocationCache> bucket_location_cache_;
422 std::unordered_set<string> allowed_locations_;
423 bool compose_append_;
424
425 GcsStatsInterface* stats_ = nullptr; // Not owned.
426
427 // Additional header material to be transmitted with all GCS requests
428 std::unique_ptr<std::pair<const string, const string>> additional_header_;
429
430 TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
431 };
432
433 /// Google Cloud Storage implementation of a file system with retry on failures.
434 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
435 public:
RetryingGcsFileSystem()436 RetryingGcsFileSystem()
437 : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem),
438 RetryConfig(100000 /* init_delay_time_us */)) {}
439 };
440
441 } // namespace tensorflow
442
443 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
444