1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18
19 #include <functional>
20 #include <string>
21 #include <vector>
22
23 #include "tensorflow/core/lib/random/random.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/file_system.h"
27 #include "tensorflow/core/platform/retrying_utils.h"
28 #include "tensorflow/core/platform/status.h"
29
30 namespace tensorflow {
31
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35 public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36 RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37 const RetryConfig& retry_config)
38 : base_file_system_(std::move(base_file_system)),
39 retry_config_(retry_config) {}
40
41 TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
42
43 Status NewRandomAccessFile(
44 const string& filename, TransactionToken* token,
45 std::unique_ptr<RandomAccessFile>* result) override;
46
47 Status NewWritableFile(const string& filename, TransactionToken* token,
48 std::unique_ptr<WritableFile>* result) override;
49
50 Status NewAppendableFile(const string& filename, TransactionToken* token,
51 std::unique_ptr<WritableFile>* result) override;
52
53 Status NewReadOnlyMemoryRegionFromFile(
54 const string& filename, TransactionToken* token,
55 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
56
FileExists(const string & fname,TransactionToken * token)57 Status FileExists(const string& fname, TransactionToken* token) override {
58 return RetryingUtils::CallWithRetries(
59 [this, &fname, token]() {
60 return base_file_system_->FileExists(fname, token);
61 },
62 retry_config_);
63 }
64
GetChildren(const string & dir,TransactionToken * token,std::vector<string> * result)65 Status GetChildren(const string& dir, TransactionToken* token,
66 std::vector<string>* result) override {
67 return RetryingUtils::CallWithRetries(
68 [this, &dir, result, token]() {
69 return base_file_system_->GetChildren(dir, token, result);
70 },
71 retry_config_);
72 }
73
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * result)74 Status GetMatchingPaths(const string& pattern, TransactionToken* token,
75 std::vector<string>* result) override {
76 return RetryingUtils::CallWithRetries(
77 [this, &pattern, result, token]() {
78 return base_file_system_->GetMatchingPaths(pattern, token, result);
79 },
80 retry_config_);
81 }
82
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)83 Status Stat(const string& fname, TransactionToken* token,
84 FileStatistics* stat) override {
85 return RetryingUtils::CallWithRetries(
86 [this, &fname, stat, token]() {
87 return base_file_system_->Stat(fname, token, stat);
88 },
89 retry_config_);
90 }
91
DeleteFile(const string & fname,TransactionToken * token)92 Status DeleteFile(const string& fname, TransactionToken* token) override {
93 return RetryingUtils::DeleteWithRetries(
94 [this, &fname, token]() {
95 return base_file_system_->DeleteFile(fname, token);
96 },
97 retry_config_);
98 }
99
CreateDir(const string & dirname,TransactionToken * token)100 Status CreateDir(const string& dirname, TransactionToken* token) override {
101 return RetryingUtils::CallWithRetries(
102 [this, &dirname, token]() {
103 return base_file_system_->CreateDir(dirname, token);
104 },
105 retry_config_);
106 }
107
DeleteDir(const string & dirname,TransactionToken * token)108 Status DeleteDir(const string& dirname, TransactionToken* token) override {
109 return RetryingUtils::DeleteWithRetries(
110 [this, &dirname, token]() {
111 return base_file_system_->DeleteDir(dirname, token);
112 },
113 retry_config_);
114 }
115
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)116 Status GetFileSize(const string& fname, TransactionToken* token,
117 uint64* file_size) override {
118 return RetryingUtils::CallWithRetries(
119 [this, &fname, file_size, token]() {
120 return base_file_system_->GetFileSize(fname, token, file_size);
121 },
122 retry_config_);
123 }
124
RenameFile(const string & src,const string & target,TransactionToken * token)125 Status RenameFile(const string& src, const string& target,
126 TransactionToken* token) override {
127 return RetryingUtils::CallWithRetries(
128 [this, &src, &target, token]() {
129 return base_file_system_->RenameFile(src, target, token);
130 },
131 retry_config_);
132 }
133
IsDirectory(const string & dirname,TransactionToken * token)134 Status IsDirectory(const string& dirname, TransactionToken* token) override {
135 return RetryingUtils::CallWithRetries(
136 [this, &dirname, token]() {
137 return base_file_system_->IsDirectory(dirname, token);
138 },
139 retry_config_);
140 }
141
HasAtomicMove(const string & path,bool * has_atomic_move)142 Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
143 // this method does not need to be retried
144 return base_file_system_->HasAtomicMove(path, has_atomic_move);
145 }
146
DeleteRecursively(const string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)147 Status DeleteRecursively(const string& dirname, TransactionToken* token,
148 int64* undeleted_files,
149 int64* undeleted_dirs) override {
150 return RetryingUtils::DeleteWithRetries(
151 [this, &dirname, token, undeleted_files, undeleted_dirs]() {
152 return base_file_system_->DeleteRecursively(
153 dirname, token, undeleted_files, undeleted_dirs);
154 },
155 retry_config_);
156 }
157
FlushCaches(TransactionToken * token)158 void FlushCaches(TransactionToken* token) override {
159 base_file_system_->FlushCaches(token);
160 }
161
underlying()162 Underlying* underlying() const { return base_file_system_.get(); }
163
164 private:
165 std::unique_ptr<Underlying> base_file_system_;
166 const RetryConfig retry_config_;
167
168 TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
169 };
170
171 namespace retrying_internals {
172
173 class RetryingRandomAccessFile : public RandomAccessFile {
174 public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)175 RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
176 const RetryConfig& retry_config)
177 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
178
Name(StringPiece * result)179 Status Name(StringPiece* result) const override {
180 return base_file_->Name(result);
181 }
182
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)183 Status Read(uint64 offset, size_t n, StringPiece* result,
184 char* scratch) const override {
185 return RetryingUtils::CallWithRetries(
186 [this, offset, n, result, scratch]() {
187 return base_file_->Read(offset, n, result, scratch);
188 },
189 retry_config_);
190 }
191
192 private:
193 std::unique_ptr<RandomAccessFile> base_file_;
194 const RetryConfig retry_config_;
195 };
196
197 class RetryingWritableFile : public WritableFile {
198 public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)199 RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
200 const RetryConfig& retry_config)
201 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
202
~RetryingWritableFile()203 ~RetryingWritableFile() override {
204 // Makes sure the retrying version of Close() is called in the destructor.
205 Close().IgnoreError();
206 }
207
Append(StringPiece data)208 Status Append(StringPiece data) override {
209 return RetryingUtils::CallWithRetries(
210 [this, &data]() { return base_file_->Append(data); }, retry_config_);
211 }
Close()212 Status Close() override {
213 return RetryingUtils::CallWithRetries(
214 [this]() { return base_file_->Close(); }, retry_config_);
215 }
Flush()216 Status Flush() override {
217 return RetryingUtils::CallWithRetries(
218 [this]() { return base_file_->Flush(); }, retry_config_);
219 }
Name(StringPiece * result)220 Status Name(StringPiece* result) const override {
221 return base_file_->Name(result);
222 }
Sync()223 Status Sync() override {
224 return RetryingUtils::CallWithRetries(
225 [this]() { return base_file_->Sync(); }, retry_config_);
226 }
Tell(int64 * position)227 Status Tell(int64* position) override {
228 return RetryingUtils::CallWithRetries(
229 [this, &position]() { return base_file_->Tell(position); },
230 retry_config_);
231 }
232
233 private:
234 std::unique_ptr<WritableFile> base_file_;
235 const RetryConfig retry_config_;
236 };
237
238 } // namespace retrying_internals
239
240 template <typename Underlying>
NewRandomAccessFile(const string & filename,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)241 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
242 const string& filename, TransactionToken* token,
243 std::unique_ptr<RandomAccessFile>* result) {
244 std::unique_ptr<RandomAccessFile> base_file;
245 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
246 [this, &filename, &base_file, token]() {
247 return base_file_system_->NewRandomAccessFile(filename, token,
248 &base_file);
249 },
250 retry_config_));
251 result->reset(new retrying_internals::RetryingRandomAccessFile(
252 std::move(base_file), retry_config_));
253 return Status::OK();
254 }
255
256 template <typename Underlying>
NewWritableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)257 Status RetryingFileSystem<Underlying>::NewWritableFile(
258 const string& filename, TransactionToken* token,
259 std::unique_ptr<WritableFile>* result) {
260 std::unique_ptr<WritableFile> base_file;
261 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
262 [this, &filename, &base_file, token]() {
263 return base_file_system_->NewWritableFile(filename, token, &base_file);
264 },
265 retry_config_));
266 result->reset(new retrying_internals::RetryingWritableFile(
267 std::move(base_file), retry_config_));
268 return Status::OK();
269 }
270
271 template <typename Underlying>
NewAppendableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)272 Status RetryingFileSystem<Underlying>::NewAppendableFile(
273 const string& filename, TransactionToken* token,
274 std::unique_ptr<WritableFile>* result) {
275 std::unique_ptr<WritableFile> base_file;
276 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
277 [this, &filename, &base_file, token]() {
278 return base_file_system_->NewAppendableFile(filename, token,
279 &base_file);
280 },
281 retry_config_));
282 result->reset(new retrying_internals::RetryingWritableFile(
283 std::move(base_file), retry_config_));
284 return Status::OK();
285 }
286
287 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)288 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
289 const string& filename, TransactionToken* token,
290 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
291 return RetryingUtils::CallWithRetries(
292 [this, &filename, result, token]() {
293 return base_file_system_->NewReadOnlyMemoryRegionFromFile(
294 filename, token, result);
295 },
296 retry_config_);
297 }
298
299 } // namespace tensorflow
300
301 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
302