• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18 
19 #include <functional>
20 #include <string>
21 #include <vector>
22 
23 #include "tensorflow/core/lib/random/random.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/file_system.h"
27 #include "tensorflow/core/platform/retrying_utils.h"
28 #include "tensorflow/core/platform/status.h"
29 
30 namespace tensorflow {
31 
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35  public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36   RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37                      const RetryConfig& retry_config)
38       : base_file_system_(std::move(base_file_system)),
39         retry_config_(retry_config) {}
40 
41   TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
42 
43   Status NewRandomAccessFile(
44       const string& filename, TransactionToken* token,
45       std::unique_ptr<RandomAccessFile>* result) override;
46 
47   Status NewWritableFile(const string& filename, TransactionToken* token,
48                          std::unique_ptr<WritableFile>* result) override;
49 
50   Status NewAppendableFile(const string& filename, TransactionToken* token,
51                            std::unique_ptr<WritableFile>* result) override;
52 
53   Status NewReadOnlyMemoryRegionFromFile(
54       const string& filename, TransactionToken* token,
55       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
56 
FileExists(const string & fname,TransactionToken * token)57   Status FileExists(const string& fname, TransactionToken* token) override {
58     return RetryingUtils::CallWithRetries(
59         [this, &fname, token]() {
60           return base_file_system_->FileExists(fname, token);
61         },
62         retry_config_);
63   }
64 
GetChildren(const string & dir,TransactionToken * token,std::vector<string> * result)65   Status GetChildren(const string& dir, TransactionToken* token,
66                      std::vector<string>* result) override {
67     return RetryingUtils::CallWithRetries(
68         [this, &dir, result, token]() {
69           return base_file_system_->GetChildren(dir, token, result);
70         },
71         retry_config_);
72   }
73 
GetMatchingPaths(const string & pattern,TransactionToken * token,std::vector<string> * result)74   Status GetMatchingPaths(const string& pattern, TransactionToken* token,
75                           std::vector<string>* result) override {
76     return RetryingUtils::CallWithRetries(
77         [this, &pattern, result, token]() {
78           return base_file_system_->GetMatchingPaths(pattern, token, result);
79         },
80         retry_config_);
81   }
82 
Stat(const string & fname,TransactionToken * token,FileStatistics * stat)83   Status Stat(const string& fname, TransactionToken* token,
84               FileStatistics* stat) override {
85     return RetryingUtils::CallWithRetries(
86         [this, &fname, stat, token]() {
87           return base_file_system_->Stat(fname, token, stat);
88         },
89         retry_config_);
90   }
91 
DeleteFile(const string & fname,TransactionToken * token)92   Status DeleteFile(const string& fname, TransactionToken* token) override {
93     return RetryingUtils::DeleteWithRetries(
94         [this, &fname, token]() {
95           return base_file_system_->DeleteFile(fname, token);
96         },
97         retry_config_);
98   }
99 
CreateDir(const string & dirname,TransactionToken * token)100   Status CreateDir(const string& dirname, TransactionToken* token) override {
101     return RetryingUtils::CallWithRetries(
102         [this, &dirname, token]() {
103           return base_file_system_->CreateDir(dirname, token);
104         },
105         retry_config_);
106   }
107 
DeleteDir(const string & dirname,TransactionToken * token)108   Status DeleteDir(const string& dirname, TransactionToken* token) override {
109     return RetryingUtils::DeleteWithRetries(
110         [this, &dirname, token]() {
111           return base_file_system_->DeleteDir(dirname, token);
112         },
113         retry_config_);
114   }
115 
GetFileSize(const string & fname,TransactionToken * token,uint64 * file_size)116   Status GetFileSize(const string& fname, TransactionToken* token,
117                      uint64* file_size) override {
118     return RetryingUtils::CallWithRetries(
119         [this, &fname, file_size, token]() {
120           return base_file_system_->GetFileSize(fname, token, file_size);
121         },
122         retry_config_);
123   }
124 
RenameFile(const string & src,const string & target,TransactionToken * token)125   Status RenameFile(const string& src, const string& target,
126                     TransactionToken* token) override {
127     return RetryingUtils::CallWithRetries(
128         [this, &src, &target, token]() {
129           return base_file_system_->RenameFile(src, target, token);
130         },
131         retry_config_);
132   }
133 
IsDirectory(const string & dirname,TransactionToken * token)134   Status IsDirectory(const string& dirname, TransactionToken* token) override {
135     return RetryingUtils::CallWithRetries(
136         [this, &dirname, token]() {
137           return base_file_system_->IsDirectory(dirname, token);
138         },
139         retry_config_);
140   }
141 
HasAtomicMove(const string & path,bool * has_atomic_move)142   Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
143     // this method does not need to be retried
144     return base_file_system_->HasAtomicMove(path, has_atomic_move);
145   }
146 
DeleteRecursively(const string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)147   Status DeleteRecursively(const string& dirname, TransactionToken* token,
148                            int64_t* undeleted_files,
149                            int64_t* undeleted_dirs) override {
150     return RetryingUtils::DeleteWithRetries(
151         [this, &dirname, token, undeleted_files, undeleted_dirs]() {
152           return base_file_system_->DeleteRecursively(
153               dirname, token, undeleted_files, undeleted_dirs);
154         },
155         retry_config_);
156   }
157 
FlushCaches(TransactionToken * token)158   void FlushCaches(TransactionToken* token) override {
159     base_file_system_->FlushCaches(token);
160   }
161 
underlying()162   Underlying* underlying() const { return base_file_system_.get(); }
163 
164  private:
165   std::unique_ptr<Underlying> base_file_system_;
166   const RetryConfig retry_config_;
167 
168   TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
169 };
170 
171 namespace retrying_internals {
172 
173 class RetryingRandomAccessFile : public RandomAccessFile {
174  public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)175   RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
176                            const RetryConfig& retry_config)
177       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
178 
Name(StringPiece * result)179   Status Name(StringPiece* result) const override {
180     return base_file_->Name(result);
181   }
182 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)183   Status Read(uint64 offset, size_t n, StringPiece* result,
184               char* scratch) const override {
185     return RetryingUtils::CallWithRetries(
186         [this, offset, n, result, scratch]() {
187           return base_file_->Read(offset, n, result, scratch);
188         },
189         retry_config_);
190   }
191 
192  private:
193   std::unique_ptr<RandomAccessFile> base_file_;
194   const RetryConfig retry_config_;
195 };
196 
197 class RetryingWritableFile : public WritableFile {
198  public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)199   RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
200                        const RetryConfig& retry_config)
201       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
202 
~RetryingWritableFile()203   ~RetryingWritableFile() override {
204     // Makes sure the retrying version of Close() is called in the destructor.
205     Close().IgnoreError();
206   }
207 
Append(StringPiece data)208   Status Append(StringPiece data) override {
209     return RetryingUtils::CallWithRetries(
210         [this, &data]() { return base_file_->Append(data); }, retry_config_);
211   }
Close()212   Status Close() override {
213     return RetryingUtils::CallWithRetries(
214         [this]() { return base_file_->Close(); }, retry_config_);
215   }
Flush()216   Status Flush() override {
217     return RetryingUtils::CallWithRetries(
218         [this]() { return base_file_->Flush(); }, retry_config_);
219   }
Name(StringPiece * result)220   Status Name(StringPiece* result) const override {
221     return base_file_->Name(result);
222   }
Sync()223   Status Sync() override {
224     return RetryingUtils::CallWithRetries(
225         [this]() { return base_file_->Sync(); }, retry_config_);
226   }
Tell(int64_t * position)227   Status Tell(int64_t* position) override {
228     return RetryingUtils::CallWithRetries(
229         [this, &position]() { return base_file_->Tell(position); },
230         retry_config_);
231   }
232 
233  private:
234   std::unique_ptr<WritableFile> base_file_;
235   const RetryConfig retry_config_;
236 };
237 
238 }  // namespace retrying_internals
239 
240 template <typename Underlying>
NewRandomAccessFile(const string & filename,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)241 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
242     const string& filename, TransactionToken* token,
243     std::unique_ptr<RandomAccessFile>* result) {
244   std::unique_ptr<RandomAccessFile> base_file;
245   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
246       [this, &filename, &base_file, token]() {
247         return base_file_system_->NewRandomAccessFile(filename, token,
248                                                       &base_file);
249       },
250       retry_config_));
251   result->reset(new retrying_internals::RetryingRandomAccessFile(
252       std::move(base_file), retry_config_));
253   return OkStatus();
254 }
255 
256 template <typename Underlying>
NewWritableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)257 Status RetryingFileSystem<Underlying>::NewWritableFile(
258     const string& filename, TransactionToken* token,
259     std::unique_ptr<WritableFile>* result) {
260   std::unique_ptr<WritableFile> base_file;
261   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
262       [this, &filename, &base_file, token]() {
263         return base_file_system_->NewWritableFile(filename, token, &base_file);
264       },
265       retry_config_));
266   result->reset(new retrying_internals::RetryingWritableFile(
267       std::move(base_file), retry_config_));
268   return OkStatus();
269 }
270 
271 template <typename Underlying>
NewAppendableFile(const string & filename,TransactionToken * token,std::unique_ptr<WritableFile> * result)272 Status RetryingFileSystem<Underlying>::NewAppendableFile(
273     const string& filename, TransactionToken* token,
274     std::unique_ptr<WritableFile>* result) {
275   std::unique_ptr<WritableFile> base_file;
276   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
277       [this, &filename, &base_file, token]() {
278         return base_file_system_->NewAppendableFile(filename, token,
279                                                     &base_file);
280       },
281       retry_config_));
282   result->reset(new retrying_internals::RetryingWritableFile(
283       std::move(base_file), retry_config_));
284   return OkStatus();
285 }
286 
287 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)288 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
289     const string& filename, TransactionToken* token,
290     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
291   return RetryingUtils::CallWithRetries(
292       [this, &filename, result, token]() {
293         return base_file_system_->NewReadOnlyMemoryRegionFromFile(
294             filename, token, result);
295       },
296       retry_config_);
297 }
298 
299 }  // namespace tensorflow
300 
301 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
302