• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17 
18 #include <stdio.h>
19 #include <unistd.h>
20 
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <vector>
27 
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/strcat.h"
30 #ifdef _WIN32
31 #include <io.h>  // for _mktemp
32 #endif
33 #include "absl/base/macros.h"
34 #include "json/json.h"
35 #include "tensorflow/core/lib/gtl/map_util.h"
36 #include "tensorflow/core/platform/cloud/curl_http_request.h"
37 #include "tensorflow/core/platform/cloud/file_block_cache.h"
38 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
39 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
40 #include "tensorflow/core/platform/cloud/time_util.h"
41 #include "tensorflow/core/platform/env.h"
42 #include "tensorflow/core/platform/errors.h"
43 #include "tensorflow/core/platform/mutex.h"
44 #include "tensorflow/core/platform/numbers.h"
45 #include "tensorflow/core/platform/path.h"
46 #include "tensorflow/core/platform/protobuf.h"
47 #include "tensorflow/core/platform/retrying_utils.h"
48 #include "tensorflow/core/platform/str_util.h"
49 #include "tensorflow/core/platform/stringprintf.h"
50 #include "tensorflow/core/platform/thread_annotations.h"
51 #include "tensorflow/core/profiler/lib/traceme.h"
52 
53 #ifdef _WIN32
54 #ifdef DeleteFile
55 #undef DeleteFile
56 #endif
57 #endif
58 
59 namespace tensorflow {
60 namespace {
61 
62 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
63 constexpr char kGcsUploadUriBase[] =
64     "https://www.googleapis.com/upload/storage/v1/";
65 constexpr char kStorageHost[] = "storage.googleapis.com";
66 constexpr char kBucketMetadataLocationKey[] = "location";
67 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024;  // In bytes.
68 constexpr int kGetChildrenDefaultPageSize = 1000;
69 // The HTTP response code "308 Resume Incomplete".
70 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
71 // The HTTP response code "412 Precondition Failed".
72 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
73 // The environment variable that overrides the size of the readahead buffer.
74 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
75 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
76 // The environment variable that overrides the maximum age of entries in the
77 // Stat cache. A value of 0 (the default) means nothing is cached.
78 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
79 constexpr uint64 kStatCacheDefaultMaxAge = 5;
80 // The environment variable that overrides the maximum number of entries in the
81 // Stat cache.
82 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
83 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
84 // The environment variable that overrides the maximum age of entries in the
85 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
86 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
87 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
88 // The environment variable that overrides the maximum number of entries in the
89 // GetMatchingPaths cache.
90 constexpr char kMatchingPathsCacheMaxEntries[] =
91     "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
92 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
93 // Number of bucket locations cached, most workloads wont touch more than one
94 // bucket so this limit is set fairly low
95 constexpr size_t kBucketLocationCacheMaxEntries = 10;
96 // ExpiringLRUCache doesnt support any "cache forever" option
97 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
98 // The file statistics returned by Stat() for directories.
99 const FileStatistics DIRECTORY_STAT(0, 0, true);
100 // Some environments exhibit unreliable DNS resolution. Set this environment
101 // variable to a positive integer describing the frequency used to refresh the
102 // userspace DNS cache.
103 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
104 // The environment variable to configure the http request's connection timeout.
105 constexpr char kRequestConnectionTimeout[] =
106     "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
107 // The environment variable to configure the http request's idle timeout.
108 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
109 // The environment variable to configure the overall request timeout for
110 // metadata requests.
111 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // block reads requests.
114 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // upload requests.
117 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure an additional header to send with
119 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
120 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
121 // The environment variable to configure the throttle (format: <int64>)
122 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
123 // The environment variable to configure the token bucket size (format: <int64>)
124 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
125 // The environment variable that controls the number of tokens per request.
126 // (format: <int64>)
127 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
128 // The environment variable to configure the initial tokens (format: <int64>)
129 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
130 
131 // The environment variable to customize which GCS bucket locations are allowed,
132 // if the list is empty defaults to using the region of the zone (format, comma
133 // delimited list). Requires 'storage.buckets.get' permission.
134 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
135 // When this value is passed as an allowed location detects the zone tensorflow
136 // is running in and restricts to buckets in that region.
137 constexpr char kDetectZoneSentinelValue[] = "auto";
138 
139 // How to upload new data when Flush() is called multiple times.
140 // By default the entire file is reuploaded.
141 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
142 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
143 // temporary object and composed with the original object. This is disabled by
144 // default as the multiple API calls required add a risk of stranding temporary
145 // objects.
146 constexpr char kComposeAppend[] = "compose";
147 
GetTmpFilename(string * filename)148 Status GetTmpFilename(string* filename) {
149   *filename = io::GetTempFilename("");
150   return Status::OK();
151 }
152 
153 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)154 string MaybeAppendSlash(const string& name) {
155   if (name.empty()) {
156     return "/";
157   }
158   if (name.back() != '/') {
159     return strings::StrCat(name, "/");
160   }
161   return name;
162 }
163 
164 // io::JoinPath() doesn't work in cases when we want an empty subpath
165 // to result in an appended slash in order for directory markers
166 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)167 string JoinGcsPath(const string& path, const string& subpath) {
168   return strings::StrCat(MaybeAppendSlash(path), subpath);
169 }
170 
171 /// \brief Returns the given paths appending all their subfolders.
172 ///
173 /// For every path X in the list, every subfolder in X is added to the
174 /// resulting list.
175 /// For example:
176 ///  - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
177 ///  - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
178 ///  - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
179 ///  - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)180 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
181   std::set<string> result;
182   result.insert(paths.begin(), paths.end());
183   for (const string& path : paths) {
184     StringPiece subpath = io::Dirname(path);
185     // If `path` starts with `/`, `subpath` will be `/` and then we get into an
186     // infinite loop. Same behavior happens if there is a `//` pattern in
187     // `path`, so we check for that and leave the loop quicker.
188     while (!(subpath.empty() || subpath == "/")) {
189       result.emplace(string(subpath));
190       subpath = io::Dirname(subpath);
191     }
192   }
193   return result;
194 }
195 
ParseJson(StringPiece json,Json::Value * result)196 Status ParseJson(StringPiece json, Json::Value* result) {
197   Json::Reader reader;
198   if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
199     return errors::Internal("Couldn't parse JSON response from GCS.");
200   }
201   return Status::OK();
202 }
203 
ParseJson(const std::vector<char> & json,Json::Value * result)204 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
205   return ParseJson(StringPiece{json.data(), json.size()}, result);
206 }
207 
208 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)209 Status GetValue(const Json::Value& parent, const char* name,
210                 Json::Value* result) {
211   *result = parent.get(name, Json::Value::null);
212   if (result->isNull()) {
213     return errors::Internal("The field '", name,
214                             "' was expected in the JSON response.");
215   }
216   return Status::OK();
217 }
218 
219 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)220 Status GetStringValue(const Json::Value& parent, const char* name,
221                       string* result) {
222   Json::Value result_value;
223   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
224   if (!result_value.isString()) {
225     return errors::Internal(
226         "The field '", name,
227         "' in the JSON response was expected to be a string.");
228   }
229   *result = result_value.asString();
230   return Status::OK();
231 }
232 
233 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64 * result)234 Status GetInt64Value(const Json::Value& parent, const char* name,
235                      int64* result) {
236   Json::Value result_value;
237   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
238   if (result_value.isNumeric()) {
239     *result = result_value.asInt64();
240     return Status::OK();
241   }
242   if (result_value.isString() &&
243       strings::safe_strto64(result_value.asCString(), result)) {
244     return Status::OK();
245   }
246   return errors::Internal(
247       "The field '", name,
248       "' in the JSON response was expected to be a number.");
249 }
250 
251 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)252 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
253   Json::Value result_value;
254   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
255   if (!result_value.isBool()) {
256     return errors::Internal(
257         "The field '", name,
258         "' in the JSON response was expected to be a boolean.");
259   }
260   *result = result_value.asBool();
261   return Status::OK();
262 }
263 
264 /// A GCS-based implementation of a random access file with an LRU block cache.
265 class GcsRandomAccessFile : public RandomAccessFile {
266  public:
267   using ReadFn =
268       std::function<Status(const string& filename, uint64 offset, size_t n,
269                            StringPiece* result, char* scratch)>;
270 
GcsRandomAccessFile(const string & filename,ReadFn read_fn)271   GcsRandomAccessFile(const string& filename, ReadFn read_fn)
272       : filename_(filename), read_fn_(std::move(read_fn)) {}
273 
Name(StringPiece * result) const274   Status Name(StringPiece* result) const override {
275     *result = filename_;
276     return Status::OK();
277   }
278 
279   /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const280   Status Read(uint64 offset, size_t n, StringPiece* result,
281               char* scratch) const override {
282     return read_fn_(filename_, offset, n, result, scratch);
283   }
284 
285  private:
286   /// The filename of this file.
287   const string filename_;
288   /// The implementation of the read operation (provided by the GCSFileSystem).
289   const ReadFn read_fn_;
290 };
291 
292 /// A GCS-based implementation of a random access file with a read buffer.
293 class BufferedGcsRandomAccessFile : public RandomAccessFile {
294  public:
295   using ReadFn =
296       std::function<Status(const string& filename, uint64 offset, size_t n,
297                            StringPiece* result, char* scratch)>;
298 
299   // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)300   BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
301                               ReadFn read_fn)
302       : filename_(filename),
303         read_fn_(std::move(read_fn)),
304         buffer_size_(buffer_size),
305         buffer_start_(0),
306         buffer_end_is_past_eof_(false) {}
307 
Name(StringPiece * result) const308   Status Name(StringPiece* result) const override {
309     *result = filename_;
310     return Status::OK();
311   }
312 
313   /// The implementation of reads with an read buffer. Thread safe.
314   /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
315   /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const316   Status Read(uint64 offset, size_t n, StringPiece* result,
317               char* scratch) const override {
318     if (n > buffer_size_) {
319       return read_fn_(filename_, offset, n, result, scratch);
320     }
321     {
322       mutex_lock l(buffer_mutex_);
323       size_t buffer_end = buffer_start_ + buffer_.size();
324       size_t copy_size = 0;
325       if (offset < buffer_end && offset >= buffer_start_) {
326         copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
327         memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
328         *result = StringPiece(scratch, copy_size);
329       }
330       bool consumed_buffer_to_eof =
331           offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
332       if (copy_size < n && !consumed_buffer_to_eof) {
333         Status status = FillBuffer(offset + copy_size);
334         if (!status.ok() && status.code() != errors::Code::OUT_OF_RANGE) {
335           // Empty the buffer to avoid caching bad reads.
336           buffer_.resize(0);
337           return status;
338         }
339         size_t remaining_copy = std::min(n - copy_size, buffer_.size());
340         memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
341         copy_size += remaining_copy;
342         *result = StringPiece(scratch, copy_size);
343       }
344       if (copy_size < n) {
345         // Forget the end-of-file flag to allow for clients that poll on the
346         // same file.
347         buffer_end_is_past_eof_ = false;
348         return errors::OutOfRange("EOF reached. Requested to read ", n,
349                                   " bytes from ", offset, ".");
350       }
351     }
352     return Status::OK();
353   }
354 
355  private:
FillBuffer(uint64 start) const356   Status FillBuffer(uint64 start) const
357       TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
358     buffer_start_ = start;
359     buffer_.resize(buffer_size_);
360     StringPiece str_piece;
361     Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
362                              &(buffer_[0]));
363     buffer_end_is_past_eof_ = status.code() == errors::Code::OUT_OF_RANGE;
364     buffer_.resize(str_piece.size());
365     return status;
366   }
367 
368   // The filename of this file.
369   const string filename_;
370 
371   // The implementation of the read operation (provided by the GCSFileSystem).
372   const ReadFn read_fn_;
373 
374   // Size of buffer that we read from GCS each time we send a request.
375   const uint64 buffer_size_;
376 
377   // Mutex for buffering operations that can be accessed from multiple threads.
378   // The following members are mutable in order to provide a const Read.
379   mutable mutex buffer_mutex_;
380 
381   // Offset of buffer from start of the file.
382   mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
383 
384   mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
385 
386   mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
387 };
388 
389 // Function object declaration with params needed to create upload sessions.
390 typedef std::function<Status(
391     uint64 start_offset, const std::string& object_to_upload,
392     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
393     UploadSessionHandle* session_handle)>
394     SessionCreator;
395 
396 // Function object declaration with params needed to upload objects.
397 typedef std::function<Status(const std::string& session_uri,
398                              uint64 start_offset, uint64 already_uploaded,
399                              const std::string& tmp_content_filename,
400                              uint64 file_size, const std::string& file_path)>
401     ObjectUploader;
402 
403 // Function object declaration with params needed to poll upload status.
404 typedef std::function<Status(const string& session_uri, uint64 file_size,
405                              const std::string& gcs_path, bool* completed,
406                              uint64* uploaded)>
407     StatusPoller;
408 
409 // Function object declaration with params needed to poll upload status.
410 typedef std::function<Status(const string& fname, const string& bucket,
411                              const string& object, int64* generation)>
412     GenerationGetter;
413 
414 /// \brief GCS-based implementation of a writeable file.
415 ///
416 /// Since GCS objects are immutable, this implementation writes to a local
417 /// tmp file and copies it to GCS on flush/close.
418 class GcsWritableFile : public WritableFile {
419  public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)420   GcsWritableFile(const string& bucket, const string& object,
421                   GcsFileSystem* filesystem,
422                   GcsFileSystem::TimeoutConfig* timeouts,
423                   std::function<void()> file_cache_erase,
424                   RetryConfig retry_config, bool compose_append,
425                   SessionCreator session_creator,
426                   ObjectUploader object_uploader, StatusPoller status_poller,
427                   GenerationGetter generation_getter)
428       : bucket_(bucket),
429         object_(object),
430         filesystem_(filesystem),
431         timeouts_(timeouts),
432         file_cache_erase_(std::move(file_cache_erase)),
433         sync_needed_(true),
434         retry_config_(retry_config),
435         compose_append_(compose_append),
436         start_offset_(0),
437         session_creator_(std::move(session_creator)),
438         object_uploader_(std::move(object_uploader)),
439         status_poller_(std::move(status_poller)),
440         generation_getter_(std::move(generation_getter)) {
441     // TODO: to make it safer, outfile_ should be constructed from an FD
442     VLOG(3) << "GcsWritableFile: " << GetGcsPath();
443     if (GetTmpFilename(&tmp_content_filename_).ok()) {
444       outfile_.open(tmp_content_filename_,
445                     std::ofstream::binary | std::ofstream::app);
446     }
447   }
448 
449   /// \brief Constructs the writable file in append mode.
450   ///
451   /// tmp_content_filename should contain a path of an existing temporary file
452   /// with the content to be appended. The class takes ownership of the
453   /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)454   GcsWritableFile(const string& bucket, const string& object,
455                   GcsFileSystem* filesystem, const string& tmp_content_filename,
456                   GcsFileSystem::TimeoutConfig* timeouts,
457                   std::function<void()> file_cache_erase,
458                   RetryConfig retry_config, bool compose_append,
459                   SessionCreator session_creator,
460                   ObjectUploader object_uploader, StatusPoller status_poller,
461                   GenerationGetter generation_getter)
462       : bucket_(bucket),
463         object_(object),
464         filesystem_(filesystem),
465         timeouts_(timeouts),
466         file_cache_erase_(std::move(file_cache_erase)),
467         sync_needed_(true),
468         retry_config_(retry_config),
469         compose_append_(compose_append),
470         start_offset_(0),
471         session_creator_(std::move(session_creator)),
472         object_uploader_(std::move(object_uploader)),
473         status_poller_(std::move(status_poller)),
474         generation_getter_(std::move(generation_getter)) {
475     VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
476             << tmp_content_filename;
477     tmp_content_filename_ = tmp_content_filename;
478     outfile_.open(tmp_content_filename_,
479                   std::ofstream::binary | std::ofstream::app);
480   }
481 
~GcsWritableFile()482   ~GcsWritableFile() override {
483     Close().IgnoreError();
484     std::remove(tmp_content_filename_.c_str());
485   }
486 
Append(StringPiece data)487   Status Append(StringPiece data) override {
488     TF_RETURN_IF_ERROR(CheckWritable());
489     VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
490     sync_needed_ = true;
491     outfile_ << data;
492     if (!outfile_.good()) {
493       return errors::Internal(
494           "Could not append to the internal temporary file.");
495     }
496     return Status::OK();
497   }
498 
Close()499   Status Close() override {
500     VLOG(3) << "Close:" << GetGcsPath();
501     if (outfile_.is_open()) {
502       Status sync_status = Sync();
503       if (sync_status.ok()) {
504         outfile_.close();
505       }
506       return sync_status;
507     }
508     return Status::OK();
509   }
510 
Flush()511   Status Flush() override {
512     VLOG(3) << "Flush:" << GetGcsPath();
513     return Sync();
514   }
515 
Name(StringPiece * result) const516   Status Name(StringPiece* result) const override {
517     return errors::Unimplemented("GCSWritableFile does not support Name()");
518   }
519 
Sync()520   Status Sync() override {
521     VLOG(3) << "Sync started:" << GetGcsPath();
522     TF_RETURN_IF_ERROR(CheckWritable());
523     if (!sync_needed_) {
524       return Status::OK();
525     }
526     Status status = SyncImpl();
527     VLOG(3) << "Sync finished " << GetGcsPath();
528     if (status.ok()) {
529       sync_needed_ = false;
530     }
531     return status;
532   }
533 
Tell(int64 * position)534   Status Tell(int64* position) override {
535     *position = outfile_.tellp();
536     if (*position == -1) {
537       return errors::Internal("tellp on the internal temporary file failed");
538     }
539     return Status::OK();
540   }
541 
542  private:
543   /// Copies the current version of the file to GCS.
544   ///
545   /// This SyncImpl() uploads the object to GCS.
546   /// In case of a failure, it resumes failed uploads as recommended by the GCS
547   /// resumable API documentation. When the whole upload needs to be
548   /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()549   Status SyncImpl() {
550     outfile_.flush();
551     if (!outfile_.good()) {
552       return errors::Internal(
553           "Could not write to the internal temporary file.");
554     }
555     UploadSessionHandle session_handle;
556     uint64 start_offset = 0;
557     string object_to_upload = object_;
558     bool should_compose = false;
559     if (compose_append_) {
560       start_offset = start_offset_;
561       // Only compose if the object has already been uploaded to GCS
562       should_compose = start_offset > 0;
563       if (should_compose) {
564         object_to_upload =
565             strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
566                             io::Basename(object_), ".", start_offset_);
567       }
568     }
569     TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
570                                               &session_handle));
571     uint64 already_uploaded = 0;
572     bool first_attempt = true;
573     const Status upload_status = RetryingUtils::CallWithRetries(
574         [&first_attempt, &already_uploaded, &session_handle, &start_offset,
575          this]() {
576           if (session_handle.resumable && !first_attempt) {
577             bool completed;
578             TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
579                 session_handle.session_uri, &completed, &already_uploaded));
580             LOG(INFO) << "### RequestUploadSessionStatus: completed = "
581                       << completed
582                       << ", already_uploaded = " << already_uploaded
583                       << ", file = " << GetGcsPath();
584             if (completed) {
585               // Erase the file from the file cache on every successful write.
586               file_cache_erase_();
587               // It's unclear why UploadToSession didn't return OK in the
588               // previous attempt, but GCS reports that the file is fully
589               // uploaded, so succeed.
590               return Status::OK();
591             }
592           }
593           first_attempt = false;
594           return UploadToSession(session_handle.session_uri, start_offset,
595                                  already_uploaded);
596         },
597         retry_config_);
598     if (upload_status.code() == errors::Code::NOT_FOUND) {
599       // GCS docs recommend retrying the whole upload. We're relying on the
600       // RetryingFileSystem to retry the Sync() call.
601       return errors::Unavailable(
602           strings::StrCat("Upload to gs://", bucket_, "/", object_,
603                           " failed, caused by: ", upload_status.ToString()));
604     }
605     if (upload_status.ok()) {
606       if (should_compose) {
607         TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
608       }
609       TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
610     }
611     return upload_status;
612   }
613 
CheckWritable() const614   Status CheckWritable() const {
615     if (!outfile_.is_open()) {
616       return errors::FailedPrecondition(
617           "The internal temporary file is not writable.");
618     }
619     return Status::OK();
620   }
621 
GetCurrentFileSize(uint64 * size)622   Status GetCurrentFileSize(uint64* size) {
623     const auto tellp = outfile_.tellp();
624     if (tellp == static_cast<std::streampos>(-1)) {
625       return errors::Internal(
626           "Could not get the size of the internal temporary file.");
627     }
628     *size = tellp;
629     return Status::OK();
630   }
631 
632   /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)633   Status CreateNewUploadSession(uint64 start_offset,
634                                 std::string object_to_upload,
635                                 UploadSessionHandle* session_handle) {
636     uint64 file_size;
637     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
638     return session_creator_(start_offset, object_to_upload, bucket_, file_size,
639                             GetGcsPath(), session_handle);
640   }
641 
642   /// Appends the data of append_object to the original object and deletes
643   /// append_object.
AppendObject(string append_object)644   Status AppendObject(string append_object) {
645     const string append_object_path = GetGcsPathWithObject(append_object);
646     VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
647 
648     int64 generation = 0;
649     TF_RETURN_IF_ERROR(
650         generation_getter_(GetGcsPath(), bucket_, object_, &generation));
651 
652     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
653         [&append_object, &generation, this]() {
654           std::unique_ptr<HttpRequest> request;
655           TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
656 
657           request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
658                                           request->EscapeString(object_),
659                                           "/compose"));
660 
661           const string request_body = strings::StrCat(
662               "{'sourceObjects': [{'name': '", object_,
663               "','objectPrecondition':{'ifGenerationMatch':", generation,
664               "}},{'name': '", append_object, "'}]}");
665           request->SetTimeouts(timeouts_->connect, timeouts_->idle,
666                                timeouts_->metadata);
667           request->AddHeader("content-type", "application/json");
668           request->SetPostFromBuffer(request_body.c_str(), request_body.size());
669           TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
670                                           " when composing to ", GetGcsPath());
671           return Status::OK();
672         },
673         retry_config_));
674 
675     return RetryingUtils::DeleteWithRetries(
676         [&append_object_path, this]() {
677           return filesystem_->DeleteFile(append_object_path, nullptr);
678         },
679         retry_config_);
680   }
681 
682   /// \brief Requests status of a previously initiated upload session.
683   ///
684   /// If the upload has already succeeded, sets 'completed' to true.
685   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
686   /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)687   Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
688                                     uint64* uploaded) {
689     uint64 file_size;
690     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
691     return status_poller_(session_uri, file_size, GetGcsPath(), completed,
692                           uploaded);
693   }
694 
695   /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)696   Status UploadToSession(const string& session_uri, uint64 start_offset,
697                          uint64 already_uploaded) {
698     uint64 file_size;
699     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
700     Status status =
701         object_uploader_(session_uri, start_offset, already_uploaded,
702                          tmp_content_filename_, file_size, GetGcsPath());
703     if (status.ok()) {
704       // Erase the file from the file cache on every successful write.
705       // Note: Only local cache, this does nothing on distributed cache. The
706       // distributed cache clears the cache as it is needed.
707       file_cache_erase_();
708     }
709 
710     return status;
711   }
712 
GetGcsPathWithObject(string object) const713   string GetGcsPathWithObject(string object) const {
714     return strings::StrCat("gs://", bucket_, "/", object);
715   }
GetGcsPath() const716   string GetGcsPath() const { return GetGcsPathWithObject(object_); }
717 
718   string bucket_;
719   string object_;
720   GcsFileSystem* const filesystem_;  // Not owned.
721   string tmp_content_filename_;
722   std::ofstream outfile_;
723   GcsFileSystem::TimeoutConfig* timeouts_;
724   std::function<void()> file_cache_erase_;
725   bool sync_needed_;  // whether there is buffered data that needs to be synced
726   RetryConfig retry_config_;
727   bool compose_append_;
728   uint64 start_offset_;
729   // Callbacks to the file system used to upload object into GCS.
730   const SessionCreator session_creator_;
731   const ObjectUploader object_uploader_;
732   const StatusPoller status_poller_;
733   const GenerationGetter generation_getter_;
734 };
735 
736 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
737  public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)738   GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
739       : data_(std::move(data)), length_(length) {}
data()740   const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()741   uint64 length() override { return length_; }
742 
743  private:
744   std::unique_ptr<char[]> data_;
745   uint64 length_;
746 };
747 
StringPieceIdentity(StringPiece str,StringPiece * value)748 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
749   *value = str;
750   return true;
751 }
752 
753 /// \brief Utility function to split a comma delimited list of strings to an
754 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)755 bool SplitByCommaToLowercaseSet(StringPiece list,
756                                 std::unordered_set<string>* set) {
757   std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
758   *set = std::unordered_set<string>(vector.begin(), vector.end());
759   return true;
760 }
761 
762 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)763 string ZoneToRegion(string* zone) {
764   return zone->substr(0, zone->find_last_of('-'));
765 }
766 
767 }  // namespace
768 
GcsFileSystem(bool make_default_cache)769 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
770   uint64 value;
771   block_size_ = kDefaultBlockSize;
772   size_t max_bytes = kDefaultMaxCacheSize;
773   uint64 max_staleness = kDefaultMaxStaleness;
774 
775   http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
776   compute_engine_metadata_client_ =
777       std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
778   auth_provider_ = std::unique_ptr<AuthProvider>(
779       new GoogleAuthProvider(compute_engine_metadata_client_));
780   zone_provider_ = std::unique_ptr<ZoneProvider>(
781       new ComputeEngineZoneProvider(compute_engine_metadata_client_));
782 
783   // Apply the sys env override for the readahead buffer size if it's provided.
784   if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
785     block_size_ = value;
786   }
787 
788   // Apply the overrides for the block size (MB), max bytes (MB), and max
789   // staleness (seconds) if provided.
790   if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
791     block_size_ = value * 1024 * 1024;
792   }
793 
794   if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
795     max_bytes = value * 1024 * 1024;
796   }
797 
798   if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
799     max_staleness = value;
800   }
801   if (!make_default_cache) {
802     max_bytes = 0;
803   }
804   VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
805           << "block size = " << block_size_ << " ; "
806           << "max staleness = " << max_staleness;
807   file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
808   // Apply overrides for the stat cache max age and max entries, if provided.
809   uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
810   size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
811   if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
812     stat_cache_max_age = value;
813   }
814   if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
815     stat_cache_max_entries = value;
816   }
817   stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
818                                                       stat_cache_max_entries));
819   // Apply overrides for the matching paths cache max age and max entries, if
820   // provided.
821   uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
822   size_t matching_paths_cache_max_entries =
823       kMatchingPathsCacheDefaultMaxEntries;
824   if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
825     matching_paths_cache_max_age = value;
826   }
827   if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
828                 &value)) {
829     matching_paths_cache_max_entries = value;
830   }
831   matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
832       matching_paths_cache_max_age, matching_paths_cache_max_entries));
833 
834   bucket_location_cache_.reset(new ExpiringLRUCache<string>(
835       kCacheNeverExpire, kBucketLocationCacheMaxEntries));
836 
837   int64 resolve_frequency_secs;
838   if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
839                 &resolve_frequency_secs)) {
840     dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
841     VLOG(1) << "GCS DNS cache is enabled.  " << kResolveCacheSecs << " = "
842             << resolve_frequency_secs;
843   } else {
844     VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
845             << " = 0 (or is not set)";
846   }
847 
848   // Get the additional header
849   StringPiece add_header_contents;
850   if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
851                 &add_header_contents)) {
852     size_t split = add_header_contents.find(':', 0);
853 
854     if (split != StringPiece::npos) {
855       StringPiece header_name = add_header_contents.substr(0, split);
856       StringPiece header_value = add_header_contents.substr(split + 1);
857 
858       if (!header_name.empty() && !header_value.empty()) {
859         additional_header_.reset(new std::pair<const string, const string>(
860             string(header_name), string(header_value)));
861 
862         VLOG(1) << "GCS additional header ENABLED. "
863                 << "Name: " << additional_header_->first << ", "
864                 << "Value: " << additional_header_->second;
865       } else {
866         LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
867                    << add_header_contents;
868       }
869     } else {
870       LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
871                  << add_header_contents;
872     }
873   } else {
874     VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
875   }
876 
877   // Apply the overrides for request timeouts
878   uint32 timeout_value;
879   if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
880                 &timeout_value)) {
881     timeouts_.connect = timeout_value;
882   }
883   if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
884     timeouts_.idle = timeout_value;
885   }
886   if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
887                 &timeout_value)) {
888     timeouts_.metadata = timeout_value;
889   }
890   if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
891     timeouts_.read = timeout_value;
892   }
893   if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
894     timeouts_.write = timeout_value;
895   }
896 
897   int64 token_value;
898   if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
899     GcsThrottleConfig config;
900     config.enabled = true;
901     config.token_rate = token_value;
902 
903     if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
904       config.bucket_size = token_value;
905     }
906 
907     if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
908       config.tokens_per_request = token_value;
909     }
910 
911     if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
912       config.initial_tokens = token_value;
913     }
914     throttle_.SetConfig(config);
915   }
916 
917   GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
918             &allowed_locations_);
919 
920   StringPiece append_mode;
921   GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
922   if (append_mode == kComposeAppend) {
923     compose_append_ = true;
924   } else {
925     compose_append_ = false;
926   }
927 }
928 
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)929 GcsFileSystem::GcsFileSystem(
930     std::unique_ptr<AuthProvider> auth_provider,
931     std::unique_ptr<HttpRequest::Factory> http_request_factory,
932     std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
933     size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
934     size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
935     size_t matching_paths_cache_max_entries, RetryConfig retry_config,
936     TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
937     std::pair<const string, const string>* additional_header,
938     bool compose_append)
939     : timeouts_(timeouts),
940       retry_config_(retry_config),
941       auth_provider_(std::move(auth_provider)),
942       http_request_factory_(std::move(http_request_factory)),
943       zone_provider_(std::move(zone_provider)),
944       block_size_(block_size),
945       file_block_cache_(
946           MakeFileBlockCache(block_size, max_bytes, max_staleness)),
947       stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
948       matching_paths_cache_(new MatchingPathsCache(
949           matching_paths_cache_max_age, matching_paths_cache_max_entries)),
950       bucket_location_cache_(new BucketLocationCache(
951           kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
952       allowed_locations_(allowed_locations),
953       compose_append_(compose_append),
954       additional_header_(additional_header) {}
955 
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)956 Status GcsFileSystem::NewRandomAccessFile(
957     const string& fname, TransactionToken* token,
958     std::unique_ptr<RandomAccessFile>* result) {
959   string bucket, object;
960   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
961   TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
962   bool cache_enabled;
963   {
964     mutex_lock l(block_cache_lock_);
965     cache_enabled = file_block_cache_->IsCacheEnabled();
966   }
967   if (cache_enabled) {
968     result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
969                                                      const string& fname,
970                                                      uint64 offset, size_t n,
971                                                      StringPiece* result,
972                                                      char* scratch) {
973       tf_shared_lock l(block_cache_lock_);
974       GcsFileStat stat;
975       TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
976           fname, &stat,
977           [this, bucket, object](const string& fname, GcsFileStat* stat) {
978             return UncachedStatForObject(fname, bucket, object, stat);
979           }));
980       if (!file_block_cache_->ValidateAndUpdateFileSignature(
981               fname, stat.generation_number)) {
982         VLOG(1)
983             << "File signature has been changed. Refreshing the cache. Path: "
984             << fname;
985       }
986       *result = StringPiece();
987       size_t bytes_transferred;
988       TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
989                                                  &bytes_transferred));
990       *result = StringPiece(scratch, bytes_transferred);
991       if (bytes_transferred < n) {
992         return errors::OutOfRange("EOF reached, ", result->size(),
993                                   " bytes were read out of ", n,
994                                   " bytes requested.");
995       }
996       return Status::OK();
997     }));
998   } else {
999     result->reset(new BufferedGcsRandomAccessFile(
1000         fname, block_size_,
1001         [this, bucket, object](const string& fname, uint64 offset, size_t n,
1002                                StringPiece* result, char* scratch) {
1003           *result = StringPiece();
1004           size_t bytes_transferred;
1005           TF_RETURN_IF_ERROR(
1006               LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1007           *result = StringPiece(scratch, bytes_transferred);
1008           if (bytes_transferred < n) {
1009             return errors::OutOfRange("EOF reached, ", result->size(),
1010                                       " bytes were read out of ", n,
1011                                       " bytes requested.");
1012           }
1013           return Status::OK();
1014         }));
1015   }
1016   return Status::OK();
1017 }
1018 
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1019 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1020                                         size_t max_bytes,
1021                                         uint64 max_staleness_secs) {
1022   mutex_lock l(block_cache_lock_);
1023   file_block_cache_ =
1024       MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1025   if (stats_ != nullptr) {
1026     stats_->Configure(this, &throttle_, file_block_cache_.get());
1027   }
1028 }
1029 
1030 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1031 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1032     size_t block_size, size_t max_bytes, uint64 max_staleness) {
1033   std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1034       block_size, max_bytes, max_staleness,
1035       [this](const string& filename, size_t offset, size_t n, char* buffer,
1036              size_t* bytes_transferred) {
1037         return LoadBufferFromGCS(filename, offset, n, buffer,
1038                                  bytes_transferred);
1039       }));
1040   return file_block_cache;
1041 }
1042 
1043 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1044 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1045                                         size_t n, char* buffer,
1046                                         size_t* bytes_transferred) {
1047   *bytes_transferred = 0;
1048 
1049   string bucket, object;
1050   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1051 
1052   profiler::TraceMe activity(
1053       [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1054 
1055   std::unique_ptr<HttpRequest> request;
1056   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1057                                   "when reading gs://", bucket, "/", object);
1058 
1059   request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1060                                   request->EscapeString(object)));
1061   request->SetRange(offset, offset + n - 1);
1062   request->SetResultBufferDirect(buffer, n);
1063   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1064 
1065   if (stats_ != nullptr) {
1066     stats_->RecordBlockLoadRequest(fname, offset);
1067   }
1068 
1069   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1070                                   bucket, "/", object);
1071 
1072   size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1073   *bytes_transferred = bytes_read;
1074   VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1075           << offset << " of size: " << bytes_read;
1076   activity.AppendMetadata([bytes_read]() {
1077     return profiler::TraceMeEncode({{"block_size", bytes_read}});
1078   });
1079 
1080   if (stats_ != nullptr) {
1081     stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1082   }
1083 
1084   throttle_.RecordResponse(bytes_read);
1085 
1086   if (bytes_read < n) {
1087     // Check stat cache to see if we encountered an interrupted read.
1088     GcsFileStat stat;
1089     if (stat_cache_->Lookup(fname, &stat)) {
1090       if (offset + bytes_read < stat.base.length) {
1091         return errors::Internal(strings::Printf(
1092             "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1093             offset));
1094       }
1095       VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1096               << object << " @ " << offset;
1097     }
1098   }
1099 
1100   return Status::OK();
1101 }
1102 
1103 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1104 Status GcsFileSystem::CreateNewUploadSession(
1105     uint64 start_offset, const std::string& object_to_upload,
1106     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1107     UploadSessionHandle* session_handle) {
1108   std::vector<char> output_buffer;
1109   std::unique_ptr<HttpRequest> request;
1110   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1111 
1112   std::string uri = strings::StrCat(
1113       kGcsUploadUriBase, "b/", bucket,
1114       "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1115   request->SetUri(uri);
1116   request->AddHeader("X-Upload-Content-Length",
1117                      absl::StrCat(file_size - start_offset));
1118   request->SetPostEmptyBody();
1119   request->SetResultBuffer(&output_buffer);
1120   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1121   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1122                                   " when initiating an upload to ", gcs_path);
1123   if (session_handle != nullptr) {
1124     session_handle->resumable = true;
1125     session_handle->session_uri = request->GetResponseHeader("Location");
1126     if (session_handle->session_uri.empty()) {
1127       return errors::Internal("Unexpected response from GCS when writing to ",
1128                               gcs_path, ": 'Location' header not returned.");
1129     }
1130   }
1131   return Status::OK();
1132 }
1133 
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1134 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1135                                       uint64 start_offset,
1136                                       uint64 already_uploaded,
1137                                       const std::string& tmp_content_filename,
1138                                       uint64 file_size,
1139                                       const std::string& file_path) {
1140   std::unique_ptr<HttpRequest> request;
1141   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1142   request->SetUri(session_uri);
1143   if (file_size > 0) {
1144     request->AddHeader("Content-Range",
1145                        strings::StrCat("bytes ", already_uploaded, "-",
1146                                        file_size - start_offset - 1, "/",
1147                                        file_size - start_offset));
1148   }
1149   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1150 
1151   TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1152                                              start_offset + already_uploaded));
1153   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1154                                   file_path);
1155   return Status::OK();
1156 }
1157 
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1158 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1159                                                  uint64 file_size,
1160                                                  const std::string& gcs_path,
1161                                                  bool* completed,
1162                                                  uint64* uploaded) {
1163   CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1164                                  "param 'completed' == nullptr.";  // Crash ok
1165   CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1166                                 "param 'uploaded' == nullptr.";  // Crash ok
1167   std::unique_ptr<HttpRequest> request;
1168   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1169   request->SetUri(session_uri);
1170   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1171   request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1172   request->SetPutEmptyBody();
1173   Status status = request->Send();
1174   if (status.ok()) {
1175     *completed = true;
1176     return Status::OK();
1177   }
1178   *completed = false;
1179   if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1180     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1181   }
1182   const std::string received_range = request->GetResponseHeader("Range");
1183   if (received_range.empty()) {
1184     // This means GCS doesn't have any bytes of the file yet.
1185     *uploaded = 0;
1186   } else {
1187     StringPiece range_piece(received_range);
1188     absl::ConsumePrefix(&range_piece,
1189                         "bytes=");  // May or may not be present.
1190 
1191     auto return_error = [](const std::string& gcs_path,
1192                            const std::string& error_message) {
1193       return errors::Internal("Unexpected response from GCS when writing ",
1194                               gcs_path, ": ", error_message);
1195     };
1196 
1197     std::vector<string> range_strs = str_util::Split(range_piece, '-');
1198     if (range_strs.size() != 2) {
1199       return return_error(gcs_path, "Range header '" + received_range +
1200                                         "' could not be parsed.");
1201     }
1202 
1203     std::vector<int64> range_parts;
1204     for (const std::string& range_str : range_strs) {
1205       int64 tmp;
1206       if (strings::safe_strto64(range_str, &tmp)) {
1207         range_parts.push_back(tmp);
1208       } else {
1209         return return_error(gcs_path, "Range header '" + received_range +
1210                                           "' could not be parsed.");
1211       }
1212     }
1213 
1214     if (range_parts[0] != 0) {
1215       return return_error(gcs_path, "The returned range '" + received_range +
1216                                         "' does not start at zero.");
1217     }
1218     // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1219     *uploaded = range_parts[1] + 1;
1220   }
1221   return Status::OK();
1222 }
1223 
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1224 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1225                                             bool empty_object_ok,
1226                                             string* bucket, string* object) {
1227   StringPiece parsed_scheme, bucketp, objectp;
1228   io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1229   if (parsed_scheme != scheme) {
1230     return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1231                                    fname);
1232   }
1233   *bucket = string(bucketp);
1234   if (bucket->empty() || *bucket == ".") {
1235     return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1236                                    fname);
1237   }
1238   absl::ConsumePrefix(&objectp, "/");
1239   *object = string(objectp);
1240   if (!empty_object_ok && object->empty()) {
1241     return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1242                                    fname);
1243   }
1244   return Status::OK();
1245 }
1246 
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1247 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1248                                    string* bucket, string* object) {
1249   return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1250 }
1251 
ClearFileCaches(const string & fname)1252 void GcsFileSystem::ClearFileCaches(const string& fname) {
1253   tf_shared_lock l(block_cache_lock_);
1254   file_block_cache_->RemoveFile(fname);
1255   stat_cache_->Delete(fname);
1256   // TODO(rxsang): Remove the patterns that matche the file in
1257   // MatchingPathsCache as well.
1258 }
1259 
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1260 Status GcsFileSystem::NewWritableFile(const string& fname,
1261                                       TransactionToken* token,
1262                                       std::unique_ptr<WritableFile>* result) {
1263   string bucket, object;
1264   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1265 
1266   auto session_creator =
1267       [this](uint64 start_offset, const std::string& object_to_upload,
1268              const std::string& bucket, uint64 file_size,
1269              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1270         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1271                                       file_size, gcs_path, session_handle);
1272       };
1273   auto object_uploader =
1274       [this](const std::string& session_uri, uint64 start_offset,
1275              uint64 already_uploaded, const std::string& tmp_content_filename,
1276              uint64 file_size, const std::string& file_path) {
1277         return UploadToSession(session_uri, start_offset, already_uploaded,
1278                                tmp_content_filename, file_size, file_path);
1279       };
1280   auto status_poller = [this](const string& session_uri, uint64 file_size,
1281                               const std::string& gcs_path, bool* completed,
1282                               uint64* uploaded) {
1283     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1284                                       completed, uploaded);
1285   };
1286 
1287   auto generation_getter = [this](const string& fname, const string& bucket,
1288                                   const string& object, int64* generation) {
1289     GcsFileStat stat;
1290     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1291         [&fname, &bucket, &object, &stat, this]() {
1292           return UncachedStatForObject(fname, bucket, object, &stat);
1293         },
1294         retry_config_));
1295     *generation = stat.generation_number;
1296     return Status::OK();
1297   };
1298 
1299   result->reset(new GcsWritableFile(
1300       bucket, object, this, &timeouts_,
1301       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1302       compose_append_, session_creator, object_uploader, status_poller,
1303       generation_getter));
1304   return Status::OK();
1305 }
1306 
1307 // Reads the file from GCS in chunks and stores it in a tmp file,
1308 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1309 Status GcsFileSystem::NewAppendableFile(const string& fname,
1310                                         TransactionToken* token,
1311                                         std::unique_ptr<WritableFile>* result) {
1312   std::unique_ptr<RandomAccessFile> reader;
1313   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1314   std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1315   Status status;
1316   uint64 offset = 0;
1317   StringPiece read_chunk;
1318 
1319   // Read the file from GCS in chunks and save it to a tmp file.
1320   string old_content_filename;
1321   TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1322   std::ofstream old_content(old_content_filename, std::ofstream::binary);
1323   while (true) {
1324     status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1325                           buffer.get());
1326     if (status.ok()) {
1327       old_content << read_chunk;
1328       offset += kReadAppendableFileBufferSize;
1329     } else if (status.code() == error::NOT_FOUND) {
1330       // New file, there is no existing content in it.
1331       break;
1332     } else if (status.code() == error::OUT_OF_RANGE) {
1333       // Expected, this means we reached EOF.
1334       old_content << read_chunk;
1335       break;
1336     } else {
1337       return status;
1338     }
1339   }
1340   old_content.close();
1341 
1342   auto session_creator =
1343       [this](uint64 start_offset, const std::string& object_to_upload,
1344              const std::string& bucket, uint64 file_size,
1345              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1346         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1347                                       file_size, gcs_path, session_handle);
1348       };
1349   auto object_uploader =
1350       [this](const std::string& session_uri, uint64 start_offset,
1351              uint64 already_uploaded, const std::string& tmp_content_filename,
1352              uint64 file_size, const std::string& file_path) {
1353         return UploadToSession(session_uri, start_offset, already_uploaded,
1354                                tmp_content_filename, file_size, file_path);
1355       };
1356 
1357   auto status_poller = [this](const string& session_uri, uint64 file_size,
1358                               const std::string& gcs_path, bool* completed,
1359                               uint64* uploaded) {
1360     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1361                                       completed, uploaded);
1362   };
1363 
1364   auto generation_getter = [this](const string& fname, const string& bucket,
1365                                   const string& object, int64* generation) {
1366     GcsFileStat stat;
1367     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1368         [&fname, &bucket, &object, &stat, this]() {
1369           return UncachedStatForObject(fname, bucket, object, &stat);
1370         },
1371         retry_config_));
1372     *generation = stat.generation_number;
1373     return Status::OK();
1374   };
1375 
1376   // Create a writable file and pass the old content to it.
1377   string bucket, object;
1378   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1379   result->reset(new GcsWritableFile(
1380       bucket, object, this, old_content_filename, &timeouts_,
1381       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1382       compose_append_, session_creator, object_uploader, status_poller,
1383       generation_getter));
1384   return Status::OK();
1385 }
1386 
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1387 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1388     const string& fname, TransactionToken* token,
1389     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1390   uint64 size;
1391   TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1392   std::unique_ptr<char[]> data(new char[size]);
1393 
1394   std::unique_ptr<RandomAccessFile> file;
1395   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1396 
1397   StringPiece piece;
1398   TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1399 
1400   result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1401   return Status::OK();
1402 }
1403 
FileExists(const string & fname,TransactionToken * token)1404 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1405   string bucket, object;
1406   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1407   if (object.empty()) {
1408     bool result;
1409     TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1410     if (result) {
1411       return Status::OK();
1412     }
1413   }
1414 
1415   // Check if the object exists.
1416   GcsFileStat stat;
1417   const Status status = StatForObject(fname, bucket, object, &stat);
1418   if (status.code() != errors::Code::NOT_FOUND) {
1419     return status;
1420   }
1421 
1422   // Check if the folder exists.
1423   bool result;
1424   TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1425   if (result) {
1426     return Status::OK();
1427   }
1428   return errors::NotFound("The specified path ", fname, " was not found.");
1429 }
1430 
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1431 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1432                                    const string& object, bool* result) {
1433   GcsFileStat stat;
1434   const Status status = StatForObject(fname, bucket, object, &stat);
1435   switch (status.code()) {
1436     case errors::Code::OK:
1437       *result = !stat.base.is_directory;
1438       return Status::OK();
1439     case errors::Code::NOT_FOUND:
1440       *result = false;
1441       return Status::OK();
1442     default:
1443       return status;
1444   }
1445 }
1446 
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1447 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1448                                             const string& bucket,
1449                                             const string& object,
1450                                             GcsFileStat* stat) {
1451   std::vector<char> output_buffer;
1452   std::unique_ptr<HttpRequest> request;
1453   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1454                                   " when reading metadata of gs://", bucket,
1455                                   "/", object);
1456 
1457   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1458                                   request->EscapeString(object),
1459                                   "?fields=size%2Cgeneration%2Cupdated"));
1460   request->SetResultBuffer(&output_buffer);
1461   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1462 
1463   if (stats_ != nullptr) {
1464     stats_->RecordStatObjectRequest();
1465   }
1466 
1467   TF_RETURN_WITH_CONTEXT_IF_ERROR(
1468       request->Send(), " when reading metadata of gs://", bucket, "/", object);
1469 
1470   Json::Value root;
1471   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1472 
1473   // Parse file size.
1474   TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1475 
1476   // Parse generation number.
1477   TF_RETURN_IF_ERROR(
1478       GetInt64Value(root, "generation", &stat->generation_number));
1479 
1480   // Parse file modification time.
1481   string updated;
1482   TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1483   TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1484 
1485   VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1486           << " length: " << stat->base.length
1487           << " generation: " << stat->generation_number
1488           << "; mtime_nsec: " << stat->base.mtime_nsec
1489           << "; updated: " << updated;
1490 
1491   if (str_util::EndsWith(fname, "/")) {
1492     // In GCS a path can be both a directory and a file, both it is uncommon for
1493     // other file systems. To avoid the ambiguity, if a path ends with "/" in
1494     // GCS, we always regard it as a directory mark or a virtual directory.
1495     stat->base.is_directory = true;
1496   } else {
1497     stat->base.is_directory = false;
1498   }
1499   return Status::OK();
1500 }
1501 
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1502 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1503                                     const string& object, GcsFileStat* stat) {
1504   if (object.empty()) {
1505     return errors::InvalidArgument(strings::Printf(
1506         "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1507   }
1508 
1509   TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1510       fname, stat,
1511       [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1512         return UncachedStatForObject(fname, bucket, object, stat);
1513       }));
1514   return Status::OK();
1515 }
1516 
BucketExists(const string & bucket,bool * result)1517 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1518   const Status status = GetBucketMetadata(bucket, nullptr);
1519   switch (status.code()) {
1520     case errors::Code::OK:
1521       *result = true;
1522       return Status::OK();
1523     case errors::Code::NOT_FOUND:
1524       *result = false;
1525       return Status::OK();
1526     default:
1527       return status;
1528   }
1529 }
1530 
CheckBucketLocationConstraint(const string & bucket)1531 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1532   if (allowed_locations_.empty()) {
1533     return Status::OK();
1534   }
1535 
1536   // Avoid calling external API's in the constructor
1537   if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1538     string zone;
1539     TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1540     allowed_locations_.insert(ZoneToRegion(&zone));
1541   }
1542 
1543   string location;
1544   TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1545   if (allowed_locations_.find(location) != allowed_locations_.end()) {
1546     return Status::OK();
1547   }
1548 
1549   return errors::FailedPrecondition(strings::Printf(
1550       "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1551       bucket.c_str(), location.c_str(),
1552       absl::StrJoin(allowed_locations_, ", ").c_str()));
1553 }
1554 
GetBucketLocation(const string & bucket,string * location)1555 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1556                                         string* location) {
1557   auto compute_func = [this](const string& bucket, string* location) {
1558     std::vector<char> result_buffer;
1559     Status status = GetBucketMetadata(bucket, &result_buffer);
1560     Json::Value result;
1561     TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1562     string bucket_location;
1563     TF_RETURN_IF_ERROR(
1564         GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1565     // Lowercase the GCS location to be case insensitive for allowed locations.
1566     *location = absl::AsciiStrToLower(bucket_location);
1567     return Status::OK();
1568   };
1569 
1570   TF_RETURN_IF_ERROR(
1571       bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1572 
1573   return Status::OK();
1574 }
1575 
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1576 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1577                                         std::vector<char>* result_buffer) {
1578   std::unique_ptr<HttpRequest> request;
1579   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1580   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1581 
1582   if (result_buffer != nullptr) {
1583     request->SetResultBuffer(result_buffer);
1584   }
1585 
1586   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1587   return request->Send();
1588 }
1589 
FolderExists(const string & dirname,bool * result)1590 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1591   StatCache::ComputeFunc compute_func = [this](const string& dirname,
1592                                                GcsFileStat* stat) {
1593     std::vector<string> children;
1594     TF_RETURN_IF_ERROR(
1595         GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1596                            true /* include_self_directory_marker */));
1597     if (!children.empty()) {
1598       stat->base = DIRECTORY_STAT;
1599       return Status::OK();
1600     } else {
1601       return errors::InvalidArgument("Not a directory!");
1602     }
1603   };
1604   GcsFileStat stat;
1605   Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1606                                           compute_func);
1607   if (s.ok()) {
1608     *result = stat.base.is_directory;
1609     return Status::OK();
1610   }
1611   if (errors::IsInvalidArgument(s)) {
1612     *result = false;
1613     return Status::OK();
1614   }
1615   return s;
1616 }
1617 
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1618 Status GcsFileSystem::GetChildren(const string& dirname,
1619                                   TransactionToken* token,
1620                                   std::vector<string>* result) {
1621   return GetChildrenBounded(dirname, UINT64_MAX, result,
1622                             false /* recursively */,
1623                             false /* include_self_directory_marker */);
1624 }
1625 
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1626 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1627                                        TransactionToken* token,
1628                                        std::vector<string>* results) {
1629   MatchingPathsCache::ComputeFunc compute_func =
1630       [this](const string& pattern, std::vector<string>* results) {
1631         results->clear();
1632         // Find the fixed prefix by looking for the first wildcard.
1633         const string& fixed_prefix =
1634             pattern.substr(0, pattern.find_first_of("*?[\\"));
1635         const string dir(this->Dirname(fixed_prefix));
1636         if (dir.empty()) {
1637           return errors::InvalidArgument(
1638               "A GCS pattern doesn't have a bucket name: ", pattern);
1639         }
1640         std::vector<string> all_files;
1641         TF_RETURN_IF_ERROR(GetChildrenBounded(
1642             dir, UINT64_MAX, &all_files, true /* recursively */,
1643             false /* include_self_directory_marker */));
1644 
1645         const auto& files_and_folders = AddAllSubpaths(all_files);
1646 
1647         // To handle `/` in the object names, we need to remove it from `dir`
1648         // and then use `StrCat` to insert it back.
1649         const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1650 
1651         // Match all obtained paths to the input pattern.
1652         for (const auto& path : files_and_folders) {
1653           // Manually construct the path instead of using `JoinPath` for the
1654           // cases where `path` starts with a `/` (which is a valid character in
1655           // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1656           // removing duplicate slashes. We know that `dir_no_slash` does not
1657           // end in `/`, so we are safe inserting the new `/` here as the path
1658           // separator.
1659           const string full_path = strings::StrCat(dir_no_slash, "/", path);
1660           if (this->Match(full_path, pattern)) {
1661             results->push_back(full_path);
1662           }
1663         }
1664         return Status::OK();
1665       };
1666   TF_RETURN_IF_ERROR(
1667       matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1668   return Status::OK();
1669 }
1670 
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1671 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1672                                          uint64 max_results,
1673                                          std::vector<string>* result,
1674                                          bool recursive,
1675                                          bool include_self_directory_marker) {
1676   if (!result) {
1677     return errors::InvalidArgument("'result' cannot be null");
1678   }
1679   string bucket, object_prefix;
1680   TF_RETURN_IF_ERROR(
1681       ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1682 
1683   string nextPageToken;
1684   uint64 retrieved_results = 0;
1685   while (true) {  // A loop over multiple result pages.
1686     std::vector<char> output_buffer;
1687     std::unique_ptr<HttpRequest> request;
1688     TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1689     auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1690     if (recursive) {
1691       uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1692     } else {
1693       // Set "/" as a delimiter to ask GCS to treat subfolders as children
1694       // and return them in "prefixes".
1695       uri = strings::StrCat(uri,
1696                             "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1697       uri = strings::StrCat(uri, "&delimiter=%2F");
1698     }
1699     if (!object_prefix.empty()) {
1700       uri = strings::StrCat(uri,
1701                             "&prefix=", request->EscapeString(object_prefix));
1702     }
1703     if (!nextPageToken.empty()) {
1704       uri = strings::StrCat(
1705           uri, "&pageToken=", request->EscapeString(nextPageToken));
1706     }
1707     if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1708       uri =
1709           strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1710     }
1711     request->SetUri(uri);
1712     request->SetResultBuffer(&output_buffer);
1713     request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1714 
1715     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1716     Json::Value root;
1717     TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1718     const auto items = root.get("items", Json::Value::null);
1719     if (!items.isNull()) {
1720       if (!items.isArray()) {
1721         return errors::Internal(
1722             "Expected an array 'items' in the GCS response.");
1723       }
1724       for (size_t i = 0; i < items.size(); i++) {
1725         const auto item = items.get(i, Json::Value::null);
1726         if (!item.isObject()) {
1727           return errors::Internal(
1728               "Unexpected JSON format: 'items' should be a list of objects.");
1729         }
1730         string name;
1731         TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1732         // The names should be relative to the 'dirname'. That means the
1733         // 'object_prefix', which is part of 'dirname', should be removed from
1734         // the beginning of 'name'.
1735         StringPiece relative_path(name);
1736         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1737           return errors::Internal(strings::StrCat(
1738               "Unexpected response: the returned file name ", name,
1739               " doesn't match the prefix ", object_prefix));
1740         }
1741         if (!relative_path.empty() || include_self_directory_marker) {
1742           result->emplace_back(relative_path);
1743         }
1744         if (++retrieved_results >= max_results) {
1745           return Status::OK();
1746         }
1747       }
1748     }
1749     const auto prefixes = root.get("prefixes", Json::Value::null);
1750     if (!prefixes.isNull()) {
1751       // Subfolders are returned for the non-recursive mode.
1752       if (!prefixes.isArray()) {
1753         return errors::Internal(
1754             "'prefixes' was expected to be an array in the GCS response.");
1755       }
1756       for (size_t i = 0; i < prefixes.size(); i++) {
1757         const auto prefix = prefixes.get(i, Json::Value::null);
1758         if (prefix.isNull() || !prefix.isString()) {
1759           return errors::Internal(
1760               "'prefixes' was expected to be an array of strings in the GCS "
1761               "response.");
1762         }
1763         const string& prefix_str = prefix.asString();
1764         StringPiece relative_path(prefix_str);
1765         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1766           return errors::Internal(
1767               "Unexpected response: the returned folder name ", prefix_str,
1768               " doesn't match the prefix ", object_prefix);
1769         }
1770         result->emplace_back(relative_path);
1771         if (++retrieved_results >= max_results) {
1772           return Status::OK();
1773         }
1774       }
1775     }
1776     const auto token = root.get("nextPageToken", Json::Value::null);
1777     if (token.isNull()) {
1778       return Status::OK();
1779     }
1780     if (!token.isString()) {
1781       return errors::Internal(
1782           "Unexpected response: nextPageToken is not a string");
1783     }
1784     nextPageToken = token.asString();
1785   }
1786 }
1787 
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1788 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1789                            FileStatistics* stat) {
1790   if (!stat) {
1791     return errors::Internal("'stat' cannot be nullptr.");
1792   }
1793   string bucket, object;
1794   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1795   if (object.empty()) {
1796     bool is_bucket;
1797     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1798     if (is_bucket) {
1799       *stat = DIRECTORY_STAT;
1800       return Status::OK();
1801     }
1802     return errors::NotFound("The specified bucket ", fname, " was not found.");
1803   }
1804 
1805   GcsFileStat gcs_stat;
1806   const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1807   if (status.ok()) {
1808     *stat = gcs_stat.base;
1809     return Status::OK();
1810   }
1811   if (status.code() != errors::Code::NOT_FOUND) {
1812     return status;
1813   }
1814   bool is_folder;
1815   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1816   if (is_folder) {
1817     *stat = DIRECTORY_STAT;
1818     return Status::OK();
1819   }
1820   return errors::NotFound("The specified path ", fname, " was not found.");
1821 }
1822 
DeleteFile(const string & fname,TransactionToken * token)1823 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1824   string bucket, object;
1825   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1826 
1827   std::unique_ptr<HttpRequest> request;
1828   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1829   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1830                                   request->EscapeString(object)));
1831   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1832   request->SetDeleteRequest();
1833 
1834   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1835   ClearFileCaches(fname);
1836   return Status::OK();
1837 }
1838 
CreateDir(const string & dirname,TransactionToken * token)1839 Status GcsFileSystem::CreateDir(const string& dirname,
1840                                 TransactionToken* token) {
1841   string dirname_with_slash = MaybeAppendSlash(dirname);
1842   VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1843           << " and dirname_with_slash: " << dirname_with_slash;
1844   string bucket, object;
1845   TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1846                                   &bucket, &object));
1847   if (object.empty()) {
1848     bool is_bucket;
1849     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1850     return is_bucket ? Status::OK()
1851                      : errors::NotFound("The specified bucket ",
1852                                         dirname_with_slash, " was not found.");
1853   }
1854 
1855   if (FileExists(dirname_with_slash, token).ok()) {
1856     // Use the original name for a correct error here.
1857     VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1858     return errors::AlreadyExists(dirname);
1859   }
1860 
1861   std::unique_ptr<HttpRequest> request;
1862   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1863 
1864   request->SetUri(strings::StrCat(
1865       kGcsUploadUriBase, "b/", bucket,
1866       "/o?uploadType=media&name=", request->EscapeString(object),
1867       // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1868       // will be returned if the object already exists, so avoid reuploading.
1869       "&ifGenerationMatch=0"));
1870 
1871   request->SetPostEmptyBody();
1872   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1873   const Status& status = request->Send();
1874   if (status.ok()) {
1875     VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1876     return Status::OK();
1877   }
1878   if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1879     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1880                                     dirname_with_slash);
1881   }
1882   VLOG(3) << "Ignoring directory already exists on object "
1883           << dirname_with_slash;
1884   return errors::AlreadyExists(dirname);
1885 }
1886 
1887 // Checks that the directory is empty (i.e no objects with this prefix exist).
1888 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1889 Status GcsFileSystem::DeleteDir(const string& dirname,
1890                                 TransactionToken* token) {
1891   std::vector<string> children;
1892   // A directory is considered empty either if there are no matching objects
1893   // with the corresponding name prefix or if there is exactly one matching
1894   // object and it is the directory marker. Therefore we need to retrieve
1895   // at most two children for the prefix to detect if a directory is empty.
1896   TF_RETURN_IF_ERROR(
1897       GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1898                          true /* include_self_directory_marker */));
1899 
1900   if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1901     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1902   }
1903   if (children.size() == 1 && children[0].empty()) {
1904     // This is the directory marker object. Delete it.
1905     return DeleteFile(MaybeAppendSlash(dirname), token);
1906   }
1907   return Status::OK();
1908 }
1909 
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1910 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1911                                   uint64* file_size) {
1912   if (!file_size) {
1913     return errors::Internal("'file_size' cannot be nullptr.");
1914   }
1915 
1916   // Only validate the name.
1917   string bucket, object;
1918   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1919 
1920   FileStatistics stat;
1921   TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1922   *file_size = stat.length;
1923   return Status::OK();
1924 }
1925 
RenameFile(const string & src,const string & target,TransactionToken * token)1926 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1927                                  TransactionToken* token) {
1928   if (!IsDirectory(src, token).ok()) {
1929     return RenameObject(src, target);
1930   }
1931   // Rename all individual objects in the directory one by one.
1932   std::vector<string> children;
1933   TF_RETURN_IF_ERROR(
1934       GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1935                          true /* include_self_directory_marker */));
1936   for (const string& subpath : children) {
1937     TF_RETURN_IF_ERROR(
1938         RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1939   }
1940   return Status::OK();
1941 }
1942 
1943 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1944 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1945   VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1946   string src_bucket, src_object, target_bucket, target_object;
1947   TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1948   TF_RETURN_IF_ERROR(
1949       ParseGcsPath(target, false, &target_bucket, &target_object));
1950 
1951   std::unique_ptr<HttpRequest> request;
1952   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1953   request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1954                                   request->EscapeString(src_object),
1955                                   "/rewriteTo/b/", target_bucket, "/o/",
1956                                   request->EscapeString(target_object)));
1957   request->SetPostEmptyBody();
1958   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1959   std::vector<char> output_buffer;
1960   request->SetResultBuffer(&output_buffer);
1961   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1962                                   " to ", target);
1963   // Flush the target from the caches.  The source will be flushed in the
1964   // DeleteFile call below.
1965   ClearFileCaches(target);
1966   Json::Value root;
1967   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1968   bool done;
1969   TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1970   if (!done) {
1971     // If GCS didn't complete rewrite in one call, this means that a large file
1972     // is being copied to a bucket with a different storage class or location,
1973     // which requires multiple rewrite calls.
1974     // TODO(surkov): implement multi-step rewrites.
1975     return errors::Unimplemented(
1976         "Couldn't rename ", src, " to ", target,
1977         ": moving large files between buckets with different "
1978         "locations or storage classes is not supported.");
1979   }
1980 
1981   VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1982   // In case the delete API call failed, but the deletion actually happened
1983   // on the server side, we can't just retry the whole RenameFile operation
1984   // because the source object is already gone.
1985   return RetryingUtils::DeleteWithRetries(
1986       [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1987 }
1988 
IsDirectory(const string & fname,TransactionToken * token)1989 Status GcsFileSystem::IsDirectory(const string& fname,
1990                                   TransactionToken* token) {
1991   string bucket, object;
1992   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1993   if (object.empty()) {
1994     bool is_bucket;
1995     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1996     if (is_bucket) {
1997       return Status::OK();
1998     }
1999     return errors::NotFound("The specified bucket gs://", bucket,
2000                             " was not found.");
2001   }
2002   bool is_folder;
2003   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2004   if (is_folder) {
2005     return Status::OK();
2006   }
2007   bool is_object;
2008   TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2009   if (is_object) {
2010     return errors::FailedPrecondition("The specified path ", fname,
2011                                       " is not a directory.");
2012   }
2013   return errors::NotFound("The specified path ", fname, " was not found.");
2014 }
2015 
DeleteRecursively(const string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)2016 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2017                                         TransactionToken* token,
2018                                         int64* undeleted_files,
2019                                         int64* undeleted_dirs) {
2020   if (!undeleted_files || !undeleted_dirs) {
2021     return errors::Internal(
2022         "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2023   }
2024   *undeleted_files = 0;
2025   *undeleted_dirs = 0;
2026   if (!IsDirectory(dirname, token).ok()) {
2027     *undeleted_dirs = 1;
2028     return Status(
2029         error::NOT_FOUND,
2030         strings::StrCat(dirname, " doesn't exist or not a directory."));
2031   }
2032   std::vector<string> all_objects;
2033   // Get all children in the directory recursively.
2034   TF_RETURN_IF_ERROR(GetChildrenBounded(
2035       dirname, UINT64_MAX, &all_objects, true /* recursively */,
2036       true /* include_self_directory_marker */));
2037   for (const string& object : all_objects) {
2038     const string& full_path = JoinGcsPath(dirname, object);
2039     // Delete all objects including directory markers for subfolders.
2040     // Since DeleteRecursively returns OK if individual file deletions fail,
2041     // and therefore RetryingFileSystem won't pay attention to the failures,
2042     // we need to make sure these failures are properly retried.
2043     const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2044         [this, &full_path, token]() { return DeleteFile(full_path, token); },
2045         retry_config_);
2046     if (!delete_file_status.ok()) {
2047       if (IsDirectory(full_path, token).ok()) {
2048         // The object is a directory marker.
2049         (*undeleted_dirs)++;
2050       } else {
2051         (*undeleted_files)++;
2052       }
2053     }
2054   }
2055   return Status::OK();
2056 }
2057 
2058 // Flushes all caches for filesystem metadata and file contents. Useful for
2059 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2060 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2061 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2062   tf_shared_lock l(block_cache_lock_);
2063   file_block_cache_->Flush();
2064   stat_cache_->Clear();
2065   matching_paths_cache_->Clear();
2066   bucket_location_cache_->Clear();
2067 }
2068 
SetStats(GcsStatsInterface * stats)2069 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2070   CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2071   CHECK(stats != nullptr);
2072   mutex_lock l(block_cache_lock_);
2073   stats_ = stats;
2074   stats_->Configure(this, &throttle_, file_block_cache_.get());
2075 }
2076 
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2077 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2078   tf_shared_lock l(block_cache_lock_);
2079   if (file_block_cache_ == nullptr) {
2080     LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2081                   "cache object. This may result in not exporting the intended "
2082                   "monitoring data";
2083     return;
2084   }
2085   file_block_cache_->SetStats(cache_stats);
2086 }
2087 
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2088 void GcsFileSystem::SetAuthProvider(
2089     std::unique_ptr<AuthProvider> auth_provider) {
2090   mutex_lock l(mu_);
2091   auth_provider_ = std::move(auth_provider);
2092 }
2093 
2094 // Creates an HttpRequest and sets several parameters that are common to all
2095 // requests.  All code (in GcsFileSystem) that creates an HttpRequest should
2096 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2097 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2098   std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2099   if (dns_cache_) {
2100     dns_cache_->AnnotateRequest(new_request.get());
2101   }
2102 
2103   string auth_token;
2104   {
2105     tf_shared_lock l(mu_);
2106     TF_RETURN_IF_ERROR(
2107         AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2108   }
2109 
2110   new_request->AddAuthBearerHeader(auth_token);
2111 
2112   if (additional_header_) {
2113     new_request->AddHeader(additional_header_->first,
2114                            additional_header_->second);
2115   }
2116 
2117   if (stats_ != nullptr) {
2118     new_request->SetRequestStats(stats_->HttpStats());
2119   }
2120 
2121   if (!throttle_.AdmitRequest()) {
2122     return errors::Unavailable("Request throttled");
2123   }
2124 
2125   *request = std::move(new_request);
2126   return Status::OK();
2127 }
2128 
2129 }  // namespace tensorflow
2130 
2131 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2132 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2133 // gcs_file_system will not be registered as a file system since the
2134 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2135 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2136 // This option is set ON/OFF in the GCP TPU tensorflow config.
2137 // Initialize gcs_file_system
2138 REGISTER_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2139