• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/c/experimental/filesystem/modular_filesystem.h"
16 
17 #include <algorithm>
18 #include <string>
19 #include <utility>
20 
21 #include "tensorflow/c/experimental/filesystem/modular_filesystem_registration.h"
22 #include "tensorflow/c/tf_status_helper.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/file_system_helper.h"
25 #include "tensorflow/core/util/ptr_util.h"
26 
27 // TODO(mihaimaruseac): After all filesystems are converted, all calls to
28 // methods from `FileSystem` will have to be replaced to calls to private
29 // methods here, as part of making this class a singleton and the only way to
30 // register/use filesystems.
31 
32 namespace tensorflow {
33 
34 using UniquePtrTo_TF_Status =
35     ::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
36 
NewRandomAccessFile(const std::string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)37 Status ModularFileSystem::NewRandomAccessFile(
38     const std::string& fname, TransactionToken* token,
39     std::unique_ptr<RandomAccessFile>* result) {
40   if (ops_->new_random_access_file == nullptr)
41     return errors::Unimplemented(tensorflow::strings::StrCat(
42         "Filesystem for ", fname, " does not support NewRandomAccessFile()"));
43 
44   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
45   auto file = MakeUnique<TF_RandomAccessFile>();
46   std::string translated_name = TranslateName(fname);
47   ops_->new_random_access_file(filesystem_.get(), translated_name.c_str(),
48                                file.get(), plugin_status.get());
49 
50   if (TF_GetCode(plugin_status.get()) == TF_OK)
51     *result = MakeUnique<ModularRandomAccessFile>(
52         translated_name, std::move(file), random_access_file_ops_.get());
53 
54   return StatusFromTF_Status(plugin_status.get());
55 }
56 
NewWritableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)57 Status ModularFileSystem::NewWritableFile(
58     const std::string& fname, TransactionToken* token,
59     std::unique_ptr<WritableFile>* result) {
60   if (ops_->new_writable_file == nullptr)
61     return errors::Unimplemented(tensorflow::strings::StrCat(
62         "Filesystem for ", fname, " does not support NewWritableFile()"));
63 
64   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
65   auto file = MakeUnique<TF_WritableFile>();
66   std::string translated_name = TranslateName(fname);
67   ops_->new_writable_file(filesystem_.get(), translated_name.c_str(),
68                           file.get(), plugin_status.get());
69 
70   if (TF_GetCode(plugin_status.get()) == TF_OK)
71     *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
72                                               writable_file_ops_.get());
73 
74   return StatusFromTF_Status(plugin_status.get());
75 }
76 
NewAppendableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)77 Status ModularFileSystem::NewAppendableFile(
78     const std::string& fname, TransactionToken* token,
79     std::unique_ptr<WritableFile>* result) {
80   if (ops_->new_appendable_file == nullptr)
81     return errors::Unimplemented(tensorflow::strings::StrCat(
82         "Filesystem for ", fname, " does not support NewAppendableFile()"));
83 
84   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
85   auto file = MakeUnique<TF_WritableFile>();
86   std::string translated_name = TranslateName(fname);
87   ops_->new_appendable_file(filesystem_.get(), translated_name.c_str(),
88                             file.get(), plugin_status.get());
89 
90   if (TF_GetCode(plugin_status.get()) == TF_OK)
91     *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
92                                               writable_file_ops_.get());
93 
94   return StatusFromTF_Status(plugin_status.get());
95 }
96 
NewReadOnlyMemoryRegionFromFile(const std::string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)97 Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
98     const std::string& fname, TransactionToken* token,
99     std::unique_ptr<ReadOnlyMemoryRegion>* result) {
100   if (ops_->new_read_only_memory_region_from_file == nullptr)
101     return errors::Unimplemented(tensorflow::strings::StrCat(
102         "Filesystem for ", fname,
103         " does not support NewReadOnlyMemoryRegionFromFile()"));
104 
105   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
106   auto region = MakeUnique<TF_ReadOnlyMemoryRegion>();
107   std::string translated_name = TranslateName(fname);
108   ops_->new_read_only_memory_region_from_file(
109       filesystem_.get(), translated_name.c_str(), region.get(),
110       plugin_status.get());
111 
112   if (TF_GetCode(plugin_status.get()) == TF_OK)
113     *result = MakeUnique<ModularReadOnlyMemoryRegion>(
114         std::move(region), read_only_memory_region_ops_.get());
115 
116   return StatusFromTF_Status(plugin_status.get());
117 }
118 
FileExists(const std::string & fname,TransactionToken * token)119 Status ModularFileSystem::FileExists(const std::string& fname,
120                                      TransactionToken* token) {
121   if (ops_->path_exists == nullptr)
122     return errors::Unimplemented(tensorflow::strings::StrCat(
123         "Filesystem for ", fname, " does not support FileExists()"));
124 
125   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
126   const std::string translated_name = TranslateName(fname);
127   ops_->path_exists(filesystem_.get(), translated_name.c_str(),
128                     plugin_status.get());
129   return StatusFromTF_Status(plugin_status.get());
130 }
131 
FilesExist(const std::vector<std::string> & files,TransactionToken * token,std::vector<Status> * status)132 bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
133                                    TransactionToken* token,
134                                    std::vector<Status>* status) {
135   if (ops_->paths_exist == nullptr)
136     return FileSystem::FilesExist(files, token, status);
137 
138   std::vector<char*> translated_names;
139   translated_names.reserve(files.size());
140   for (int i = 0; i < files.size(); i++)
141     translated_names.push_back(strdup(TranslateName(files[i]).c_str()));
142 
143   bool result;
144   if (status == nullptr) {
145     result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
146                                files.size(), nullptr);
147   } else {
148     std::vector<TF_Status*> plugin_status;
149     plugin_status.reserve(files.size());
150     for (int i = 0; i < files.size(); i++)
151       plugin_status.push_back(TF_NewStatus());
152     result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
153                                files.size(), plugin_status.data());
154     for (int i = 0; i < files.size(); i++) {
155       status->push_back(StatusFromTF_Status(plugin_status[i]));
156       TF_DeleteStatus(plugin_status[i]);
157     }
158   }
159 
160   for (int i = 0; i < files.size(); i++) free(translated_names[i]);
161 
162   return result;
163 }
164 
GetChildren(const std::string & dir,TransactionToken * token,std::vector<std::string> * result)165 Status ModularFileSystem::GetChildren(const std::string& dir,
166                                       TransactionToken* token,
167                                       std::vector<std::string>* result) {
168   if (ops_->get_children == nullptr)
169     return errors::Unimplemented(tensorflow::strings::StrCat(
170         "Filesystem for ", dir, " does not support GetChildren()"));
171 
172   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
173   std::string translated_name = TranslateName(dir);
174   // Note that `children` is allocated by the plugin and freed by core
175   // TensorFlow, so we need to use `plugin_memory_free_` here.
176   char** children = nullptr;
177   const int num_children =
178       ops_->get_children(filesystem_.get(), translated_name.c_str(), &children,
179                          plugin_status.get());
180   if (num_children >= 0) {
181     for (int i = 0; i < num_children; i++) {
182       result->push_back(std::string(children[i]));
183       plugin_memory_free_(children[i]);
184     }
185     plugin_memory_free_(children);
186   }
187 
188   return StatusFromTF_Status(plugin_status.get());
189 }
190 
GetMatchingPaths(const std::string & pattern,TransactionToken * token,std::vector<std::string> * result)191 Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
192                                            TransactionToken* token,
193                                            std::vector<std::string>* result) {
194   if (ops_->get_matching_paths == nullptr)
195     return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
196 
197   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
198   // Note that `matches` is allocated by the plugin and freed by core
199   // TensorFlow, so we need to use `plugin_memory_free_` here.
200   char** matches = nullptr;
201   const int num_matches = ops_->get_matching_paths(
202       filesystem_.get(), pattern.c_str(), &matches, plugin_status.get());
203   if (num_matches >= 0) {
204     for (int i = 0; i < num_matches; i++) {
205       result->push_back(std::string(matches[i]));
206       plugin_memory_free_(matches[i]);
207     }
208     plugin_memory_free_(matches);
209   }
210 
211   return StatusFromTF_Status(plugin_status.get());
212 }
213 
DeleteFile(const std::string & fname,TransactionToken * token)214 Status ModularFileSystem::DeleteFile(const std::string& fname,
215                                      TransactionToken* token) {
216   if (ops_->delete_file == nullptr)
217     return errors::Unimplemented(tensorflow::strings::StrCat(
218         "Filesystem for ", fname, " does not support DeleteFile()"));
219 
220   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
221   std::string translated_name = TranslateName(fname);
222   ops_->delete_file(filesystem_.get(), translated_name.c_str(),
223                     plugin_status.get());
224   return StatusFromTF_Status(plugin_status.get());
225 }
226 
DeleteRecursively(const std::string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)227 Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
228                                             TransactionToken* token,
229                                             int64* undeleted_files,
230                                             int64* undeleted_dirs) {
231   if (undeleted_files == nullptr || undeleted_dirs == nullptr)
232     return errors::FailedPrecondition(
233         "DeleteRecursively must not be called with `undeleted_files` or "
234         "`undeleted_dirs` set to NULL");
235 
236   if (ops_->delete_recursively == nullptr)
237     return FileSystem::DeleteRecursively(dirname, token, undeleted_files,
238                                          undeleted_dirs);
239 
240   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
241   std::string translated_name = TranslateName(dirname);
242   uint64_t plugin_undeleted_files, plugin_undeleted_dirs;
243   ops_->delete_recursively(filesystem_.get(), translated_name.c_str(),
244                            &plugin_undeleted_files, &plugin_undeleted_dirs,
245                            plugin_status.get());
246   *undeleted_files = plugin_undeleted_files;
247   *undeleted_dirs = plugin_undeleted_dirs;
248   return StatusFromTF_Status(plugin_status.get());
249 }
250 
DeleteDir(const std::string & dirname,TransactionToken * token)251 Status ModularFileSystem::DeleteDir(const std::string& dirname,
252                                     TransactionToken* token) {
253   if (ops_->delete_dir == nullptr)
254     return errors::Unimplemented(tensorflow::strings::StrCat(
255         "Filesystem for ", dirname, " does not support DeleteDir()"));
256 
257   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
258   std::string translated_name = TranslateName(dirname);
259   ops_->delete_dir(filesystem_.get(), translated_name.c_str(),
260                    plugin_status.get());
261   return StatusFromTF_Status(plugin_status.get());
262 }
263 
RecursivelyCreateDir(const std::string & dirname,TransactionToken * token)264 Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
265                                                TransactionToken* token) {
266   if (ops_->recursively_create_dir == nullptr)
267     return FileSystem::RecursivelyCreateDir(dirname, token);
268 
269   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
270   std::string translated_name = TranslateName(dirname);
271   ops_->recursively_create_dir(filesystem_.get(), translated_name.c_str(),
272                                plugin_status.get());
273   return StatusFromTF_Status(plugin_status.get());
274 }
275 
CreateDir(const std::string & dirname,TransactionToken * token)276 Status ModularFileSystem::CreateDir(const std::string& dirname,
277                                     TransactionToken* token) {
278   if (ops_->create_dir == nullptr)
279     return errors::Unimplemented(tensorflow::strings::StrCat(
280         "Filesystem for ", dirname, " does not support CreateDir()"));
281 
282   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
283   std::string translated_name = TranslateName(dirname);
284   ops_->create_dir(filesystem_.get(), translated_name.c_str(),
285                    plugin_status.get());
286   return StatusFromTF_Status(plugin_status.get());
287 }
288 
Stat(const std::string & fname,TransactionToken * token,FileStatistics * stat)289 Status ModularFileSystem::Stat(const std::string& fname,
290                                TransactionToken* token, FileStatistics* stat) {
291   if (ops_->stat == nullptr)
292     return errors::Unimplemented(tensorflow::strings::StrCat(
293         "Filesystem for ", fname, " does not support Stat()"));
294 
295   if (stat == nullptr)
296     return errors::InvalidArgument("FileStatistics pointer must not be NULL");
297 
298   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
299   std::string translated_name = TranslateName(fname);
300   TF_FileStatistics stats;
301   ops_->stat(filesystem_.get(), translated_name.c_str(), &stats,
302              plugin_status.get());
303 
304   if (TF_GetCode(plugin_status.get()) == TF_OK) {
305     stat->length = stats.length;
306     stat->mtime_nsec = stats.mtime_nsec;
307     stat->is_directory = stats.is_directory;
308   }
309 
310   return StatusFromTF_Status(plugin_status.get());
311 }
312 
IsDirectory(const std::string & name,TransactionToken * token)313 Status ModularFileSystem::IsDirectory(const std::string& name,
314                                       TransactionToken* token) {
315   if (ops_->is_directory == nullptr)
316     return FileSystem::IsDirectory(name, token);
317 
318   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
319   std::string translated_name = TranslateName(name);
320   ops_->is_directory(filesystem_.get(), translated_name.c_str(),
321                      plugin_status.get());
322   return StatusFromTF_Status(plugin_status.get());
323 }
324 
GetFileSize(const std::string & fname,TransactionToken * token,uint64 * file_size)325 Status ModularFileSystem::GetFileSize(const std::string& fname,
326                                       TransactionToken* token,
327                                       uint64* file_size) {
328   if (ops_->get_file_size == nullptr) {
329     FileStatistics stat;
330     Status status = Stat(fname, &stat);
331     if (!status.ok()) return status;
332     if (stat.is_directory)
333       return errors::FailedPrecondition("Called GetFileSize on a directory");
334 
335     *file_size = stat.length;
336     return status;
337   }
338 
339   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
340   std::string translated_name = TranslateName(fname);
341   *file_size = ops_->get_file_size(filesystem_.get(), translated_name.c_str(),
342                                    plugin_status.get());
343   return StatusFromTF_Status(plugin_status.get());
344 }
345 
RenameFile(const std::string & src,const std::string & target,TransactionToken * token)346 Status ModularFileSystem::RenameFile(const std::string& src,
347                                      const std::string& target,
348                                      TransactionToken* token) {
349   if (ops_->rename_file == nullptr) {
350     Status status = CopyFile(src, target);
351     if (status.ok()) status = DeleteFile(src);
352     return status;
353   }
354 
355   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
356   std::string translated_src = TranslateName(src);
357   std::string translated_target = TranslateName(target);
358   ops_->rename_file(filesystem_.get(), translated_src.c_str(),
359                     translated_target.c_str(), plugin_status.get());
360   return StatusFromTF_Status(plugin_status.get());
361 }
362 
CopyFile(const std::string & src,const std::string & target,TransactionToken * token)363 Status ModularFileSystem::CopyFile(const std::string& src,
364                                    const std::string& target,
365                                    TransactionToken* token) {
366   if (ops_->copy_file == nullptr)
367     return FileSystem::CopyFile(src, target, token);
368 
369   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
370   std::string translated_src = TranslateName(src);
371   std::string translated_target = TranslateName(target);
372   ops_->copy_file(filesystem_.get(), translated_src.c_str(),
373                   translated_target.c_str(), plugin_status.get());
374   return StatusFromTF_Status(plugin_status.get());
375 }
376 
TranslateName(const std::string & name) const377 std::string ModularFileSystem::TranslateName(const std::string& name) const {
378   if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
379 
380   char* p = ops_->translate_name(filesystem_.get(), name.c_str());
381   CHECK(p != nullptr) << "TranslateName(" << name << ") returned nullptr";
382 
383   std::string ret(p);
384   // Since `p` is allocated by plugin, free it using plugin's method.
385   plugin_memory_free_(p);
386   return ret;
387 }
388 
FlushCaches(TransactionToken * token)389 void ModularFileSystem::FlushCaches(TransactionToken* token) {
390   if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
391 }
392 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const393 Status ModularRandomAccessFile::Read(uint64 offset, size_t n,
394                                      StringPiece* result, char* scratch) const {
395   if (ops_->read == nullptr)
396     return errors::Unimplemented(
397         tensorflow::strings::StrCat("Read() not implemented for ", filename_));
398 
399   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
400   int64_t read =
401       ops_->read(file_.get(), offset, n, scratch, plugin_status.get());
402   if (read > 0) *result = StringPiece(scratch, read);
403   return StatusFromTF_Status(plugin_status.get());
404 }
405 
Name(StringPiece * result) const406 Status ModularRandomAccessFile::Name(StringPiece* result) const {
407   *result = filename_;
408   return Status::OK();
409 }
410 
Append(StringPiece data)411 Status ModularWritableFile::Append(StringPiece data) {
412   if (ops_->append == nullptr)
413     return errors::Unimplemented(tensorflow::strings::StrCat(
414         "Append() not implemented for ", filename_));
415 
416   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
417   ops_->append(file_.get(), data.data(), data.size(), plugin_status.get());
418   return StatusFromTF_Status(plugin_status.get());
419 }
420 
Close()421 Status ModularWritableFile::Close() {
422   if (ops_->close == nullptr)
423     return errors::Unimplemented(
424         tensorflow::strings::StrCat("Close() not implemented for ", filename_));
425 
426   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
427   ops_->close(file_.get(), plugin_status.get());
428   return StatusFromTF_Status(plugin_status.get());
429 }
430 
Flush()431 Status ModularWritableFile::Flush() {
432   if (ops_->flush == nullptr) return Status::OK();
433 
434   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
435   ops_->flush(file_.get(), plugin_status.get());
436   return StatusFromTF_Status(plugin_status.get());
437 }
438 
Sync()439 Status ModularWritableFile::Sync() {
440   if (ops_->sync == nullptr) return Flush();
441 
442   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
443   ops_->sync(file_.get(), plugin_status.get());
444   return StatusFromTF_Status(plugin_status.get());
445 }
446 
Name(StringPiece * result) const447 Status ModularWritableFile::Name(StringPiece* result) const {
448   *result = filename_;
449   return Status::OK();
450 }
451 
Tell(int64 * position)452 Status ModularWritableFile::Tell(int64* position) {
453   if (ops_->tell == nullptr)
454     return errors::Unimplemented(
455         tensorflow::strings::StrCat("Tell() not implemented for ", filename_));
456 
457   UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
458   *position = ops_->tell(file_.get(), plugin_status.get());
459   return StatusFromTF_Status(plugin_status.get());
460 }
461 
RegisterFilesystemPlugin(const std::string & dso_path)462 Status RegisterFilesystemPlugin(const std::string& dso_path) {
463   // Step 1: Load plugin
464   Env* env = Env::Default();
465   void* dso_handle;
466   TF_RETURN_IF_ERROR(env->LoadDynamicLibrary(dso_path.c_str(), &dso_handle));
467 
468   // Step 2: Load symbol for `TF_InitPlugin`
469   void* dso_symbol;
470   TF_RETURN_IF_ERROR(
471       env->GetSymbolFromLibrary(dso_handle, "TF_InitPlugin", &dso_symbol));
472 
473   // Step 3: Call `TF_InitPlugin`
474   TF_FilesystemPluginInfo info;
475   memset(&info, 0, sizeof(info));
476   auto TF_InitPlugin =
477       reinterpret_cast<int (*)(TF_FilesystemPluginInfo*)>(dso_symbol);
478   TF_InitPlugin(&info);
479 
480   // Step 4: Do the actual registration
481   return filesystem_registration::RegisterFilesystemPluginImpl(&info);
482 }
483 
484 }  // namespace tensorflow
485