1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/gcs_file_system.h"
17
18 #include <stdio.h>
19 #include <unistd.h>
20
21 #include <algorithm>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <cstring>
25 #include <fstream>
26 #include <vector>
27
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/strcat.h"
30 #ifdef _WIN32
31 #include <io.h> // for _mktemp
32 #endif
33 #include "absl/base/macros.h"
34 #include "json/json.h"
35 #include "tensorflow/core/lib/gtl/map_util.h"
36 #include "tensorflow/core/platform/cloud/curl_http_request.h"
37 #include "tensorflow/core/platform/cloud/file_block_cache.h"
38 #include "tensorflow/core/platform/cloud/google_auth_provider.h"
39 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
40 #include "tensorflow/core/platform/cloud/time_util.h"
41 #include "tensorflow/core/platform/env.h"
42 #include "tensorflow/core/platform/errors.h"
43 #include "tensorflow/core/platform/mutex.h"
44 #include "tensorflow/core/platform/numbers.h"
45 #include "tensorflow/core/platform/path.h"
46 #include "tensorflow/core/platform/protobuf.h"
47 #include "tensorflow/core/platform/retrying_utils.h"
48 #include "tensorflow/core/platform/str_util.h"
49 #include "tensorflow/core/platform/stringprintf.h"
50 #include "tensorflow/core/platform/thread_annotations.h"
51 #include "tensorflow/core/profiler/lib/traceme.h"
52
53 #ifdef _WIN32
54 #ifdef DeleteFile
55 #undef DeleteFile
56 #endif
57 #endif
58
59 namespace tensorflow {
60 namespace {
61
62 constexpr char kGcsUriBase[] = "https://www.googleapis.com/storage/v1/";
63 constexpr char kGcsUploadUriBase[] =
64 "https://www.googleapis.com/upload/storage/v1/";
65 constexpr char kStorageHost[] = "storage.googleapis.com";
66 constexpr char kBucketMetadataLocationKey[] = "location";
67 constexpr size_t kReadAppendableFileBufferSize = 1024 * 1024; // In bytes.
68 constexpr int kGetChildrenDefaultPageSize = 1000;
69 // The HTTP response code "308 Resume Incomplete".
70 constexpr uint64 HTTP_CODE_RESUME_INCOMPLETE = 308;
71 // The HTTP response code "412 Precondition Failed".
72 constexpr uint64 HTTP_CODE_PRECONDITION_FAILED = 412;
73 // The environment variable that overrides the size of the readahead buffer.
74 ABSL_DEPRECATED("Use GCS_READ_CACHE_BLOCK_SIZE_MB instead.")
75 constexpr char kReadaheadBufferSize[] = "GCS_READAHEAD_BUFFER_SIZE_BYTES";
76 // The environment variable that overrides the maximum age of entries in the
77 // Stat cache. A value of 0 means nothing is cached.
78 constexpr char kStatCacheMaxAge[] = "GCS_STAT_CACHE_MAX_AGE";
79 constexpr uint64 kStatCacheDefaultMaxAge = 5;
80 // The environment variable that overrides the maximum number of entries in the
81 // Stat cache.
82 constexpr char kStatCacheMaxEntries[] = "GCS_STAT_CACHE_MAX_ENTRIES";
83 constexpr size_t kStatCacheDefaultMaxEntries = 1024;
84 // The environment variable that overrides the maximum age of entries in the
85 // GetMatchingPaths cache. A value of 0 (the default) means nothing is cached.
86 constexpr char kMatchingPathsCacheMaxAge[] = "GCS_MATCHING_PATHS_CACHE_MAX_AGE";
87 constexpr uint64 kMatchingPathsCacheDefaultMaxAge = 0;
88 // The environment variable that overrides the maximum number of entries in the
89 // GetMatchingPaths cache.
90 constexpr char kMatchingPathsCacheMaxEntries[] =
91 "GCS_MATCHING_PATHS_CACHE_MAX_ENTRIES";
92 constexpr size_t kMatchingPathsCacheDefaultMaxEntries = 1024;
93 // Number of bucket locations cached, most workloads wont touch more than one
94 // bucket so this limit is set fairly low
95 constexpr size_t kBucketLocationCacheMaxEntries = 10;
96 // ExpiringLRUCache doesnt support any "cache forever" option
97 constexpr size_t kCacheNeverExpire = std::numeric_limits<uint64>::max();
98 // The file statistics returned by Stat() for directories.
99 const FileStatistics DIRECTORY_STAT(0, 0, true);
100 // Some environments exhibit unreliable DNS resolution. Set this environment
101 // variable to a positive integer describing the frequency used to refresh the
102 // userspace DNS cache.
103 constexpr char kResolveCacheSecs[] = "GCS_RESOLVE_REFRESH_SECS";
104 // The environment variable to configure the http request's connection timeout.
105 constexpr char kRequestConnectionTimeout[] =
106 "GCS_REQUEST_CONNECTION_TIMEOUT_SECS";
107 // The environment variable to configure the http request's idle timeout.
108 constexpr char kRequestIdleTimeout[] = "GCS_REQUEST_IDLE_TIMEOUT_SECS";
109 // The environment variable to configure the overall request timeout for
110 // metadata requests.
111 constexpr char kMetadataRequestTimeout[] = "GCS_METADATA_REQUEST_TIMEOUT_SECS";
112 // The environment variable to configure the overall request timeout for
113 // block reads requests.
114 constexpr char kReadRequestTimeout[] = "GCS_READ_REQUEST_TIMEOUT_SECS";
115 // The environment variable to configure the overall request timeout for
116 // upload requests.
117 constexpr char kWriteRequestTimeout[] = "GCS_WRITE_REQUEST_TIMEOUT_SECS";
118 // The environment variable to configure an additional header to send with
119 // all requests to GCS (format HEADERNAME:HEADERCONTENT)
120 constexpr char kAdditionalRequestHeader[] = "GCS_ADDITIONAL_REQUEST_HEADER";
121 // The environment variable to configure the throttle (format: <int64>)
122 constexpr char kThrottleRate[] = "GCS_THROTTLE_TOKEN_RATE";
123 // The environment variable to configure the token bucket size (format: <int64>)
124 constexpr char kThrottleBucket[] = "GCS_THROTTLE_BUCKET_SIZE";
125 // The environment variable that controls the number of tokens per request.
126 // (format: <int64>)
127 constexpr char kTokensPerRequest[] = "GCS_TOKENS_PER_REQUEST";
128 // The environment variable to configure the initial tokens (format: <int64>)
129 constexpr char kInitialTokens[] = "GCS_INITIAL_TOKENS";
130
131 // The environment variable to customize which GCS bucket locations are allowed,
132 // if the list is empty defaults to using the region of the zone (format, comma
133 // delimited list). Requires 'storage.buckets.get' permission.
134 constexpr char kAllowedBucketLocations[] = "GCS_ALLOWED_BUCKET_LOCATIONS";
135 // When this value is passed as an allowed location detects the zone tensorflow
136 // is running in and restricts to buckets in that region.
137 constexpr char kDetectZoneSentinelValue[] = "auto";
138
139 // How to upload new data when Flush() is called multiple times.
140 // By default the entire file is reuploaded.
141 constexpr char kAppendMode[] = "GCS_APPEND_MODE";
142 // If GCS_APPEND_MODE=compose then instead the new data is uploaded to a
143 // temporary object and composed with the original object. This is disabled by
144 // default as the multiple API calls required add a risk of stranding temporary
145 // objects.
146 constexpr char kComposeAppend[] = "compose";
147
GetTmpFilename(string * filename)148 Status GetTmpFilename(string* filename) {
149 *filename = io::GetTempFilename("");
150 return Status::OK();
151 }
152
153 /// Appends a trailing slash if the name doesn't already have one.
MaybeAppendSlash(const string & name)154 string MaybeAppendSlash(const string& name) {
155 if (name.empty()) {
156 return "/";
157 }
158 if (name.back() != '/') {
159 return strings::StrCat(name, "/");
160 }
161 return name;
162 }
163
164 // io::JoinPath() doesn't work in cases when we want an empty subpath
165 // to result in an appended slash in order for directory markers
166 // to be processed correctly: "gs://a/b" + "" should give "gs://a/b/".
JoinGcsPath(const string & path,const string & subpath)167 string JoinGcsPath(const string& path, const string& subpath) {
168 return strings::StrCat(MaybeAppendSlash(path), subpath);
169 }
170
171 /// \brief Returns the given paths appending all their subfolders.
172 ///
173 /// For every path X in the list, every subfolder in X is added to the
174 /// resulting list.
175 /// For example:
176 /// - for 'a/b/c/d' it will append 'a', 'a/b' and 'a/b/c'
177 /// - for 'a/b/c/' it will append 'a', 'a/b' and 'a/b/c'
178 /// - for 'a//b/c/' it will append 'a', 'a//b' and 'a//b/c'
179 /// - for '/a/b/c/' it will append '/a', '/a/b' and '/a/b/c'
AddAllSubpaths(const std::vector<string> & paths)180 std::set<string> AddAllSubpaths(const std::vector<string>& paths) {
181 std::set<string> result;
182 result.insert(paths.begin(), paths.end());
183 for (const string& path : paths) {
184 StringPiece subpath = io::Dirname(path);
185 // If `path` starts with `/`, `subpath` will be `/` and then we get into an
186 // infinite loop. Same behavior happens if there is a `//` pattern in
187 // `path`, so we check for that and leave the loop quicker.
188 while (!(subpath.empty() || subpath == "/")) {
189 result.emplace(string(subpath));
190 subpath = io::Dirname(subpath);
191 }
192 }
193 return result;
194 }
195
ParseJson(StringPiece json,Json::Value * result)196 Status ParseJson(StringPiece json, Json::Value* result) {
197 Json::Reader reader;
198 if (!reader.parse(json.data(), json.data() + json.size(), *result)) {
199 return errors::Internal("Couldn't parse JSON response from GCS.");
200 }
201 return Status::OK();
202 }
203
ParseJson(const std::vector<char> & json,Json::Value * result)204 Status ParseJson(const std::vector<char>& json, Json::Value* result) {
205 return ParseJson(StringPiece{json.data(), json.size()}, result);
206 }
207
208 /// Reads a JSON value with the given name from a parent JSON value.
GetValue(const Json::Value & parent,const char * name,Json::Value * result)209 Status GetValue(const Json::Value& parent, const char* name,
210 Json::Value* result) {
211 *result = parent.get(name, Json::Value::null);
212 if (result->isNull()) {
213 return errors::Internal("The field '", name,
214 "' was expected in the JSON response.");
215 }
216 return Status::OK();
217 }
218
219 /// Reads a string JSON value with the given name from a parent JSON value.
GetStringValue(const Json::Value & parent,const char * name,string * result)220 Status GetStringValue(const Json::Value& parent, const char* name,
221 string* result) {
222 Json::Value result_value;
223 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
224 if (!result_value.isString()) {
225 return errors::Internal(
226 "The field '", name,
227 "' in the JSON response was expected to be a string.");
228 }
229 *result = result_value.asString();
230 return Status::OK();
231 }
232
233 /// Reads a long JSON value with the given name from a parent JSON value.
GetInt64Value(const Json::Value & parent,const char * name,int64 * result)234 Status GetInt64Value(const Json::Value& parent, const char* name,
235 int64* result) {
236 Json::Value result_value;
237 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
238 if (result_value.isNumeric()) {
239 *result = result_value.asInt64();
240 return Status::OK();
241 }
242 if (result_value.isString() &&
243 strings::safe_strto64(result_value.asCString(), result)) {
244 return Status::OK();
245 }
246 return errors::Internal(
247 "The field '", name,
248 "' in the JSON response was expected to be a number.");
249 }
250
251 /// Reads a boolean JSON value with the given name from a parent JSON value.
GetBoolValue(const Json::Value & parent,const char * name,bool * result)252 Status GetBoolValue(const Json::Value& parent, const char* name, bool* result) {
253 Json::Value result_value;
254 TF_RETURN_IF_ERROR(GetValue(parent, name, &result_value));
255 if (!result_value.isBool()) {
256 return errors::Internal(
257 "The field '", name,
258 "' in the JSON response was expected to be a boolean.");
259 }
260 *result = result_value.asBool();
261 return Status::OK();
262 }
263
264 /// A GCS-based implementation of a random access file with an LRU block cache.
265 class GcsRandomAccessFile : public RandomAccessFile {
266 public:
267 using ReadFn =
268 std::function<Status(const string& filename, uint64 offset, size_t n,
269 StringPiece* result, char* scratch)>;
270
GcsRandomAccessFile(const string & filename,ReadFn read_fn)271 GcsRandomAccessFile(const string& filename, ReadFn read_fn)
272 : filename_(filename), read_fn_(std::move(read_fn)) {}
273
Name(StringPiece * result) const274 Status Name(StringPiece* result) const override {
275 *result = filename_;
276 return Status::OK();
277 }
278
279 /// The implementation of reads with an LRU block cache. Thread safe.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const280 Status Read(uint64 offset, size_t n, StringPiece* result,
281 char* scratch) const override {
282 return read_fn_(filename_, offset, n, result, scratch);
283 }
284
285 private:
286 /// The filename of this file.
287 const string filename_;
288 /// The implementation of the read operation (provided by the GCSFileSystem).
289 const ReadFn read_fn_;
290 };
291
292 /// A GCS-based implementation of a random access file with a read buffer.
293 class BufferedGcsRandomAccessFile : public RandomAccessFile {
294 public:
295 using ReadFn =
296 std::function<Status(const string& filename, uint64 offset, size_t n,
297 StringPiece* result, char* scratch)>;
298
299 // Initialize the reader. Provided read_fn should be thread safe.
BufferedGcsRandomAccessFile(const string & filename,uint64 buffer_size,ReadFn read_fn)300 BufferedGcsRandomAccessFile(const string& filename, uint64 buffer_size,
301 ReadFn read_fn)
302 : filename_(filename),
303 read_fn_(std::move(read_fn)),
304 buffer_size_(buffer_size),
305 buffer_start_(0),
306 buffer_end_is_past_eof_(false) {}
307
Name(StringPiece * result) const308 Status Name(StringPiece* result) const override {
309 *result = filename_;
310 return Status::OK();
311 }
312
313 /// The implementation of reads with an read buffer. Thread safe.
314 /// Returns `OUT_OF_RANGE` if fewer than n bytes were stored in `*result`
315 /// because of EOF.
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const316 Status Read(uint64 offset, size_t n, StringPiece* result,
317 char* scratch) const override {
318 if (n > buffer_size_) {
319 return read_fn_(filename_, offset, n, result, scratch);
320 }
321 {
322 mutex_lock l(buffer_mutex_);
323 size_t buffer_end = buffer_start_ + buffer_.size();
324 size_t copy_size = 0;
325 if (offset < buffer_end && offset >= buffer_start_) {
326 copy_size = std::min(n, static_cast<size_t>(buffer_end - offset));
327 memcpy(scratch, buffer_.data() + (offset - buffer_start_), copy_size);
328 *result = StringPiece(scratch, copy_size);
329 }
330 bool consumed_buffer_to_eof =
331 offset + copy_size >= buffer_end && buffer_end_is_past_eof_;
332 if (copy_size < n && !consumed_buffer_to_eof) {
333 Status status = FillBuffer(offset + copy_size);
334 if (!status.ok() && status.code() != errors::Code::OUT_OF_RANGE) {
335 // Empty the buffer to avoid caching bad reads.
336 buffer_.resize(0);
337 return status;
338 }
339 size_t remaining_copy = std::min(n - copy_size, buffer_.size());
340 memcpy(scratch + copy_size, buffer_.data(), remaining_copy);
341 copy_size += remaining_copy;
342 *result = StringPiece(scratch, copy_size);
343 }
344 if (copy_size < n) {
345 // Forget the end-of-file flag to allow for clients that poll on the
346 // same file.
347 buffer_end_is_past_eof_ = false;
348 return errors::OutOfRange("EOF reached. Requested to read ", n,
349 " bytes from ", offset, ".");
350 }
351 }
352 return Status::OK();
353 }
354
355 private:
FillBuffer(uint64 start) const356 Status FillBuffer(uint64 start) const
357 TF_EXCLUSIVE_LOCKS_REQUIRED(buffer_mutex_) {
358 buffer_start_ = start;
359 buffer_.resize(buffer_size_);
360 StringPiece str_piece;
361 Status status = read_fn_(filename_, buffer_start_, buffer_size_, &str_piece,
362 &(buffer_[0]));
363 buffer_end_is_past_eof_ = status.code() == errors::Code::OUT_OF_RANGE;
364 buffer_.resize(str_piece.size());
365 return status;
366 }
367
368 // The filename of this file.
369 const string filename_;
370
371 // The implementation of the read operation (provided by the GCSFileSystem).
372 const ReadFn read_fn_;
373
374 // Size of buffer that we read from GCS each time we send a request.
375 const uint64 buffer_size_;
376
377 // Mutex for buffering operations that can be accessed from multiple threads.
378 // The following members are mutable in order to provide a const Read.
379 mutable mutex buffer_mutex_;
380
381 // Offset of buffer from start of the file.
382 mutable uint64 buffer_start_ TF_GUARDED_BY(buffer_mutex_);
383
384 mutable bool buffer_end_is_past_eof_ TF_GUARDED_BY(buffer_mutex_);
385
386 mutable string buffer_ TF_GUARDED_BY(buffer_mutex_);
387 };
388
389 // Function object declaration with params needed to create upload sessions.
390 typedef std::function<Status(
391 uint64 start_offset, const std::string& object_to_upload,
392 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
393 UploadSessionHandle* session_handle)>
394 SessionCreator;
395
396 // Function object declaration with params needed to upload objects.
397 typedef std::function<Status(const std::string& session_uri,
398 uint64 start_offset, uint64 already_uploaded,
399 const std::string& tmp_content_filename,
400 uint64 file_size, const std::string& file_path)>
401 ObjectUploader;
402
403 // Function object declaration with params needed to poll upload status.
404 typedef std::function<Status(const string& session_uri, uint64 file_size,
405 const std::string& gcs_path, bool* completed,
406 uint64* uploaded)>
407 StatusPoller;
408
409 // Function object declaration with params needed to poll upload status.
410 typedef std::function<Status(const string& fname, const string& bucket,
411 const string& object, int64* generation)>
412 GenerationGetter;
413
414 /// \brief GCS-based implementation of a writeable file.
415 ///
416 /// Since GCS objects are immutable, this implementation writes to a local
417 /// tmp file and copies it to GCS on flush/close.
418 class GcsWritableFile : public WritableFile {
419 public:
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)420 GcsWritableFile(const string& bucket, const string& object,
421 GcsFileSystem* filesystem,
422 GcsFileSystem::TimeoutConfig* timeouts,
423 std::function<void()> file_cache_erase,
424 RetryConfig retry_config, bool compose_append,
425 SessionCreator session_creator,
426 ObjectUploader object_uploader, StatusPoller status_poller,
427 GenerationGetter generation_getter)
428 : bucket_(bucket),
429 object_(object),
430 filesystem_(filesystem),
431 timeouts_(timeouts),
432 file_cache_erase_(std::move(file_cache_erase)),
433 sync_needed_(true),
434 retry_config_(retry_config),
435 compose_append_(compose_append),
436 start_offset_(0),
437 session_creator_(std::move(session_creator)),
438 object_uploader_(std::move(object_uploader)),
439 status_poller_(std::move(status_poller)),
440 generation_getter_(std::move(generation_getter)) {
441 // TODO: to make it safer, outfile_ should be constructed from an FD
442 VLOG(3) << "GcsWritableFile: " << GetGcsPath();
443 if (GetTmpFilename(&tmp_content_filename_).ok()) {
444 outfile_.open(tmp_content_filename_,
445 std::ofstream::binary | std::ofstream::app);
446 }
447 }
448
449 /// \brief Constructs the writable file in append mode.
450 ///
451 /// tmp_content_filename should contain a path of an existing temporary file
452 /// with the content to be appended. The class takes ownership of the
453 /// specified tmp file and deletes it on close.
GcsWritableFile(const string & bucket,const string & object,GcsFileSystem * filesystem,const string & tmp_content_filename,GcsFileSystem::TimeoutConfig * timeouts,std::function<void ()> file_cache_erase,RetryConfig retry_config,bool compose_append,SessionCreator session_creator,ObjectUploader object_uploader,StatusPoller status_poller,GenerationGetter generation_getter)454 GcsWritableFile(const string& bucket, const string& object,
455 GcsFileSystem* filesystem, const string& tmp_content_filename,
456 GcsFileSystem::TimeoutConfig* timeouts,
457 std::function<void()> file_cache_erase,
458 RetryConfig retry_config, bool compose_append,
459 SessionCreator session_creator,
460 ObjectUploader object_uploader, StatusPoller status_poller,
461 GenerationGetter generation_getter)
462 : bucket_(bucket),
463 object_(object),
464 filesystem_(filesystem),
465 timeouts_(timeouts),
466 file_cache_erase_(std::move(file_cache_erase)),
467 sync_needed_(true),
468 retry_config_(retry_config),
469 compose_append_(compose_append),
470 start_offset_(0),
471 session_creator_(std::move(session_creator)),
472 object_uploader_(std::move(object_uploader)),
473 status_poller_(std::move(status_poller)),
474 generation_getter_(std::move(generation_getter)) {
475 VLOG(3) << "GcsWritableFile: " << GetGcsPath() << "with existing file "
476 << tmp_content_filename;
477 tmp_content_filename_ = tmp_content_filename;
478 outfile_.open(tmp_content_filename_,
479 std::ofstream::binary | std::ofstream::app);
480 }
481
~GcsWritableFile()482 ~GcsWritableFile() override {
483 Close().IgnoreError();
484 std::remove(tmp_content_filename_.c_str());
485 }
486
Append(StringPiece data)487 Status Append(StringPiece data) override {
488 TF_RETURN_IF_ERROR(CheckWritable());
489 VLOG(3) << "Append: " << GetGcsPath() << " size " << data.length();
490 sync_needed_ = true;
491 outfile_ << data;
492 if (!outfile_.good()) {
493 return errors::Internal(
494 "Could not append to the internal temporary file.");
495 }
496 return Status::OK();
497 }
498
Close()499 Status Close() override {
500 VLOG(3) << "Close:" << GetGcsPath();
501 if (outfile_.is_open()) {
502 Status sync_status = Sync();
503 if (sync_status.ok()) {
504 outfile_.close();
505 }
506 return sync_status;
507 }
508 return Status::OK();
509 }
510
Flush()511 Status Flush() override {
512 VLOG(3) << "Flush:" << GetGcsPath();
513 return Sync();
514 }
515
Name(StringPiece * result) const516 Status Name(StringPiece* result) const override {
517 return errors::Unimplemented("GCSWritableFile does not support Name()");
518 }
519
Sync()520 Status Sync() override {
521 VLOG(3) << "Sync started:" << GetGcsPath();
522 TF_RETURN_IF_ERROR(CheckWritable());
523 if (!sync_needed_) {
524 return Status::OK();
525 }
526 Status status = SyncImpl();
527 VLOG(3) << "Sync finished " << GetGcsPath();
528 if (status.ok()) {
529 sync_needed_ = false;
530 }
531 return status;
532 }
533
Tell(int64 * position)534 Status Tell(int64* position) override {
535 *position = outfile_.tellp();
536 if (*position == -1) {
537 return errors::Internal("tellp on the internal temporary file failed");
538 }
539 return Status::OK();
540 }
541
542 private:
543 /// Copies the current version of the file to GCS.
544 ///
545 /// This SyncImpl() uploads the object to GCS.
546 /// In case of a failure, it resumes failed uploads as recommended by the GCS
547 /// resumable API documentation. When the whole upload needs to be
548 /// restarted, Sync() returns UNAVAILABLE and relies on RetryingFileSystem.
SyncImpl()549 Status SyncImpl() {
550 outfile_.flush();
551 if (!outfile_.good()) {
552 return errors::Internal(
553 "Could not write to the internal temporary file.");
554 }
555 UploadSessionHandle session_handle;
556 uint64 start_offset = 0;
557 string object_to_upload = object_;
558 bool should_compose = false;
559 if (compose_append_) {
560 start_offset = start_offset_;
561 // Only compose if the object has already been uploaded to GCS
562 should_compose = start_offset > 0;
563 if (should_compose) {
564 object_to_upload =
565 strings::StrCat(io::Dirname(object_), "/.tmpcompose/",
566 io::Basename(object_), ".", start_offset_);
567 }
568 }
569 TF_RETURN_IF_ERROR(CreateNewUploadSession(start_offset, object_to_upload,
570 &session_handle));
571 uint64 already_uploaded = 0;
572 bool first_attempt = true;
573 const Status upload_status = RetryingUtils::CallWithRetries(
574 [&first_attempt, &already_uploaded, &session_handle, &start_offset,
575 this]() {
576 if (session_handle.resumable && !first_attempt) {
577 bool completed;
578 TF_RETURN_IF_ERROR(RequestUploadSessionStatus(
579 session_handle.session_uri, &completed, &already_uploaded));
580 LOG(INFO) << "### RequestUploadSessionStatus: completed = "
581 << completed
582 << ", already_uploaded = " << already_uploaded
583 << ", file = " << GetGcsPath();
584 if (completed) {
585 // Erase the file from the file cache on every successful write.
586 file_cache_erase_();
587 // It's unclear why UploadToSession didn't return OK in the
588 // previous attempt, but GCS reports that the file is fully
589 // uploaded, so succeed.
590 return Status::OK();
591 }
592 }
593 first_attempt = false;
594 return UploadToSession(session_handle.session_uri, start_offset,
595 already_uploaded);
596 },
597 retry_config_);
598 if (upload_status.code() == errors::Code::NOT_FOUND) {
599 // GCS docs recommend retrying the whole upload. We're relying on the
600 // RetryingFileSystem to retry the Sync() call.
601 return errors::Unavailable(
602 strings::StrCat("Upload to gs://", bucket_, "/", object_,
603 " failed, caused by: ", upload_status.ToString()));
604 }
605 if (upload_status.ok()) {
606 if (should_compose) {
607 TF_RETURN_IF_ERROR(AppendObject(object_to_upload));
608 }
609 TF_RETURN_IF_ERROR(GetCurrentFileSize(&start_offset_));
610 }
611 return upload_status;
612 }
613
CheckWritable() const614 Status CheckWritable() const {
615 if (!outfile_.is_open()) {
616 return errors::FailedPrecondition(
617 "The internal temporary file is not writable.");
618 }
619 return Status::OK();
620 }
621
GetCurrentFileSize(uint64 * size)622 Status GetCurrentFileSize(uint64* size) {
623 const auto tellp = outfile_.tellp();
624 if (tellp == static_cast<std::streampos>(-1)) {
625 return errors::Internal(
626 "Could not get the size of the internal temporary file.");
627 }
628 *size = tellp;
629 return Status::OK();
630 }
631
632 /// Initiates a new resumable upload session.
CreateNewUploadSession(uint64 start_offset,std::string object_to_upload,UploadSessionHandle * session_handle)633 Status CreateNewUploadSession(uint64 start_offset,
634 std::string object_to_upload,
635 UploadSessionHandle* session_handle) {
636 uint64 file_size;
637 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
638 return session_creator_(start_offset, object_to_upload, bucket_, file_size,
639 GetGcsPath(), session_handle);
640 }
641
642 /// Appends the data of append_object to the original object and deletes
643 /// append_object.
AppendObject(string append_object)644 Status AppendObject(string append_object) {
645 const string append_object_path = GetGcsPathWithObject(append_object);
646 VLOG(3) << "AppendObject: " << append_object_path << " to " << GetGcsPath();
647
648 int64_t generation = 0;
649 TF_RETURN_IF_ERROR(
650 generation_getter_(GetGcsPath(), bucket_, object_, &generation));
651
652 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
653 [&append_object, &generation, this]() {
654 std::unique_ptr<HttpRequest> request;
655 TF_RETURN_IF_ERROR(filesystem_->CreateHttpRequest(&request));
656
657 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket_, "/o/",
658 request->EscapeString(object_),
659 "/compose"));
660
661 const string request_body = strings::StrCat(
662 "{'sourceObjects': [{'name': '", object_,
663 "','objectPrecondition':{'ifGenerationMatch':", generation,
664 "}},{'name': '", append_object, "'}]}");
665 request->SetTimeouts(timeouts_->connect, timeouts_->idle,
666 timeouts_->metadata);
667 request->AddHeader("content-type", "application/json");
668 request->SetPostFromBuffer(request_body.c_str(), request_body.size());
669 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
670 " when composing to ", GetGcsPath());
671 return Status::OK();
672 },
673 retry_config_));
674
675 return RetryingUtils::DeleteWithRetries(
676 [&append_object_path, this]() {
677 return filesystem_->DeleteFile(append_object_path, nullptr);
678 },
679 retry_config_);
680 }
681
682 /// \brief Requests status of a previously initiated upload session.
683 ///
684 /// If the upload has already succeeded, sets 'completed' to true.
685 /// Otherwise sets 'completed' to false and 'uploaded' to the currently
686 /// uploaded size in bytes.
RequestUploadSessionStatus(const string & session_uri,bool * completed,uint64 * uploaded)687 Status RequestUploadSessionStatus(const string& session_uri, bool* completed,
688 uint64* uploaded) {
689 uint64 file_size;
690 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
691 return status_poller_(session_uri, file_size, GetGcsPath(), completed,
692 uploaded);
693 }
694
695 /// Uploads data to object.
UploadToSession(const string & session_uri,uint64 start_offset,uint64 already_uploaded)696 Status UploadToSession(const string& session_uri, uint64 start_offset,
697 uint64 already_uploaded) {
698 uint64 file_size;
699 TF_RETURN_IF_ERROR(GetCurrentFileSize(&file_size));
700 Status status =
701 object_uploader_(session_uri, start_offset, already_uploaded,
702 tmp_content_filename_, file_size, GetGcsPath());
703 if (status.ok()) {
704 // Erase the file from the file cache on every successful write.
705 // Note: Only local cache, this does nothing on distributed cache. The
706 // distributed cache clears the cache as it is needed.
707 file_cache_erase_();
708 }
709
710 return status;
711 }
712
GetGcsPathWithObject(string object) const713 string GetGcsPathWithObject(string object) const {
714 return strings::StrCat("gs://", bucket_, "/", object);
715 }
GetGcsPath() const716 string GetGcsPath() const { return GetGcsPathWithObject(object_); }
717
718 string bucket_;
719 string object_;
720 GcsFileSystem* const filesystem_; // Not owned.
721 string tmp_content_filename_;
722 std::ofstream outfile_;
723 GcsFileSystem::TimeoutConfig* timeouts_;
724 std::function<void()> file_cache_erase_;
725 bool sync_needed_; // whether there is buffered data that needs to be synced
726 RetryConfig retry_config_;
727 bool compose_append_;
728 uint64 start_offset_;
729 // Callbacks to the file system used to upload object into GCS.
730 const SessionCreator session_creator_;
731 const ObjectUploader object_uploader_;
732 const StatusPoller status_poller_;
733 const GenerationGetter generation_getter_;
734 };
735
736 class GcsReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
737 public:
GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data,uint64 length)738 GcsReadOnlyMemoryRegion(std::unique_ptr<char[]> data, uint64 length)
739 : data_(std::move(data)), length_(length) {}
data()740 const void* data() override { return reinterpret_cast<void*>(data_.get()); }
length()741 uint64 length() override { return length_; }
742
743 private:
744 std::unique_ptr<char[]> data_;
745 uint64 length_;
746 };
747
StringPieceIdentity(StringPiece str,StringPiece * value)748 bool StringPieceIdentity(StringPiece str, StringPiece* value) {
749 *value = str;
750 return true;
751 }
752
753 /// \brief Utility function to split a comma delimited list of strings to an
754 /// unordered set, lowercasing all values.
SplitByCommaToLowercaseSet(StringPiece list,std::unordered_set<string> * set)755 bool SplitByCommaToLowercaseSet(StringPiece list,
756 std::unordered_set<string>* set) {
757 std::vector<string> vector = absl::StrSplit(absl::AsciiStrToLower(list), ',');
758 *set = std::unordered_set<string>(vector.begin(), vector.end());
759 return true;
760 }
761
762 // \brief Convert Compute Engine zone to region
ZoneToRegion(string * zone)763 string ZoneToRegion(string* zone) {
764 return zone->substr(0, zone->find_last_of('-'));
765 }
766
767 } // namespace
768
GcsFileSystem(bool make_default_cache)769 GcsFileSystem::GcsFileSystem(bool make_default_cache) {
770 uint64 value;
771 block_size_ = kDefaultBlockSize;
772 size_t max_bytes = kDefaultMaxCacheSize;
773
774 uint64 max_staleness = kDefaultMaxStaleness;
775
776 http_request_factory_ = std::make_shared<CurlHttpRequest::Factory>();
777 compute_engine_metadata_client_ =
778 std::make_shared<ComputeEngineMetadataClient>(http_request_factory_);
779 auth_provider_ = std::unique_ptr<AuthProvider>(
780 new GoogleAuthProvider(compute_engine_metadata_client_));
781 zone_provider_ = std::unique_ptr<ZoneProvider>(
782 new ComputeEngineZoneProvider(compute_engine_metadata_client_));
783
784 // Apply the sys env override for the readahead buffer size if it's provided.
785 if (GetEnvVar(kReadaheadBufferSize, strings::safe_strtou64, &value)) {
786 block_size_ = value;
787 }
788
789 // Apply the overrides for the block size (MB), max bytes (MB), and max
790 // staleness (seconds) if provided.
791 if (GetEnvVar(kBlockSize, strings::safe_strtou64, &value)) {
792 block_size_ = value * 1024 * 1024;
793 }
794
795 if (GetEnvVar(kMaxCacheSize, strings::safe_strtou64, &value)) {
796 max_bytes = value * 1024 * 1024;
797 }
798
799 if (GetEnvVar(kMaxStaleness, strings::safe_strtou64, &value)) {
800 max_staleness = value;
801 }
802 if (!make_default_cache) {
803 max_bytes = 0;
804 }
805 VLOG(1) << "GCS cache max size = " << max_bytes << " ; "
806 << "block size = " << block_size_ << " ; "
807 << "max staleness = " << max_staleness;
808 file_block_cache_ = MakeFileBlockCache(block_size_, max_bytes, max_staleness);
809 // Apply overrides for the stat cache max age and max entries, if provided.
810 uint64 stat_cache_max_age = kStatCacheDefaultMaxAge;
811 size_t stat_cache_max_entries = kStatCacheDefaultMaxEntries;
812 if (GetEnvVar(kStatCacheMaxAge, strings::safe_strtou64, &value)) {
813 stat_cache_max_age = value;
814 }
815 if (GetEnvVar(kStatCacheMaxEntries, strings::safe_strtou64, &value)) {
816 stat_cache_max_entries = value;
817 }
818 stat_cache_.reset(new ExpiringLRUCache<GcsFileStat>(stat_cache_max_age,
819 stat_cache_max_entries));
820 // Apply overrides for the matching paths cache max age and max entries, if
821 // provided.
822 uint64 matching_paths_cache_max_age = kMatchingPathsCacheDefaultMaxAge;
823 size_t matching_paths_cache_max_entries =
824 kMatchingPathsCacheDefaultMaxEntries;
825 if (GetEnvVar(kMatchingPathsCacheMaxAge, strings::safe_strtou64, &value)) {
826 matching_paths_cache_max_age = value;
827 }
828 if (GetEnvVar(kMatchingPathsCacheMaxEntries, strings::safe_strtou64,
829 &value)) {
830 matching_paths_cache_max_entries = value;
831 }
832 matching_paths_cache_.reset(new ExpiringLRUCache<std::vector<string>>(
833 matching_paths_cache_max_age, matching_paths_cache_max_entries));
834
835 bucket_location_cache_.reset(new ExpiringLRUCache<string>(
836 kCacheNeverExpire, kBucketLocationCacheMaxEntries));
837
838 int64_t resolve_frequency_secs;
839 if (GetEnvVar(kResolveCacheSecs, strings::safe_strto64,
840 &resolve_frequency_secs)) {
841 dns_cache_.reset(new GcsDnsCache(resolve_frequency_secs));
842 VLOG(1) << "GCS DNS cache is enabled. " << kResolveCacheSecs << " = "
843 << resolve_frequency_secs;
844 } else {
845 VLOG(1) << "GCS DNS cache is disabled, because " << kResolveCacheSecs
846 << " = 0 (or is not set)";
847 }
848
849 // Get the additional header
850 StringPiece add_header_contents;
851 if (GetEnvVar(kAdditionalRequestHeader, StringPieceIdentity,
852 &add_header_contents)) {
853 size_t split = add_header_contents.find(':', 0);
854
855 if (split != StringPiece::npos) {
856 StringPiece header_name = add_header_contents.substr(0, split);
857 StringPiece header_value = add_header_contents.substr(split + 1);
858
859 if (!header_name.empty() && !header_value.empty()) {
860 additional_header_.reset(new std::pair<const string, const string>(
861 string(header_name), string(header_value)));
862
863 VLOG(1) << "GCS additional header ENABLED. "
864 << "Name: " << additional_header_->first << ", "
865 << "Value: " << additional_header_->second;
866 } else {
867 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
868 << add_header_contents;
869 }
870 } else {
871 LOG(ERROR) << "GCS additional header DISABLED. Invalid contents: "
872 << add_header_contents;
873 }
874 } else {
875 VLOG(1) << "GCS additional header DISABLED. No environment variable set.";
876 }
877
878 // Apply the overrides for request timeouts
879 uint32 timeout_value;
880 if (GetEnvVar(kRequestConnectionTimeout, strings::safe_strtou32,
881 &timeout_value)) {
882 timeouts_.connect = timeout_value;
883 }
884 if (GetEnvVar(kRequestIdleTimeout, strings::safe_strtou32, &timeout_value)) {
885 timeouts_.idle = timeout_value;
886 }
887 if (GetEnvVar(kMetadataRequestTimeout, strings::safe_strtou32,
888 &timeout_value)) {
889 timeouts_.metadata = timeout_value;
890 }
891 if (GetEnvVar(kReadRequestTimeout, strings::safe_strtou32, &timeout_value)) {
892 timeouts_.read = timeout_value;
893 }
894 if (GetEnvVar(kWriteRequestTimeout, strings::safe_strtou32, &timeout_value)) {
895 timeouts_.write = timeout_value;
896 }
897
898 int64_t token_value;
899 if (GetEnvVar(kThrottleRate, strings::safe_strto64, &token_value)) {
900 GcsThrottleConfig config;
901 config.enabled = true;
902 config.token_rate = token_value;
903
904 if (GetEnvVar(kThrottleBucket, strings::safe_strto64, &token_value)) {
905 config.bucket_size = token_value;
906 }
907
908 if (GetEnvVar(kTokensPerRequest, strings::safe_strto64, &token_value)) {
909 config.tokens_per_request = token_value;
910 }
911
912 if (GetEnvVar(kInitialTokens, strings::safe_strto64, &token_value)) {
913 config.initial_tokens = token_value;
914 }
915 throttle_.SetConfig(config);
916 }
917
918 GetEnvVar(kAllowedBucketLocations, SplitByCommaToLowercaseSet,
919 &allowed_locations_);
920
921 StringPiece append_mode;
922 GetEnvVar(kAppendMode, StringPieceIdentity, &append_mode);
923 if (append_mode == kComposeAppend) {
924 compose_append_ = true;
925 } else {
926 compose_append_ = false;
927 }
928 }
929
GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,std::unique_ptr<HttpRequest::Factory> http_request_factory,std::unique_ptr<ZoneProvider> zone_provider,size_t block_size,size_t max_bytes,uint64 max_staleness,uint64 stat_cache_max_age,size_t stat_cache_max_entries,uint64 matching_paths_cache_max_age,size_t matching_paths_cache_max_entries,RetryConfig retry_config,TimeoutConfig timeouts,const std::unordered_set<string> & allowed_locations,std::pair<const string,const string> * additional_header,bool compose_append)930 GcsFileSystem::GcsFileSystem(
931 std::unique_ptr<AuthProvider> auth_provider,
932 std::unique_ptr<HttpRequest::Factory> http_request_factory,
933 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
934 size_t max_bytes, uint64 max_staleness, uint64 stat_cache_max_age,
935 size_t stat_cache_max_entries, uint64 matching_paths_cache_max_age,
936 size_t matching_paths_cache_max_entries, RetryConfig retry_config,
937 TimeoutConfig timeouts, const std::unordered_set<string>& allowed_locations,
938 std::pair<const string, const string>* additional_header,
939 bool compose_append)
940 : timeouts_(timeouts),
941 retry_config_(retry_config),
942 auth_provider_(std::move(auth_provider)),
943 http_request_factory_(std::move(http_request_factory)),
944 zone_provider_(std::move(zone_provider)),
945 block_size_(block_size),
946 file_block_cache_(
947 MakeFileBlockCache(block_size, max_bytes, max_staleness)),
948 stat_cache_(new StatCache(stat_cache_max_age, stat_cache_max_entries)),
949 matching_paths_cache_(new MatchingPathsCache(
950 matching_paths_cache_max_age, matching_paths_cache_max_entries)),
951 bucket_location_cache_(new BucketLocationCache(
952 kCacheNeverExpire, kBucketLocationCacheMaxEntries)),
953 allowed_locations_(allowed_locations),
954 compose_append_(compose_append),
955 additional_header_(additional_header) {}
956
NewRandomAccessFile(const string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)957 Status GcsFileSystem::NewRandomAccessFile(
958 const string& fname, TransactionToken* token,
959 std::unique_ptr<RandomAccessFile>* result) {
960 string bucket, object;
961 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
962 TF_RETURN_IF_ERROR(CheckBucketLocationConstraint(bucket));
963 if (cache_enabled_) {
964 result->reset(new GcsRandomAccessFile(fname, [this, bucket, object](
965 const string& fname,
966 uint64 offset, size_t n,
967 StringPiece* result,
968 char* scratch) {
969 tf_shared_lock l(block_cache_lock_);
970 GcsFileStat stat;
971 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
972 fname, &stat,
973 [this, bucket, object](const string& fname, GcsFileStat* stat) {
974 return UncachedStatForObject(fname, bucket, object, stat);
975 }));
976 if (!file_block_cache_->ValidateAndUpdateFileSignature(
977 fname, stat.generation_number)) {
978 VLOG(1)
979 << "File signature has been changed. Refreshing the cache. Path: "
980 << fname;
981 }
982 *result = StringPiece();
983 size_t bytes_transferred;
984 TF_RETURN_IF_ERROR(file_block_cache_->Read(fname, offset, n, scratch,
985 &bytes_transferred));
986 *result = StringPiece(scratch, bytes_transferred);
987 if (bytes_transferred < n) {
988 return errors::OutOfRange("EOF reached, ", result->size(),
989 " bytes were read out of ", n,
990 " bytes requested.");
991 }
992 return Status::OK();
993 }));
994 } else {
995 result->reset(new BufferedGcsRandomAccessFile(
996 fname, block_size_,
997 [this, bucket, object](const string& fname, uint64 offset, size_t n,
998 StringPiece* result, char* scratch) {
999 *result = StringPiece();
1000 size_t bytes_transferred;
1001 TF_RETURN_IF_ERROR(
1002 LoadBufferFromGCS(fname, offset, n, scratch, &bytes_transferred));
1003 *result = StringPiece(scratch, bytes_transferred);
1004 if (bytes_transferred < n) {
1005 return errors::OutOfRange("EOF reached, ", result->size(),
1006 " bytes were read out of ", n,
1007 " bytes requested.");
1008 }
1009 return Status::OK();
1010 }));
1011 }
1012 return Status::OK();
1013 }
1014
ResetFileBlockCache(size_t block_size_bytes,size_t max_bytes,uint64 max_staleness_secs)1015 void GcsFileSystem::ResetFileBlockCache(size_t block_size_bytes,
1016 size_t max_bytes,
1017 uint64 max_staleness_secs) {
1018 mutex_lock l(block_cache_lock_);
1019 file_block_cache_ =
1020 MakeFileBlockCache(block_size_bytes, max_bytes, max_staleness_secs);
1021 if (stats_ != nullptr) {
1022 stats_->Configure(this, &throttle_, file_block_cache_.get());
1023 }
1024 }
1025
1026 // A helper function to build a FileBlockCache for GcsFileSystem.
MakeFileBlockCache(size_t block_size,size_t max_bytes,uint64 max_staleness)1027 std::unique_ptr<FileBlockCache> GcsFileSystem::MakeFileBlockCache(
1028 size_t block_size, size_t max_bytes, uint64 max_staleness) {
1029 std::unique_ptr<FileBlockCache> file_block_cache(new RamFileBlockCache(
1030 block_size, max_bytes, max_staleness,
1031 [this](const string& filename, size_t offset, size_t n, char* buffer,
1032 size_t* bytes_transferred) {
1033 return LoadBufferFromGCS(filename, offset, n, buffer,
1034 bytes_transferred);
1035 }));
1036
1037 // Check if cache is enabled here to avoid unnecessary mutex contention.
1038 cache_enabled_ = file_block_cache->IsCacheEnabled();
1039 return file_block_cache;
1040 }
1041
1042 // A helper function to actually read the data from GCS.
LoadBufferFromGCS(const string & fname,size_t offset,size_t n,char * buffer,size_t * bytes_transferred)1043 Status GcsFileSystem::LoadBufferFromGCS(const string& fname, size_t offset,
1044 size_t n, char* buffer,
1045 size_t* bytes_transferred) {
1046 *bytes_transferred = 0;
1047
1048 string bucket, object;
1049 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1050
1051 profiler::TraceMe activity(
1052 [fname]() { return absl::StrCat("LoadBufferFromGCS ", fname); });
1053
1054 std::unique_ptr<HttpRequest> request;
1055 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1056 "when reading gs://", bucket, "/", object);
1057
1058 request->SetUri(strings::StrCat("https://", kStorageHost, "/", bucket, "/",
1059 request->EscapeString(object)));
1060 request->SetRange(offset, offset + n - 1);
1061 request->SetResultBufferDirect(buffer, n);
1062 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.read);
1063
1064 if (stats_ != nullptr) {
1065 stats_->RecordBlockLoadRequest(fname, offset);
1066 }
1067
1068 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading gs://",
1069 bucket, "/", object);
1070
1071 size_t bytes_read = request->GetResultBufferDirectBytesTransferred();
1072 *bytes_transferred = bytes_read;
1073 VLOG(1) << "Successful read of gs://" << bucket << "/" << object << " @ "
1074 << offset << " of size: " << bytes_read;
1075 activity.AppendMetadata([bytes_read]() {
1076 return profiler::TraceMeEncode({{"block_size", bytes_read}});
1077 });
1078
1079 if (stats_ != nullptr) {
1080 stats_->RecordBlockRetrieved(fname, offset, bytes_read);
1081 }
1082
1083 throttle_.RecordResponse(bytes_read);
1084
1085 if (bytes_read < n) {
1086 // Check stat cache to see if we encountered an interrupted read.
1087 GcsFileStat stat;
1088 if (stat_cache_->Lookup(fname, &stat)) {
1089 if (offset + bytes_read < stat.base.length) {
1090 return errors::Internal(strings::Printf(
1091 "File contents are inconsistent for file: %s @ %lu.", fname.c_str(),
1092 offset));
1093 }
1094 VLOG(2) << "Successful integrity check for: gs://" << bucket << "/"
1095 << object << " @ " << offset;
1096 }
1097 }
1098
1099 return Status::OK();
1100 }
1101
1102 /// Initiates a new upload session.
CreateNewUploadSession(uint64 start_offset,const std::string & object_to_upload,const std::string & bucket,uint64 file_size,const std::string & gcs_path,UploadSessionHandle * session_handle)1103 Status GcsFileSystem::CreateNewUploadSession(
1104 uint64 start_offset, const std::string& object_to_upload,
1105 const std::string& bucket, uint64 file_size, const std::string& gcs_path,
1106 UploadSessionHandle* session_handle) {
1107 std::vector<char> output_buffer;
1108 std::unique_ptr<HttpRequest> request;
1109 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1110
1111 std::string uri = strings::StrCat(
1112 kGcsUploadUriBase, "b/", bucket,
1113 "/o?uploadType=resumable&name=", request->EscapeString(object_to_upload));
1114 request->SetUri(uri);
1115 request->AddHeader("X-Upload-Content-Length",
1116 absl::StrCat(file_size - start_offset));
1117 request->SetPostEmptyBody();
1118 request->SetResultBuffer(&output_buffer);
1119 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1120 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(),
1121 " when initiating an upload to ", gcs_path);
1122 if (session_handle != nullptr) {
1123 session_handle->resumable = true;
1124 session_handle->session_uri = request->GetResponseHeader("Location");
1125 if (session_handle->session_uri.empty()) {
1126 return errors::Internal("Unexpected response from GCS when writing to ",
1127 gcs_path, ": 'Location' header not returned.");
1128 }
1129 }
1130 return Status::OK();
1131 }
1132
UploadToSession(const std::string & session_uri,uint64 start_offset,uint64 already_uploaded,const std::string & tmp_content_filename,uint64 file_size,const std::string & file_path)1133 Status GcsFileSystem::UploadToSession(const std::string& session_uri,
1134 uint64 start_offset,
1135 uint64 already_uploaded,
1136 const std::string& tmp_content_filename,
1137 uint64 file_size,
1138 const std::string& file_path) {
1139 std::unique_ptr<HttpRequest> request;
1140 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1141 request->SetUri(session_uri);
1142 if (file_size > 0) {
1143 request->AddHeader("Content-Range",
1144 strings::StrCat("bytes ", already_uploaded, "-",
1145 file_size - start_offset - 1, "/",
1146 file_size - start_offset));
1147 }
1148 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.write);
1149
1150 TF_RETURN_IF_ERROR(request->SetPutFromFile(tmp_content_filename,
1151 start_offset + already_uploaded));
1152 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when uploading ",
1153 file_path);
1154 return Status::OK();
1155 }
1156
RequestUploadSessionStatus(const string & session_uri,uint64 file_size,const std::string & gcs_path,bool * completed,uint64 * uploaded)1157 Status GcsFileSystem::RequestUploadSessionStatus(const string& session_uri,
1158 uint64 file_size,
1159 const std::string& gcs_path,
1160 bool* completed,
1161 uint64* uploaded) {
1162 CHECK(completed != nullptr) << "RequestUploadSessionStatus() called with out "
1163 "param 'completed' == nullptr."; // Crash ok
1164 CHECK(uploaded != nullptr) << "RequestUploadSessionStatus() called with out "
1165 "param 'uploaded' == nullptr."; // Crash ok
1166 std::unique_ptr<HttpRequest> request;
1167 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1168 request->SetUri(session_uri);
1169 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1170 request->AddHeader("Content-Range", strings::StrCat("bytes */", file_size));
1171 request->SetPutEmptyBody();
1172 Status status = request->Send();
1173 if (status.ok()) {
1174 *completed = true;
1175 return Status::OK();
1176 }
1177 *completed = false;
1178 if (request->GetResponseCode() != HTTP_CODE_RESUME_INCOMPLETE) {
1179 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when resuming upload ", gcs_path);
1180 }
1181 const std::string received_range = request->GetResponseHeader("Range");
1182 if (received_range.empty()) {
1183 // This means GCS doesn't have any bytes of the file yet.
1184 *uploaded = 0;
1185 } else {
1186 StringPiece range_piece(received_range);
1187 absl::ConsumePrefix(&range_piece,
1188 "bytes="); // May or may not be present.
1189
1190 auto return_error = [](const std::string& gcs_path,
1191 const std::string& error_message) {
1192 return errors::Internal("Unexpected response from GCS when writing ",
1193 gcs_path, ": ", error_message);
1194 };
1195
1196 std::vector<string> range_strs = str_util::Split(range_piece, '-');
1197 if (range_strs.size() != 2) {
1198 return return_error(gcs_path, "Range header '" + received_range +
1199 "' could not be parsed.");
1200 }
1201
1202 std::vector<int64> range_parts;
1203 for (const std::string& range_str : range_strs) {
1204 int64_t tmp;
1205 if (strings::safe_strto64(range_str, &tmp)) {
1206 range_parts.push_back(tmp);
1207 } else {
1208 return return_error(gcs_path, "Range header '" + received_range +
1209 "' could not be parsed.");
1210 }
1211 }
1212
1213 if (range_parts[0] != 0) {
1214 return return_error(gcs_path, "The returned range '" + received_range +
1215 "' does not start at zero.");
1216 }
1217 // If GCS returned "Range: 0-10", this means 11 bytes were uploaded.
1218 *uploaded = range_parts[1] + 1;
1219 }
1220 return Status::OK();
1221 }
1222
ParseGcsPathForScheme(StringPiece fname,string scheme,bool empty_object_ok,string * bucket,string * object)1223 Status GcsFileSystem::ParseGcsPathForScheme(StringPiece fname, string scheme,
1224 bool empty_object_ok,
1225 string* bucket, string* object) {
1226 StringPiece parsed_scheme, bucketp, objectp;
1227 io::ParseURI(fname, &parsed_scheme, &bucketp, &objectp);
1228 if (parsed_scheme != scheme) {
1229 return errors::InvalidArgument("GCS path doesn't start with 'gs://': ",
1230 fname);
1231 }
1232 *bucket = string(bucketp);
1233 if (bucket->empty() || *bucket == ".") {
1234 return errors::InvalidArgument("GCS path doesn't contain a bucket name: ",
1235 fname);
1236 }
1237 absl::ConsumePrefix(&objectp, "/");
1238 *object = string(objectp);
1239 if (!empty_object_ok && object->empty()) {
1240 return errors::InvalidArgument("GCS path doesn't contain an object name: ",
1241 fname);
1242 }
1243 return Status::OK();
1244 }
1245
ParseGcsPath(StringPiece fname,bool empty_object_ok,string * bucket,string * object)1246 Status GcsFileSystem::ParseGcsPath(StringPiece fname, bool empty_object_ok,
1247 string* bucket, string* object) {
1248 return ParseGcsPathForScheme(fname, "gs", empty_object_ok, bucket, object);
1249 }
1250
ClearFileCaches(const string & fname)1251 void GcsFileSystem::ClearFileCaches(const string& fname) {
1252 tf_shared_lock l(block_cache_lock_);
1253 file_block_cache_->RemoveFile(fname);
1254 stat_cache_->Delete(fname);
1255 // TODO(rxsang): Remove the patterns that matche the file in
1256 // MatchingPathsCache as well.
1257 }
1258
NewWritableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1259 Status GcsFileSystem::NewWritableFile(const string& fname,
1260 TransactionToken* token,
1261 std::unique_ptr<WritableFile>* result) {
1262 string bucket, object;
1263 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1264
1265 auto session_creator =
1266 [this](uint64 start_offset, const std::string& object_to_upload,
1267 const std::string& bucket, uint64 file_size,
1268 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1269 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1270 file_size, gcs_path, session_handle);
1271 };
1272 auto object_uploader =
1273 [this](const std::string& session_uri, uint64 start_offset,
1274 uint64 already_uploaded, const std::string& tmp_content_filename,
1275 uint64 file_size, const std::string& file_path) {
1276 return UploadToSession(session_uri, start_offset, already_uploaded,
1277 tmp_content_filename, file_size, file_path);
1278 };
1279 auto status_poller = [this](const string& session_uri, uint64 file_size,
1280 const std::string& gcs_path, bool* completed,
1281 uint64* uploaded) {
1282 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1283 completed, uploaded);
1284 };
1285
1286 auto generation_getter = [this](const string& fname, const string& bucket,
1287 const string& object, int64* generation) {
1288 GcsFileStat stat;
1289 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1290 [&fname, &bucket, &object, &stat, this]() {
1291 return UncachedStatForObject(fname, bucket, object, &stat);
1292 },
1293 retry_config_));
1294 *generation = stat.generation_number;
1295 return Status::OK();
1296 };
1297
1298 result->reset(new GcsWritableFile(
1299 bucket, object, this, &timeouts_,
1300 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1301 compose_append_, session_creator, object_uploader, status_poller,
1302 generation_getter));
1303 return Status::OK();
1304 }
1305
1306 // Reads the file from GCS in chunks and stores it in a tmp file,
1307 // which is then passed to GcsWritableFile.
NewAppendableFile(const string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)1308 Status GcsFileSystem::NewAppendableFile(const string& fname,
1309 TransactionToken* token,
1310 std::unique_ptr<WritableFile>* result) {
1311 std::unique_ptr<RandomAccessFile> reader;
1312 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &reader));
1313 std::unique_ptr<char[]> buffer(new char[kReadAppendableFileBufferSize]);
1314 Status status;
1315 uint64 offset = 0;
1316 StringPiece read_chunk;
1317
1318 // Read the file from GCS in chunks and save it to a tmp file.
1319 string old_content_filename;
1320 TF_RETURN_IF_ERROR(GetTmpFilename(&old_content_filename));
1321 std::ofstream old_content(old_content_filename, std::ofstream::binary);
1322 while (true) {
1323 status = reader->Read(offset, kReadAppendableFileBufferSize, &read_chunk,
1324 buffer.get());
1325 if (status.ok()) {
1326 old_content << read_chunk;
1327 offset += kReadAppendableFileBufferSize;
1328 } else if (status.code() == error::NOT_FOUND) {
1329 // New file, there is no existing content in it.
1330 break;
1331 } else if (status.code() == error::OUT_OF_RANGE) {
1332 // Expected, this means we reached EOF.
1333 old_content << read_chunk;
1334 break;
1335 } else {
1336 return status;
1337 }
1338 }
1339 old_content.close();
1340
1341 auto session_creator =
1342 [this](uint64 start_offset, const std::string& object_to_upload,
1343 const std::string& bucket, uint64 file_size,
1344 const std::string& gcs_path, UploadSessionHandle* session_handle) {
1345 return CreateNewUploadSession(start_offset, object_to_upload, bucket,
1346 file_size, gcs_path, session_handle);
1347 };
1348 auto object_uploader =
1349 [this](const std::string& session_uri, uint64 start_offset,
1350 uint64 already_uploaded, const std::string& tmp_content_filename,
1351 uint64 file_size, const std::string& file_path) {
1352 return UploadToSession(session_uri, start_offset, already_uploaded,
1353 tmp_content_filename, file_size, file_path);
1354 };
1355
1356 auto status_poller = [this](const string& session_uri, uint64 file_size,
1357 const std::string& gcs_path, bool* completed,
1358 uint64* uploaded) {
1359 return RequestUploadSessionStatus(session_uri, file_size, gcs_path,
1360 completed, uploaded);
1361 };
1362
1363 auto generation_getter = [this](const string& fname, const string& bucket,
1364 const string& object, int64* generation) {
1365 GcsFileStat stat;
1366 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
1367 [&fname, &bucket, &object, &stat, this]() {
1368 return UncachedStatForObject(fname, bucket, object, &stat);
1369 },
1370 retry_config_));
1371 *generation = stat.generation_number;
1372 return Status::OK();
1373 };
1374
1375 // Create a writable file and pass the old content to it.
1376 string bucket, object;
1377 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1378 result->reset(new GcsWritableFile(
1379 bucket, object, this, old_content_filename, &timeouts_,
1380 [this, fname]() { ClearFileCaches(fname); }, retry_config_,
1381 compose_append_, session_creator, object_uploader, status_poller,
1382 generation_getter));
1383 return Status::OK();
1384 }
1385
NewReadOnlyMemoryRegionFromFile(const string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)1386 Status GcsFileSystem::NewReadOnlyMemoryRegionFromFile(
1387 const string& fname, TransactionToken* token,
1388 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
1389 uint64 size;
1390 TF_RETURN_IF_ERROR(GetFileSize(fname, token, &size));
1391 std::unique_ptr<char[]> data(new char[size]);
1392
1393 std::unique_ptr<RandomAccessFile> file;
1394 TF_RETURN_IF_ERROR(NewRandomAccessFile(fname, token, &file));
1395
1396 StringPiece piece;
1397 TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get()));
1398
1399 result->reset(new GcsReadOnlyMemoryRegion(std::move(data), size));
1400 return Status::OK();
1401 }
1402
FileExists(const string & fname,TransactionToken * token)1403 Status GcsFileSystem::FileExists(const string& fname, TransactionToken* token) {
1404 string bucket, object;
1405 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1406 if (object.empty()) {
1407 bool result;
1408 TF_RETURN_IF_ERROR(BucketExists(bucket, &result));
1409 if (result) {
1410 return Status::OK();
1411 }
1412 }
1413
1414 // Check if the object exists.
1415 GcsFileStat stat;
1416 const Status status = StatForObject(fname, bucket, object, &stat);
1417 if (status.code() != errors::Code::NOT_FOUND) {
1418 return status;
1419 }
1420
1421 // Check if the folder exists.
1422 bool result;
1423 TF_RETURN_IF_ERROR(FolderExists(fname, &result));
1424 if (result) {
1425 return Status::OK();
1426 }
1427 return errors::NotFound("The specified path ", fname, " was not found.");
1428 }
1429
ObjectExists(const string & fname,const string & bucket,const string & object,bool * result)1430 Status GcsFileSystem::ObjectExists(const string& fname, const string& bucket,
1431 const string& object, bool* result) {
1432 GcsFileStat stat;
1433 const Status status = StatForObject(fname, bucket, object, &stat);
1434 switch (status.code()) {
1435 case errors::Code::OK:
1436 *result = !stat.base.is_directory;
1437 return Status::OK();
1438 case errors::Code::NOT_FOUND:
1439 *result = false;
1440 return Status::OK();
1441 default:
1442 return status;
1443 }
1444 }
1445
UncachedStatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1446 Status GcsFileSystem::UncachedStatForObject(const string& fname,
1447 const string& bucket,
1448 const string& object,
1449 GcsFileStat* stat) {
1450 std::vector<char> output_buffer;
1451 std::unique_ptr<HttpRequest> request;
1452 TF_RETURN_WITH_CONTEXT_IF_ERROR(CreateHttpRequest(&request),
1453 " when reading metadata of gs://", bucket,
1454 "/", object);
1455
1456 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1457 request->EscapeString(object),
1458 "?fields=size%2Cgeneration%2Cupdated"));
1459 request->SetResultBuffer(&output_buffer);
1460 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1461
1462 if (stats_ != nullptr) {
1463 stats_->RecordStatObjectRequest();
1464 }
1465
1466 TF_RETURN_WITH_CONTEXT_IF_ERROR(
1467 request->Send(), " when reading metadata of gs://", bucket, "/", object);
1468
1469 Json::Value root;
1470 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1471
1472 // Parse file size.
1473 TF_RETURN_IF_ERROR(GetInt64Value(root, "size", &stat->base.length));
1474
1475 // Parse generation number.
1476 TF_RETURN_IF_ERROR(
1477 GetInt64Value(root, "generation", &stat->generation_number));
1478
1479 // Parse file modification time.
1480 string updated;
1481 TF_RETURN_IF_ERROR(GetStringValue(root, "updated", &updated));
1482 TF_RETURN_IF_ERROR(ParseRfc3339Time(updated, &(stat->base.mtime_nsec)));
1483
1484 VLOG(1) << "Stat of: gs://" << bucket << "/" << object << " -- "
1485 << " length: " << stat->base.length
1486 << " generation: " << stat->generation_number
1487 << "; mtime_nsec: " << stat->base.mtime_nsec
1488 << "; updated: " << updated;
1489
1490 if (str_util::EndsWith(fname, "/")) {
1491 // In GCS a path can be both a directory and a file, both it is uncommon for
1492 // other file systems. To avoid the ambiguity, if a path ends with "/" in
1493 // GCS, we always regard it as a directory mark or a virtual directory.
1494 stat->base.is_directory = true;
1495 } else {
1496 stat->base.is_directory = false;
1497 }
1498 return Status::OK();
1499 }
1500
StatForObject(const string & fname,const string & bucket,const string & object,GcsFileStat * stat)1501 Status GcsFileSystem::StatForObject(const string& fname, const string& bucket,
1502 const string& object, GcsFileStat* stat) {
1503 if (object.empty()) {
1504 return errors::InvalidArgument(strings::Printf(
1505 "'object' must be a non-empty string. (File: %s)", fname.c_str()));
1506 }
1507
1508 TF_RETURN_IF_ERROR(stat_cache_->LookupOrCompute(
1509 fname, stat,
1510 [this, &bucket, &object](const string& fname, GcsFileStat* stat) {
1511 return UncachedStatForObject(fname, bucket, object, stat);
1512 }));
1513 return Status::OK();
1514 }
1515
BucketExists(const string & bucket,bool * result)1516 Status GcsFileSystem::BucketExists(const string& bucket, bool* result) {
1517 const Status status = GetBucketMetadata(bucket, nullptr);
1518 switch (status.code()) {
1519 case errors::Code::OK:
1520 *result = true;
1521 return Status::OK();
1522 case errors::Code::NOT_FOUND:
1523 *result = false;
1524 return Status::OK();
1525 default:
1526 return status;
1527 }
1528 }
1529
CheckBucketLocationConstraint(const string & bucket)1530 Status GcsFileSystem::CheckBucketLocationConstraint(const string& bucket) {
1531 if (allowed_locations_.empty()) {
1532 return Status::OK();
1533 }
1534
1535 // Avoid calling external API's in the constructor
1536 if (allowed_locations_.erase(kDetectZoneSentinelValue) == 1) {
1537 string zone;
1538 TF_RETURN_IF_ERROR(zone_provider_->GetZone(&zone));
1539 allowed_locations_.insert(ZoneToRegion(&zone));
1540 }
1541
1542 string location;
1543 TF_RETURN_IF_ERROR(GetBucketLocation(bucket, &location));
1544 if (allowed_locations_.find(location) != allowed_locations_.end()) {
1545 return Status::OK();
1546 }
1547
1548 return errors::FailedPrecondition(strings::Printf(
1549 "Bucket '%s' is in '%s' location, allowed locations are: (%s).",
1550 bucket.c_str(), location.c_str(),
1551 absl::StrJoin(allowed_locations_, ", ").c_str()));
1552 }
1553
GetBucketLocation(const string & bucket,string * location)1554 Status GcsFileSystem::GetBucketLocation(const string& bucket,
1555 string* location) {
1556 auto compute_func = [this](const string& bucket, string* location) {
1557 std::vector<char> result_buffer;
1558 Status status = GetBucketMetadata(bucket, &result_buffer);
1559 Json::Value result;
1560 TF_RETURN_IF_ERROR(ParseJson(result_buffer, &result));
1561 string bucket_location;
1562 TF_RETURN_IF_ERROR(
1563 GetStringValue(result, kBucketMetadataLocationKey, &bucket_location));
1564 // Lowercase the GCS location to be case insensitive for allowed locations.
1565 *location = absl::AsciiStrToLower(bucket_location);
1566 return Status::OK();
1567 };
1568
1569 TF_RETURN_IF_ERROR(
1570 bucket_location_cache_->LookupOrCompute(bucket, location, compute_func));
1571
1572 return Status::OK();
1573 }
1574
GetBucketMetadata(const string & bucket,std::vector<char> * result_buffer)1575 Status GcsFileSystem::GetBucketMetadata(const string& bucket,
1576 std::vector<char>* result_buffer) {
1577 std::unique_ptr<HttpRequest> request;
1578 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1579 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket));
1580
1581 if (result_buffer != nullptr) {
1582 request->SetResultBuffer(result_buffer);
1583 }
1584
1585 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1586 return request->Send();
1587 }
1588
FolderExists(const string & dirname,bool * result)1589 Status GcsFileSystem::FolderExists(const string& dirname, bool* result) {
1590 StatCache::ComputeFunc compute_func = [this](const string& dirname,
1591 GcsFileStat* stat) {
1592 std::vector<string> children;
1593 TF_RETURN_IF_ERROR(
1594 GetChildrenBounded(dirname, 1, &children, true /* recursively */,
1595 true /* include_self_directory_marker */));
1596 if (!children.empty()) {
1597 stat->base = DIRECTORY_STAT;
1598 return Status::OK();
1599 } else {
1600 return errors::InvalidArgument("Not a directory!");
1601 }
1602 };
1603 GcsFileStat stat;
1604 Status s = stat_cache_->LookupOrCompute(MaybeAppendSlash(dirname), &stat,
1605 compute_func);
1606 if (s.ok()) {
1607 *result = stat.base.is_directory;
1608 return Status::OK();
1609 }
1610 if (errors::IsInvalidArgument(s)) {
1611 *result = false;
1612 return Status::OK();
1613 }
1614 return s;
1615 }
1616
GetChildren(const string & dirname,TransactionToken * token,std::vector<string> * result)1617 Status GcsFileSystem::GetChildren(const string& dirname,
1618 TransactionToken* token,
1619 std::vector<string>* result) {
1620 return GetChildrenBounded(dirname, UINT64_MAX, result,
1621 false /* recursively */,
1622 false /* include_self_directory_marker */);
1623 }
1624
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * results)1625 Status GcsFileSystem::GetMatchingPaths(const string& pattern,
1626 TransactionToken* token,
1627 std::vector<string>* results) {
1628 MatchingPathsCache::ComputeFunc compute_func =
1629 [this](const string& pattern, std::vector<string>* results) {
1630 results->clear();
1631 // Find the fixed prefix by looking for the first wildcard.
1632 const string& fixed_prefix =
1633 pattern.substr(0, pattern.find_first_of("*?[\\"));
1634 const string dir(this->Dirname(fixed_prefix));
1635 if (dir.empty()) {
1636 return errors::InvalidArgument(
1637 "A GCS pattern doesn't have a bucket name: ", pattern);
1638 }
1639 std::vector<string> all_files;
1640 TF_RETURN_IF_ERROR(GetChildrenBounded(
1641 dir, UINT64_MAX, &all_files, true /* recursively */,
1642 false /* include_self_directory_marker */));
1643
1644 const auto& files_and_folders = AddAllSubpaths(all_files);
1645
1646 // To handle `/` in the object names, we need to remove it from `dir`
1647 // and then use `StrCat` to insert it back.
1648 const StringPiece dir_no_slash = str_util::StripSuffix(dir, "/");
1649
1650 // Match all obtained paths to the input pattern.
1651 for (const auto& path : files_and_folders) {
1652 // Manually construct the path instead of using `JoinPath` for the
1653 // cases where `path` starts with a `/` (which is a valid character in
1654 // the filenames of GCS objects). `JoinPath` canonicalizes the result,
1655 // removing duplicate slashes. We know that `dir_no_slash` does not
1656 // end in `/`, so we are safe inserting the new `/` here as the path
1657 // separator.
1658 const string full_path = strings::StrCat(dir_no_slash, "/", path);
1659 if (this->Match(full_path, pattern)) {
1660 results->push_back(full_path);
1661 }
1662 }
1663 return Status::OK();
1664 };
1665 TF_RETURN_IF_ERROR(
1666 matching_paths_cache_->LookupOrCompute(pattern, results, compute_func));
1667 return Status::OK();
1668 }
1669
GetChildrenBounded(const string & dirname,uint64 max_results,std::vector<string> * result,bool recursive,bool include_self_directory_marker)1670 Status GcsFileSystem::GetChildrenBounded(const string& dirname,
1671 uint64 max_results,
1672 std::vector<string>* result,
1673 bool recursive,
1674 bool include_self_directory_marker) {
1675 if (!result) {
1676 return errors::InvalidArgument("'result' cannot be null");
1677 }
1678 string bucket, object_prefix;
1679 TF_RETURN_IF_ERROR(
1680 ParseGcsPath(MaybeAppendSlash(dirname), true, &bucket, &object_prefix));
1681
1682 string nextPageToken;
1683 uint64 retrieved_results = 0;
1684 while (true) { // A loop over multiple result pages.
1685 std::vector<char> output_buffer;
1686 std::unique_ptr<HttpRequest> request;
1687 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1688 auto uri = strings::StrCat(kGcsUriBase, "b/", bucket, "/o");
1689 if (recursive) {
1690 uri = strings::StrCat(uri, "?fields=items%2Fname%2CnextPageToken");
1691 } else {
1692 // Set "/" as a delimiter to ask GCS to treat subfolders as children
1693 // and return them in "prefixes".
1694 uri = strings::StrCat(uri,
1695 "?fields=items%2Fname%2Cprefixes%2CnextPageToken");
1696 uri = strings::StrCat(uri, "&delimiter=%2F");
1697 }
1698 if (!object_prefix.empty()) {
1699 uri = strings::StrCat(uri,
1700 "&prefix=", request->EscapeString(object_prefix));
1701 }
1702 if (!nextPageToken.empty()) {
1703 uri = strings::StrCat(
1704 uri, "&pageToken=", request->EscapeString(nextPageToken));
1705 }
1706 if (max_results - retrieved_results < kGetChildrenDefaultPageSize) {
1707 uri =
1708 strings::StrCat(uri, "&maxResults=", max_results - retrieved_results);
1709 }
1710 request->SetUri(uri);
1711 request->SetResultBuffer(&output_buffer);
1712 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1713
1714 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when reading ", dirname);
1715 Json::Value root;
1716 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1717 const auto items = root.get("items", Json::Value::null);
1718 if (!items.isNull()) {
1719 if (!items.isArray()) {
1720 return errors::Internal(
1721 "Expected an array 'items' in the GCS response.");
1722 }
1723 for (size_t i = 0; i < items.size(); i++) {
1724 const auto item = items.get(i, Json::Value::null);
1725 if (!item.isObject()) {
1726 return errors::Internal(
1727 "Unexpected JSON format: 'items' should be a list of objects.");
1728 }
1729 string name;
1730 TF_RETURN_IF_ERROR(GetStringValue(item, "name", &name));
1731 // The names should be relative to the 'dirname'. That means the
1732 // 'object_prefix', which is part of 'dirname', should be removed from
1733 // the beginning of 'name'.
1734 StringPiece relative_path(name);
1735 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1736 return errors::Internal(strings::StrCat(
1737 "Unexpected response: the returned file name ", name,
1738 " doesn't match the prefix ", object_prefix));
1739 }
1740 if (!relative_path.empty() || include_self_directory_marker) {
1741 result->emplace_back(relative_path);
1742 }
1743 if (++retrieved_results >= max_results) {
1744 return Status::OK();
1745 }
1746 }
1747 }
1748 const auto prefixes = root.get("prefixes", Json::Value::null);
1749 if (!prefixes.isNull()) {
1750 // Subfolders are returned for the non-recursive mode.
1751 if (!prefixes.isArray()) {
1752 return errors::Internal(
1753 "'prefixes' was expected to be an array in the GCS response.");
1754 }
1755 for (size_t i = 0; i < prefixes.size(); i++) {
1756 const auto prefix = prefixes.get(i, Json::Value::null);
1757 if (prefix.isNull() || !prefix.isString()) {
1758 return errors::Internal(
1759 "'prefixes' was expected to be an array of strings in the GCS "
1760 "response.");
1761 }
1762 const string& prefix_str = prefix.asString();
1763 StringPiece relative_path(prefix_str);
1764 if (!absl::ConsumePrefix(&relative_path, object_prefix)) {
1765 return errors::Internal(
1766 "Unexpected response: the returned folder name ", prefix_str,
1767 " doesn't match the prefix ", object_prefix);
1768 }
1769 result->emplace_back(relative_path);
1770 if (++retrieved_results >= max_results) {
1771 return Status::OK();
1772 }
1773 }
1774 }
1775 const auto token = root.get("nextPageToken", Json::Value::null);
1776 if (token.isNull()) {
1777 return Status::OK();
1778 }
1779 if (!token.isString()) {
1780 return errors::Internal(
1781 "Unexpected response: nextPageToken is not a string");
1782 }
1783 nextPageToken = token.asString();
1784 }
1785 }
1786
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)1787 Status GcsFileSystem::Stat(const string& fname, TransactionToken* token,
1788 FileStatistics* stat) {
1789 if (!stat) {
1790 return errors::Internal("'stat' cannot be nullptr.");
1791 }
1792 string bucket, object;
1793 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1794 if (object.empty()) {
1795 bool is_bucket;
1796 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1797 if (is_bucket) {
1798 *stat = DIRECTORY_STAT;
1799 return Status::OK();
1800 }
1801 return errors::NotFound("The specified bucket ", fname, " was not found.");
1802 }
1803
1804 GcsFileStat gcs_stat;
1805 const Status status = StatForObject(fname, bucket, object, &gcs_stat);
1806 if (status.ok()) {
1807 *stat = gcs_stat.base;
1808 return Status::OK();
1809 }
1810 if (status.code() != errors::Code::NOT_FOUND) {
1811 return status;
1812 }
1813 bool is_folder;
1814 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
1815 if (is_folder) {
1816 *stat = DIRECTORY_STAT;
1817 return Status::OK();
1818 }
1819 return errors::NotFound("The specified path ", fname, " was not found.");
1820 }
1821
DeleteFile(const string & fname,TransactionToken * token)1822 Status GcsFileSystem::DeleteFile(const string& fname, TransactionToken* token) {
1823 string bucket, object;
1824 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1825
1826 std::unique_ptr<HttpRequest> request;
1827 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1828 request->SetUri(strings::StrCat(kGcsUriBase, "b/", bucket, "/o/",
1829 request->EscapeString(object)));
1830 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1831 request->SetDeleteRequest();
1832
1833 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when deleting ", fname);
1834 ClearFileCaches(fname);
1835 return Status::OK();
1836 }
1837
CreateDir(const string & dirname,TransactionToken * token)1838 Status GcsFileSystem::CreateDir(const string& dirname,
1839 TransactionToken* token) {
1840 string dirname_with_slash = MaybeAppendSlash(dirname);
1841 VLOG(3) << "CreateDir: creating directory with dirname: " << dirname
1842 << " and dirname_with_slash: " << dirname_with_slash;
1843 string bucket, object;
1844 TF_RETURN_IF_ERROR(ParseGcsPath(dirname_with_slash, /*empty_object_ok=*/true,
1845 &bucket, &object));
1846 if (object.empty()) {
1847 bool is_bucket;
1848 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1849 return is_bucket ? Status::OK()
1850 : errors::NotFound("The specified bucket ",
1851 dirname_with_slash, " was not found.");
1852 }
1853
1854 if (FileExists(dirname_with_slash, token).ok()) {
1855 // Use the original name for a correct error here.
1856 VLOG(3) << "CreateDir: directory already exists, not uploading " << dirname;
1857 return errors::AlreadyExists(dirname);
1858 }
1859
1860 std::unique_ptr<HttpRequest> request;
1861 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1862
1863 request->SetUri(strings::StrCat(
1864 kGcsUploadUriBase, "b/", bucket,
1865 "/o?uploadType=media&name=", request->EscapeString(object),
1866 // Adding this parameter means HTTP_CODE_PRECONDITION_FAILED
1867 // will be returned if the object already exists, so avoid reuploading.
1868 "&ifGenerationMatch=0"));
1869
1870 request->SetPostEmptyBody();
1871 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1872 const Status& status = request->Send();
1873 if (status.ok()) {
1874 VLOG(3) << "CreateDir: finished uploading directory " << dirname;
1875 return Status::OK();
1876 }
1877 if (request->GetResponseCode() != HTTP_CODE_PRECONDITION_FAILED) {
1878 TF_RETURN_WITH_CONTEXT_IF_ERROR(status, " when uploading ",
1879 dirname_with_slash);
1880 }
1881 VLOG(3) << "Ignoring directory already exists on object "
1882 << dirname_with_slash;
1883 return errors::AlreadyExists(dirname);
1884 }
1885
1886 // Checks that the directory is empty (i.e no objects with this prefix exist).
1887 // Deletes the GCS directory marker if it exists.
DeleteDir(const string & dirname,TransactionToken * token)1888 Status GcsFileSystem::DeleteDir(const string& dirname,
1889 TransactionToken* token) {
1890 std::vector<string> children;
1891 // A directory is considered empty either if there are no matching objects
1892 // with the corresponding name prefix or if there is exactly one matching
1893 // object and it is the directory marker. Therefore we need to retrieve
1894 // at most two children for the prefix to detect if a directory is empty.
1895 TF_RETURN_IF_ERROR(
1896 GetChildrenBounded(dirname, 2, &children, true /* recursively */,
1897 true /* include_self_directory_marker */));
1898
1899 if (children.size() > 1 || (children.size() == 1 && !children[0].empty())) {
1900 return errors::FailedPrecondition("Cannot delete a non-empty directory.");
1901 }
1902 if (children.size() == 1 && children[0].empty()) {
1903 // This is the directory marker object. Delete it.
1904 return DeleteFile(MaybeAppendSlash(dirname), token);
1905 }
1906 return Status::OK();
1907 }
1908
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)1909 Status GcsFileSystem::GetFileSize(const string& fname, TransactionToken* token,
1910 uint64* file_size) {
1911 if (!file_size) {
1912 return errors::Internal("'file_size' cannot be nullptr.");
1913 }
1914
1915 // Only validate the name.
1916 string bucket, object;
1917 TF_RETURN_IF_ERROR(ParseGcsPath(fname, false, &bucket, &object));
1918
1919 FileStatistics stat;
1920 TF_RETURN_IF_ERROR(Stat(fname, token, &stat));
1921 *file_size = stat.length;
1922 return Status::OK();
1923 }
1924
RenameFile(const string & src,const string & target,TransactionToken * token)1925 Status GcsFileSystem::RenameFile(const string& src, const string& target,
1926 TransactionToken* token) {
1927 if (!IsDirectory(src, token).ok()) {
1928 return RenameObject(src, target);
1929 }
1930 // Rename all individual objects in the directory one by one.
1931 std::vector<string> children;
1932 TF_RETURN_IF_ERROR(
1933 GetChildrenBounded(src, UINT64_MAX, &children, true /* recursively */,
1934 true /* include_self_directory_marker */));
1935 for (const string& subpath : children) {
1936 TF_RETURN_IF_ERROR(
1937 RenameObject(JoinGcsPath(src, subpath), JoinGcsPath(target, subpath)));
1938 }
1939 return Status::OK();
1940 }
1941
1942 // Uses a GCS API command to copy the object and then deletes the old one.
RenameObject(const string & src,const string & target)1943 Status GcsFileSystem::RenameObject(const string& src, const string& target) {
1944 VLOG(3) << "RenameObject: started gs://" << src << " to " << target;
1945 string src_bucket, src_object, target_bucket, target_object;
1946 TF_RETURN_IF_ERROR(ParseGcsPath(src, false, &src_bucket, &src_object));
1947 TF_RETURN_IF_ERROR(
1948 ParseGcsPath(target, false, &target_bucket, &target_object));
1949
1950 std::unique_ptr<HttpRequest> request;
1951 TF_RETURN_IF_ERROR(CreateHttpRequest(&request));
1952 request->SetUri(strings::StrCat(kGcsUriBase, "b/", src_bucket, "/o/",
1953 request->EscapeString(src_object),
1954 "/rewriteTo/b/", target_bucket, "/o/",
1955 request->EscapeString(target_object)));
1956 request->SetPostEmptyBody();
1957 request->SetTimeouts(timeouts_.connect, timeouts_.idle, timeouts_.metadata);
1958 std::vector<char> output_buffer;
1959 request->SetResultBuffer(&output_buffer);
1960 TF_RETURN_WITH_CONTEXT_IF_ERROR(request->Send(), " when renaming ", src,
1961 " to ", target);
1962 // Flush the target from the caches. The source will be flushed in the
1963 // DeleteFile call below.
1964 ClearFileCaches(target);
1965 Json::Value root;
1966 TF_RETURN_IF_ERROR(ParseJson(output_buffer, &root));
1967 bool done;
1968 TF_RETURN_IF_ERROR(GetBoolValue(root, "done", &done));
1969 if (!done) {
1970 // If GCS didn't complete rewrite in one call, this means that a large file
1971 // is being copied to a bucket with a different storage class or location,
1972 // which requires multiple rewrite calls.
1973 // TODO(surkov): implement multi-step rewrites.
1974 return errors::Unimplemented(
1975 "Couldn't rename ", src, " to ", target,
1976 ": moving large files between buckets with different "
1977 "locations or storage classes is not supported.");
1978 }
1979
1980 VLOG(3) << "RenameObject: finished from: gs://" << src << " to " << target;
1981 // In case the delete API call failed, but the deletion actually happened
1982 // on the server side, we can't just retry the whole RenameFile operation
1983 // because the source object is already gone.
1984 return RetryingUtils::DeleteWithRetries(
1985 [this, &src]() { return DeleteFile(src, nullptr); }, retry_config_);
1986 }
1987
IsDirectory(const string & fname,TransactionToken * token)1988 Status GcsFileSystem::IsDirectory(const string& fname,
1989 TransactionToken* token) {
1990 string bucket, object;
1991 TF_RETURN_IF_ERROR(ParseGcsPath(fname, true, &bucket, &object));
1992 if (object.empty()) {
1993 bool is_bucket;
1994 TF_RETURN_IF_ERROR(BucketExists(bucket, &is_bucket));
1995 if (is_bucket) {
1996 return Status::OK();
1997 }
1998 return errors::NotFound("The specified bucket gs://", bucket,
1999 " was not found.");
2000 }
2001 bool is_folder;
2002 TF_RETURN_IF_ERROR(FolderExists(fname, &is_folder));
2003 if (is_folder) {
2004 return Status::OK();
2005 }
2006 bool is_object;
2007 TF_RETURN_IF_ERROR(ObjectExists(fname, bucket, object, &is_object));
2008 if (is_object) {
2009 return errors::FailedPrecondition("The specified path ", fname,
2010 " is not a directory.");
2011 }
2012 return errors::NotFound("The specified path ", fname, " was not found.");
2013 }
2014
DeleteRecursively(const string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)2015 Status GcsFileSystem::DeleteRecursively(const string& dirname,
2016 TransactionToken* token,
2017 int64* undeleted_files,
2018 int64* undeleted_dirs) {
2019 if (!undeleted_files || !undeleted_dirs) {
2020 return errors::Internal(
2021 "'undeleted_files' and 'undeleted_dirs' cannot be nullptr.");
2022 }
2023 *undeleted_files = 0;
2024 *undeleted_dirs = 0;
2025 if (!IsDirectory(dirname, token).ok()) {
2026 *undeleted_dirs = 1;
2027 return Status(
2028 error::NOT_FOUND,
2029 strings::StrCat(dirname, " doesn't exist or not a directory."));
2030 }
2031 std::vector<string> all_objects;
2032 // Get all children in the directory recursively.
2033 TF_RETURN_IF_ERROR(GetChildrenBounded(
2034 dirname, UINT64_MAX, &all_objects, true /* recursively */,
2035 true /* include_self_directory_marker */));
2036 for (const string& object : all_objects) {
2037 const string& full_path = JoinGcsPath(dirname, object);
2038 // Delete all objects including directory markers for subfolders.
2039 // Since DeleteRecursively returns OK if individual file deletions fail,
2040 // and therefore RetryingFileSystem won't pay attention to the failures,
2041 // we need to make sure these failures are properly retried.
2042 const auto& delete_file_status = RetryingUtils::DeleteWithRetries(
2043 [this, &full_path, token]() { return DeleteFile(full_path, token); },
2044 retry_config_);
2045 if (!delete_file_status.ok()) {
2046 if (IsDirectory(full_path, token).ok()) {
2047 // The object is a directory marker.
2048 (*undeleted_dirs)++;
2049 } else {
2050 (*undeleted_files)++;
2051 }
2052 }
2053 }
2054 return Status::OK();
2055 }
2056
2057 // Flushes all caches for filesystem metadata and file contents. Useful for
2058 // reclaiming memory once filesystem operations are done (e.g. model is loaded),
2059 // or for resetting the filesystem to a consistent state.
FlushCaches(TransactionToken * token)2060 void GcsFileSystem::FlushCaches(TransactionToken* token) {
2061 tf_shared_lock l(block_cache_lock_);
2062 file_block_cache_->Flush();
2063 stat_cache_->Clear();
2064 matching_paths_cache_->Clear();
2065 bucket_location_cache_->Clear();
2066 }
2067
SetStats(GcsStatsInterface * stats)2068 void GcsFileSystem::SetStats(GcsStatsInterface* stats) {
2069 CHECK(stats_ == nullptr) << "SetStats() has already been called.";
2070 CHECK(stats != nullptr);
2071 mutex_lock l(block_cache_lock_);
2072 stats_ = stats;
2073 stats_->Configure(this, &throttle_, file_block_cache_.get());
2074 }
2075
SetCacheStats(FileBlockCacheStatsInterface * cache_stats)2076 void GcsFileSystem::SetCacheStats(FileBlockCacheStatsInterface* cache_stats) {
2077 tf_shared_lock l(block_cache_lock_);
2078 if (file_block_cache_ == nullptr) {
2079 LOG(ERROR) << "Tried to set cache stats of non-initialized file block "
2080 "cache object. This may result in not exporting the intended "
2081 "monitoring data";
2082 return;
2083 }
2084 file_block_cache_->SetStats(cache_stats);
2085 }
2086
SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider)2087 void GcsFileSystem::SetAuthProvider(
2088 std::unique_ptr<AuthProvider> auth_provider) {
2089 mutex_lock l(mu_);
2090 auth_provider_ = std::move(auth_provider);
2091 }
2092
2093 // Creates an HttpRequest and sets several parameters that are common to all
2094 // requests. All code (in GcsFileSystem) that creates an HttpRequest should
2095 // go through this method, rather than directly using http_request_factory_.
CreateHttpRequest(std::unique_ptr<HttpRequest> * request)2096 Status GcsFileSystem::CreateHttpRequest(std::unique_ptr<HttpRequest>* request) {
2097 std::unique_ptr<HttpRequest> new_request{http_request_factory_->Create()};
2098 if (dns_cache_) {
2099 dns_cache_->AnnotateRequest(new_request.get());
2100 }
2101
2102 string auth_token;
2103 {
2104 tf_shared_lock l(mu_);
2105 TF_RETURN_IF_ERROR(
2106 AuthProvider::GetToken(auth_provider_.get(), &auth_token));
2107 }
2108
2109 new_request->AddAuthBearerHeader(auth_token);
2110
2111 if (additional_header_) {
2112 new_request->AddHeader(additional_header_->first,
2113 additional_header_->second);
2114 }
2115
2116 if (stats_ != nullptr) {
2117 new_request->SetRequestStats(stats_->HttpStats());
2118 }
2119
2120 if (!throttle_.AdmitRequest()) {
2121 return errors::Unavailable("Request throttled");
2122 }
2123
2124 *request = std::move(new_request);
2125 return Status::OK();
2126 }
2127
2128 } // namespace tensorflow
2129
2130 // The TPU_GCS_FS option sets a TPU-on-GCS optimized file system that allows
2131 // TPU pods to function more optimally. When TPU_GCS_FS is enabled then
2132 // gcs_file_system will not be registered as a file system since the
2133 // tpu_gcs_file_system is going to take over its responsibilities. The tpu file
2134 // system is a child of gcs file system with TPU-pod on GCS optimizations.
2135 // This option is set ON/OFF in the GCP TPU tensorflow config.
2136 // Initialize gcs_file_system
2137 REGISTER_LEGACY_FILE_SYSTEM("gs", ::tensorflow::RetryingGcsFileSystem);
2138