1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18
19 #include <functional>
20 #include <string>
21 #include <vector>
22
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/random/random.h"
26 #include "tensorflow/core/platform/cloud/retrying_utils.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_system.h"
29
30 namespace tensorflow {
31
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35 public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36 RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37 const RetryConfig& retry_config)
38 : base_file_system_(std::move(base_file_system)),
39 retry_config_(retry_config) {}
40
41 Status NewRandomAccessFile(
42 const string& filename,
43 std::unique_ptr<RandomAccessFile>* result) override;
44
45 Status NewWritableFile(const string& fname,
46 std::unique_ptr<WritableFile>* result) override;
47
48 Status NewAppendableFile(const string& fname,
49 std::unique_ptr<WritableFile>* result) override;
50
51 Status NewReadOnlyMemoryRegionFromFile(
52 const string& filename,
53 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
54
FileExists(const string & fname)55 Status FileExists(const string& fname) override {
56 return RetryingUtils::CallWithRetries(
57 [this, &fname]() { return base_file_system_->FileExists(fname); },
58 retry_config_);
59 }
60
GetChildren(const string & dir,std::vector<string> * result)61 Status GetChildren(const string& dir, std::vector<string>* result) override {
62 return RetryingUtils::CallWithRetries(
63 [this, &dir, result]() {
64 return base_file_system_->GetChildren(dir, result);
65 },
66 retry_config_);
67 }
68
GetMatchingPaths(const string & pattern,std::vector<string> * result)69 Status GetMatchingPaths(const string& pattern,
70 std::vector<string>* result) override {
71 return RetryingUtils::CallWithRetries(
72 [this, &pattern, result]() {
73 return base_file_system_->GetMatchingPaths(pattern, result);
74 },
75 retry_config_);
76 }
77
Stat(const string & fname,FileStatistics * stat)78 Status Stat(const string& fname, FileStatistics* stat) override {
79 return RetryingUtils::CallWithRetries(
80 [this, &fname, stat]() { return base_file_system_->Stat(fname, stat); },
81 retry_config_);
82 }
83
DeleteFile(const string & fname)84 Status DeleteFile(const string& fname) override {
85 return RetryingUtils::DeleteWithRetries(
86 [this, &fname]() { return base_file_system_->DeleteFile(fname); },
87 retry_config_);
88 }
89
CreateDir(const string & dirname)90 Status CreateDir(const string& dirname) override {
91 return RetryingUtils::CallWithRetries(
92 [this, &dirname]() { return base_file_system_->CreateDir(dirname); },
93 retry_config_);
94 }
95
DeleteDir(const string & dirname)96 Status DeleteDir(const string& dirname) override {
97 return RetryingUtils::DeleteWithRetries(
98 [this, &dirname]() { return base_file_system_->DeleteDir(dirname); },
99 retry_config_);
100 }
101
GetFileSize(const string & fname,uint64 * file_size)102 Status GetFileSize(const string& fname, uint64* file_size) override {
103 return RetryingUtils::CallWithRetries(
104 [this, &fname, file_size]() {
105 return base_file_system_->GetFileSize(fname, file_size);
106 },
107 retry_config_);
108 }
109
RenameFile(const string & src,const string & target)110 Status RenameFile(const string& src, const string& target) override {
111 return RetryingUtils::CallWithRetries(
112 [this, &src, &target]() {
113 return base_file_system_->RenameFile(src, target);
114 },
115 retry_config_);
116 }
117
IsDirectory(const string & dirname)118 Status IsDirectory(const string& dirname) override {
119 return RetryingUtils::CallWithRetries(
120 [this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
121 retry_config_);
122 }
123
DeleteRecursively(const string & dirname,int64 * undeleted_files,int64 * undeleted_dirs)124 Status DeleteRecursively(const string& dirname, int64* undeleted_files,
125 int64* undeleted_dirs) override {
126 return RetryingUtils::DeleteWithRetries(
127 [this, &dirname, undeleted_files, undeleted_dirs]() {
128 return base_file_system_->DeleteRecursively(dirname, undeleted_files,
129 undeleted_dirs);
130 },
131 retry_config_);
132 }
133
FlushCaches()134 void FlushCaches() override { base_file_system_->FlushCaches(); }
135
underlying()136 Underlying* underlying() const { return base_file_system_.get(); }
137
138 private:
139 std::unique_ptr<Underlying> base_file_system_;
140 const RetryConfig retry_config_;
141
142 TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
143 };
144
145 namespace retrying_internals {
146
147 class RetryingRandomAccessFile : public RandomAccessFile {
148 public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)149 RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
150 const RetryConfig& retry_config)
151 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
152
Name(StringPiece * result)153 Status Name(StringPiece* result) const override {
154 return base_file_->Name(result);
155 }
156
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)157 Status Read(uint64 offset, size_t n, StringPiece* result,
158 char* scratch) const override {
159 return RetryingUtils::CallWithRetries(
160 [this, offset, n, result, scratch]() {
161 return base_file_->Read(offset, n, result, scratch);
162 },
163 retry_config_);
164 }
165
166 private:
167 std::unique_ptr<RandomAccessFile> base_file_;
168 const RetryConfig retry_config_;
169 };
170
171 class RetryingWritableFile : public WritableFile {
172 public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)173 RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
174 const RetryConfig& retry_config)
175 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
176
~RetryingWritableFile()177 ~RetryingWritableFile() override {
178 // Makes sure the retrying version of Close() is called in the destructor.
179 Close().IgnoreError();
180 }
181
Append(StringPiece data)182 Status Append(StringPiece data) override {
183 return RetryingUtils::CallWithRetries(
184 [this, &data]() { return base_file_->Append(data); }, retry_config_);
185 }
Close()186 Status Close() override {
187 return RetryingUtils::CallWithRetries(
188 [this]() { return base_file_->Close(); }, retry_config_);
189 }
Flush()190 Status Flush() override {
191 return RetryingUtils::CallWithRetries(
192 [this]() { return base_file_->Flush(); }, retry_config_);
193 }
Name(StringPiece * result)194 Status Name(StringPiece* result) const override {
195 return base_file_->Name(result);
196 }
Sync()197 Status Sync() override {
198 return RetryingUtils::CallWithRetries(
199 [this]() { return base_file_->Sync(); }, retry_config_);
200 }
Tell(int64 * position)201 Status Tell(int64* position) override {
202 return RetryingUtils::CallWithRetries(
203 [this, &position]() { return base_file_->Tell(position); },
204 retry_config_);
205 }
206
207 private:
208 std::unique_ptr<WritableFile> base_file_;
209 const RetryConfig retry_config_;
210 };
211
212 } // namespace retrying_internals
213
214 template <typename Underlying>
NewRandomAccessFile(const string & filename,std::unique_ptr<RandomAccessFile> * result)215 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
216 const string& filename, std::unique_ptr<RandomAccessFile>* result) {
217 std::unique_ptr<RandomAccessFile> base_file;
218 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
219 [this, &filename, &base_file]() {
220 return base_file_system_->NewRandomAccessFile(filename, &base_file);
221 },
222 retry_config_));
223 result->reset(new retrying_internals::RetryingRandomAccessFile(
224 std::move(base_file), retry_config_));
225 return Status::OK();
226 }
227
228 template <typename Underlying>
NewWritableFile(const string & filename,std::unique_ptr<WritableFile> * result)229 Status RetryingFileSystem<Underlying>::NewWritableFile(
230 const string& filename, std::unique_ptr<WritableFile>* result) {
231 std::unique_ptr<WritableFile> base_file;
232 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
233 [this, &filename, &base_file]() {
234 return base_file_system_->NewWritableFile(filename, &base_file);
235 },
236 retry_config_));
237 result->reset(new retrying_internals::RetryingWritableFile(
238 std::move(base_file), retry_config_));
239 return Status::OK();
240 }
241
242 template <typename Underlying>
NewAppendableFile(const string & filename,std::unique_ptr<WritableFile> * result)243 Status RetryingFileSystem<Underlying>::NewAppendableFile(
244 const string& filename, std::unique_ptr<WritableFile>* result) {
245 std::unique_ptr<WritableFile> base_file;
246 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
247 [this, &filename, &base_file]() {
248 return base_file_system_->NewAppendableFile(filename, &base_file);
249 },
250 retry_config_));
251 result->reset(new retrying_internals::RetryingWritableFile(
252 std::move(base_file), retry_config_));
253 return Status::OK();
254 }
255
256 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,std::unique_ptr<ReadOnlyMemoryRegion> * result)257 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
258 const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
259 return RetryingUtils::CallWithRetries(
260 [this, &filename, result]() {
261 return base_file_system_->NewReadOnlyMemoryRegionFromFile(filename,
262 result);
263 },
264 retry_config_);
265 }
266
267 } // namespace tensorflow
268
269 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
270