1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17
18 #include <stdio.h>
19 #include <unistd.h>
20
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <vector>
27
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/strcat.h"
30 #ifdef _WIN32
31 #include <io.h> // for _mktemp
32 #endif
33 #include "absl/base/macros.h"
34 #include "json/json.h"
35 #include "tensorflow/core/lib/gtl/map_util.h"
36 #include "tensorflow/core/platform/cloud/curl_http_request.h"
37 #include "tensorflow/core/platform/cloud/file_block_cache.h"
38 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
39 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
40 #include "tensorflow/core/platform/cloud/time_util.h"
41 #include "tensorflow/core/platform/env.h"
42 #include "tensorflow/core/platform/errors.h"
43 #include "tensorflow/core/platform/mutex.h"
44 #include "tensorflow/core/platform/numbers.h"
45 #include "tensorflow/core/platform/path.h"
46 #include "tensorflow/core/platform/protobuf.h"
47 #include "tensorflow/core/platform/retrying_utils.h"
48 #include "tensorflow/core/platform/str_util.h"
49 #include "tensorflow/core/platform/stringprintf.h"
50 #include "tensorflow/core/platform/thread_annotations.h"
51 #include "tensorflow/core/profiler/lib/traceme.h"
52
53 #ifdef _WIN32
54 #ifdef DeleteFile
55 #undef DeleteFile
56 #endif
57 #endif
58
59 namespace tensorflow {
60 namespace {
61
62 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
63 constexpr char kGcsUploadUriBase[] =
64 "https://www.googleapis.com/upload/storage/v1/";
65 constexpr char kStorageHost[] = "storage.googleapis.com";
66 constexpr char kBucketMetadataLocationKey[] = "location";
67 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024; // In bytes.
68 constexpr int kGetChildrenDefaultPageSize = 1000;
69 // The HTTP response code "308 Resume Incomplete".
70 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
71 // The HTTP response code "412 Precondition Failed".
72 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
73 // The environment variable that overrides the size of the readahead buffer.
74 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
75 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
76 // The environment variable that overrides the maximum age of entries in the
77 // Stat cache. A value of 0 (the default) means nothing is cached.
78 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
79 constexpr uint64 kStatCacheDefaultMaxAge = 5;
80 // The environment variable that overrides the maximum number of entries in the
81 // Stat cache.
82 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
83 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
84 // The environment variable that overrides the maximum age of entries in the
85 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
86 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
87 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
88 // The environment variable that overrides the maximum number of entries in the
89 // GetMatchingPaths cache.
90 constexpr char kMatchingPathsCacheMaxEntries[] =
91 "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
92 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
93 // Number of bucket locations cached, most workloads wont touch more than one
94 // bucket so this limit is set fairly low
95 constexpr size_t kBucketLocationCacheMaxEntries = 10;
96 // ExpiringLRUCache doesnt support any "cache forever" option
97 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
98 // The file statistics returned by Stat() for directories.
99 const FileStatistics DIRECTORY_STAT(0, 0, true);
100 // Some environments exhibit unreliable DNS resolution. Set this environment
101 // variable to a positive integer describing the frequency used to refresh the
102 // userspace DNS cache.
103 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
104 // The environment variable to configure the http request's connection timeout.
105 constexpr char kRequestConnectionTimeout[] =
106 "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
107 // The environment variable to configure the http request's idle timeout.
108 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
109 // The environment variable to configure the overall request timeout for
110 // metadata requests.
111 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // block reads requests.
114 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // upload requests.
117 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure an additional header to send with
119 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
120 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
121 // The environment variable to configure the throttle (format: <int64>)
122 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
123 // The environment variable to configure the token bucket size (format: <int64>)
124 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
125 // The environment variable that controls the number of tokens per request.
126 // (format: <int64>)
127 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
128 // The environment variable to configure the initial tokens (format: <int64>)
129 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
130
131 // The environment variable to customize which GCS bucket locations are allowed,
132 // if the list is empty defaults to using the region of the zone (format, comma
133 // delimited list). Requires 'storage.buckets.get' permission.
134 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
135 // When this value is passed as an allowed location detects the zone tensorflow
136 // is running in and restricts to buckets in that region.
137 constexpr char kDetectZoneSentinelValue[] = "auto";
138
139 // How to upload new data when Flush() is called multiple times.
140 // By default the entire file is reuploaded.
141 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
142 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
143 // temporary object and composed with the original object. This is disabled by
144 // default as the multiple API calls required add a risk of stranding temporary
145 // objects.
146 constexpr char kComposeAppend[] = "compose";
147
GetTmpFilename(string * filename)148 Status GetTmpFilename(string* filename) {
149 *filename = io::GetTempFilename("");
150 return Status::OK();
151 }
152
153 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)154 string MaybeAppendSlash(const string& name) {
155 if (name.empty()) {
156 return "/";
157 }
158 if (name.back() != '/') {
159 return strings::StrCat(name, "/");
160 }
161 return name;
162 }
163
164 // io::JoinPath() doesn't work in cases when we want an empty subpath
165 // to result in an appended slash in order for directory markers
166 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)167 string JoinGcsPath(const string& path, const string& subpath) {
168 return strings::StrCat(MaybeAppendSlash(path), subpath);
169 }
170
171 /// \brief Returns the given paths appending all their subfolders.
172 ///
173 /// For every path X in the list, every subfolder in X is added to the
174 /// resulting list.
175 /// For example:
176 /// - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
177 /// - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
178 /// - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
179 /// - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)180 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
181 std::set<string> result;
182 result.insert(paths.begin(), paths.end());
183 for (const string& path : paths) {
184 StringPiece subpath = io::Dirname(path);
185 // If `path` starts with `/`, `subpath` will be `/` and then we get into an
186 // infinite loop. Same behavior happens if there is a `//` pattern in
187 // `path`, so we check for that and leave the loop quicker.
188 while (!(subpath.empty() || subpath == "/")) {
189 result.emplace(string(subpath));
190 subpath = io::Dirname(subpath);
191 }
192 }
193 return result;
194 }
195
ParseJson(StringPiece json,Json::Value * result)196 Status ParseJson(StringPiece json, Json::Value* result) {
197 Json::Reader reader;
198 if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
199 return errors::Internal("Couldn't parse JSON response from GCS.");
200 }
201 return Status::OK();
202 }
203
ParseJson(const std::vector<char> & json,Json::Value * result)204 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
205 return ParseJson(StringPiece{json.data(), json.size()}, result);
206 }
207
208 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)209 Status GetValue(const Json::Value& parent, const char* name,
210 Json::Value* result) {
211 *result = parent.get(name, Json::Value::null);
212 if (result->isNull()) {
213 return errors::Internal("The field '", name,
214 "' was expected in the JSON response.");
215 }
216 return Status::OK();
217 }
218
219 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)220 Status GetStringValue(const Json::Value& parent, const char* name,
221 string* result) {
222 Json::Value result_value;
223 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
224 if (!result_value.isString()) {
225 return errors::Internal(
226 "The field '", name,
227 "' in the JSON response was expected to be a string.");
228 }
229 *result = result_value.asString();
230 return Status::OK();
231 }
232
233 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64 * result)234 Status GetInt64Value(const Json::Value& parent, const char* name,
235 int64* result) {
236 Json::Value result_value;
237 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
238 if (result_value.isNumeric()) {
239 *result = result_value.asInt64();
240 return Status::OK();
241 }
242 if (result_value.isString() &&
243 strings::safe_strto64(result_value.asCString(), result)) {
244 return Status::OK();
245 }
246 return errors::Internal(
247 "The field '", name,
248 "' in the JSON response was expected to be a number.");
249 }
250
251 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)252 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
253 Json::Value result_value;
254 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
255 if (!result_value.isBool()) {
256 return errors::Internal(
257 "The field '", name,
258 "' in the JSON response was expected to be a boolean.");
259 }
260 *result = result_value.asBool();
261 return Status::OK();
262 }
263
264 /// A GCS-based implementation of a random access file with an LRU block cache.
265 class GcsRandomAccessFile : public RandomAccessFile {
266 public:
267 using ReadFn =
268 std::function<Status(const string& filename, uint64 offset, size_t n,
269 StringPiece* result, char* scratch)>;
270
GcsRandomAccessFile(const string & filename,ReadFn read_fn)271 GcsRandomAccessFile(const string& filename, ReadFn read_fn)
272 : filename_(filename), read_fn_(std::move(read_fn)) {}
273
Name(StringPiece * result) const274 Status Name(StringPiece* result) const override {
275 *result = filename_;
276 return Status::OK();
277 }
278
279 /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const280 Status Read(uint64 offset, size_t n, StringPiece* result,
281 char* scratch) const override {
282 return read_fn_(filename_, offset, n, result, scratch);
283 }
284
285 private:
286 /// The filename of this file.
287 const string filename_;
288 /// The implementation of the read operation (provided by the GCSFileSystem).
289 const ReadFn read_fn_;
290 };
291
292 /// A GCS-based implementation of a random access file with a read buffer.
293 class BufferedGcsRandomAccessFile : public RandomAccessFile {
294 public:
295 using ReadFn =
296 std::function<Status(const string& filename, uint64 offset, size_t n,
297 StringPiece* result, char* scratch)>;
298
299 // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)300 BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
301 ReadFn read_fn)
302 : filename_(filename),
303 read_fn_(std::move(read_fn)),
304 buffer_size_(buffer_size),
305 buffer_start_(0),
306 buffer_end_is_past_eof_(false) {}
307
Name(StringPiece * result) const308 Status Name(StringPiece* result) const override {
309 *result = filename_;
310 return Status::OK();
311 }
312
313 /// The implementation of reads with an read buffer. Thread safe.
314 /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
315 /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const316 Status Read(uint64 offset, size_t n, StringPiece* result,
317 char* scratch) const override {
318 if (n > buffer_size_) {
319 return read_fn_(filename_, offset, n, result, scratch);
320 }
321 {
322 mutex_lock l(buffer_mutex_);
323 size_t buffer_end = buffer_start_ + buffer_.size();
324 size_t copy_size = 0;
325 if (offset < buffer_end && offset >= buffer_start_) {
326 copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
327 memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
328 *result = StringPiece(scratch, copy_size);
329 }
330 bool consumed_buffer_to_eof =
331 offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
332 if (copy_size < n && !consumed_buffer_to_eof) {
333 Status status = FillBuffer(offset + copy_size);
334 if (!status.ok() && status.code() != errors::Code::OUT_OF_RANGE) {
335 // Empty the buffer to avoid caching bad reads.
336 buffer_.resize(0);
337 return status;
338 }
339 size_t remaining_copy = std::min(n - copy_size, buffer_.size());
340 memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
341 copy_size += remaining_copy;
342 *result = StringPiece(scratch, copy_size);
343 }
344 if (copy_size < n) {
345 // Forget the end-of-file flag to allow for clients that poll on the
346 // same file.
347 buffer_end_is_past_eof_ = false;
348 return errors::OutOfRange("EOF reached. Requested to read ", n,
349 " bytes from ", offset, ".");
350 }
351 }
352 return Status::OK();
353 }
354
355 private:
FillBuffer(uint64 start) const356 Status FillBuffer(uint64 start) const
357 TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
358 buffer_start_ = start;
359 buffer_.resize(buffer_size_);
360 StringPiece str_piece;
361 Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
362 &(buffer_[0]));
363 buffer_end_is_past_eof_ = status.code() == errors::Code::OUT_OF_RANGE;
364 buffer_.resize(str_piece.size());
365 return status;
366 }
367
368 // The filename of this file.
369 const string filename_;
370
371 // The implementation of the read operation (provided by the GCSFileSystem).
372 const ReadFn read_fn_;
373
374 // Size of buffer that we read from GCS each time we send a request.
375 const uint64 buffer_size_;
376
377 // Mutex for buffering operations that can be accessed from multiple threads.
378 // The following members are mutable in order to provide a const Read.
379 mutable mutex buffer_mutex_;
380
381 // Offset of buffer from start of the file.
382 mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
383
384 mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
385
386 mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
387 };
388
389 // Function object declaration with params needed to create upload sessions.
390 typedef std::function<Status(
391 uint64 start_offset, const std::string& object_to_upload,
392 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
393 UploadSessionHandle* session_handle)>
394 SessionCreator;
395
396 // Function object declaration with params needed to upload objects.
397 typedef std::function<Status(const std::string& session_uri,
398 uint64 start_offset, uint64 already_uploaded,
399 const std::string& tmp_content_filename,
400 uint64 file_size, const std::string& file_path)>
401 ObjectUploader;
402
403 // Function object declaration with params needed to poll upload status.
404 typedef std::function<Status(const string& session_uri, uint64 file_size,
405 const std::string& gcs_path, bool* completed,
406 uint64* uploaded)>
407 StatusPoller;
408
409 // Function object declaration with params needed to poll upload status.
410 typedef std::function<Status(const string& fname, const string& bucket,
411 const string& object, int64* generation)>
412 GenerationGetter;
413
414 /// \brief GCS-based implementation of a writeable file.
415 ///
416 /// Since GCS objects are immutable, this implementation writes to a local
417 /// tmp file and copies it to GCS on flush/close.
418 class GcsWritableFile : public WritableFile {
419 public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)420 GcsWritableFile(const string& bucket, const string& object,
421 GcsFileSystem* filesystem,
422 GcsFileSystem::TimeoutConfig* timeouts,
423 std::function<void()> file_cache_erase,
424 RetryConfig retry_config, bool compose_append,
425 SessionCreator session_creator,
426 ObjectUploader object_uploader, StatusPoller status_poller,
427 GenerationGetter generation_getter)
428 : bucket_(bucket),
429 object_(object),
430 filesystem_(filesystem),
431 timeouts_(timeouts),
432 file_cache_erase_(std::move(file_cache_erase)),
433 sync_needed_(true),
434 retry_config_(retry_config),
435 compose_append_(compose_append),
436 start_offset_(0),
437 session_creator_(std::move(session_creator)),
438 object_uploader_(std::move(object_uploader)),
439 status_poller_(std::move(status_poller)),
440 generation_getter_(std::move(generation_getter)) {
441 // TODO: to make it safer, outfile_ should be constructed from an FD
442 VLOG(3) << "GcsWritableFile: " << GetGcsPath();
443 if (GetTmpFilename(&tmp_content_filename_).ok()) {
444 outfile_.open(tmp_content_filename_,
445 std::ofstream::binary | std::ofstream::app);
446 }
447 }
448
449 /// \brief Constructs the writable file in append mode.
450 ///
451 /// tmp_content_filename should contain a path of an existing temporary file
452 /// with the content to be appended. The class takes ownership of the
453 /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)454 GcsWritableFile(const string& bucket, const string& object,
455 GcsFileSystem* filesystem, const string& tmp_content_filename,
456 GcsFileSystem::TimeoutConfig* timeouts,
457 std::function<void()> file_cache_erase,
458 RetryConfig retry_config, bool compose_append,
459 SessionCreator session_creator,
460 ObjectUploader object_uploader, StatusPoller status_poller,
461 GenerationGetter generation_getter)
462 : bucket_(bucket),
463 object_(object),
464 filesystem_(filesystem),
465 timeouts_(timeouts),
466 file_cache_erase_(std::move(file_cache_erase)),
467 sync_needed_(true),
468 retry_config_(retry_config),
469 compose_append_(compose_append),
470 start_offset_(0),
471 session_creator_(std::move(session_creator)),
472 object_uploader_(std::move(object_uploader)),
473 status_poller_(std::move(status_poller)),
474 generation_getter_(std::move(generation_getter)) {
475 VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
476 << tmp_content_filename;
477 tmp_content_filename_ = tmp_content_filename;
478 outfile_.open(tmp_content_filename_,
479 std::ofstream::binary | std::ofstream::app);
480 }
481
~GcsWritableFile()482 ~GcsWritableFile() override {
483 Close().IgnoreError();
484 std::remove(tmp_content_filename_.c_str());
485 }
486
Append(StringPiece data)487 Status Append(StringPiece data) override {
488 TF_RETURN_IF_ERROR(CheckWritable());
489 VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
490 sync_needed_ = true;
491 outfile_ << data;
492 if (!outfile_.good()) {
493 return errors::Internal(
494 "Could not append to the internal temporary file.");
495 }
496 return Status::OK();
497 }
498
Close()499 Status Close() override {
500 VLOG(3) << "Close:" << GetGcsPath();
501 if (outfile_.is_open()) {
502 Status sync_status = Sync();
503 if (sync_status.ok()) {
504 outfile_.close();
505 }
506 return sync_status;
507 }
508 return Status::OK();
509 }
510
Flush()511 Status Flush() override {
512 VLOG(3) << "Flush:" << GetGcsPath();
513 return Sync();
514 }
515
Name(StringPiece * result) const516 Status Name(StringPiece* result) const override {
517 return errors::Unimplemented("GCSWritableFile does not support Name()");
518 }
519
Sync()520 Status Sync() override {
521 VLOG(3) << "Sync started:" << GetGcsPath();
522 TF_RETURN_IF_ERROR(CheckWritable());
523 if (!sync_needed_) {
524 return Status::OK();
525 }
526 Status status = SyncImpl();
527 VLOG(3) << "Sync finished " << GetGcsPath();
528 if (status.ok()) {
529 sync_needed_ = false;
530 }
531 return status;
532 }
533
Tell(int64 * position)534 Status Tell(int64* position) override {
535 *position = outfile_.tellp();
536 if (*position == -1) {
537 return errors::Internal("tellp on the internal temporary file failed");
538 }
539 return Status::OK();
540 }
541
542 private:
543 /// Copies the current version of the file to GCS.
544 ///
545 /// This SyncImpl() uploads the object to GCS.
546 /// In case of a failure, it resumes failed uploads as recommended by the GCS
547 /// resumable API documentation. When the whole upload needs to be
548 /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()549 Status SyncImpl() {
550 outfile_.flush();
551 if (!outfile_.good()) {
552 return errors::Internal(
553 "Could not write to the internal temporary file.");
554 }
555 UploadSessionHandle session_handle;
556 uint64 start_offset = 0;
557 string object_to_upload = object_;
558 bool should_compose = false;
559 if (compose_append_) {
560 start_offset = start_offset_;
561 // Only compose if the object has already been uploaded to GCS
562 should_compose = start_offset > 0;
563 if (should_compose) {
564 object_to_upload =
565 strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
566 io::Basename(object_), ".", start_offset_);
567 }
568 }
569 TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
570 &session_handle));
571 uint64 already_uploaded = 0;
572 bool first_attempt = true;
573 const Status upload_status = RetryingUtils::CallWithRetries(
574 [&first_attempt, &already_uploaded, &session_handle, &start_offset,
575 this]() {
576 if (session_handle.resumable && !first_attempt) {
577 bool completed;
578 TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
579 session_handle.session_uri, &completed, &already_uploaded));
580 LOG(INFO) << "### RequestUploadSessionStatus: completed = "
581 << completed
582 << ", already_uploaded = " << already_uploaded
583 << ", file = " << GetGcsPath();
584 if (completed) {
585 // Erase the file from the file cache on every successful write.
586 file_cache_erase_();
587 // It's unclear why UploadToSession didn't return OK in the
588 // previous attempt, but GCS reports that the file is fully
589 // uploaded, so succeed.
590 return Status::OK();
591 }
592 }
593 first_attempt = false;
594 return UploadToSession(session_handle.session_uri, start_offset,
595 already_uploaded);
596 },
597 retry_config_);
598 if (upload_status.code() == errors::Code::NOT_FOUND) {
599 // GCS docs recommend retrying the whole upload. We're relying on the
600 // RetryingFileSystem to retry the Sync() call.
601 return errors::Unavailable(
602 strings::StrCat("Upload to gs://", bucket_, "/", object_,
603 " failed, caused by: ", upload_status.ToString()));
604 }
605 if (upload_status.ok()) {
606 if (should_compose) {
607 TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
608 }
609 TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
610 }
611 return upload_status;
612 }
613
CheckWritable() const614 Status CheckWritable() const {
615 if (!outfile_.is_open()) {
616 return errors::FailedPrecondition(
617 "The internal temporary file is not writable.");
618 }
619 return Status::OK();
620 }
621
GetCurrentFileSize(uint64 * size)622 Status GetCurrentFileSize(uint64* size) {
623 const auto tellp = outfile_.tellp();
624 if (tellp == static_cast<std::streampos>(-1)) {
625 return errors::Internal(
626 "Could not get the size of the internal temporary file.");
627 }
628 *size = tellp;
629 return Status::OK();
630 }
631
632 /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)633 Status CreateNewUploadSession(uint64 start_offset,
634 std::string object_to_upload,
635 UploadSessionHandle* session_handle) {
636 uint64 file_size;
637 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
638 return session_creator_(start_offset, object_to_upload, bucket_, file_size,
639 GetGcsPath(), session_handle);
640 }
641
642 /// Appends the data of append_object to the original object and deletes
643 /// append_object.
AppendObject(string append_object)644 Status AppendObject(string append_object) {
645 const string append_object_path = GetGcsPathWithObject(append_object);
646 VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
647
648 int64 generation = 0;
649 TF_RETURN_IF_ERROR(
650 generation_getter_(GetGcsPath(), bucket_, object_, &generation));
651
652 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
653 [&append_object, &generation, this]() {
654 std::unique_ptr<HttpRequest> request;
655 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
656
657 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
658 request->EscapeString(object_),
659 "/compose"));
660
661 const string request_body = strings::StrCat(
662 "{'sourceObjects': [{'name': '", object_,
663 "','objectPrecondition':{'ifGenerationMatch':", generation,
664 "}},{'name': '", append_object, "'}]}");
665 request->SetTimeouts(timeouts_->connect, timeouts_->idle,
666 timeouts_->metadata);
667 request->AddHeader("content-type", "application/json");
668 request->SetPostFromBuffer(request_body.c_str(), request_body.size());
669 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
670 " when composing to ", GetGcsPath());
671 return Status::OK();
672 },
673 retry_config_));
674
675 return RetryingUtils::DeleteWithRetries(
676 [&append_object_path, this]() {
677 return filesystem_->DeleteFile(append_object_path, nullptr);
678 },
679 retry_config_);
680 }
681
682 /// \brief Requests status of a previously initiated upload session.
683 ///
684 /// If the upload has already succeeded, sets 'completed' to true.
685 /// Otherwise sets 'completed' to false and 'uploaded' to the currently
686 /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)687 Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
688 uint64* uploaded) {
689 uint64 file_size;
690 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
691 return status_poller_(session_uri, file_size, GetGcsPath(), completed,
692 uploaded);
693 }
694
695 /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)696 Status UploadToSession(const string& session_uri, uint64 start_offset,
697 uint64 already_uploaded) {
698 uint64 file_size;
699 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
700 Status status =
701 object_uploader_(session_uri, start_offset, already_uploaded,
702 tmp_content_filename_, file_size, GetGcsPath());
703 if (status.ok()) {
704 // Erase the file from the file cache on every successful write.
705 // Note: Only local cache, this does nothing on distributed cache. The
706 // distributed cache clears the cache as it is needed.
707 file_cache_erase_();
708 }
709
710 return status;
711 }
712
GetGcsPathWithObject(string object) const713 string GetGcsPathWithObject(string object) const {
714 return strings::StrCat("gs://", bucket_, "/", object);
715 }
GetGcsPath() const716 string GetGcsPath() const { return GetGcsPathWithObject(object_); }
717
718 string bucket_;
719 string object_;
720 GcsFileSystem* const filesystem_; // Not owned.
721 string tmp_content_filename_;
722 std::ofstream outfile_;
723 GcsFileSystem::TimeoutConfig* timeouts_;
724 std::function<void()> file_cache_erase_;
725 bool sync_needed_; // whether there is buffered data that needs to be synced
726 RetryConfig retry_config_;
727 bool compose_append_;
728 uint64 start_offset_;
729 // Callbacks to the file system used to upload object into GCS.
730 const SessionCreator session_creator_;
731 const ObjectUploader object_uploader_;
732 const StatusPoller status_poller_;
733 const GenerationGetter generation_getter_;
734 };
735
736 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
737 public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)738 GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
739 : data_(std::move(data)), length_(length) {}
data()740 const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()741 uint64 length() override { return length_; }
742
743 private:
744 std::unique_ptr<char[]> data_;
745 uint64 length_;
746 };
747
StringPieceIdentity(StringPiece str,StringPiece * value)748 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
749 *value = str;
750 return true;
751 }
752
753 /// \brief Utility function to split a comma delimited list of strings to an
754 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)755 bool SplitByCommaToLowercaseSet(StringPiece list,
756 std::unordered_set<string>* set) {
757 std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
758 *set = std::unordered_set<string>(vector.begin(), vector.end());
759 return true;
760 }
761
762 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)763 string ZoneToRegion(string* zone) {
764 return zone->substr(0, zone->find_last_of('-'));
765 }
766
767 } // namespace
768
GcsFileSystem(bool make_default_cache)769 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
770 uint64 value;
771 block_size_ = kDefaultBlockSize;
772 size_t max_bytes = kDefaultMaxCacheSize;
773 uint64 max_staleness = kDefaultMaxStaleness;
774
775 http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
776 compute_engine_metadata_client_ =
777 std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
778 auth_provider_ = std::unique_ptr<AuthProvider>(
779 new GoogleAuthProvider(compute_engine_metadata_client_));
780 zone_provider_ = std::unique_ptr<ZoneProvider>(
781 new ComputeEngineZoneProvider(compute_engine_metadata_client_));
782
783 // Apply the sys env override for the readahead buffer size if it's provided.
784 if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
785 block_size_ = value;
786 }
787
788 // Apply the overrides for the block size (MB), max bytes (MB), and max
789 // staleness (seconds) if provided.
790 if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
791 block_size_ = value * 1024 * 1024;
792 }
793
794 if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
795 max_bytes = value * 1024 * 1024;
796 }
797
798 if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
799 max_staleness = value;
800 }
801 if (!make_default_cache) {
802 max_bytes = 0;
803 }
804 VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
805 << "block size = " << block_size_ << " ; "
806 << "max staleness = " << max_staleness;
807 file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
808 // Apply overrides for the stat cache max age and max entries, if provided.
809 uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
810 size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
811 if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
812 stat_cache_max_age = value;
813 }
814 if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
815 stat_cache_max_entries = value;
816 }
817 stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
818 stat_cache_max_entries));
819 // Apply overrides for the matching paths cache max age and max entries, if
820 // provided.
821 uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
822 size_t matching_paths_cache_max_entries =
823 kMatchingPathsCacheDefaultMaxEntries;
824 if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
825 matching_paths_cache_max_age = value;
826 }
827 if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
828 &value)) {
829 matching_paths_cache_max_entries = value;
830 }
831 matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
832 matching_paths_cache_max_age, matching_paths_cache_max_entries));
833
834 bucket_location_cache_.reset(new ExpiringLRUCache<string>(
835 kCacheNeverExpire, kBucketLocationCacheMaxEntries));
836
837 int64 resolve_frequency_secs;
838 if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
839 &resolve_frequency_secs)) {
840 dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
841 VLOG(1) << "GCS DNS cache is enabled. " << kResolveCacheSecs << " = "
842 << resolve_frequency_secs;
843 } else {
844 VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
845 << " = 0 (or is not set)";
846 }
847
848 // Get the additional header
849 StringPiece add_header_contents;
850 if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
851 &add_header_contents)) {
852 size_t split = add_header_contents.find(':', 0);
853
854 if (split != StringPiece::npos) {
855 StringPiece header_name = add_header_contents.substr(0, split);
856 StringPiece header_value = add_header_contents.substr(split + 1);
857
858 if (!header_name.empty() && !header_value.empty()) {
859 additional_header_.reset(new std::pair<const string, const string>(
860 string(header_name), string(header_value)));
861
862 VLOG(1) << "GCS additional header ENABLED. "
863 << "Name: " << additional_header_->first << ", "
864 << "Value: " << additional_header_->second;
865 } else {
866 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
867 << add_header_contents;
868 }
869 } else {
870 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
871 << add_header_contents;
872 }
873 } else {
874 VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
875 }
876
877 // Apply the overrides for request timeouts
878 uint32 timeout_value;
879 if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
880 &timeout_value)) {
881 timeouts_.connect = timeout_value;
882 }
883 if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
884 timeouts_.idle = timeout_value;
885 }
886 if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
887 &timeout_value)) {
888 timeouts_.metadata = timeout_value;
889 }
890 if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
891 timeouts_.read = timeout_value;
892 }
893 if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
894 timeouts_.write = timeout_value;
895 }
896
897 int64 token_value;
898 if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
899 GcsThrottleConfig config;
900 config.enabled = true;
901 config.token_rate = token_value;
902
903 if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
904 config.bucket_size = token_value;
905 }
906
907 if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
908 config.tokens_per_request = token_value;
909 }
910
911 if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
912 config.initial_tokens = token_value;
913 }
914 throttle_.SetConfig(config);
915 }
916
917 GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
918 &allowed_locations_);
919
920 StringPiece append_mode;
921 GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
922 if (append_mode == kComposeAppend) {
923 compose_append_ = true;
924 } else {
925 compose_append_ = false;
926 }
927 }
928
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)929 GcsFileSystem::GcsFileSystem(
930 std::unique_ptr<AuthProvider> auth_provider,
931 std::unique_ptr<HttpRequest::Factory> http_request_factory,
932 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
933 size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
934 size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
935 size_t matching_paths_cache_max_entries, RetryConfig retry_config,
936 TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
937 std::pair<const string, const string>* additional_header,
938 bool compose_append)
939 : timeouts_(timeouts),
940 retry_config_(retry_config),
941 auth_provider_(std::move(auth_provider)),
942 http_request_factory_(std::move(http_request_factory)),
943 zone_provider_(std::move(zone_provider)),
944 block_size_(block_size),
945 file_block_cache_(
946 MakeFileBlockCache(block_size, max_bytes, max_staleness)),
947 stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
948 matching_paths_cache_(new MatchingPathsCache(
949 matching_paths_cache_max_age, matching_paths_cache_max_entries)),
950 bucket_location_cache_(new BucketLocationCache(
951 kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
952 allowed_locations_(allowed_locations),
953 compose_append_(compose_append),
954 additional_header_(additional_header) {}
955
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)956 Status GcsFileSystem::NewRandomAccessFile(
957 const string& fname, TransactionToken* token,
958 std::unique_ptr<RandomAccessFile>* result) {
959 string bucket, object;
960 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
961 TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
962 bool cache_enabled;
963 {
964 mutex_lock l(block_cache_lock_);
965 cache_enabled = file_block_cache_->IsCacheEnabled();
966 }
967 if (cache_enabled) {
968 result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
969 const string& fname,
970 uint64 offset, size_t n,
971 StringPiece* result,
972 char* scratch) {
973 tf_shared_lock l(block_cache_lock_);
974 GcsFileStat stat;
975 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
976 fname, &stat,
977 [this, bucket, object](const string& fname, GcsFileStat* stat) {
978 return UncachedStatForObject(fname, bucket, object, stat);
979 }));
980 if (!file_block_cache_->ValidateAndUpdateFileSignature(
981 fname, stat.generation_number)) {
982 VLOG(1)
983 << "File signature has been changed. Refreshing the cache. Path: "
984 << fname;
985 }
986 *result = StringPiece();
987 size_t bytes_transferred;
988 TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
989 &bytes_transferred));
990 *result = StringPiece(scratch, bytes_transferred);
991 if (bytes_transferred < n) {
992 return errors::OutOfRange("EOF reached, ", result->size(),
993 " bytes were read out of ", n,
994 " bytes requested.");
995 }
996 return Status::OK();
997 }));
998 } else {
999 result->reset(new BufferedGcsRandomAccessFile(
1000 fname, block_size_,
1001 [this, bucket, object](const string& fname, uint64 offset, size_t n,
1002 StringPiece* result, char* scratch) {
1003 *result = StringPiece();
1004 size_t bytes_transferred;
1005 TF_RETURN_IF_ERROR(
1006 LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1007 *result = StringPiece(scratch, bytes_transferred);
1008 if (bytes_transferred < n) {
1009 return errors::OutOfRange("EOF reached, ", result->size(),
1010 " bytes were read out of ", n,
1011 " bytes requested.");
1012 }
1013 return Status::OK();
1014 }));
1015 }
1016 return Status::OK();
1017 }
1018
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1019 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1020 size_t max_bytes,
1021 uint64 max_staleness_secs) {
1022 mutex_lock l(block_cache_lock_);
1023 file_block_cache_ =
1024 MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1025 if (stats_ != nullptr) {
1026 stats_->Configure(this, &throttle_, file_block_cache_.get());
1027 }
1028 }
1029
1030 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1031 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1032 size_t block_size, size_t max_bytes, uint64 max_staleness) {
1033 std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1034 block_size, max_bytes, max_staleness,
1035 [this](const string& filename, size_t offset, size_t n, char* buffer,
1036 size_t* bytes_transferred) {
1037 return LoadBufferFromGCS(filename, offset, n, buffer,
1038 bytes_transferred);
1039 }));
1040 return file_block_cache;
1041 }
1042
1043 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1044 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1045 size_t n, char* buffer,
1046 size_t* bytes_transferred) {
1047 *bytes_transferred = 0;
1048
1049 string bucket, object;
1050 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1051
1052 profiler::TraceMe activity(
1053 [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1054
1055 std::unique_ptr<HttpRequest> request;
1056 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1057 "when reading gs://", bucket, "/", object);
1058
1059 request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1060 request->EscapeString(object)));
1061 request->SetRange(offset, offset + n - 1);
1062 request->SetResultBufferDirect(buffer, n);
1063 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1064
1065 if (stats_ != nullptr) {
1066 stats_->RecordBlockLoadRequest(fname, offset);
1067 }
1068
1069 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1070 bucket, "/", object);
1071
1072 size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1073 *bytes_transferred = bytes_read;
1074 VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1075 << offset << " of size: " << bytes_read;
1076 activity.AppendMetadata([bytes_read]() {
1077 return profiler::TraceMeEncode({{"block_size", bytes_read}});
1078 });
1079
1080 if (stats_ != nullptr) {
1081 stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1082 }
1083
1084 throttle_.RecordResponse(bytes_read);
1085
1086 if (bytes_read < n) {
1087 // Check stat cache to see if we encountered an interrupted read.
1088 GcsFileStat stat;
1089 if (stat_cache_->Lookup(fname, &stat)) {
1090 if (offset + bytes_read < stat.base.length) {
1091 return errors::Internal(strings::Printf(
1092 "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1093 offset));
1094 }
1095 VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1096 << object << " @ " << offset;
1097 }
1098 }
1099
1100 return Status::OK();
1101 }
1102
1103 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1104 Status GcsFileSystem::CreateNewUploadSession(
1105 uint64 start_offset, const std::string& object_to_upload,
1106 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1107 UploadSessionHandle* session_handle) {
1108 std::vector<char> output_buffer;
1109 std::unique_ptr<HttpRequest> request;
1110 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1111
1112 std::string uri = strings::StrCat(
1113 kGcsUploadUriBase, "b/", bucket,
1114 "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1115 request->SetUri(uri);
1116 request->AddHeader("X-Upload-Content-Length",
1117 absl::StrCat(file_size - start_offset));
1118 request->SetPostEmptyBody();
1119 request->SetResultBuffer(&output_buffer);
1120 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1121 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1122 " when initiating an upload to ", gcs_path);
1123 if (session_handle != nullptr) {
1124 session_handle->resumable = true;
1125 session_handle->session_uri = request->GetResponseHeader("Location");
1126 if (session_handle->session_uri.empty()) {
1127 return errors::Internal("Unexpected response from GCS when writing to ",
1128 gcs_path, ": 'Location' header not returned.");
1129 }
1130 }
1131 return Status::OK();
1132 }
1133
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1134 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1135 uint64 start_offset,
1136 uint64 already_uploaded,
1137 const std::string& tmp_content_filename,
1138 uint64 file_size,
1139 const std::string& file_path) {
1140 std::unique_ptr<HttpRequest> request;
1141 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1142 request->SetUri(session_uri);
1143 if (file_size > 0) {
1144 request->AddHeader("Content-Range",
1145 strings::StrCat("bytes ", already_uploaded, "-",
1146 file_size - start_offset - 1, "/",
1147 file_size - start_offset));
1148 }
1149 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1150
1151 TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1152 start_offset + already_uploaded));
1153 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1154 file_path);
1155 return Status::OK();
1156 }
1157
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1158 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1159 uint64 file_size,
1160 const std::string& gcs_path,
1161 bool* completed,
1162 uint64* uploaded) {
1163 CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1164 "param 'completed' == nullptr."; // Crash ok
1165 CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1166 "param 'uploaded' == nullptr."; // Crash ok
1167 std::unique_ptr<HttpRequest> request;
1168 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1169 request->SetUri(session_uri);
1170 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1171 request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1172 request->SetPutEmptyBody();
1173 Status status = request->Send();
1174 if (status.ok()) {
1175 *completed = true;
1176 return Status::OK();
1177 }
1178 *completed = false;
1179 if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1180 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1181 }
1182 const std::string received_range = request->GetResponseHeader("Range");
1183 if (received_range.empty()) {
1184 // This means GCS doesn't have any bytes of the file yet.
1185 *uploaded = 0;
1186 } else {
1187 StringPiece range_piece(received_range);
1188 absl::ConsumePrefix(&range_piece,
1189 "bytes="); // May or may not be present.
1190
1191 auto return_error = [](const std::string& gcs_path,
1192 const std::string& error_message) {
1193 return errors::Internal("Unexpected response from GCS when writing ",
1194 gcs_path, ": ", error_message);
1195 };
1196
1197 std::vector<string> range_strs = str_util::Split(range_piece, '-');
1198 if (range_strs.size() != 2) {
1199 return return_error(gcs_path, "Range header '" + received_range +
1200 "' could not be parsed.");
1201 }
1202
1203 std::vector<int64> range_parts;
1204 for (const std::string& range_str : range_strs) {
1205 int64 tmp;
1206 if (strings::safe_strto64(range_str, &tmp)) {
1207 range_parts.push_back(tmp);
1208 } else {
1209 return return_error(gcs_path, "Range header '" + received_range +
1210 "' could not be parsed.");
1211 }
1212 }
1213
1214 if (range_parts[0] != 0) {
1215 return return_error(gcs_path, "The returned range '" + received_range +
1216 "' does not start at zero.");
1217 }
1218 // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1219 *uploaded = range_parts[1] + 1;
1220 }
1221 return Status::OK();
1222 }
1223
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1224 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1225 bool empty_object_ok,
1226 string* bucket, string* object) {
1227 StringPiece parsed_scheme, bucketp, objectp;
1228 io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1229 if (parsed_scheme != scheme) {
1230 return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1231 fname);
1232 }
1233 *bucket = string(bucketp);
1234 if (bucket->empty() || *bucket == ".") {
1235 return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1236 fname);
1237 }
1238 absl::ConsumePrefix(&objectp, "/");
1239 *object = string(objectp);
1240 if (!empty_object_ok && object->empty()) {
1241 return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1242 fname);
1243 }
1244 return Status::OK();
1245 }
1246
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1247 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1248 string* bucket, string* object) {
1249 return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1250 }
1251
ClearFileCaches(const string & fname)1252 void GcsFileSystem::ClearFileCaches(const string& fname) {
1253 tf_shared_lock l(block_cache_lock_);
1254 file_block_cache_->RemoveFile(fname);
1255 stat_cache_->Delete(fname);
1256 // TODO(rxsang): Remove the patterns that matche the file in
1257 // MatchingPathsCache as well.
1258 }
1259
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1260 Status GcsFileSystem::NewWritableFile(const string& fname,
1261 TransactionToken* token,
1262 std::unique_ptr<WritableFile>* result) {
1263 string bucket, object;
1264 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1265
1266 auto session_creator =
1267 [this](uint64 start_offset, const std::string& object_to_upload,
1268 const std::string& bucket, uint64 file_size,
1269 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1270 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1271 file_size, gcs_path, session_handle);
1272 };
1273 auto object_uploader =
1274 [this](const std::string& session_uri, uint64 start_offset,
1275 uint64 already_uploaded, const std::string& tmp_content_filename,
1276 uint64 file_size, const std::string& file_path) {
1277 return UploadToSession(session_uri, start_offset, already_uploaded,
1278 tmp_content_filename, file_size, file_path);
1279 };
1280 auto status_poller = [this](const string& session_uri, uint64 file_size,
1281 const std::string& gcs_path, bool* completed,
1282 uint64* uploaded) {
1283 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1284 completed, uploaded);
1285 };
1286
1287 auto generation_getter = [this](const string& fname, const string& bucket,
1288 const string& object, int64* generation) {
1289 GcsFileStat stat;
1290 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1291 [&fname, &bucket, &object, &stat, this]() {
1292 return UncachedStatForObject(fname, bucket, object, &stat);
1293 },
1294 retry_config_));
1295 *generation = stat.generation_number;
1296 return Status::OK();
1297 };
1298
1299 result->reset(new GcsWritableFile(
1300 bucket, object, this, &timeouts_,
1301 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1302 compose_append_, session_creator, object_uploader, status_poller,
1303 generation_getter));
1304 return Status::OK();
1305 }
1306
1307 // Reads the file from GCS in chunks and stores it in a tmp file,
1308 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1309 Status GcsFileSystem::NewAppendableFile(const string& fname,
1310 TransactionToken* token,
1311 std::unique_ptr<WritableFile>* result) {
1312 std::unique_ptr<RandomAccessFile> reader;
1313 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1314 std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1315 Status status;
1316 uint64 offset = 0;
1317 StringPiece read_chunk;
1318
1319 // Read the file from GCS in chunks and save it to a tmp file.
1320 string old_content_filename;
1321 TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1322 std::ofstream old_content(old_content_filename, std::ofstream::binary);
1323 while (true) {
1324 status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1325 buffer.get());
1326 if (status.ok()) {
1327 old_content << read_chunk;
1328 offset += kReadAppendableFileBufferSize;
1329 } else if (status.code() == error::NOT_FOUND) {
1330 // New file, there is no existing content in it.
1331 break;
1332 } else if (status.code() == error::OUT_OF_RANGE) {
1333 // Expected, this means we reached EOF.
1334 old_content << read_chunk;
1335 break;
1336 } else {
1337 return status;
1338 }
1339 }
1340 old_content.close();
1341
1342 auto session_creator =
1343 [this](uint64 start_offset, const std::string& object_to_upload,
1344 const std::string& bucket, uint64 file_size,
1345 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1346 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1347 file_size, gcs_path, session_handle);
1348 };
1349 auto object_uploader =
1350 [this](const std::string& session_uri, uint64 start_offset,
1351 uint64 already_uploaded, const std::string& tmp_content_filename,
1352 uint64 file_size, const std::string& file_path) {
1353 return UploadToSession(session_uri, start_offset, already_uploaded,
1354 tmp_content_filename, file_size, file_path);
1355 };
1356
1357 auto status_poller = [this](const string& session_uri, uint64 file_size,
1358 const std::string& gcs_path, bool* completed,
1359 uint64* uploaded) {
1360 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1361 completed, uploaded);
1362 };
1363
1364 auto generation_getter = [this](const string& fname, const string& bucket,
1365 const string& object, int64* generation) {
1366 GcsFileStat stat;
1367 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1368 [&fname, &bucket, &object, &stat, this]() {
1369 return UncachedStatForObject(fname, bucket, object, &stat);
1370 },
1371 retry_config_));
1372 *generation = stat.generation_number;
1373 return Status::OK();
1374 };
1375
1376 // Create a writable file and pass the old content to it.
1377 string bucket, object;
1378 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1379 result->reset(new GcsWritableFile(
1380 bucket, object, this, old_content_filename, &timeouts_,
1381 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1382 compose_append_, session_creator, object_uploader, status_poller,
1383 generation_getter));
1384 return Status::OK();
1385 }
1386
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1387 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1388 const string& fname, TransactionToken* token,
1389 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1390 uint64 size;
1391 TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1392 std::unique_ptr<char[]> data(new char[size]);
1393
1394 std::unique_ptr<RandomAccessFile> file;
1395 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1396
1397 StringPiece piece;
1398 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1399
1400 result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1401 return Status::OK();
1402 }
1403
FileExists(const string & fname,TransactionToken * token)1404 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1405 string bucket, object;
1406 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1407 if (object.empty()) {
1408 bool result;
1409 TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1410 if (result) {
1411 return Status::OK();
1412 }
1413 }
1414
1415 // Check if the object exists.
1416 GcsFileStat stat;
1417 const Status status = StatForObject(fname, bucket, object, &stat);
1418 if (status.code() != errors::Code::NOT_FOUND) {
1419 return status;
1420 }
1421
1422 // Check if the folder exists.
1423 bool result;
1424 TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1425 if (result) {
1426 return Status::OK();
1427 }
1428 return errors::NotFound("The specified path ", fname, " was not found.");
1429 }
1430
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1431 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1432 const string& object, bool* result) {
1433 GcsFileStat stat;
1434 const Status status = StatForObject(fname, bucket, object, &stat);
1435 switch (status.code()) {
1436 case errors::Code::OK:
1437 *result = !stat.base.is_directory;
1438 return Status::OK();
1439 case errors::Code::NOT_FOUND:
1440 *result = false;
1441 return Status::OK();
1442 default:
1443 return status;
1444 }
1445 }
1446
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1447 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1448 const string& bucket,
1449 const string& object,
1450 GcsFileStat* stat) {
1451 std::vector<char> output_buffer;
1452 std::unique_ptr<HttpRequest> request;
1453 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1454 " when reading metadata of gs://", bucket,
1455 "/", object);
1456
1457 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1458 request->EscapeString(object),
1459 "?fields=size%2Cgeneration%2Cupdated"));
1460 request->SetResultBuffer(&output_buffer);
1461 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1462
1463 if (stats_ != nullptr) {
1464 stats_->RecordStatObjectRequest();
1465 }
1466
1467 TF_RETURN_WITH_CONTEXT_IF_ERROR(
1468 request->Send(), " when reading metadata of gs://", bucket, "/", object);
1469
1470 Json::Value root;
1471 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1472
1473 // Parse file size.
1474 TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1475
1476 // Parse generation number.
1477 TF_RETURN_IF_ERROR(
1478 GetInt64Value(root, "generation", &stat->generation_number));
1479
1480 // Parse file modification time.
1481 string updated;
1482 TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1483 TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1484
1485 VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1486 << " length: " << stat->base.length
1487 << " generation: " << stat->generation_number
1488 << "; mtime_nsec: " << stat->base.mtime_nsec
1489 << "; updated: " << updated;
1490
1491 if (str_util::EndsWith(fname, "/")) {
1492 // In GCS a path can be both a directory and a file, both it is uncommon for
1493 // other file systems. To avoid the ambiguity, if a path ends with "/" in
1494 // GCS, we always regard it as a directory mark or a virtual directory.
1495 stat->base.is_directory = true;
1496 } else {
1497 stat->base.is_directory = false;
1498 }
1499 return Status::OK();
1500 }
1501
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1502 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1503 const string& object, GcsFileStat* stat) {
1504 if (object.empty()) {
1505 return errors::InvalidArgument(strings::Printf(
1506 "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1507 }
1508
1509 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1510 fname, stat,
1511 [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1512 return UncachedStatForObject(fname, bucket, object, stat);
1513 }));
1514 return Status::OK();
1515 }
1516
BucketExists(const string & bucket,bool * result)1517 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1518 const Status status = GetBucketMetadata(bucket, nullptr);
1519 switch (status.code()) {
1520 case errors::Code::OK:
1521 *result = true;
1522 return Status::OK();
1523 case errors::Code::NOT_FOUND:
1524 *result = false;
1525 return Status::OK();
1526 default:
1527 return status;
1528 }
1529 }
1530
CheckBucketLocationConstraint(const string & bucket)1531 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1532 if (allowed_locations_.empty()) {
1533 return Status::OK();
1534 }
1535
1536 // Avoid calling external API's in the constructor
1537 if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1538 string zone;
1539 TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1540 allowed_locations_.insert(ZoneToRegion(&zone));
1541 }
1542
1543 string location;
1544 TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1545 if (allowed_locations_.find(location) != allowed_locations_.end()) {
1546 return Status::OK();
1547 }
1548
1549 return errors::FailedPrecondition(strings::Printf(
1550 "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1551 bucket.c_str(), location.c_str(),
1552 absl::StrJoin(allowed_locations_, ", ").c_str()));
1553 }
1554
GetBucketLocation(const string & bucket,string * location)1555 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1556 string* location) {
1557 auto compute_func = [this](const string& bucket, string* location) {
1558 std::vector<char> result_buffer;
1559 Status status = GetBucketMetadata(bucket, &result_buffer);
1560 Json::Value result;
1561 TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1562 string bucket_location;
1563 TF_RETURN_IF_ERROR(
1564 GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1565 // Lowercase the GCS location to be case insensitive for allowed locations.
1566 *location = absl::AsciiStrToLower(bucket_location);
1567 return Status::OK();
1568 };
1569
1570 TF_RETURN_IF_ERROR(
1571 bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1572
1573 return Status::OK();
1574 }
1575
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1576 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1577 std::vector<char>* result_buffer) {
1578 std::unique_ptr<HttpRequest> request;
1579 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1580 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1581
1582 if (result_buffer != nullptr) {
1583 request->SetResultBuffer(result_buffer);
1584 }
1585
1586 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1587 return request->Send();
1588 }
1589
FolderExists(const string & dirname,bool * result)1590 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1591 StatCache::ComputeFunc compute_func = [this](const string& dirname,
1592 GcsFileStat* stat) {
1593 std::vector<string> children;
1594 TF_RETURN_IF_ERROR(
1595 GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1596 true /* include_self_directory_marker */));
1597 if (!children.empty()) {
1598 stat->base = DIRECTORY_STAT;
1599 return Status::OK();
1600 } else {
1601 return errors::InvalidArgument("Not a directory!");
1602 }
1603 };
1604 GcsFileStat stat;
1605 Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1606 compute_func);
1607 if (s.ok()) {
1608 *result = stat.base.is_directory;
1609 return Status::OK();
1610 }
1611 if (errors::IsInvalidArgument(s)) {
1612 *result = false;
1613 return Status::OK();
1614 }
1615 return s;
1616 }
1617
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1618 Status GcsFileSystem::GetChildren(const string& dirname,
1619 TransactionToken* token,
1620 std::vector<string>* result) {
1621 return GetChildrenBounded(dirname, UINT64_MAX, result,
1622 false /* recursively */,
1623 false /* include_self_directory_marker */);
1624 }
1625
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1626 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1627 TransactionToken* token,
1628 std::vector<string>* results) {
1629 MatchingPathsCache::ComputeFunc compute_func =
1630 [this](const string& pattern, std::vector<string>* results) {
1631 results->clear();
1632 // Find the fixed prefix by looking for the first wildcard.
1633 const string& fixed_prefix =
1634 pattern.substr(0, pattern.find_first_of("*?[\\"));
1635 const string dir(this->Dirname(fixed_prefix));
1636 if (dir.empty()) {
1637 return errors::InvalidArgument(
1638 "A GCS pattern doesn't have a bucket name: ", pattern);
1639 }
1640 std::vector<string> all_files;
1641 TF_RETURN_IF_ERROR(GetChildrenBounded(
1642 dir, UINT64_MAX, &all_files, true /* recursively */,
1643 false /* include_self_directory_marker */));
1644
1645 const auto& files_and_folders = AddAllSubpaths(all_files);
1646
1647 // To handle `/` in the object names, we need to remove it from `dir`
1648 // and then use `StrCat` to insert it back.
1649 const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1650
1651 // Match all obtained paths to the input pattern.
1652 for (const auto& path : files_and_folders) {
1653 // Manually construct the path instead of using `JoinPath` for the
1654 // cases where `path` starts with a `/` (which is a valid character in
1655 // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1656 // removing duplicate slashes. We know that `dir_no_slash` does not
1657 // end in `/`, so we are safe inserting the new `/` here as the path
1658 // separator.
1659 const string full_path = strings::StrCat(dir_no_slash, "/", path);
1660 if (this->Match(full_path, pattern)) {
1661 results->push_back(full_path);
1662 }
1663 }
1664 return Status::OK();
1665 };
1666 TF_RETURN_IF_ERROR(
1667 matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1668 return Status::OK();
1669 }
1670
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1671 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1672 uint64 max_results,
1673 std::vector<string>* result,
1674 bool recursive,
1675 bool include_self_directory_marker) {
1676 if (!result) {
1677 return errors::InvalidArgument("'result' cannot be null");
1678 }
1679 string bucket, object_prefix;
1680 TF_RETURN_IF_ERROR(
1681 ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1682
1683 string nextPageToken;
1684 uint64 retrieved_results = 0;
1685 while (true) { // A loop over multiple result pages.
1686 std::vector<char> output_buffer;
1687 std::unique_ptr<HttpRequest> request;
1688 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1689 auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1690 if (recursive) {
1691 uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1692 } else {
1693 // Set "/" as a delimiter to ask GCS to treat subfolders as children
1694 // and return them in "prefixes".
1695 uri = strings::StrCat(uri,
1696 "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1697 uri = strings::StrCat(uri, "&delimiter=%2F");
1698 }
1699 if (!object_prefix.empty()) {
1700 uri = strings::StrCat(uri,
1701 "&prefix=", request->EscapeString(object_prefix));
1702 }
1703 if (!nextPageToken.empty()) {
1704 uri = strings::StrCat(
1705 uri, "&pageToken=", request->EscapeString(nextPageToken));
1706 }
1707 if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1708 uri =
1709 strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1710 }
1711 request->SetUri(uri);
1712 request->SetResultBuffer(&output_buffer);
1713 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1714
1715 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1716 Json::Value root;
1717 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1718 const auto items = root.get("items", Json::Value::null);
1719 if (!items.isNull()) {
1720 if (!items.isArray()) {
1721 return errors::Internal(
1722 "Expected an array 'items' in the GCS response.");
1723 }
1724 for (size_t i = 0; i < items.size(); i++) {
1725 const auto item = items.get(i, Json::Value::null);
1726 if (!item.isObject()) {
1727 return errors::Internal(
1728 "Unexpected JSON format: 'items' should be a list of objects.");
1729 }
1730 string name;
1731 TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1732 // The names should be relative to the 'dirname'. That means the
1733 // 'object_prefix', which is part of 'dirname', should be removed from
1734 // the beginning of 'name'.
1735 StringPiece relative_path(name);
1736 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1737 return errors::Internal(strings::StrCat(
1738 "Unexpected response: the returned file name ", name,
1739 " doesn't match the prefix ", object_prefix));
1740 }
1741 if (!relative_path.empty() || include_self_directory_marker) {
1742 result->emplace_back(relative_path);
1743 }
1744 if (++retrieved_results >= max_results) {
1745 return Status::OK();
1746 }
1747 }
1748 }
1749 const auto prefixes = root.get("prefixes", Json::Value::null);
1750 if (!prefixes.isNull()) {
1751 // Subfolders are returned for the non-recursive mode.
1752 if (!prefixes.isArray()) {
1753 return errors::Internal(
1754 "'prefixes' was expected to be an array in the GCS response.");
1755 }
1756 for (size_t i = 0; i < prefixes.size(); i++) {
1757 const auto prefix = prefixes.get(i, Json::Value::null);
1758 if (prefix.isNull() || !prefix.isString()) {
1759 return errors::Internal(
1760 "'prefixes' was expected to be an array of strings in the GCS "
1761 "response.");
1762 }
1763 const string& prefix_str = prefix.asString();
1764 StringPiece relative_path(prefix_str);
1765 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1766 return errors::Internal(
1767 "Unexpected response: the returned folder name ", prefix_str,
1768 " doesn't match the prefix ", object_prefix);
1769 }
1770 result->emplace_back(relative_path);
1771 if (++retrieved_results >= max_results) {
1772 return Status::OK();
1773 }
1774 }
1775 }
1776 const auto token = root.get("nextPageToken", Json::Value::null);
1777 if (token.isNull()) {
1778 return Status::OK();
1779 }
1780 if (!token.isString()) {
1781 return errors::Internal(
1782 "Unexpected response: nextPageToken is not a string");
1783 }
1784 nextPageToken = token.asString();
1785 }
1786 }
1787
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1788 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1789 FileStatistics* stat) {
1790 if (!stat) {
1791 return errors::Internal("'stat' cannot be nullptr.");
1792 }
1793 string bucket, object;
1794 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1795 if (object.empty()) {
1796 bool is_bucket;
1797 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1798 if (is_bucket) {
1799 *stat = DIRECTORY_STAT;
1800 return Status::OK();
1801 }
1802 return errors::NotFound("The specified bucket ", fname, " was not found.");
1803 }
1804
1805 GcsFileStat gcs_stat;
1806 const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1807 if (status.ok()) {
1808 *stat = gcs_stat.base;
1809 return Status::OK();
1810 }
1811 if (status.code() != errors::Code::NOT_FOUND) {
1812 return status;
1813 }
1814 bool is_folder;
1815 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1816 if (is_folder) {
1817 *stat = DIRECTORY_STAT;
1818 return Status::OK();
1819 }
1820 return errors::NotFound("The specified path ", fname, " was not found.");
1821 }
1822
DeleteFile(const string & fname,TransactionToken * token)1823 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1824 string bucket, object;
1825 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1826
1827 std::unique_ptr<HttpRequest> request;
1828 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1829 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1830 request->EscapeString(object)));
1831 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1832 request->SetDeleteRequest();
1833
1834 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1835 ClearFileCaches(fname);
1836 return Status::OK();
1837 }
1838
CreateDir(const string & dirname,TransactionToken * token)1839 Status GcsFileSystem::CreateDir(const string& dirname,
1840 TransactionToken* token) {
1841 string dirname_with_slash = MaybeAppendSlash(dirname);
1842 VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1843 << " and dirname_with_slash: " << dirname_with_slash;
1844 string bucket, object;
1845 TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1846 &bucket, &object));
1847 if (object.empty()) {
1848 bool is_bucket;
1849 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1850 return is_bucket ? Status::OK()
1851 : errors::NotFound("The specified bucket ",
1852 dirname_with_slash, " was not found.");
1853 }
1854
1855 if (FileExists(dirname_with_slash, token).ok()) {
1856 // Use the original name for a correct error here.
1857 VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1858 return errors::AlreadyExists(dirname);
1859 }
1860
1861 std::unique_ptr<HttpRequest> request;
1862 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1863
1864 request->SetUri(strings::StrCat(
1865 kGcsUploadUriBase, "b/", bucket,
1866 "/o?uploadType=media&name=", request->EscapeString(object),
1867 // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1868 // will be returned if the object already exists, so avoid reuploading.
1869 "&ifGenerationMatch=0"));
1870
1871 request->SetPostEmptyBody();
1872 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1873 const Status& status = request->Send();
1874 if (status.ok()) {
1875 VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1876 return Status::OK();
1877 }
1878 if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1879 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1880 dirname_with_slash);
1881 }
1882 VLOG(3) << "Ignoring directory already exists on object "
1883 << dirname_with_slash;
1884 return errors::AlreadyExists(dirname);
1885 }
1886
1887 // Checks that the directory is empty (i.e no objects with this prefix exist).
1888 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1889 Status GcsFileSystem::DeleteDir(const string& dirname,
1890 TransactionToken* token) {
1891 std::vector<string> children;
1892 // A directory is considered empty either if there are no matching objects
1893 // with the corresponding name prefix or if there is exactly one matching
1894 // object and it is the directory marker. Therefore we need to retrieve
1895 // at most two children for the prefix to detect if a directory is empty.
1896 TF_RETURN_IF_ERROR(
1897 GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1898 true /* include_self_directory_marker */));
1899
1900 if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1901 return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1902 }
1903 if (children.size() == 1 && children[0].empty()) {
1904 // This is the directory marker object. Delete it.
1905 return DeleteFile(MaybeAppendSlash(dirname), token);
1906 }
1907 return Status::OK();
1908 }
1909
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1910 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1911 uint64* file_size) {
1912 if (!file_size) {
1913 return errors::Internal("'file_size' cannot be nullptr.");
1914 }
1915
1916 // Only validate the name.
1917 string bucket, object;
1918 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1919
1920 FileStatistics stat;
1921 TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1922 *file_size = stat.length;
1923 return Status::OK();
1924 }
1925
RenameFile(const string & src,const string & target,TransactionToken * token)1926 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1927 TransactionToken* token) {
1928 if (!IsDirectory(src, token).ok()) {
1929 return RenameObject(src, target);
1930 }
1931 // Rename all individual objects in the directory one by one.
1932 std::vector<string> children;
1933 TF_RETURN_IF_ERROR(
1934 GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1935 true /* include_self_directory_marker */));
1936 for (const string& subpath : children) {
1937 TF_RETURN_IF_ERROR(
1938 RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1939 }
1940 return Status::OK();
1941 }
1942
1943 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1944 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1945 VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1946 string src_bucket, src_object, target_bucket, target_object;
1947 TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1948 TF_RETURN_IF_ERROR(
1949 ParseGcsPath(target, false, &target_bucket, &target_object));
1950
1951 std::unique_ptr<HttpRequest> request;
1952 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1953 request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1954 request->EscapeString(src_object),
1955 "/rewriteTo/b/", target_bucket, "/o/",
1956 request->EscapeString(target_object)));
1957 request->SetPostEmptyBody();
1958 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1959 std::vector<char> output_buffer;
1960 request->SetResultBuffer(&output_buffer);
1961 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1962 " to ", target);
1963 // Flush the target from the caches. The source will be flushed in the
1964 // DeleteFile call below.
1965 ClearFileCaches(target);
1966 Json::Value root;
1967 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1968 bool done;
1969 TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1970 if (!done) {
1971 // If GCS didn't complete rewrite in one call, this means that a large file
1972 // is being copied to a bucket with a different storage class or location,
1973 // which requires multiple rewrite calls.
1974 // TODO(surkov): implement multi-step rewrites.
1975 return errors::Unimplemented(
1976 "Couldn't rename ", src, " to ", target,
1977 ": moving large files between buckets with different "
1978 "locations or storage classes is not supported.");
1979 }
1980
1981 VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1982 // In case the delete API call failed, but the deletion actually happened
1983 // on the server side, we can't just retry the whole RenameFile operation
1984 // because the source object is already gone.
1985 return RetryingUtils::DeleteWithRetries(
1986 [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1987 }
1988
IsDirectory(const string & fname,TransactionToken * token)1989 Status GcsFileSystem::IsDirectory(const string& fname,
1990 TransactionToken* token) {
1991 string bucket, object;
1992 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1993 if (object.empty()) {
1994 bool is_bucket;
1995 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1996 if (is_bucket) {
1997 return Status::OK();
1998 }
1999 return errors::NotFound("The specified bucket gs://", bucket,
2000 " was not found.");
2001 }
2002 bool is_folder;
2003 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2004 if (is_folder) {
2005 return Status::OK();
2006 }
2007 bool is_object;
2008 TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2009 if (is_object) {
2010 return errors::FailedPrecondition("The specified path ", fname,
2011 " is not a directory.");
2012 }
2013 return errors::NotFound("The specified path ", fname, " was not found.");
2014 }
2015
DeleteRecursively(const string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)2016 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2017 TransactionToken* token,
2018 int64* undeleted_files,
2019 int64* undeleted_dirs) {
2020 if (!undeleted_files || !undeleted_dirs) {
2021 return errors::Internal(
2022 "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2023 }
2024 *undeleted_files = 0;
2025 *undeleted_dirs = 0;
2026 if (!IsDirectory(dirname, token).ok()) {
2027 *undeleted_dirs = 1;
2028 return Status(
2029 error::NOT_FOUND,
2030 strings::StrCat(dirname, " doesn't exist or not a directory."));
2031 }
2032 std::vector<string> all_objects;
2033 // Get all children in the directory recursively.
2034 TF_RETURN_IF_ERROR(GetChildrenBounded(
2035 dirname, UINT64_MAX, &all_objects, true /* recursively */,
2036 true /* include_self_directory_marker */));
2037 for (const string& object : all_objects) {
2038 const string& full_path = JoinGcsPath(dirname, object);
2039 // Delete all objects including directory markers for subfolders.
2040 // Since DeleteRecursively returns OK if individual file deletions fail,
2041 // and therefore RetryingFileSystem won't pay attention to the failures,
2042 // we need to make sure these failures are properly retried.
2043 const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2044 [this, &full_path, token]() { return DeleteFile(full_path, token); },
2045 retry_config_);
2046 if (!delete_file_status.ok()) {
2047 if (IsDirectory(full_path, token).ok()) {
2048 // The object is a directory marker.
2049 (*undeleted_dirs)++;
2050 } else {
2051 (*undeleted_files)++;
2052 }
2053 }
2054 }
2055 return Status::OK();
2056 }
2057
2058 // Flushes all caches for filesystem metadata and file contents. Useful for
2059 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2060 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2061 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2062 tf_shared_lock l(block_cache_lock_);
2063 file_block_cache_->Flush();
2064 stat_cache_->Clear();
2065 matching_paths_cache_->Clear();
2066 bucket_location_cache_->Clear();
2067 }
2068
SetStats(GcsStatsInterface * stats)2069 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2070 CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2071 CHECK(stats != nullptr);
2072 mutex_lock l(block_cache_lock_);
2073 stats_ = stats;
2074 stats_->Configure(this, &throttle_, file_block_cache_.get());
2075 }
2076
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2077 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2078 tf_shared_lock l(block_cache_lock_);
2079 if (file_block_cache_ == nullptr) {
2080 LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2081 "cache object. This may result in not exporting the intended "
2082 "monitoring data";
2083 return;
2084 }
2085 file_block_cache_->SetStats(cache_stats);
2086 }
2087
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2088 void GcsFileSystem::SetAuthProvider(
2089 std::unique_ptr<AuthProvider> auth_provider) {
2090 mutex_lock l(mu_);
2091 auth_provider_ = std::move(auth_provider);
2092 }
2093
2094 // Creates an HttpRequest and sets several parameters that are common to all
2095 // requests. All code (in GcsFileSystem) that creates an HttpRequest should
2096 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2097 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2098 std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2099 if (dns_cache_) {
2100 dns_cache_->AnnotateRequest(new_request.get());
2101 }
2102
2103 string auth_token;
2104 {
2105 tf_shared_lock l(mu_);
2106 TF_RETURN_IF_ERROR(
2107 AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2108 }
2109
2110 new_request->AddAuthBearerHeader(auth_token);
2111
2112 if (additional_header_) {
2113 new_request->AddHeader(additional_header_->first,
2114 additional_header_->second);
2115 }
2116
2117 if (stats_ != nullptr) {
2118 new_request->SetRequestStats(stats_->HttpStats());
2119 }
2120
2121 if (!throttle_.AdmitRequest()) {
2122 return errors::Unavailable("Request throttled");
2123 }
2124
2125 *request = std::move(new_request);
2126 return Status::OK();
2127 }
2128
2129 } // namespace tensorflow
2130
2131 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2132 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2133 // gcs_file_system will not be registered as a file system since the
2134 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2135 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2136 // This option is set ON/OFF in the GCP TPU tensorflow config.
2137 // Initialize gcs_file_system
2138 REGISTER_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2139