1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/c/experimental/filesystem/modular_filesystem.h"
16
17 #include <algorithm>
18 #include <string>
19 #include <utility>
20
21 #include "tensorflow/c/experimental/filesystem/modular_filesystem_registration.h"
22 #include "tensorflow/c/tf_status_helper.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/file_system_helper.h"
25 #include "tensorflow/core/util/ptr_util.h"
26
27 // TODO(mihaimaruseac): After all filesystems are converted, all calls to
28 // methods from `FileSystem` will have to be replaced to calls to private
29 // methods here, as part of making this class a singleton and the only way to
30 // register/use filesystems.
31
32 namespace tensorflow {
33
34 using UniquePtrTo_TF_Status =
35 ::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
36
NewRandomAccessFile(const std::string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)37 Status ModularFileSystem::NewRandomAccessFile(
38 const std::string& fname, TransactionToken* token,
39 std::unique_ptr<RandomAccessFile>* result) {
40 if (ops_->new_random_access_file == nullptr)
41 return errors::Unimplemented(tensorflow::strings::StrCat(
42 "Filesystem for ", fname, " does not support NewRandomAccessFile()"));
43
44 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
45 auto file = MakeUnique<TF_RandomAccessFile>();
46 std::string translated_name = TranslateName(fname);
47 ops_->new_random_access_file(filesystem_.get(), translated_name.c_str(),
48 file.get(), plugin_status.get());
49
50 if (TF_GetCode(plugin_status.get()) == TF_OK)
51 *result = MakeUnique<ModularRandomAccessFile>(
52 translated_name, std::move(file), random_access_file_ops_.get());
53
54 return StatusFromTF_Status(plugin_status.get());
55 }
56
NewWritableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)57 Status ModularFileSystem::NewWritableFile(
58 const std::string& fname, TransactionToken* token,
59 std::unique_ptr<WritableFile>* result) {
60 if (ops_->new_writable_file == nullptr)
61 return errors::Unimplemented(tensorflow::strings::StrCat(
62 "Filesystem for ", fname, " does not support NewWritableFile()"));
63
64 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
65 auto file = MakeUnique<TF_WritableFile>();
66 std::string translated_name = TranslateName(fname);
67 ops_->new_writable_file(filesystem_.get(), translated_name.c_str(),
68 file.get(), plugin_status.get());
69
70 if (TF_GetCode(plugin_status.get()) == TF_OK)
71 *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
72 writable_file_ops_.get());
73
74 return StatusFromTF_Status(plugin_status.get());
75 }
76
NewAppendableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)77 Status ModularFileSystem::NewAppendableFile(
78 const std::string& fname, TransactionToken* token,
79 std::unique_ptr<WritableFile>* result) {
80 if (ops_->new_appendable_file == nullptr)
81 return errors::Unimplemented(tensorflow::strings::StrCat(
82 "Filesystem for ", fname, " does not support NewAppendableFile()"));
83
84 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
85 auto file = MakeUnique<TF_WritableFile>();
86 std::string translated_name = TranslateName(fname);
87 ops_->new_appendable_file(filesystem_.get(), translated_name.c_str(),
88 file.get(), plugin_status.get());
89
90 if (TF_GetCode(plugin_status.get()) == TF_OK)
91 *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
92 writable_file_ops_.get());
93
94 return StatusFromTF_Status(plugin_status.get());
95 }
96
NewReadOnlyMemoryRegionFromFile(const std::string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)97 Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
98 const std::string& fname, TransactionToken* token,
99 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
100 if (ops_->new_read_only_memory_region_from_file == nullptr)
101 return errors::Unimplemented(tensorflow::strings::StrCat(
102 "Filesystem for ", fname,
103 " does not support NewReadOnlyMemoryRegionFromFile()"));
104
105 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
106 auto region = MakeUnique<TF_ReadOnlyMemoryRegion>();
107 std::string translated_name = TranslateName(fname);
108 ops_->new_read_only_memory_region_from_file(
109 filesystem_.get(), translated_name.c_str(), region.get(),
110 plugin_status.get());
111
112 if (TF_GetCode(plugin_status.get()) == TF_OK)
113 *result = MakeUnique<ModularReadOnlyMemoryRegion>(
114 std::move(region), read_only_memory_region_ops_.get());
115
116 return StatusFromTF_Status(plugin_status.get());
117 }
118
FileExists(const std::string & fname,TransactionToken * token)119 Status ModularFileSystem::FileExists(const std::string& fname,
120 TransactionToken* token) {
121 if (ops_->path_exists == nullptr)
122 return errors::Unimplemented(tensorflow::strings::StrCat(
123 "Filesystem for ", fname, " does not support FileExists()"));
124
125 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
126 const std::string translated_name = TranslateName(fname);
127 ops_->path_exists(filesystem_.get(), translated_name.c_str(),
128 plugin_status.get());
129 return StatusFromTF_Status(plugin_status.get());
130 }
131
FilesExist(const std::vector<std::string> & files,TransactionToken * token,std::vector<Status> * status)132 bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
133 TransactionToken* token,
134 std::vector<Status>* status) {
135 if (ops_->paths_exist == nullptr)
136 return FileSystem::FilesExist(files, token, status);
137
138 std::vector<char*> translated_names;
139 translated_names.reserve(files.size());
140 for (int i = 0; i < files.size(); i++)
141 translated_names.push_back(strdup(TranslateName(files[i]).c_str()));
142
143 bool result;
144 if (status == nullptr) {
145 result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
146 files.size(), nullptr);
147 } else {
148 std::vector<TF_Status*> plugin_status;
149 plugin_status.reserve(files.size());
150 for (int i = 0; i < files.size(); i++)
151 plugin_status.push_back(TF_NewStatus());
152 result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
153 files.size(), plugin_status.data());
154 for (int i = 0; i < files.size(); i++) {
155 status->push_back(StatusFromTF_Status(plugin_status[i]));
156 TF_DeleteStatus(plugin_status[i]);
157 }
158 }
159
160 for (int i = 0; i < files.size(); i++) free(translated_names[i]);
161
162 return result;
163 }
164
GetChildren(const std::string & dir,TransactionToken * token,std::vector<std::string> * result)165 Status ModularFileSystem::GetChildren(const std::string& dir,
166 TransactionToken* token,
167 std::vector<std::string>* result) {
168 if (ops_->get_children == nullptr)
169 return errors::Unimplemented(tensorflow::strings::StrCat(
170 "Filesystem for ", dir, " does not support GetChildren()"));
171
172 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
173 std::string translated_name = TranslateName(dir);
174 // Note that `children` is allocated by the plugin and freed by core
175 // TensorFlow, so we need to use `plugin_memory_free_` here.
176 char** children = nullptr;
177 const int num_children =
178 ops_->get_children(filesystem_.get(), translated_name.c_str(), &children,
179 plugin_status.get());
180 if (num_children >= 0) {
181 for (int i = 0; i < num_children; i++) {
182 result->push_back(std::string(children[i]));
183 plugin_memory_free_(children[i]);
184 }
185 plugin_memory_free_(children);
186 }
187
188 return StatusFromTF_Status(plugin_status.get());
189 }
190
GetMatchingPaths(const std::string & pattern,TransactionToken * token,std::vector<std::string> * result)191 Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
192 TransactionToken* token,
193 std::vector<std::string>* result) {
194 if (ops_->get_matching_paths == nullptr)
195 return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
196
197 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
198 // Note that `matches` is allocated by the plugin and freed by core
199 // TensorFlow, so we need to use `plugin_memory_free_` here.
200 char** matches = nullptr;
201 const int num_matches = ops_->get_matching_paths(
202 filesystem_.get(), pattern.c_str(), &matches, plugin_status.get());
203 if (num_matches >= 0) {
204 for (int i = 0; i < num_matches; i++) {
205 result->push_back(std::string(matches[i]));
206 plugin_memory_free_(matches[i]);
207 }
208 plugin_memory_free_(matches);
209 }
210
211 return StatusFromTF_Status(plugin_status.get());
212 }
213
DeleteFile(const std::string & fname,TransactionToken * token)214 Status ModularFileSystem::DeleteFile(const std::string& fname,
215 TransactionToken* token) {
216 if (ops_->delete_file == nullptr)
217 return errors::Unimplemented(tensorflow::strings::StrCat(
218 "Filesystem for ", fname, " does not support DeleteFile()"));
219
220 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
221 std::string translated_name = TranslateName(fname);
222 ops_->delete_file(filesystem_.get(), translated_name.c_str(),
223 plugin_status.get());
224 return StatusFromTF_Status(plugin_status.get());
225 }
226
DeleteRecursively(const std::string & dirname,TransactionToken * token,int64 * undeleted_files,int64 * undeleted_dirs)227 Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
228 TransactionToken* token,
229 int64* undeleted_files,
230 int64* undeleted_dirs) {
231 if (undeleted_files == nullptr || undeleted_dirs == nullptr)
232 return errors::FailedPrecondition(
233 "DeleteRecursively must not be called with `undeleted_files` or "
234 "`undeleted_dirs` set to NULL");
235
236 if (ops_->delete_recursively == nullptr)
237 return FileSystem::DeleteRecursively(dirname, token, undeleted_files,
238 undeleted_dirs);
239
240 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
241 std::string translated_name = TranslateName(dirname);
242 uint64_t plugin_undeleted_files, plugin_undeleted_dirs;
243 ops_->delete_recursively(filesystem_.get(), translated_name.c_str(),
244 &plugin_undeleted_files, &plugin_undeleted_dirs,
245 plugin_status.get());
246 *undeleted_files = plugin_undeleted_files;
247 *undeleted_dirs = plugin_undeleted_dirs;
248 return StatusFromTF_Status(plugin_status.get());
249 }
250
DeleteDir(const std::string & dirname,TransactionToken * token)251 Status ModularFileSystem::DeleteDir(const std::string& dirname,
252 TransactionToken* token) {
253 if (ops_->delete_dir == nullptr)
254 return errors::Unimplemented(tensorflow::strings::StrCat(
255 "Filesystem for ", dirname, " does not support DeleteDir()"));
256
257 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
258 std::string translated_name = TranslateName(dirname);
259 ops_->delete_dir(filesystem_.get(), translated_name.c_str(),
260 plugin_status.get());
261 return StatusFromTF_Status(plugin_status.get());
262 }
263
RecursivelyCreateDir(const std::string & dirname,TransactionToken * token)264 Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
265 TransactionToken* token) {
266 if (ops_->recursively_create_dir == nullptr)
267 return FileSystem::RecursivelyCreateDir(dirname, token);
268
269 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
270 std::string translated_name = TranslateName(dirname);
271 ops_->recursively_create_dir(filesystem_.get(), translated_name.c_str(),
272 plugin_status.get());
273 return StatusFromTF_Status(plugin_status.get());
274 }
275
CreateDir(const std::string & dirname,TransactionToken * token)276 Status ModularFileSystem::CreateDir(const std::string& dirname,
277 TransactionToken* token) {
278 if (ops_->create_dir == nullptr)
279 return errors::Unimplemented(tensorflow::strings::StrCat(
280 "Filesystem for ", dirname, " does not support CreateDir()"));
281
282 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
283 std::string translated_name = TranslateName(dirname);
284 ops_->create_dir(filesystem_.get(), translated_name.c_str(),
285 plugin_status.get());
286 return StatusFromTF_Status(plugin_status.get());
287 }
288
Stat(const std::string & fname,TransactionToken * token,FileStatistics * stat)289 Status ModularFileSystem::Stat(const std::string& fname,
290 TransactionToken* token, FileStatistics* stat) {
291 if (ops_->stat == nullptr)
292 return errors::Unimplemented(tensorflow::strings::StrCat(
293 "Filesystem for ", fname, " does not support Stat()"));
294
295 if (stat == nullptr)
296 return errors::InvalidArgument("FileStatistics pointer must not be NULL");
297
298 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
299 std::string translated_name = TranslateName(fname);
300 TF_FileStatistics stats;
301 ops_->stat(filesystem_.get(), translated_name.c_str(), &stats,
302 plugin_status.get());
303
304 if (TF_GetCode(plugin_status.get()) == TF_OK) {
305 stat->length = stats.length;
306 stat->mtime_nsec = stats.mtime_nsec;
307 stat->is_directory = stats.is_directory;
308 }
309
310 return StatusFromTF_Status(plugin_status.get());
311 }
312
IsDirectory(const std::string & name,TransactionToken * token)313 Status ModularFileSystem::IsDirectory(const std::string& name,
314 TransactionToken* token) {
315 if (ops_->is_directory == nullptr)
316 return FileSystem::IsDirectory(name, token);
317
318 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
319 std::string translated_name = TranslateName(name);
320 ops_->is_directory(filesystem_.get(), translated_name.c_str(),
321 plugin_status.get());
322 return StatusFromTF_Status(plugin_status.get());
323 }
324
GetFileSize(const std::string & fname,TransactionToken * token,uint64 * file_size)325 Status ModularFileSystem::GetFileSize(const std::string& fname,
326 TransactionToken* token,
327 uint64* file_size) {
328 if (ops_->get_file_size == nullptr) {
329 FileStatistics stat;
330 Status status = Stat(fname, &stat);
331 if (!status.ok()) return status;
332 if (stat.is_directory)
333 return errors::FailedPrecondition("Called GetFileSize on a directory");
334
335 *file_size = stat.length;
336 return status;
337 }
338
339 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
340 std::string translated_name = TranslateName(fname);
341 *file_size = ops_->get_file_size(filesystem_.get(), translated_name.c_str(),
342 plugin_status.get());
343 return StatusFromTF_Status(plugin_status.get());
344 }
345
RenameFile(const std::string & src,const std::string & target,TransactionToken * token)346 Status ModularFileSystem::RenameFile(const std::string& src,
347 const std::string& target,
348 TransactionToken* token) {
349 if (ops_->rename_file == nullptr) {
350 Status status = CopyFile(src, target);
351 if (status.ok()) status = DeleteFile(src);
352 return status;
353 }
354
355 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
356 std::string translated_src = TranslateName(src);
357 std::string translated_target = TranslateName(target);
358 ops_->rename_file(filesystem_.get(), translated_src.c_str(),
359 translated_target.c_str(), plugin_status.get());
360 return StatusFromTF_Status(plugin_status.get());
361 }
362
CopyFile(const std::string & src,const std::string & target,TransactionToken * token)363 Status ModularFileSystem::CopyFile(const std::string& src,
364 const std::string& target,
365 TransactionToken* token) {
366 if (ops_->copy_file == nullptr)
367 return FileSystem::CopyFile(src, target, token);
368
369 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
370 std::string translated_src = TranslateName(src);
371 std::string translated_target = TranslateName(target);
372 ops_->copy_file(filesystem_.get(), translated_src.c_str(),
373 translated_target.c_str(), plugin_status.get());
374 return StatusFromTF_Status(plugin_status.get());
375 }
376
TranslateName(const std::string & name) const377 std::string ModularFileSystem::TranslateName(const std::string& name) const {
378 if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
379
380 char* p = ops_->translate_name(filesystem_.get(), name.c_str());
381 CHECK(p != nullptr) << "TranslateName(" << name << ") returned nullptr";
382
383 std::string ret(p);
384 // Since `p` is allocated by plugin, free it using plugin's method.
385 plugin_memory_free_(p);
386 return ret;
387 }
388
FlushCaches(TransactionToken * token)389 void ModularFileSystem::FlushCaches(TransactionToken* token) {
390 if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
391 }
392
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const393 Status ModularRandomAccessFile::Read(uint64 offset, size_t n,
394 StringPiece* result, char* scratch) const {
395 if (ops_->read == nullptr)
396 return errors::Unimplemented(
397 tensorflow::strings::StrCat("Read() not implemented for ", filename_));
398
399 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
400 int64_t read =
401 ops_->read(file_.get(), offset, n, scratch, plugin_status.get());
402 if (read > 0) *result = StringPiece(scratch, read);
403 return StatusFromTF_Status(plugin_status.get());
404 }
405
Name(StringPiece * result) const406 Status ModularRandomAccessFile::Name(StringPiece* result) const {
407 *result = filename_;
408 return Status::OK();
409 }
410
Append(StringPiece data)411 Status ModularWritableFile::Append(StringPiece data) {
412 if (ops_->append == nullptr)
413 return errors::Unimplemented(tensorflow::strings::StrCat(
414 "Append() not implemented for ", filename_));
415
416 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
417 ops_->append(file_.get(), data.data(), data.size(), plugin_status.get());
418 return StatusFromTF_Status(plugin_status.get());
419 }
420
Close()421 Status ModularWritableFile::Close() {
422 if (ops_->close == nullptr)
423 return errors::Unimplemented(
424 tensorflow::strings::StrCat("Close() not implemented for ", filename_));
425
426 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
427 ops_->close(file_.get(), plugin_status.get());
428 return StatusFromTF_Status(plugin_status.get());
429 }
430
Flush()431 Status ModularWritableFile::Flush() {
432 if (ops_->flush == nullptr) return Status::OK();
433
434 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
435 ops_->flush(file_.get(), plugin_status.get());
436 return StatusFromTF_Status(plugin_status.get());
437 }
438
Sync()439 Status ModularWritableFile::Sync() {
440 if (ops_->sync == nullptr) return Flush();
441
442 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
443 ops_->sync(file_.get(), plugin_status.get());
444 return StatusFromTF_Status(plugin_status.get());
445 }
446
Name(StringPiece * result) const447 Status ModularWritableFile::Name(StringPiece* result) const {
448 *result = filename_;
449 return Status::OK();
450 }
451
Tell(int64 * position)452 Status ModularWritableFile::Tell(int64* position) {
453 if (ops_->tell == nullptr)
454 return errors::Unimplemented(
455 tensorflow::strings::StrCat("Tell() not implemented for ", filename_));
456
457 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
458 *position = ops_->tell(file_.get(), plugin_status.get());
459 return StatusFromTF_Status(plugin_status.get());
460 }
461
RegisterFilesystemPlugin(const std::string & dso_path)462 Status RegisterFilesystemPlugin(const std::string& dso_path) {
463 // Step 1: Load plugin
464 Env* env = Env::Default();
465 void* dso_handle;
466 TF_RETURN_IF_ERROR(env->LoadDynamicLibrary(dso_path.c_str(), &dso_handle));
467
468 // Step 2: Load symbol for `TF_InitPlugin`
469 void* dso_symbol;
470 TF_RETURN_IF_ERROR(
471 env->GetSymbolFromLibrary(dso_handle, "TF_InitPlugin", &dso_symbol));
472
473 // Step 3: Call `TF_InitPlugin`
474 TF_FilesystemPluginInfo info;
475 memset(&info, 0, sizeof(info));
476 auto TF_InitPlugin =
477 reinterpret_cast<int (*)(TF_FilesystemPluginInfo*)>(dso_symbol);
478 TF_InitPlugin(&info);
479
480 // Step 4: Do the actual registration
481 return filesystem_registration::RegisterFilesystemPluginImpl(&info);
482 }
483
484 } // namespace tensorflow
485