• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
18 
19 #include <string>
20 #include <unordered_set>
21 #include <utility>
22 #include <vector>
23 
24 #include "tensorflow/core/platform/cloud/auth_provider.h"
25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
28 #include "tensorflow/core/platform/cloud/file_block_cache.h"
29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
30 #include "tensorflow/core/platform/cloud/gcs_throttle.h"
31 #include "tensorflow/core/platform/cloud/http_request.h"
32 #include "tensorflow/core/platform/file_system.h"
33 #include "tensorflow/core/platform/retrying_file_system.h"
34 #include "tensorflow/core/platform/status.h"
35 
36 namespace tensorflow {
37 
38 class GcsFileSystem;
39 
40 // The environment variable that overrides the block size for aligned reads from
41 // GCS. Specified in MB (e.g. "16" = 16 x 1024 x 1024 = 16777216 bytes).
42 constexpr char kBlockSize[] = "GCS_READ_CACHE_BLOCK_SIZE_MB";
43 constexpr size_t kDefaultBlockSize = 64 * 1024 * 1024;
44 // The environment variable that overrides the max size of the LRU cache of
45 // blocks read from GCS. Specified in MB.
46 constexpr char kMaxCacheSize[] = "GCS_READ_CACHE_MAX_SIZE_MB";
47 constexpr size_t kDefaultMaxCacheSize = 0;
48 // The environment variable that overrides the maximum staleness of cached file
49 // contents. Once any block of a file reaches this staleness, all cached blocks
50 // will be evicted on the next read.
51 constexpr char kMaxStaleness[] = "GCS_READ_CACHE_MAX_STALENESS";
52 constexpr uint64 kDefaultMaxStaleness = 0;
53 
54 // Helper function to extract an environment variable and convert it into a
55 // value of type T.
56 template <typename T>
GetEnvVar(const char * varname,bool (* convert)(StringPiece,T *),T * value)57 bool GetEnvVar(const char* varname, bool (*convert)(StringPiece, T*),
58                T* value) {
59   const char* env_value = std::getenv(varname);
60   if (env_value == nullptr) {
61     return false;
62   }
63   return convert(env_value, value);
64 }
65 
66 /// GcsStatsInterface allows for instrumentation of the GCS file system.
67 ///
68 /// GcsStatsInterface and its subclasses must be safe to use from multiple
69 /// threads concurrently.
70 ///
71 /// WARNING! This is an experimental interface that may change or go away at any
72 /// time.
73 class GcsStatsInterface {
74  public:
75   /// Configure is called by the GcsFileSystem to provide instrumentation hooks.
76   ///
77   /// Note: Configure can be called multiple times (e.g. if the block cache is
78   /// re-initialized).
79   virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
80                          const FileBlockCache* block_cache) = 0;
81 
82   /// RecordBlockLoadRequest is called to record a block load request is about
83   /// to be made.
84   virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
85 
86   /// RecordBlockRetrieved is called once a block within the file has been
87   /// retrieved.
88   virtual void RecordBlockRetrieved(const string& file, size_t offset,
89                                     size_t bytes_transferred) = 0;
90 
91   // RecordStatObjectRequest is called once a statting object request over GCS
92   // is about to be made.
93   virtual void RecordStatObjectRequest() = 0;
94 
95   /// HttpStats is called to optionally provide a RequestStats listener
96   /// to be annotated on every HTTP request made to the GCS API.
97   ///
98   /// HttpStats() may return nullptr.
99   virtual HttpRequest::RequestStats* HttpStats() = 0;
100 
101   virtual ~GcsStatsInterface() = default;
102 };
103 
104 struct UploadSessionHandle {
105   std::string session_uri;
106   bool resumable;
107 };
108 
109 /// Google Cloud Storage implementation of a file system.
110 ///
111 /// The clients should use RetryingGcsFileSystem defined below,
112 /// which adds retry logic to GCS operations.
113 class GcsFileSystem : public FileSystem {
114  public:
115   struct TimeoutConfig;
116 
117   // Main constructor used (via RetryingFileSystem) throughout Tensorflow
118   explicit GcsFileSystem(bool make_default_cache = true);
119   // Used mostly for unit testing or use cases which need to customize the
120   // filesystem from defaults
121   GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
122                 std::unique_ptr<HttpRequest::Factory> http_request_factory,
123                 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
124                 size_t max_bytes, uint64 max_staleness,
125                 uint64 stat_cache_max_age, size_t stat_cache_max_entries,
126                 uint64 matching_paths_cache_max_age,
127                 size_t matching_paths_cache_max_entries,
128                 RetryConfig retry_config, TimeoutConfig timeouts,
129                 const std::unordered_set<string>& allowed_locations,
130                 std::pair<const string, const string>* additional_header,
131                 bool compose_append);
132 
133   TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
134 
135   Status NewRandomAccessFile(
136       const string& fname, TransactionToken* token,
137       std::unique_ptr<RandomAccessFile>* result) override;
138 
139   Status NewWritableFile(const string& fname, TransactionToken* token,
140                          std::unique_ptr<WritableFile>* result) override;
141 
142   Status NewAppendableFile(const string& fname, TransactionToken* token,
143                            std::unique_ptr<WritableFile>* result) override;
144 
145   Status NewReadOnlyMemoryRegionFromFile(
146       const string& fname, TransactionToken* token,
147       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
148 
149   Status FileExists(const string& fname, TransactionToken* token) override;
150 
151   Status Stat(const string& fname, TransactionToken* token,
152               FileStatistics* stat) override;
153 
154   Status GetChildren(const string& dir, TransactionToken* token,
155                      std::vector<string>* result) override;
156 
157   Status GetMatchingPaths(const string& pattern, TransactionToken* token,
158                           std::vector<string>* results) override;
159 
160   Status DeleteFile(const string& fname, TransactionToken* token) override;
161 
162   Status CreateDir(const string& dirname, TransactionToken* token) override;
163 
164   Status DeleteDir(const string& dirname, TransactionToken* token) override;
165 
166   Status GetFileSize(const string& fname, TransactionToken* token,
167                      uint64* file_size) override;
168 
169   Status RenameFile(const string& src, const string& target,
170                     TransactionToken* token) override;
171 
172   Status IsDirectory(const string& fname, TransactionToken* token) override;
173 
174   Status DeleteRecursively(const string& dirname, TransactionToken* token,
175                            int64* undeleted_files,
176                            int64* undeleted_dirs) override;
177 
178   void FlushCaches(TransactionToken* token) override;
179 
180   /// Set an object to collect runtime statistics from the GcsFilesystem.
181   void SetStats(GcsStatsInterface* stats);
182 
183   /// Set an object to collect file block cache stats.
184   void SetCacheStats(FileBlockCacheStatsInterface* cache_stats);
185 
186   /// These accessors are mainly for testing purposes, to verify that the
187   /// environment variables that control these parameters are handled correctly.
block_size()188   size_t block_size() {
189     tf_shared_lock l(block_cache_lock_);
190     return file_block_cache_->block_size();
191   }
max_bytes()192   size_t max_bytes() {
193     tf_shared_lock l(block_cache_lock_);
194     return file_block_cache_->max_bytes();
195   }
max_staleness()196   uint64 max_staleness() {
197     tf_shared_lock l(block_cache_lock_);
198     return file_block_cache_->max_staleness();
199   }
timeouts()200   TimeoutConfig timeouts() const { return timeouts_; }
allowed_locations()201   std::unordered_set<string> allowed_locations() const {
202     return allowed_locations_;
203   }
204 
compose_append()205   bool compose_append() const { return compose_append_; }
additional_header_name()206   string additional_header_name() const {
207     return additional_header_ ? additional_header_->first : "";
208   }
additional_header_value()209   string additional_header_value() const {
210     return additional_header_ ? additional_header_->second : "";
211   }
212 
stat_cache_max_age()213   uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
stat_cache_max_entries()214   size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
215 
matching_paths_cache_max_age()216   uint64 matching_paths_cache_max_age() const {
217     return matching_paths_cache_->max_age();
218   }
matching_paths_cache_max_entries()219   size_t matching_paths_cache_max_entries() const {
220     return matching_paths_cache_->max_entries();
221   }
222 
223   /// Structure containing the information for timeouts related to accessing the
224   /// GCS APIs.
225   ///
226   /// All values are in seconds.
227   struct TimeoutConfig {
228     // The request connection timeout. If a connection cannot be established
229     // within `connect` seconds, abort the request.
230     uint32 connect = 120;  // 2 minutes
231 
232     // The request idle timeout. If a request has seen no activity in `idle`
233     // seconds, abort the request.
234     uint32 idle = 60;  // 1 minute
235 
236     // The maximum total time a metadata request can take. If a request has not
237     // completed within `metadata` seconds, the request is aborted.
238     uint32 metadata = 3600;  // 1 hour
239 
240     // The maximum total time a block read request can take. If a request has
241     // not completed within `read` seconds, the request is aborted.
242     uint32 read = 3600;  // 1 hour
243 
244     // The maximum total time an upload request can take. If a request has not
245     // completed within `write` seconds, the request is aborted.
246     uint32 write = 3600;  // 1 hour
247 
TimeoutConfigTimeoutConfig248     TimeoutConfig() {}
TimeoutConfigTimeoutConfig249     TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
250                   uint32 write)
251         : connect(connect),
252           idle(idle),
253           metadata(metadata),
254           read(read),
255           write(write) {}
256   };
257 
258   Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
259 
260   /// \brief Sets a new AuthProvider on the GCS FileSystem.
261   ///
262   /// The new auth provider will be used for all subsequent requests.
263   void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
264 
265   /// \brief Resets the block cache and re-instantiates it with the new values.
266   ///
267   /// This method can be used to clear the existing block cache and/or to
268   /// re-configure the block cache for different values.
269   ///
270   /// Note: the existing block cache is not cleaned up until all existing files
271   /// have been closed.
272   void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
273                            uint64 max_staleness_secs);
274 
275  protected:
276   virtual std::unique_ptr<FileBlockCache> MakeFileBlockCache(
277       size_t block_size, size_t max_bytes, uint64 max_staleness);
278 
279   /// Loads file contents from GCS for a given filename, offset, and length.
280   virtual Status LoadBufferFromGCS(const string& fname, size_t offset, size_t n,
281                                    char* buffer, size_t* bytes_transferred);
282 
283   // Creates an upload session for an upcoming GCS object upload.
284   virtual Status CreateNewUploadSession(uint64 start_offset,
285                                         const std::string& object_to_upload,
286                                         const std::string& bucket,
287                                         uint64 file_size,
288                                         const std::string& gcs_path,
289                                         UploadSessionHandle* session_handle);
290 
291   // Uploads object data to session.
292   virtual Status UploadToSession(const std::string& session_uri,
293                                  uint64 start_offset, uint64 already_uploaded,
294                                  const std::string& tmp_content_filename,
295                                  uint64 file_size,
296                                  const std::string& file_path);
297 
298   /// \brief Requests status of a previously initiated upload session.
299   ///
300   /// If the upload has already succeeded, sets 'completed' to true.
301   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
302   /// uploaded size in bytes.
303   virtual Status RequestUploadSessionStatus(const string& session_uri,
304                                             uint64 file_size,
305                                             const std::string& gcs_path,
306                                             bool* completed, uint64* uploaded);
307 
308   Status ParseGcsPathForScheme(StringPiece fname, string scheme,
309                                bool empty_object_ok, string* bucket,
310                                string* object);
311 
312   /// \brief Splits a GCS path to a bucket and an object.
313   ///
314   /// For example, "gs://bucket-name/path/to/file.txt" gets split into
315   /// "bucket-name" and "path/to/file.txt".
316   /// If fname only contains the bucket and empty_object_ok = true, the returned
317   /// object is empty.
318   virtual Status ParseGcsPath(StringPiece fname, bool empty_object_ok,
319                               string* bucket, string* object);
320 
321   std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
322 
323   // Used by a subclass.
324   TimeoutConfig timeouts_;
325 
326   /// The retry configuration used for retrying failed calls.
327   RetryConfig retry_config_;
328 
329  private:
330   // GCS file statistics.
331   struct GcsFileStat {
332     FileStatistics base;
333     int64 generation_number = 0;
334   };
335 
336   /// \brief Checks if the bucket exists. Returns OK if the check succeeded.
337   ///
338   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
339   Status BucketExists(const string& bucket, bool* result);
340 
341   /// \brief Retrieves the GCS bucket location. Returns OK if the location was
342   /// retrieved.
343   ///
344   /// Given a string bucket the GCS bucket metadata API will be called and the
345   /// location string filled with the location of the bucket.
346   ///
347   /// This requires the bucket metadata permission.
348   /// Repeated calls for the same bucket are cached so this function can be
349   /// called frequently without causing an extra API call
350   Status GetBucketLocation(const string& bucket, string* location);
351 
352   /// \brief Check if the GCS buckets location is allowed with the current
353   /// constraint configuration
354   Status CheckBucketLocationConstraint(const string& bucket);
355 
356   /// \brief Given the input bucket `bucket`, fills `result_buffer` with the
357   /// results of the metadata. Returns OK if the API call succeeds without
358   /// error.
359   Status GetBucketMetadata(const string& bucket,
360                            std::vector<char>* result_buffer);
361 
362   /// \brief Checks if the object exists. Returns OK if the check succeeded.
363   ///
364   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
365   Status ObjectExists(const string& fname, const string& bucket,
366                       const string& object, bool* result);
367 
368   /// \brief Checks if the folder exists. Returns OK if the check succeeded.
369   ///
370   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
371   Status FolderExists(const string& dirname, bool* result);
372 
373   /// \brief Internal version of GetChildren with more knobs.
374   ///
375   /// If 'recursively' is true, returns all objects in all subfolders.
376   /// Otherwise only returns the immediate children in the directory.
377   ///
378   /// If 'include_self_directory_marker' is true and there is a GCS directory
379   /// marker at the path 'dir', GetChildrenBound will return an empty string
380   /// as one of the children that represents this marker.
381   Status GetChildrenBounded(const string& dir, uint64 max_results,
382                             std::vector<string>* result, bool recursively,
383                             bool include_self_directory_marker);
384 
385   /// Retrieves file statistics assuming fname points to a GCS object. The data
386   /// may be read from cache or from GCS directly.
387   Status StatForObject(const string& fname, const string& bucket,
388                        const string& object, GcsFileStat* stat);
389   /// Retrieves file statistics of file fname directly from GCS.
390   Status UncachedStatForObject(const string& fname, const string& bucket,
391                                const string& object, GcsFileStat* stat);
392 
393   Status RenameObject(const string& src, const string& target);
394 
395   // Clear all the caches related to the file with name `filename`.
396   void ClearFileCaches(const string& fname);
397 
398   mutex mu_;
399   std::unique_ptr<AuthProvider> auth_provider_ TF_GUARDED_BY(mu_);
400   std::shared_ptr<HttpRequest::Factory> http_request_factory_;
401   std::unique_ptr<ZoneProvider> zone_provider_;
402 
403   // Reads smaller than block_size_ will trigger a read of block_size_.
404   uint64 block_size_;
405 
406   // block_cache_lock_ protects the file_block_cache_ pointer (Note that
407   // FileBlockCache instances are themselves threadsafe).
408   mutex block_cache_lock_;
409   std::unique_ptr<FileBlockCache> file_block_cache_
410       TF_GUARDED_BY(block_cache_lock_);
411   std::unique_ptr<GcsDnsCache> dns_cache_;
412   GcsThrottle throttle_;
413 
414   using StatCache = ExpiringLRUCache<GcsFileStat>;
415   std::unique_ptr<StatCache> stat_cache_;
416 
417   using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
418   std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
419 
420   using BucketLocationCache = ExpiringLRUCache<string>;
421   std::unique_ptr<BucketLocationCache> bucket_location_cache_;
422   std::unordered_set<string> allowed_locations_;
423   bool compose_append_;
424 
425   GcsStatsInterface* stats_ = nullptr;  // Not owned.
426 
427   // Additional header material to be transmitted with all GCS requests
428   std::unique_ptr<std::pair<const string, const string>> additional_header_;
429 
430   TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
431 };
432 
433 /// Google Cloud Storage implementation of a file system with retry on failures.
434 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
435  public:
RetryingGcsFileSystem()436   RetryingGcsFileSystem()
437       : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem),
438                            RetryConfig(100000 /* init_delay_time_us */)) {}
439 };
440 
441 }  // namespace tensorflow
442 
443 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
444