• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17 
18 #include <stdio.h>
19 #include <unistd.h>
20 
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <vector>
27 
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/strcat.h"
30 #ifdef _WIN32
31 #include <io.h>  // for _mktemp
32 #endif
33 #include "absl/base/macros.h"
34 #include "json/json.h"
35 #include "tensorflow/core/lib/gtl/map_util.h"
36 #include "tensorflow/core/platform/cloud/curl_http_request.h"
37 #include "tensorflow/core/platform/cloud/file_block_cache.h"
38 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
39 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
40 #include "tensorflow/core/platform/cloud/time_util.h"
41 #include "tensorflow/core/platform/env.h"
42 #include "tensorflow/core/platform/errors.h"
43 #include "tensorflow/core/platform/mutex.h"
44 #include "tensorflow/core/platform/numbers.h"
45 #include "tensorflow/core/platform/path.h"
46 #include "tensorflow/core/platform/protobuf.h"
47 #include "tensorflow/core/platform/retrying_utils.h"
48 #include "tensorflow/core/platform/str_util.h"
49 #include "tensorflow/core/platform/stringprintf.h"
50 #include "tensorflow/core/platform/thread_annotations.h"
51 #include "tensorflow/core/profiler/lib/traceme.h"
52 
53 #ifdef _WIN32
54 #ifdef DeleteFile
55 #undef DeleteFile
56 #endif
57 #endif
58 
59 namespace tensorflow {
60 namespace {
61 
62 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
63 constexpr char kGcsUploadUriBase[] =
64     "https://www.googleapis.com/upload/storage/v1/";
65 constexpr char kStorageHost[] = "storage.googleapis.com";
66 constexpr char kBucketMetadataLocationKey[] = "location";
67 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024;  // In bytes.
68 constexpr int kGetChildrenDefaultPageSize = 1000;
69 // The HTTP response code "308 Resume Incomplete".
70 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
71 // The HTTP response code "412 Precondition Failed".
72 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
73 // The environment variable that overrides the size of the readahead buffer.
74 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
75 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
76 // The environment variable that overrides the maximum age of entries in the
77 // Stat cache. A value of 0 means nothing is cached.
78 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
79 constexpr uint64 kStatCacheDefaultMaxAge = 5;
80 // The environment variable that overrides the maximum number of entries in the
81 // Stat cache.
82 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
83 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
84 // The environment variable that overrides the maximum age of entries in the
85 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
86 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
87 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
88 // The environment variable that overrides the maximum number of entries in the
89 // GetMatchingPaths cache.
90 constexpr char kMatchingPathsCacheMaxEntries[] =
91     "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
92 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
93 // Number of bucket locations cached, most workloads wont touch more than one
94 // bucket so this limit is set fairly low
95 constexpr size_t kBucketLocationCacheMaxEntries = 10;
96 // ExpiringLRUCache doesnt support any "cache forever" option
97 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
98 // The file statistics returned by Stat() for directories.
99 const FileStatistics DIRECTORY_STAT(0, 0, true);
100 // Some environments exhibit unreliable DNS resolution. Set this environment
101 // variable to a positive integer describing the frequency used to refresh the
102 // userspace DNS cache.
103 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
104 // The environment variable to configure the http request's connection timeout.
105 constexpr char kRequestConnectionTimeout[] =
106     "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
107 // The environment variable to configure the http request's idle timeout.
108 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
109 // The environment variable to configure the overall request timeout for
110 // metadata requests.
111 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // block reads requests.
114 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // upload requests.
117 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure an additional header to send with
119 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
120 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
121 // The environment variable to configure the throttle (format: <int64>)
122 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
123 // The environment variable to configure the token bucket size (format: <int64>)
124 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
125 // The environment variable that controls the number of tokens per request.
126 // (format: <int64>)
127 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
128 // The environment variable to configure the initial tokens (format: <int64>)
129 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
130 
131 // The environment variable to customize which GCS bucket locations are allowed,
132 // if the list is empty defaults to using the region of the zone (format, comma
133 // delimited list). Requires 'storage.buckets.get' permission.
134 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
135 // When this value is passed as an allowed location detects the zone tensorflow
136 // is running in and restricts to buckets in that region.
137 constexpr char kDetectZoneSentinelValue[] = "auto";
138 
139 // How to upload new data when Flush() is called multiple times.
140 // By default the entire file is reuploaded.
141 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
142 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
143 // temporary object and composed with the original object. This is disabled by
144 // default as the multiple API calls required add a risk of stranding temporary
145 // objects.
146 constexpr char kComposeAppend[] = "compose";
147 
GetTmpFilename(string * filename)148 Status GetTmpFilename(string* filename) {
149   *filename = io::GetTempFilename("");
150   return Status::OK();
151 }
152 
153 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)154 string MaybeAppendSlash(const string& name) {
155   if (name.empty()) {
156     return "/";
157   }
158   if (name.back() != '/') {
159     return strings::StrCat(name, "/");
160   }
161   return name;
162 }
163 
164 // io::JoinPath() doesn't work in cases when we want an empty subpath
165 // to result in an appended slash in order for directory markers
166 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)167 string JoinGcsPath(const string& path, const string& subpath) {
168   return strings::StrCat(MaybeAppendSlash(path), subpath);
169 }
170 
171 /// \brief Returns the given paths appending all their subfolders.
172 ///
173 /// For every path X in the list, every subfolder in X is added to the
174 /// resulting list.
175 /// For example:
176 ///  - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
177 ///  - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
178 ///  - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
179 ///  - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)180 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
181   std::set<string> result;
182   result.insert(paths.begin(), paths.end());
183   for (const string& path : paths) {
184     StringPiece subpath = io::Dirname(path);
185     // If `path` starts with `/`, `subpath` will be `/` and then we get into an
186     // infinite loop. Same behavior happens if there is a `//` pattern in
187     // `path`, so we check for that and leave the loop quicker.
188     while (!(subpath.empty() || subpath == "/")) {
189       result.emplace(string(subpath));
190       subpath = io::Dirname(subpath);
191     }
192   }
193   return result;
194 }
195 
ParseJson(StringPiece json,Json::Value * result)196 Status ParseJson(StringPiece json, Json::Value* result) {
197   Json::Reader reader;
198   if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
199     return errors::Internal("Couldn't parse JSON response from GCS.");
200   }
201   return Status::OK();
202 }
203 
ParseJson(const std::vector<char> & json,Json::Value * result)204 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
205   return ParseJson(StringPiece{json.data(), json.size()}, result);
206 }
207 
208 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)209 Status GetValue(const Json::Value& parent, const char* name,
210                 Json::Value* result) {
211   *result = parent.get(name, Json::Value::null);
212   if (result->isNull()) {
213     return errors::Internal("The field '", name,
214                             "' was expected in the JSON response.");
215   }
216   return Status::OK();
217 }
218 
219 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)220 Status GetStringValue(const Json::Value& parent, const char* name,
221                       string* result) {
222   Json::Value result_value;
223   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
224   if (!result_value.isString()) {
225     return errors::Internal(
226         "The field '", name,
227         "' in the JSON response was expected to be a string.");
228   }
229   *result = result_value.asString();
230   return Status::OK();
231 }
232 
233 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64 * result)234 Status GetInt64Value(const Json::Value& parent, const char* name,
235                      int64* result) {
236   Json::Value result_value;
237   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
238   if (result_value.isNumeric()) {
239     *result = result_value.asInt64();
240     return Status::OK();
241   }
242   if (result_value.isString() &&
243       strings::safe_strto64(result_value.asCString(), result)) {
244     return Status::OK();
245   }
246   return errors::Internal(
247       "The field '", name,
248       "' in the JSON response was expected to be a number.");
249 }
250 
251 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)252 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
253   Json::Value result_value;
254   TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
255   if (!result_value.isBool()) {
256     return errors::Internal(
257         "The field '", name,
258         "' in the JSON response was expected to be a boolean.");
259   }
260   *result = result_value.asBool();
261   return Status::OK();
262 }
263 
264 /// A GCS-based implementation of a random access file with an LRU block cache.
265 class GcsRandomAccessFile : public RandomAccessFile {
266  public:
267   using ReadFn =
268       std::function<Status(const string& filename, uint64 offset, size_t n,
269                            StringPiece* result, char* scratch)>;
270 
GcsRandomAccessFile(const string & filename,ReadFn read_fn)271   GcsRandomAccessFile(const string& filename, ReadFn read_fn)
272       : filename_(filename), read_fn_(std::move(read_fn)) {}
273 
Name(StringPiece * result) const274   Status Name(StringPiece* result) const override {
275     *result = filename_;
276     return Status::OK();
277   }
278 
279   /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const280   Status Read(uint64 offset, size_t n, StringPiece* result,
281               char* scratch) const override {
282     return read_fn_(filename_, offset, n, result, scratch);
283   }
284 
285  private:
286   /// The filename of this file.
287   const string filename_;
288   /// The implementation of the read operation (provided by the GCSFileSystem).
289   const ReadFn read_fn_;
290 };
291 
292 /// A GCS-based implementation of a random access file with a read buffer.
293 class BufferedGcsRandomAccessFile : public RandomAccessFile {
294  public:
295   using ReadFn =
296       std::function<Status(const string& filename, uint64 offset, size_t n,
297                            StringPiece* result, char* scratch)>;
298 
299   // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)300   BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
301                               ReadFn read_fn)
302       : filename_(filename),
303         read_fn_(std::move(read_fn)),
304         buffer_size_(buffer_size),
305         buffer_start_(0),
306         buffer_end_is_past_eof_(false) {}
307 
Name(StringPiece * result) const308   Status Name(StringPiece* result) const override {
309     *result = filename_;
310     return Status::OK();
311   }
312 
313   /// The implementation of reads with an read buffer. Thread safe.
314   /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
315   /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const316   Status Read(uint64 offset, size_t n, StringPiece* result,
317               char* scratch) const override {
318     if (n > buffer_size_) {
319       return read_fn_(filename_, offset, n, result, scratch);
320     }
321     {
322       mutex_lock l(buffer_mutex_);
323       size_t buffer_end = buffer_start_ + buffer_.size();
324       size_t copy_size = 0;
325       if (offset < buffer_end && offset >= buffer_start_) {
326         copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
327         memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
328         *result = StringPiece(scratch, copy_size);
329       }
330       bool consumed_buffer_to_eof =
331           offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
332       if (copy_size < n && !consumed_buffer_to_eof) {
333         Status status = FillBuffer(offset + copy_size);
334         if (!status.ok() && status.code() != errors::Code::OUT_OF_RANGE) {
335           // Empty the buffer to avoid caching bad reads.
336           buffer_.resize(0);
337           return status;
338         }
339         size_t remaining_copy = std::min(n - copy_size, buffer_.size());
340         memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
341         copy_size += remaining_copy;
342         *result = StringPiece(scratch, copy_size);
343       }
344       if (copy_size < n) {
345         // Forget the end-of-file flag to allow for clients that poll on the
346         // same file.
347         buffer_end_is_past_eof_ = false;
348         return errors::OutOfRange("EOF reached. Requested to read ", n,
349                                   " bytes from ", offset, ".");
350       }
351     }
352     return Status::OK();
353   }
354 
355  private:
FillBuffer(uint64 start) const356   Status FillBuffer(uint64 start) const
357       TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
358     buffer_start_ = start;
359     buffer_.resize(buffer_size_);
360     StringPiece str_piece;
361     Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
362                              &(buffer_[0]));
363     buffer_end_is_past_eof_ = status.code() == errors::Code::OUT_OF_RANGE;
364     buffer_.resize(str_piece.size());
365     return status;
366   }
367 
368   // The filename of this file.
369   const string filename_;
370 
371   // The implementation of the read operation (provided by the GCSFileSystem).
372   const ReadFn read_fn_;
373 
374   // Size of buffer that we read from GCS each time we send a request.
375   const uint64 buffer_size_;
376 
377   // Mutex for buffering operations that can be accessed from multiple threads.
378   // The following members are mutable in order to provide a const Read.
379   mutable mutex buffer_mutex_;
380 
381   // Offset of buffer from start of the file.
382   mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
383 
384   mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
385 
386   mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
387 };
388 
389 // Function object declaration with params needed to create upload sessions.
390 typedef std::function<Status(
391     uint64 start_offset, const std::string& object_to_upload,
392     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
393     UploadSessionHandle* session_handle)>
394     SessionCreator;
395 
396 // Function object declaration with params needed to upload objects.
397 typedef std::function<Status(const std::string& session_uri,
398                              uint64 start_offset, uint64 already_uploaded,
399                              const std::string& tmp_content_filename,
400                              uint64 file_size, const std::string& file_path)>
401     ObjectUploader;
402 
403 // Function object declaration with params needed to poll upload status.
404 typedef std::function<Status(const string& session_uri, uint64 file_size,
405                              const std::string& gcs_path, bool* completed,
406                              uint64* uploaded)>
407     StatusPoller;
408 
409 // Function object declaration with params needed to poll upload status.
410 typedef std::function<Status(const string& fname, const string& bucket,
411                              const string& object, int64* generation)>
412     GenerationGetter;
413 
414 /// \brief GCS-based implementation of a writeable file.
415 ///
416 /// Since GCS objects are immutable, this implementation writes to a local
417 /// tmp file and copies it to GCS on flush/close.
418 class GcsWritableFile : public WritableFile {
419  public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)420   GcsWritableFile(const string& bucket, const string& object,
421                   GcsFileSystem* filesystem,
422                   GcsFileSystem::TimeoutConfig* timeouts,
423                   std::function<void()> file_cache_erase,
424                   RetryConfig retry_config, bool compose_append,
425                   SessionCreator session_creator,
426                   ObjectUploader object_uploader, StatusPoller status_poller,
427                   GenerationGetter generation_getter)
428       : bucket_(bucket),
429         object_(object),
430         filesystem_(filesystem),
431         timeouts_(timeouts),
432         file_cache_erase_(std::move(file_cache_erase)),
433         sync_needed_(true),
434         retry_config_(retry_config),
435         compose_append_(compose_append),
436         start_offset_(0),
437         session_creator_(std::move(session_creator)),
438         object_uploader_(std::move(object_uploader)),
439         status_poller_(std::move(status_poller)),
440         generation_getter_(std::move(generation_getter)) {
441     // TODO: to make it safer, outfile_ should be constructed from an FD
442     VLOG(3) << "GcsWritableFile: " << GetGcsPath();
443     if (GetTmpFilename(&tmp_content_filename_).ok()) {
444       outfile_.open(tmp_content_filename_,
445                     std::ofstream::binary | std::ofstream::app);
446     }
447   }
448 
449   /// \brief Constructs the writable file in append mode.
450   ///
451   /// tmp_content_filename should contain a path of an existing temporary file
452   /// with the content to be appended. The class takes ownership of the
453   /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)454   GcsWritableFile(const string& bucket, const string& object,
455                   GcsFileSystem* filesystem, const string& tmp_content_filename,
456                   GcsFileSystem::TimeoutConfig* timeouts,
457                   std::function<void()> file_cache_erase,
458                   RetryConfig retry_config, bool compose_append,
459                   SessionCreator session_creator,
460                   ObjectUploader object_uploader, StatusPoller status_poller,
461                   GenerationGetter generation_getter)
462       : bucket_(bucket),
463         object_(object),
464         filesystem_(filesystem),
465         timeouts_(timeouts),
466         file_cache_erase_(std::move(file_cache_erase)),
467         sync_needed_(true),
468         retry_config_(retry_config),
469         compose_append_(compose_append),
470         start_offset_(0),
471         session_creator_(std::move(session_creator)),
472         object_uploader_(std::move(object_uploader)),
473         status_poller_(std::move(status_poller)),
474         generation_getter_(std::move(generation_getter)) {
475     VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
476             << tmp_content_filename;
477     tmp_content_filename_ = tmp_content_filename;
478     outfile_.open(tmp_content_filename_,
479                   std::ofstream::binary | std::ofstream::app);
480   }
481 
~GcsWritableFile()482   ~GcsWritableFile() override {
483     Close().IgnoreError();
484     std::remove(tmp_content_filename_.c_str());
485   }
486 
Append(StringPiece data)487   Status Append(StringPiece data) override {
488     TF_RETURN_IF_ERROR(CheckWritable());
489     VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
490     sync_needed_ = true;
491     outfile_ << data;
492     if (!outfile_.good()) {
493       return errors::Internal(
494           "Could not append to the internal temporary file.");
495     }
496     return Status::OK();
497   }
498 
Close()499   Status Close() override {
500     VLOG(3) << "Close:" << GetGcsPath();
501     if (outfile_.is_open()) {
502       Status sync_status = Sync();
503       if (sync_status.ok()) {
504         outfile_.close();
505       }
506       return sync_status;
507     }
508     return Status::OK();
509   }
510 
Flush()511   Status Flush() override {
512     VLOG(3) << "Flush:" << GetGcsPath();
513     return Sync();
514   }
515 
Name(StringPiece * result) const516   Status Name(StringPiece* result) const override {
517     return errors::Unimplemented("GCSWritableFile does not support Name()");
518   }
519 
Sync()520   Status Sync() override {
521     VLOG(3) << "Sync started:" << GetGcsPath();
522     TF_RETURN_IF_ERROR(CheckWritable());
523     if (!sync_needed_) {
524       return Status::OK();
525     }
526     Status status = SyncImpl();
527     VLOG(3) << "Sync finished " << GetGcsPath();
528     if (status.ok()) {
529       sync_needed_ = false;
530     }
531     return status;
532   }
533 
Tell(int64 * position)534   Status Tell(int64* position) override {
535     *position = outfile_.tellp();
536     if (*position == -1) {
537       return errors::Internal("tellp on the internal temporary file failed");
538     }
539     return Status::OK();
540   }
541 
542  private:
543   /// Copies the current version of the file to GCS.
544   ///
545   /// This SyncImpl() uploads the object to GCS.
546   /// In case of a failure, it resumes failed uploads as recommended by the GCS
547   /// resumable API documentation. When the whole upload needs to be
548   /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()549   Status SyncImpl() {
550     outfile_.flush();
551     if (!outfile_.good()) {
552       return errors::Internal(
553           "Could not write to the internal temporary file.");
554     }
555     UploadSessionHandle session_handle;
556     uint64 start_offset = 0;
557     string object_to_upload = object_;
558     bool should_compose = false;
559     if (compose_append_) {
560       start_offset = start_offset_;
561       // Only compose if the object has already been uploaded to GCS
562       should_compose = start_offset > 0;
563       if (should_compose) {
564         object_to_upload =
565             strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
566                             io::Basename(object_), ".", start_offset_);
567       }
568     }
569     TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
570                                               &session_handle));
571     uint64 already_uploaded = 0;
572     bool first_attempt = true;
573     const Status upload_status = RetryingUtils::CallWithRetries(
574         [&first_attempt, &already_uploaded, &session_handle, &start_offset,
575          this]() {
576           if (session_handle.resumable && !first_attempt) {
577             bool completed;
578             TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
579                 session_handle.session_uri, &completed, &already_uploaded));
580             LOG(INFO) << "### RequestUploadSessionStatus: completed = "
581                       << completed
582                       << ", already_uploaded = " << already_uploaded
583                       << ", file = " << GetGcsPath();
584             if (completed) {
585               // Erase the file from the file cache on every successful write.
586               file_cache_erase_();
587               // It's unclear why UploadToSession didn't return OK in the
588               // previous attempt, but GCS reports that the file is fully
589               // uploaded, so succeed.
590               return Status::OK();
591             }
592           }
593           first_attempt = false;
594           return UploadToSession(session_handle.session_uri, start_offset,
595                                  already_uploaded);
596         },
597         retry_config_);
598     if (upload_status.code() == errors::Code::NOT_FOUND) {
599       // GCS docs recommend retrying the whole upload. We're relying on the
600       // RetryingFileSystem to retry the Sync() call.
601       return errors::Unavailable(
602           strings::StrCat("Upload to gs://", bucket_, "/", object_,
603                           " failed, caused by: ", upload_status.ToString()));
604     }
605     if (upload_status.ok()) {
606       if (should_compose) {
607         TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
608       }
609       TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
610     }
611     return upload_status;
612   }
613 
CheckWritable() const614   Status CheckWritable() const {
615     if (!outfile_.is_open()) {
616       return errors::FailedPrecondition(
617           "The internal temporary file is not writable.");
618     }
619     return Status::OK();
620   }
621 
GetCurrentFileSize(uint64 * size)622   Status GetCurrentFileSize(uint64* size) {
623     const auto tellp = outfile_.tellp();
624     if (tellp == static_cast<std::streampos>(-1)) {
625       return errors::Internal(
626           "Could not get the size of the internal temporary file.");
627     }
628     *size = tellp;
629     return Status::OK();
630   }
631 
632   /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)633   Status CreateNewUploadSession(uint64 start_offset,
634                                 std::string object_to_upload,
635                                 UploadSessionHandle* session_handle) {
636     uint64 file_size;
637     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
638     return session_creator_(start_offset, object_to_upload, bucket_, file_size,
639                             GetGcsPath(), session_handle);
640   }
641 
642   /// Appends the data of append_object to the original object and deletes
643   /// append_object.
AppendObject(string append_object)644   Status AppendObject(string append_object) {
645     const string append_object_path = GetGcsPathWithObject(append_object);
646     VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
647 
648     int64_t generation = 0;
649     TF_RETURN_IF_ERROR(
650         generation_getter_(GetGcsPath(), bucket_, object_, &generation));
651 
652     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
653         [&append_object, &generation, this]() {
654           std::unique_ptr<HttpRequest> request;
655           TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
656 
657           request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
658                                           request->EscapeString(object_),
659                                           "/compose"));
660 
661           const string request_body = strings::StrCat(
662               "{'sourceObjects': [{'name': '", object_,
663               "','objectPrecondition':{'ifGenerationMatch':", generation,
664               "}},{'name': '", append_object, "'}]}");
665           request->SetTimeouts(timeouts_->connect, timeouts_->idle,
666                                timeouts_->metadata);
667           request->AddHeader("content-type", "application/json");
668           request->SetPostFromBuffer(request_body.c_str(), request_body.size());
669           TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
670                                           " when composing to ", GetGcsPath());
671           return Status::OK();
672         },
673         retry_config_));
674 
675     return RetryingUtils::DeleteWithRetries(
676         [&append_object_path, this]() {
677           return filesystem_->DeleteFile(append_object_path, nullptr);
678         },
679         retry_config_);
680   }
681 
682   /// \brief Requests status of a previously initiated upload session.
683   ///
684   /// If the upload has already succeeded, sets 'completed' to true.
685   /// Otherwise sets 'completed' to false and 'uploaded' to the currently
686   /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)687   Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
688                                     uint64* uploaded) {
689     uint64 file_size;
690     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
691     return status_poller_(session_uri, file_size, GetGcsPath(), completed,
692                           uploaded);
693   }
694 
695   /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)696   Status UploadToSession(const string& session_uri, uint64 start_offset,
697                          uint64 already_uploaded) {
698     uint64 file_size;
699     TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
700     Status status =
701         object_uploader_(session_uri, start_offset, already_uploaded,
702                          tmp_content_filename_, file_size, GetGcsPath());
703     if (status.ok()) {
704       // Erase the file from the file cache on every successful write.
705       // Note: Only local cache, this does nothing on distributed cache. The
706       // distributed cache clears the cache as it is needed.
707       file_cache_erase_();
708     }
709 
710     return status;
711   }
712 
GetGcsPathWithObject(string object) const713   string GetGcsPathWithObject(string object) const {
714     return strings::StrCat("gs://", bucket_, "/", object);
715   }
GetGcsPath() const716   string GetGcsPath() const { return GetGcsPathWithObject(object_); }
717 
718   string bucket_;
719   string object_;
720   GcsFileSystem* const filesystem_;  // Not owned.
721   string tmp_content_filename_;
722   std::ofstream outfile_;
723   GcsFileSystem::TimeoutConfig* timeouts_;
724   std::function<void()> file_cache_erase_;
725   bool sync_needed_;  // whether there is buffered data that needs to be synced
726   RetryConfig retry_config_;
727   bool compose_append_;
728   uint64 start_offset_;
729   // Callbacks to the file system used to upload object into GCS.
730   const SessionCreator session_creator_;
731   const ObjectUploader object_uploader_;
732   const StatusPoller status_poller_;
733   const GenerationGetter generation_getter_;
734 };
735 
736 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
737  public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)738   GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
739       : data_(std::move(data)), length_(length) {}
data()740   const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()741   uint64 length() override { return length_; }
742 
743  private:
744   std::unique_ptr<char[]> data_;
745   uint64 length_;
746 };
747 
StringPieceIdentity(StringPiece str,StringPiece * value)748 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
749   *value = str;
750   return true;
751 }
752 
753 /// \brief Utility function to split a comma delimited list of strings to an
754 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)755 bool SplitByCommaToLowercaseSet(StringPiece list,
756                                 std::unordered_set<string>* set) {
757   std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
758   *set = std::unordered_set<string>(vector.begin(), vector.end());
759   return true;
760 }
761 
762 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)763 string ZoneToRegion(string* zone) {
764   return zone->substr(0, zone->find_last_of('-'));
765 }
766 
767 }  // namespace
768 
GcsFileSystem(bool make_default_cache)769 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
770   uint64 value;
771   block_size_ = kDefaultBlockSize;
772   size_t max_bytes = kDefaultMaxCacheSize;
773 
774   uint64 max_staleness = kDefaultMaxStaleness;
775 
776   http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
777   compute_engine_metadata_client_ =
778       std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
779   auth_provider_ = std::unique_ptr<AuthProvider>(
780       new GoogleAuthProvider(compute_engine_metadata_client_));
781   zone_provider_ = std::unique_ptr<ZoneProvider>(
782       new ComputeEngineZoneProvider(compute_engine_metadata_client_));
783 
784   // Apply the sys env override for the readahead buffer size if it's provided.
785   if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
786     block_size_ = value;
787   }
788 
789   // Apply the overrides for the block size (MB), max bytes (MB), and max
790   // staleness (seconds) if provided.
791   if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
792     block_size_ = value * 1024 * 1024;
793   }
794 
795   if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
796     max_bytes = value * 1024 * 1024;
797   }
798 
799   if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
800     max_staleness = value;
801   }
802   if (!make_default_cache) {
803     max_bytes = 0;
804   }
805   VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
806           << "block size = " << block_size_ << " ; "
807           << "max staleness = " << max_staleness;
808   file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
809   // Apply overrides for the stat cache max age and max entries, if provided.
810   uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
811   size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
812   if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
813     stat_cache_max_age = value;
814   }
815   if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
816     stat_cache_max_entries = value;
817   }
818   stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
819                                                       stat_cache_max_entries));
820   // Apply overrides for the matching paths cache max age and max entries, if
821   // provided.
822   uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
823   size_t matching_paths_cache_max_entries =
824       kMatchingPathsCacheDefaultMaxEntries;
825   if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
826     matching_paths_cache_max_age = value;
827   }
828   if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
829                 &value)) {
830     matching_paths_cache_max_entries = value;
831   }
832   matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
833       matching_paths_cache_max_age, matching_paths_cache_max_entries));
834 
835   bucket_location_cache_.reset(new ExpiringLRUCache<string>(
836       kCacheNeverExpire, kBucketLocationCacheMaxEntries));
837 
838   int64_t resolve_frequency_secs;
839   if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
840                 &resolve_frequency_secs)) {
841     dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
842     VLOG(1) << "GCS DNS cache is enabled.  " << kResolveCacheSecs << " = "
843             << resolve_frequency_secs;
844   } else {
845     VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
846             << " = 0 (or is not set)";
847   }
848 
849   // Get the additional header
850   StringPiece add_header_contents;
851   if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
852                 &add_header_contents)) {
853     size_t split = add_header_contents.find(':', 0);
854 
855     if (split != StringPiece::npos) {
856       StringPiece header_name = add_header_contents.substr(0, split);
857       StringPiece header_value = add_header_contents.substr(split + 1);
858 
859       if (!header_name.empty() && !header_value.empty()) {
860         additional_header_.reset(new std::pair<const string, const string>(
861             string(header_name), string(header_value)));
862 
863         VLOG(1) << "GCS additional header ENABLED. "
864                 << "Name: " << additional_header_->first << ", "
865                 << "Value: " << additional_header_->second;
866       } else {
867         LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
868                    << add_header_contents;
869       }
870     } else {
871       LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
872                  << add_header_contents;
873     }
874   } else {
875     VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
876   }
877 
878   // Apply the overrides for request timeouts
879   uint32 timeout_value;
880   if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
881                 &timeout_value)) {
882     timeouts_.connect = timeout_value;
883   }
884   if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
885     timeouts_.idle = timeout_value;
886   }
887   if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
888                 &timeout_value)) {
889     timeouts_.metadata = timeout_value;
890   }
891   if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
892     timeouts_.read = timeout_value;
893   }
894   if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
895     timeouts_.write = timeout_value;
896   }
897 
898   int64_t token_value;
899   if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
900     GcsThrottleConfig config;
901     config.enabled = true;
902     config.token_rate = token_value;
903 
904     if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
905       config.bucket_size = token_value;
906     }
907 
908     if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
909       config.tokens_per_request = token_value;
910     }
911 
912     if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
913       config.initial_tokens = token_value;
914     }
915     throttle_.SetConfig(config);
916   }
917 
918   GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
919             &allowed_locations_);
920 
921   StringPiece append_mode;
922   GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
923   if (append_mode == kComposeAppend) {
924     compose_append_ = true;
925   } else {
926     compose_append_ = false;
927   }
928 }
929 
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)930 GcsFileSystem::GcsFileSystem(
931     std::unique_ptr<AuthProvider> auth_provider,
932     std::unique_ptr<HttpRequest::Factory> http_request_factory,
933     std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
934     size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
935     size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
936     size_t matching_paths_cache_max_entries, RetryConfig retry_config,
937     TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
938     std::pair<const string, const string>* additional_header,
939     bool compose_append)
940     : timeouts_(timeouts),
941       retry_config_(retry_config),
942       auth_provider_(std::move(auth_provider)),
943       http_request_factory_(std::move(http_request_factory)),
944       zone_provider_(std::move(zone_provider)),
945       block_size_(block_size),
946       file_block_cache_(
947           MakeFileBlockCache(block_size, max_bytes, max_staleness)),
948       stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
949       matching_paths_cache_(new MatchingPathsCache(
950           matching_paths_cache_max_age, matching_paths_cache_max_entries)),
951       bucket_location_cache_(new BucketLocationCache(
952           kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
953       allowed_locations_(allowed_locations),
954       compose_append_(compose_append),
955       additional_header_(additional_header) {}
956 
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)957 Status GcsFileSystem::NewRandomAccessFile(
958     const string& fname, TransactionToken* token,
959     std::unique_ptr<RandomAccessFile>* result) {
960   string bucket, object;
961   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
962   TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
963   if (cache_enabled_) {
964     result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
965                                                      const string& fname,
966                                                      uint64 offset, size_t n,
967                                                      StringPiece* result,
968                                                      char* scratch) {
969       tf_shared_lock l(block_cache_lock_);
970       GcsFileStat stat;
971       TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
972           fname, &stat,
973           [this, bucket, object](const string& fname, GcsFileStat* stat) {
974             return UncachedStatForObject(fname, bucket, object, stat);
975           }));
976       if (!file_block_cache_->ValidateAndUpdateFileSignature(
977               fname, stat.generation_number)) {
978         VLOG(1)
979             << "File signature has been changed. Refreshing the cache. Path: "
980             << fname;
981       }
982       *result = StringPiece();
983       size_t bytes_transferred;
984       TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
985                                                  &bytes_transferred));
986       *result = StringPiece(scratch, bytes_transferred);
987       if (bytes_transferred < n) {
988         return errors::OutOfRange("EOF reached, ", result->size(),
989                                   " bytes were read out of ", n,
990                                   " bytes requested.");
991       }
992       return Status::OK();
993     }));
994   } else {
995     result->reset(new BufferedGcsRandomAccessFile(
996         fname, block_size_,
997         [this, bucket, object](const string& fname, uint64 offset, size_t n,
998                                StringPiece* result, char* scratch) {
999           *result = StringPiece();
1000           size_t bytes_transferred;
1001           TF_RETURN_IF_ERROR(
1002               LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1003           *result = StringPiece(scratch, bytes_transferred);
1004           if (bytes_transferred < n) {
1005             return errors::OutOfRange("EOF reached, ", result->size(),
1006                                       " bytes were read out of ", n,
1007                                       " bytes requested.");
1008           }
1009           return Status::OK();
1010         }));
1011   }
1012   return Status::OK();
1013 }
1014 
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1015 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1016                                         size_t max_bytes,
1017                                         uint64 max_staleness_secs) {
1018   mutex_lock l(block_cache_lock_);
1019   file_block_cache_ =
1020       MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1021   if (stats_ != nullptr) {
1022     stats_->Configure(this, &throttle_, file_block_cache_.get());
1023   }
1024 }
1025 
1026 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1027 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1028     size_t block_size, size_t max_bytes, uint64 max_staleness) {
1029   std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1030       block_size, max_bytes, max_staleness,
1031       [this](const string& filename, size_t offset, size_t n, char* buffer,
1032              size_t* bytes_transferred) {
1033         return LoadBufferFromGCS(filename, offset, n, buffer,
1034                                  bytes_transferred);
1035       }));
1036 
1037   // Check if cache is enabled here to avoid unnecessary mutex contention.
1038   cache_enabled_ = file_block_cache->IsCacheEnabled();
1039   return file_block_cache;
1040 }
1041 
1042 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1043 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1044                                         size_t n, char* buffer,
1045                                         size_t* bytes_transferred) {
1046   *bytes_transferred = 0;
1047 
1048   string bucket, object;
1049   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1050 
1051   profiler::TraceMe activity(
1052       [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1053 
1054   std::unique_ptr<HttpRequest> request;
1055   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1056                                   "when reading gs://", bucket, "/", object);
1057 
1058   request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1059                                   request->EscapeString(object)));
1060   request->SetRange(offset, offset + n - 1);
1061   request->SetResultBufferDirect(buffer, n);
1062   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1063 
1064   if (stats_ != nullptr) {
1065     stats_->RecordBlockLoadRequest(fname, offset);
1066   }
1067 
1068   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1069                                   bucket, "/", object);
1070 
1071   size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1072   *bytes_transferred = bytes_read;
1073   VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1074           << offset << " of size: " << bytes_read;
1075   activity.AppendMetadata([bytes_read]() {
1076     return profiler::TraceMeEncode({{"block_size", bytes_read}});
1077   });
1078 
1079   if (stats_ != nullptr) {
1080     stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1081   }
1082 
1083   throttle_.RecordResponse(bytes_read);
1084 
1085   if (bytes_read < n) {
1086     // Check stat cache to see if we encountered an interrupted read.
1087     GcsFileStat stat;
1088     if (stat_cache_->Lookup(fname, &stat)) {
1089       if (offset + bytes_read < stat.base.length) {
1090         return errors::Internal(strings::Printf(
1091             "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1092             offset));
1093       }
1094       VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1095               << object << " @ " << offset;
1096     }
1097   }
1098 
1099   return Status::OK();
1100 }
1101 
1102 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1103 Status GcsFileSystem::CreateNewUploadSession(
1104     uint64 start_offset, const std::string& object_to_upload,
1105     const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1106     UploadSessionHandle* session_handle) {
1107   std::vector<char> output_buffer;
1108   std::unique_ptr<HttpRequest> request;
1109   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1110 
1111   std::string uri = strings::StrCat(
1112       kGcsUploadUriBase, "b/", bucket,
1113       "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1114   request->SetUri(uri);
1115   request->AddHeader("X-Upload-Content-Length",
1116                      absl::StrCat(file_size - start_offset));
1117   request->SetPostEmptyBody();
1118   request->SetResultBuffer(&output_buffer);
1119   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1120   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1121                                   " when initiating an upload to ", gcs_path);
1122   if (session_handle != nullptr) {
1123     session_handle->resumable = true;
1124     session_handle->session_uri = request->GetResponseHeader("Location");
1125     if (session_handle->session_uri.empty()) {
1126       return errors::Internal("Unexpected response from GCS when writing to ",
1127                               gcs_path, ": 'Location' header not returned.");
1128     }
1129   }
1130   return Status::OK();
1131 }
1132 
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1133 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1134                                       uint64 start_offset,
1135                                       uint64 already_uploaded,
1136                                       const std::string& tmp_content_filename,
1137                                       uint64 file_size,
1138                                       const std::string& file_path) {
1139   std::unique_ptr<HttpRequest> request;
1140   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1141   request->SetUri(session_uri);
1142   if (file_size > 0) {
1143     request->AddHeader("Content-Range",
1144                        strings::StrCat("bytes ", already_uploaded, "-",
1145                                        file_size - start_offset - 1, "/",
1146                                        file_size - start_offset));
1147   }
1148   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1149 
1150   TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1151                                              start_offset + already_uploaded));
1152   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1153                                   file_path);
1154   return Status::OK();
1155 }
1156 
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1157 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1158                                                  uint64 file_size,
1159                                                  const std::string& gcs_path,
1160                                                  bool* completed,
1161                                                  uint64* uploaded) {
1162   CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1163                                  "param 'completed' == nullptr.";  // Crash ok
1164   CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1165                                 "param 'uploaded' == nullptr.";  // Crash ok
1166   std::unique_ptr<HttpRequest> request;
1167   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1168   request->SetUri(session_uri);
1169   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1170   request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1171   request->SetPutEmptyBody();
1172   Status status = request->Send();
1173   if (status.ok()) {
1174     *completed = true;
1175     return Status::OK();
1176   }
1177   *completed = false;
1178   if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1179     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1180   }
1181   const std::string received_range = request->GetResponseHeader("Range");
1182   if (received_range.empty()) {
1183     // This means GCS doesn't have any bytes of the file yet.
1184     *uploaded = 0;
1185   } else {
1186     StringPiece range_piece(received_range);
1187     absl::ConsumePrefix(&range_piece,
1188                         "bytes=");  // May or may not be present.
1189 
1190     auto return_error = [](const std::string& gcs_path,
1191                            const std::string& error_message) {
1192       return errors::Internal("Unexpected response from GCS when writing ",
1193                               gcs_path, ": ", error_message);
1194     };
1195 
1196     std::vector<string> range_strs = str_util::Split(range_piece, '-');
1197     if (range_strs.size() != 2) {
1198       return return_error(gcs_path, "Range header '" + received_range +
1199                                         "' could not be parsed.");
1200     }
1201 
1202     std::vector<int64> range_parts;
1203     for (const std::string& range_str : range_strs) {
1204       int64_t tmp;
1205       if (strings::safe_strto64(range_str, &tmp)) {
1206         range_parts.push_back(tmp);
1207       } else {
1208         return return_error(gcs_path, "Range header '" + received_range +
1209                                           "' could not be parsed.");
1210       }
1211     }
1212 
1213     if (range_parts[0] != 0) {
1214       return return_error(gcs_path, "The returned range '" + received_range +
1215                                         "' does not start at zero.");
1216     }
1217     // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1218     *uploaded = range_parts[1] + 1;
1219   }
1220   return Status::OK();
1221 }
1222 
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1223 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1224                                             bool empty_object_ok,
1225                                             string* bucket, string* object) {
1226   StringPiece parsed_scheme, bucketp, objectp;
1227   io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1228   if (parsed_scheme != scheme) {
1229     return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1230                                    fname);
1231   }
1232   *bucket = string(bucketp);
1233   if (bucket->empty() || *bucket == ".") {
1234     return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1235                                    fname);
1236   }
1237   absl::ConsumePrefix(&objectp, "/");
1238   *object = string(objectp);
1239   if (!empty_object_ok && object->empty()) {
1240     return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1241                                    fname);
1242   }
1243   return Status::OK();
1244 }
1245 
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1246 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1247                                    string* bucket, string* object) {
1248   return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1249 }
1250 
ClearFileCaches(const string & fname)1251 void GcsFileSystem::ClearFileCaches(const string& fname) {
1252   tf_shared_lock l(block_cache_lock_);
1253   file_block_cache_->RemoveFile(fname);
1254   stat_cache_->Delete(fname);
1255   // TODO(rxsang): Remove the patterns that matche the file in
1256   // MatchingPathsCache as well.
1257 }
1258 
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1259 Status GcsFileSystem::NewWritableFile(const string& fname,
1260                                       TransactionToken* token,
1261                                       std::unique_ptr<WritableFile>* result) {
1262   string bucket, object;
1263   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1264 
1265   auto session_creator =
1266       [this](uint64 start_offset, const std::string& object_to_upload,
1267              const std::string& bucket, uint64 file_size,
1268              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1269         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1270                                       file_size, gcs_path, session_handle);
1271       };
1272   auto object_uploader =
1273       [this](const std::string& session_uri, uint64 start_offset,
1274              uint64 already_uploaded, const std::string& tmp_content_filename,
1275              uint64 file_size, const std::string& file_path) {
1276         return UploadToSession(session_uri, start_offset, already_uploaded,
1277                                tmp_content_filename, file_size, file_path);
1278       };
1279   auto status_poller = [this](const string& session_uri, uint64 file_size,
1280                               const std::string& gcs_path, bool* completed,
1281                               uint64* uploaded) {
1282     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1283                                       completed, uploaded);
1284   };
1285 
1286   auto generation_getter = [this](const string& fname, const string& bucket,
1287                                   const string& object, int64* generation) {
1288     GcsFileStat stat;
1289     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1290         [&fname, &bucket, &object, &stat, this]() {
1291           return UncachedStatForObject(fname, bucket, object, &stat);
1292         },
1293         retry_config_));
1294     *generation = stat.generation_number;
1295     return Status::OK();
1296   };
1297 
1298   result->reset(new GcsWritableFile(
1299       bucket, object, this, &timeouts_,
1300       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1301       compose_append_, session_creator, object_uploader, status_poller,
1302       generation_getter));
1303   return Status::OK();
1304 }
1305 
1306 // Reads the file from GCS in chunks and stores it in a tmp file,
1307 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1308 Status GcsFileSystem::NewAppendableFile(const string& fname,
1309                                         TransactionToken* token,
1310                                         std::unique_ptr<WritableFile>* result) {
1311   std::unique_ptr<RandomAccessFile> reader;
1312   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1313   std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1314   Status status;
1315   uint64 offset = 0;
1316   StringPiece read_chunk;
1317 
1318   // Read the file from GCS in chunks and save it to a tmp file.
1319   string old_content_filename;
1320   TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1321   std::ofstream old_content(old_content_filename, std::ofstream::binary);
1322   while (true) {
1323     status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1324                           buffer.get());
1325     if (status.ok()) {
1326       old_content << read_chunk;
1327       offset += kReadAppendableFileBufferSize;
1328     } else if (status.code() == error::NOT_FOUND) {
1329       // New file, there is no existing content in it.
1330       break;
1331     } else if (status.code() == error::OUT_OF_RANGE) {
1332       // Expected, this means we reached EOF.
1333       old_content << read_chunk;
1334       break;
1335     } else {
1336       return status;
1337     }
1338   }
1339   old_content.close();
1340 
1341   auto session_creator =
1342       [this](uint64 start_offset, const std::string& object_to_upload,
1343              const std::string& bucket, uint64 file_size,
1344              const std::string& gcs_path, UploadSessionHandle* session_handle) {
1345         return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1346                                       file_size, gcs_path, session_handle);
1347       };
1348   auto object_uploader =
1349       [this](const std::string& session_uri, uint64 start_offset,
1350              uint64 already_uploaded, const std::string& tmp_content_filename,
1351              uint64 file_size, const std::string& file_path) {
1352         return UploadToSession(session_uri, start_offset, already_uploaded,
1353                                tmp_content_filename, file_size, file_path);
1354       };
1355 
1356   auto status_poller = [this](const string& session_uri, uint64 file_size,
1357                               const std::string& gcs_path, bool* completed,
1358                               uint64* uploaded) {
1359     return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1360                                       completed, uploaded);
1361   };
1362 
1363   auto generation_getter = [this](const string& fname, const string& bucket,
1364                                   const string& object, int64* generation) {
1365     GcsFileStat stat;
1366     TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1367         [&fname, &bucket, &object, &stat, this]() {
1368           return UncachedStatForObject(fname, bucket, object, &stat);
1369         },
1370         retry_config_));
1371     *generation = stat.generation_number;
1372     return Status::OK();
1373   };
1374 
1375   // Create a writable file and pass the old content to it.
1376   string bucket, object;
1377   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1378   result->reset(new GcsWritableFile(
1379       bucket, object, this, old_content_filename, &timeouts_,
1380       [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1381       compose_append_, session_creator, object_uploader, status_poller,
1382       generation_getter));
1383   return Status::OK();
1384 }
1385 
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1386 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1387     const string& fname, TransactionToken* token,
1388     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1389   uint64 size;
1390   TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1391   std::unique_ptr<char[]> data(new char[size]);
1392 
1393   std::unique_ptr<RandomAccessFile> file;
1394   TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1395 
1396   StringPiece piece;
1397   TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1398 
1399   result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1400   return Status::OK();
1401 }
1402 
FileExists(const string & fname,TransactionToken * token)1403 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1404   string bucket, object;
1405   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1406   if (object.empty()) {
1407     bool result;
1408     TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1409     if (result) {
1410       return Status::OK();
1411     }
1412   }
1413 
1414   // Check if the object exists.
1415   GcsFileStat stat;
1416   const Status status = StatForObject(fname, bucket, object, &stat);
1417   if (status.code() != errors::Code::NOT_FOUND) {
1418     return status;
1419   }
1420 
1421   // Check if the folder exists.
1422   bool result;
1423   TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1424   if (result) {
1425     return Status::OK();
1426   }
1427   return errors::NotFound("The specified path ", fname, " was not found.");
1428 }
1429 
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1430 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1431                                    const string& object, bool* result) {
1432   GcsFileStat stat;
1433   const Status status = StatForObject(fname, bucket, object, &stat);
1434   switch (status.code()) {
1435     case errors::Code::OK:
1436       *result = !stat.base.is_directory;
1437       return Status::OK();
1438     case errors::Code::NOT_FOUND:
1439       *result = false;
1440       return Status::OK();
1441     default:
1442       return status;
1443   }
1444 }
1445 
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1446 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1447                                             const string& bucket,
1448                                             const string& object,
1449                                             GcsFileStat* stat) {
1450   std::vector<char> output_buffer;
1451   std::unique_ptr<HttpRequest> request;
1452   TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1453                                   " when reading metadata of gs://", bucket,
1454                                   "/", object);
1455 
1456   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1457                                   request->EscapeString(object),
1458                                   "?fields=size%2Cgeneration%2Cupdated"));
1459   request->SetResultBuffer(&output_buffer);
1460   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1461 
1462   if (stats_ != nullptr) {
1463     stats_->RecordStatObjectRequest();
1464   }
1465 
1466   TF_RETURN_WITH_CONTEXT_IF_ERROR(
1467       request->Send(), " when reading metadata of gs://", bucket, "/", object);
1468 
1469   Json::Value root;
1470   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1471 
1472   // Parse file size.
1473   TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1474 
1475   // Parse generation number.
1476   TF_RETURN_IF_ERROR(
1477       GetInt64Value(root, "generation", &stat->generation_number));
1478 
1479   // Parse file modification time.
1480   string updated;
1481   TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1482   TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1483 
1484   VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1485           << " length: " << stat->base.length
1486           << " generation: " << stat->generation_number
1487           << "; mtime_nsec: " << stat->base.mtime_nsec
1488           << "; updated: " << updated;
1489 
1490   if (str_util::EndsWith(fname, "/")) {
1491     // In GCS a path can be both a directory and a file, both it is uncommon for
1492     // other file systems. To avoid the ambiguity, if a path ends with "/" in
1493     // GCS, we always regard it as a directory mark or a virtual directory.
1494     stat->base.is_directory = true;
1495   } else {
1496     stat->base.is_directory = false;
1497   }
1498   return Status::OK();
1499 }
1500 
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1501 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1502                                     const string& object, GcsFileStat* stat) {
1503   if (object.empty()) {
1504     return errors::InvalidArgument(strings::Printf(
1505         "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1506   }
1507 
1508   TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1509       fname, stat,
1510       [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1511         return UncachedStatForObject(fname, bucket, object, stat);
1512       }));
1513   return Status::OK();
1514 }
1515 
BucketExists(const string & bucket,bool * result)1516 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1517   const Status status = GetBucketMetadata(bucket, nullptr);
1518   switch (status.code()) {
1519     case errors::Code::OK:
1520       *result = true;
1521       return Status::OK();
1522     case errors::Code::NOT_FOUND:
1523       *result = false;
1524       return Status::OK();
1525     default:
1526       return status;
1527   }
1528 }
1529 
CheckBucketLocationConstraint(const string & bucket)1530 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1531   if (allowed_locations_.empty()) {
1532     return Status::OK();
1533   }
1534 
1535   // Avoid calling external API's in the constructor
1536   if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1537     string zone;
1538     TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1539     allowed_locations_.insert(ZoneToRegion(&zone));
1540   }
1541 
1542   string location;
1543   TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1544   if (allowed_locations_.find(location) != allowed_locations_.end()) {
1545     return Status::OK();
1546   }
1547 
1548   return errors::FailedPrecondition(strings::Printf(
1549       "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1550       bucket.c_str(), location.c_str(),
1551       absl::StrJoin(allowed_locations_, ", ").c_str()));
1552 }
1553 
GetBucketLocation(const string & bucket,string * location)1554 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1555                                         string* location) {
1556   auto compute_func = [this](const string& bucket, string* location) {
1557     std::vector<char> result_buffer;
1558     Status status = GetBucketMetadata(bucket, &result_buffer);
1559     Json::Value result;
1560     TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1561     string bucket_location;
1562     TF_RETURN_IF_ERROR(
1563         GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1564     // Lowercase the GCS location to be case insensitive for allowed locations.
1565     *location = absl::AsciiStrToLower(bucket_location);
1566     return Status::OK();
1567   };
1568 
1569   TF_RETURN_IF_ERROR(
1570       bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1571 
1572   return Status::OK();
1573 }
1574 
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1575 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1576                                         std::vector<char>* result_buffer) {
1577   std::unique_ptr<HttpRequest> request;
1578   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1579   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1580 
1581   if (result_buffer != nullptr) {
1582     request->SetResultBuffer(result_buffer);
1583   }
1584 
1585   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1586   return request->Send();
1587 }
1588 
FolderExists(const string & dirname,bool * result)1589 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1590   StatCache::ComputeFunc compute_func = [this](const string& dirname,
1591                                                GcsFileStat* stat) {
1592     std::vector<string> children;
1593     TF_RETURN_IF_ERROR(
1594         GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1595                            true /* include_self_directory_marker */));
1596     if (!children.empty()) {
1597       stat->base = DIRECTORY_STAT;
1598       return Status::OK();
1599     } else {
1600       return errors::InvalidArgument("Not a directory!");
1601     }
1602   };
1603   GcsFileStat stat;
1604   Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1605                                           compute_func);
1606   if (s.ok()) {
1607     *result = stat.base.is_directory;
1608     return Status::OK();
1609   }
1610   if (errors::IsInvalidArgument(s)) {
1611     *result = false;
1612     return Status::OK();
1613   }
1614   return s;
1615 }
1616 
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1617 Status GcsFileSystem::GetChildren(const string& dirname,
1618                                   TransactionToken* token,
1619                                   std::vector<string>* result) {
1620   return GetChildrenBounded(dirname, UINT64_MAX, result,
1621                             false /* recursively */,
1622                             false /* include_self_directory_marker */);
1623 }
1624 
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1625 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1626                                        TransactionToken* token,
1627                                        std::vector<string>* results) {
1628   MatchingPathsCache::ComputeFunc compute_func =
1629       [this](const string& pattern, std::vector<string>* results) {
1630         results->clear();
1631         // Find the fixed prefix by looking for the first wildcard.
1632         const string& fixed_prefix =
1633             pattern.substr(0, pattern.find_first_of("*?[\\"));
1634         const string dir(this->Dirname(fixed_prefix));
1635         if (dir.empty()) {
1636           return errors::InvalidArgument(
1637               "A GCS pattern doesn't have a bucket name: ", pattern);
1638         }
1639         std::vector<string> all_files;
1640         TF_RETURN_IF_ERROR(GetChildrenBounded(
1641             dir, UINT64_MAX, &all_files, true /* recursively */,
1642             false /* include_self_directory_marker */));
1643 
1644         const auto& files_and_folders = AddAllSubpaths(all_files);
1645 
1646         // To handle `/` in the object names, we need to remove it from `dir`
1647         // and then use `StrCat` to insert it back.
1648         const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1649 
1650         // Match all obtained paths to the input pattern.
1651         for (const auto& path : files_and_folders) {
1652           // Manually construct the path instead of using `JoinPath` for the
1653           // cases where `path` starts with a `/` (which is a valid character in
1654           // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1655           // removing duplicate slashes. We know that `dir_no_slash` does not
1656           // end in `/`, so we are safe inserting the new `/` here as the path
1657           // separator.
1658           const string full_path = strings::StrCat(dir_no_slash, "/", path);
1659           if (this->Match(full_path, pattern)) {
1660             results->push_back(full_path);
1661           }
1662         }
1663         return Status::OK();
1664       };
1665   TF_RETURN_IF_ERROR(
1666       matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1667   return Status::OK();
1668 }
1669 
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1670 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1671                                          uint64 max_results,
1672                                          std::vector<string>* result,
1673                                          bool recursive,
1674                                          bool include_self_directory_marker) {
1675   if (!result) {
1676     return errors::InvalidArgument("'result' cannot be null");
1677   }
1678   string bucket, object_prefix;
1679   TF_RETURN_IF_ERROR(
1680       ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1681 
1682   string nextPageToken;
1683   uint64 retrieved_results = 0;
1684   while (true) {  // A loop over multiple result pages.
1685     std::vector<char> output_buffer;
1686     std::unique_ptr<HttpRequest> request;
1687     TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1688     auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1689     if (recursive) {
1690       uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1691     } else {
1692       // Set "/" as a delimiter to ask GCS to treat subfolders as children
1693       // and return them in "prefixes".
1694       uri = strings::StrCat(uri,
1695                             "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1696       uri = strings::StrCat(uri, "&delimiter=%2F");
1697     }
1698     if (!object_prefix.empty()) {
1699       uri = strings::StrCat(uri,
1700                             "&prefix=", request->EscapeString(object_prefix));
1701     }
1702     if (!nextPageToken.empty()) {
1703       uri = strings::StrCat(
1704           uri, "&pageToken=", request->EscapeString(nextPageToken));
1705     }
1706     if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1707       uri =
1708           strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1709     }
1710     request->SetUri(uri);
1711     request->SetResultBuffer(&output_buffer);
1712     request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1713 
1714     TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1715     Json::Value root;
1716     TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1717     const auto items = root.get("items", Json::Value::null);
1718     if (!items.isNull()) {
1719       if (!items.isArray()) {
1720         return errors::Internal(
1721             "Expected an array 'items' in the GCS response.");
1722       }
1723       for (size_t i = 0; i < items.size(); i++) {
1724         const auto item = items.get(i, Json::Value::null);
1725         if (!item.isObject()) {
1726           return errors::Internal(
1727               "Unexpected JSON format: 'items' should be a list of objects.");
1728         }
1729         string name;
1730         TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1731         // The names should be relative to the 'dirname'. That means the
1732         // 'object_prefix', which is part of 'dirname', should be removed from
1733         // the beginning of 'name'.
1734         StringPiece relative_path(name);
1735         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1736           return errors::Internal(strings::StrCat(
1737               "Unexpected response: the returned file name ", name,
1738               " doesn't match the prefix ", object_prefix));
1739         }
1740         if (!relative_path.empty() || include_self_directory_marker) {
1741           result->emplace_back(relative_path);
1742         }
1743         if (++retrieved_results >= max_results) {
1744           return Status::OK();
1745         }
1746       }
1747     }
1748     const auto prefixes = root.get("prefixes", Json::Value::null);
1749     if (!prefixes.isNull()) {
1750       // Subfolders are returned for the non-recursive mode.
1751       if (!prefixes.isArray()) {
1752         return errors::Internal(
1753             "'prefixes' was expected to be an array in the GCS response.");
1754       }
1755       for (size_t i = 0; i < prefixes.size(); i++) {
1756         const auto prefix = prefixes.get(i, Json::Value::null);
1757         if (prefix.isNull() || !prefix.isString()) {
1758           return errors::Internal(
1759               "'prefixes' was expected to be an array of strings in the GCS "
1760               "response.");
1761         }
1762         const string& prefix_str = prefix.asString();
1763         StringPiece relative_path(prefix_str);
1764         if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1765           return errors::Internal(
1766               "Unexpected response: the returned folder name ", prefix_str,
1767               " doesn't match the prefix ", object_prefix);
1768         }
1769         result->emplace_back(relative_path);
1770         if (++retrieved_results >= max_results) {
1771           return Status::OK();
1772         }
1773       }
1774     }
1775     const auto token = root.get("nextPageToken", Json::Value::null);
1776     if (token.isNull()) {
1777       return Status::OK();
1778     }
1779     if (!token.isString()) {
1780       return errors::Internal(
1781           "Unexpected response: nextPageToken is not a string");
1782     }
1783     nextPageToken = token.asString();
1784   }
1785 }
1786 
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1787 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1788                            FileStatistics* stat) {
1789   if (!stat) {
1790     return errors::Internal("'stat' cannot be nullptr.");
1791   }
1792   string bucket, object;
1793   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1794   if (object.empty()) {
1795     bool is_bucket;
1796     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1797     if (is_bucket) {
1798       *stat = DIRECTORY_STAT;
1799       return Status::OK();
1800     }
1801     return errors::NotFound("The specified bucket ", fname, " was not found.");
1802   }
1803 
1804   GcsFileStat gcs_stat;
1805   const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1806   if (status.ok()) {
1807     *stat = gcs_stat.base;
1808     return Status::OK();
1809   }
1810   if (status.code() != errors::Code::NOT_FOUND) {
1811     return status;
1812   }
1813   bool is_folder;
1814   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1815   if (is_folder) {
1816     *stat = DIRECTORY_STAT;
1817     return Status::OK();
1818   }
1819   return errors::NotFound("The specified path ", fname, " was not found.");
1820 }
1821 
DeleteFile(const string & fname,TransactionToken * token)1822 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1823   string bucket, object;
1824   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1825 
1826   std::unique_ptr<HttpRequest> request;
1827   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1828   request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1829                                   request->EscapeString(object)));
1830   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1831   request->SetDeleteRequest();
1832 
1833   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1834   ClearFileCaches(fname);
1835   return Status::OK();
1836 }
1837 
CreateDir(const string & dirname,TransactionToken * token)1838 Status GcsFileSystem::CreateDir(const string& dirname,
1839                                 TransactionToken* token) {
1840   string dirname_with_slash = MaybeAppendSlash(dirname);
1841   VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1842           << " and dirname_with_slash: " << dirname_with_slash;
1843   string bucket, object;
1844   TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1845                                   &bucket, &object));
1846   if (object.empty()) {
1847     bool is_bucket;
1848     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1849     return is_bucket ? Status::OK()
1850                      : errors::NotFound("The specified bucket ",
1851                                         dirname_with_slash, " was not found.");
1852   }
1853 
1854   if (FileExists(dirname_with_slash, token).ok()) {
1855     // Use the original name for a correct error here.
1856     VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1857     return errors::AlreadyExists(dirname);
1858   }
1859 
1860   std::unique_ptr<HttpRequest> request;
1861   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1862 
1863   request->SetUri(strings::StrCat(
1864       kGcsUploadUriBase, "b/", bucket,
1865       "/o?uploadType=media&name=", request->EscapeString(object),
1866       // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1867       // will be returned if the object already exists, so avoid reuploading.
1868       "&ifGenerationMatch=0"));
1869 
1870   request->SetPostEmptyBody();
1871   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1872   const Status& status = request->Send();
1873   if (status.ok()) {
1874     VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1875     return Status::OK();
1876   }
1877   if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1878     TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1879                                     dirname_with_slash);
1880   }
1881   VLOG(3) << "Ignoring directory already exists on object "
1882           << dirname_with_slash;
1883   return errors::AlreadyExists(dirname);
1884 }
1885 
1886 // Checks that the directory is empty (i.e no objects with this prefix exist).
1887 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1888 Status GcsFileSystem::DeleteDir(const string& dirname,
1889                                 TransactionToken* token) {
1890   std::vector<string> children;
1891   // A directory is considered empty either if there are no matching objects
1892   // with the corresponding name prefix or if there is exactly one matching
1893   // object and it is the directory marker. Therefore we need to retrieve
1894   // at most two children for the prefix to detect if a directory is empty.
1895   TF_RETURN_IF_ERROR(
1896       GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1897                          true /* include_self_directory_marker */));
1898 
1899   if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1900     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1901   }
1902   if (children.size() == 1 && children[0].empty()) {
1903     // This is the directory marker object. Delete it.
1904     return DeleteFile(MaybeAppendSlash(dirname), token);
1905   }
1906   return Status::OK();
1907 }
1908 
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1909 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1910                                   uint64* file_size) {
1911   if (!file_size) {
1912     return errors::Internal("'file_size' cannot be nullptr.");
1913   }
1914 
1915   // Only validate the name.
1916   string bucket, object;
1917   TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1918 
1919   FileStatistics stat;
1920   TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1921   *file_size = stat.length;
1922   return Status::OK();
1923 }
1924 
RenameFile(const string & src,const string & target,TransactionToken * token)1925 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1926                                  TransactionToken* token) {
1927   if (!IsDirectory(src, token).ok()) {
1928     return RenameObject(src, target);
1929   }
1930   // Rename all individual objects in the directory one by one.
1931   std::vector<string> children;
1932   TF_RETURN_IF_ERROR(
1933       GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1934                          true /* include_self_directory_marker */));
1935   for (const string& subpath : children) {
1936     TF_RETURN_IF_ERROR(
1937         RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1938   }
1939   return Status::OK();
1940 }
1941 
1942 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1943 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1944   VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1945   string src_bucket, src_object, target_bucket, target_object;
1946   TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1947   TF_RETURN_IF_ERROR(
1948       ParseGcsPath(target, false, &target_bucket, &target_object));
1949 
1950   std::unique_ptr<HttpRequest> request;
1951   TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1952   request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1953                                   request->EscapeString(src_object),
1954                                   "/rewriteTo/b/", target_bucket, "/o/",
1955                                   request->EscapeString(target_object)));
1956   request->SetPostEmptyBody();
1957   request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1958   std::vector<char> output_buffer;
1959   request->SetResultBuffer(&output_buffer);
1960   TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1961                                   " to ", target);
1962   // Flush the target from the caches.  The source will be flushed in the
1963   // DeleteFile call below.
1964   ClearFileCaches(target);
1965   Json::Value root;
1966   TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1967   bool done;
1968   TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1969   if (!done) {
1970     // If GCS didn't complete rewrite in one call, this means that a large file
1971     // is being copied to a bucket with a different storage class or location,
1972     // which requires multiple rewrite calls.
1973     // TODO(surkov): implement multi-step rewrites.
1974     return errors::Unimplemented(
1975         "Couldn't rename ", src, " to ", target,
1976         ": moving large files between buckets with different "
1977         "locations or storage classes is not supported.");
1978   }
1979 
1980   VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1981   // In case the delete API call failed, but the deletion actually happened
1982   // on the server side, we can't just retry the whole RenameFile operation
1983   // because the source object is already gone.
1984   return RetryingUtils::DeleteWithRetries(
1985       [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1986 }
1987 
IsDirectory(const string & fname,TransactionToken * token)1988 Status GcsFileSystem::IsDirectory(const string& fname,
1989                                   TransactionToken* token) {
1990   string bucket, object;
1991   TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1992   if (object.empty()) {
1993     bool is_bucket;
1994     TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1995     if (is_bucket) {
1996       return Status::OK();
1997     }
1998     return errors::NotFound("The specified bucket gs://", bucket,
1999                             " was not found.");
2000   }
2001   bool is_folder;
2002   TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2003   if (is_folder) {
2004     return Status::OK();
2005   }
2006   bool is_object;
2007   TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2008   if (is_object) {
2009     return errors::FailedPrecondition("The specified path ", fname,
2010                                       " is not a directory.");
2011   }
2012   return errors::NotFound("The specified path ", fname, " was not found.");
2013 }
2014 
DeleteRecursively(const string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)2015 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2016                                         TransactionToken* token,
2017                                         int64* undeleted_files,
2018                                         int64* undeleted_dirs) {
2019   if (!undeleted_files || !undeleted_dirs) {
2020     return errors::Internal(
2021         "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2022   }
2023   *undeleted_files = 0;
2024   *undeleted_dirs = 0;
2025   if (!IsDirectory(dirname, token).ok()) {
2026     *undeleted_dirs = 1;
2027     return Status(
2028         error::NOT_FOUND,
2029         strings::StrCat(dirname, " doesn't exist or not a directory."));
2030   }
2031   std::vector<string> all_objects;
2032   // Get all children in the directory recursively.
2033   TF_RETURN_IF_ERROR(GetChildrenBounded(
2034       dirname, UINT64_MAX, &all_objects, true /* recursively */,
2035       true /* include_self_directory_marker */));
2036   for (const string& object : all_objects) {
2037     const string& full_path = JoinGcsPath(dirname, object);
2038     // Delete all objects including directory markers for subfolders.
2039     // Since DeleteRecursively returns OK if individual file deletions fail,
2040     // and therefore RetryingFileSystem won't pay attention to the failures,
2041     // we need to make sure these failures are properly retried.
2042     const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2043         [this, &full_path, token]() { return DeleteFile(full_path, token); },
2044         retry_config_);
2045     if (!delete_file_status.ok()) {
2046       if (IsDirectory(full_path, token).ok()) {
2047         // The object is a directory marker.
2048         (*undeleted_dirs)++;
2049       } else {
2050         (*undeleted_files)++;
2051       }
2052     }
2053   }
2054   return Status::OK();
2055 }
2056 
2057 // Flushes all caches for filesystem metadata and file contents. Useful for
2058 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2059 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2060 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2061   tf_shared_lock l(block_cache_lock_);
2062   file_block_cache_->Flush();
2063   stat_cache_->Clear();
2064   matching_paths_cache_->Clear();
2065   bucket_location_cache_->Clear();
2066 }
2067 
SetStats(GcsStatsInterface * stats)2068 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2069   CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2070   CHECK(stats != nullptr);
2071   mutex_lock l(block_cache_lock_);
2072   stats_ = stats;
2073   stats_->Configure(this, &throttle_, file_block_cache_.get());
2074 }
2075 
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2076 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2077   tf_shared_lock l(block_cache_lock_);
2078   if (file_block_cache_ == nullptr) {
2079     LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2080                   "cache object. This may result in not exporting the intended "
2081                   "monitoring data";
2082     return;
2083   }
2084   file_block_cache_->SetStats(cache_stats);
2085 }
2086 
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2087 void GcsFileSystem::SetAuthProvider(
2088     std::unique_ptr<AuthProvider> auth_provider) {
2089   mutex_lock l(mu_);
2090   auth_provider_ = std::move(auth_provider);
2091 }
2092 
2093 // Creates an HttpRequest and sets several parameters that are common to all
2094 // requests.  All code (in GcsFileSystem) that creates an HttpRequest should
2095 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2096 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2097   std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2098   if (dns_cache_) {
2099     dns_cache_->AnnotateRequest(new_request.get());
2100   }
2101 
2102   string auth_token;
2103   {
2104     tf_shared_lock l(mu_);
2105     TF_RETURN_IF_ERROR(
2106         AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2107   }
2108 
2109   new_request->AddAuthBearerHeader(auth_token);
2110 
2111   if (additional_header_) {
2112     new_request->AddHeader(additional_header_->first,
2113                            additional_header_->second);
2114   }
2115 
2116   if (stats_ != nullptr) {
2117     new_request->SetRequestStats(stats_->HttpStats());
2118   }
2119 
2120   if (!throttle_.AdmitRequest()) {
2121     return errors::Unavailable("Request throttled");
2122   }
2123 
2124   *request = std::move(new_request);
2125   return Status::OK();
2126 }
2127 
2128 }  // namespace tensorflow
2129 
2130 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2131 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2132 // gcs_file_system will not be registered as a file system since the
2133 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2134 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2135 // This option is set ON/OFF in the GCP TPU tensorflow config.
2136 // Initialize gcs_file_system
2137 REGISTER_LEGACY_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2138