1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/c/experimental/filesystem/modular_filesystem.h"
16
17 #include <algorithm>
18 #include <string>
19 #include <utility>
20
21 #include "tensorflow/c/experimental/filesystem/modular_filesystem_registration.h"
22 #include "tensorflow/c/tf_status_helper.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/file_system_helper.h"
25 #include "tensorflow/core/util/ptr_util.h"
26
27 // TODO(b/139060984): After all filesystems are converted, all calls to
28 // methods from `FileSystem` will have to be replaced to calls to private
29 // methods here, as part of making this class a singleton and the only way to
30 // register/use filesystems.
31
32 namespace tensorflow {
33
34 using UniquePtrTo_TF_Status =
35 ::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
36
NewRandomAccessFile(const std::string & fname,TransactionToken * token,std::unique_ptr<RandomAccessFile> * result)37 Status ModularFileSystem::NewRandomAccessFile(
38 const std::string& fname, TransactionToken* token,
39 std::unique_ptr<RandomAccessFile>* result) {
40 if (ops_->new_random_access_file == nullptr)
41 return errors::Unimplemented(tensorflow::strings::StrCat(
42 "Filesystem for ", fname, " does not support NewRandomAccessFile()"));
43
44 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
45 auto file = MakeUnique<TF_RandomAccessFile>();
46 std::string translated_name = TranslateName(fname);
47 ops_->new_random_access_file(filesystem_.get(), translated_name.c_str(),
48 file.get(), plugin_status.get());
49
50 if (TF_GetCode(plugin_status.get()) == TF_OK)
51 *result = MakeUnique<ModularRandomAccessFile>(
52 translated_name, std::move(file), random_access_file_ops_.get());
53
54 return StatusFromTF_Status(plugin_status.get());
55 }
56
NewWritableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)57 Status ModularFileSystem::NewWritableFile(
58 const std::string& fname, TransactionToken* token,
59 std::unique_ptr<WritableFile>* result) {
60 if (ops_->new_writable_file == nullptr)
61 return errors::Unimplemented(tensorflow::strings::StrCat(
62 "Filesystem for ", fname, " does not support NewWritableFile()"));
63
64 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
65 auto file = MakeUnique<TF_WritableFile>();
66 std::string translated_name = TranslateName(fname);
67 ops_->new_writable_file(filesystem_.get(), translated_name.c_str(),
68 file.get(), plugin_status.get());
69
70 if (TF_GetCode(plugin_status.get()) == TF_OK)
71 *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
72 writable_file_ops_.get());
73
74 return StatusFromTF_Status(plugin_status.get());
75 }
76
NewAppendableFile(const std::string & fname,TransactionToken * token,std::unique_ptr<WritableFile> * result)77 Status ModularFileSystem::NewAppendableFile(
78 const std::string& fname, TransactionToken* token,
79 std::unique_ptr<WritableFile>* result) {
80 if (ops_->new_appendable_file == nullptr)
81 return errors::Unimplemented(tensorflow::strings::StrCat(
82 "Filesystem for ", fname, " does not support NewAppendableFile()"));
83
84 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
85 auto file = MakeUnique<TF_WritableFile>();
86 std::string translated_name = TranslateName(fname);
87 ops_->new_appendable_file(filesystem_.get(), translated_name.c_str(),
88 file.get(), plugin_status.get());
89
90 if (TF_GetCode(plugin_status.get()) == TF_OK)
91 *result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
92 writable_file_ops_.get());
93
94 return StatusFromTF_Status(plugin_status.get());
95 }
96
NewReadOnlyMemoryRegionFromFile(const std::string & fname,TransactionToken * token,std::unique_ptr<ReadOnlyMemoryRegion> * result)97 Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
98 const std::string& fname, TransactionToken* token,
99 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
100 if (ops_->new_read_only_memory_region_from_file == nullptr)
101 return errors::Unimplemented(tensorflow::strings::StrCat(
102 "Filesystem for ", fname,
103 " does not support NewReadOnlyMemoryRegionFromFile()"));
104
105 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
106 auto region = MakeUnique<TF_ReadOnlyMemoryRegion>();
107 std::string translated_name = TranslateName(fname);
108 ops_->new_read_only_memory_region_from_file(
109 filesystem_.get(), translated_name.c_str(), region.get(),
110 plugin_status.get());
111
112 if (TF_GetCode(plugin_status.get()) == TF_OK)
113 *result = MakeUnique<ModularReadOnlyMemoryRegion>(
114 std::move(region), read_only_memory_region_ops_.get());
115
116 return StatusFromTF_Status(plugin_status.get());
117 }
118
FileExists(const std::string & fname,TransactionToken * token)119 Status ModularFileSystem::FileExists(const std::string& fname,
120 TransactionToken* token) {
121 if (ops_->path_exists == nullptr)
122 return errors::Unimplemented(tensorflow::strings::StrCat(
123 "Filesystem for ", fname, " does not support FileExists()"));
124
125 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
126 const std::string translated_name = TranslateName(fname);
127 ops_->path_exists(filesystem_.get(), translated_name.c_str(),
128 plugin_status.get());
129 return StatusFromTF_Status(plugin_status.get());
130 }
131
FilesExist(const std::vector<std::string> & files,TransactionToken * token,std::vector<Status> * status)132 bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
133 TransactionToken* token,
134 std::vector<Status>* status) {
135 if (ops_->paths_exist == nullptr)
136 return FileSystem::FilesExist(files, token, status);
137
138 std::vector<char*> translated_names;
139 translated_names.reserve(files.size());
140 for (int i = 0; i < files.size(); i++)
141 translated_names.push_back(strdup(TranslateName(files[i]).c_str()));
142
143 bool result;
144 if (status == nullptr) {
145 result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
146 files.size(), nullptr);
147 } else {
148 std::vector<TF_Status*> plugin_status;
149 plugin_status.reserve(files.size());
150 for (int i = 0; i < files.size(); i++)
151 plugin_status.push_back(TF_NewStatus());
152 result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
153 files.size(), plugin_status.data());
154 for (int i = 0; i < files.size(); i++) {
155 status->push_back(StatusFromTF_Status(plugin_status[i]));
156 TF_DeleteStatus(plugin_status[i]);
157 }
158 }
159
160 for (int i = 0; i < files.size(); i++) free(translated_names[i]);
161
162 return result;
163 }
164
GetChildren(const std::string & dir,TransactionToken * token,std::vector<std::string> * result)165 Status ModularFileSystem::GetChildren(const std::string& dir,
166 TransactionToken* token,
167 std::vector<std::string>* result) {
168 if (ops_->get_children == nullptr)
169 return errors::Unimplemented(tensorflow::strings::StrCat(
170 "Filesystem for ", dir, " does not support GetChildren()"));
171
172 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
173 std::string translated_name = TranslateName(dir);
174 // Note that `children` is allocated by the plugin and freed by core
175 // TensorFlow, so we need to use `plugin_memory_free_` here.
176 char** children = nullptr;
177 const int num_children =
178 ops_->get_children(filesystem_.get(), translated_name.c_str(), &children,
179 plugin_status.get());
180 if (num_children >= 0) {
181 for (int i = 0; i < num_children; i++) {
182 result->push_back(std::string(children[i]));
183 plugin_memory_free_(children[i]);
184 }
185 plugin_memory_free_(children);
186 }
187
188 return StatusFromTF_Status(plugin_status.get());
189 }
190
GetMatchingPaths(const std::string & pattern,TransactionToken * token,std::vector<std::string> * result)191 Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
192 TransactionToken* token,
193 std::vector<std::string>* result) {
194 if (ops_->get_matching_paths == nullptr)
195 return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
196
197 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
198 // Note that `matches` is allocated by the plugin and freed by core
199 // TensorFlow, so we need to use `plugin_memory_free_` here.
200 char** matches = nullptr;
201 const int num_matches = ops_->get_matching_paths(
202 filesystem_.get(), pattern.c_str(), &matches, plugin_status.get());
203 if (num_matches >= 0) {
204 for (int i = 0; i < num_matches; i++) {
205 result->push_back(std::string(matches[i]));
206 plugin_memory_free_(matches[i]);
207 }
208 plugin_memory_free_(matches);
209 }
210
211 return StatusFromTF_Status(plugin_status.get());
212 }
213
DeleteFile(const std::string & fname,TransactionToken * token)214 Status ModularFileSystem::DeleteFile(const std::string& fname,
215 TransactionToken* token) {
216 if (ops_->delete_file == nullptr)
217 return errors::Unimplemented(tensorflow::strings::StrCat(
218 "Filesystem for ", fname, " does not support DeleteFile()"));
219
220 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
221 std::string translated_name = TranslateName(fname);
222 ops_->delete_file(filesystem_.get(), translated_name.c_str(),
223 plugin_status.get());
224 return StatusFromTF_Status(plugin_status.get());
225 }
226
DeleteRecursively(const std::string & dirname,TransactionToken * token,int64_t * undeleted_files,int64_t * undeleted_dirs)227 Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
228 TransactionToken* token,
229 int64_t* undeleted_files,
230 int64_t* undeleted_dirs) {
231 if (undeleted_files == nullptr || undeleted_dirs == nullptr)
232 return errors::FailedPrecondition(
233 "DeleteRecursively must not be called with `undeleted_files` or "
234 "`undeleted_dirs` set to NULL");
235
236 if (ops_->delete_recursively == nullptr)
237 return FileSystem::DeleteRecursively(dirname, token, undeleted_files,
238 undeleted_dirs);
239
240 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
241 std::string translated_name = TranslateName(dirname);
242 uint64_t plugin_undeleted_files, plugin_undeleted_dirs;
243 ops_->delete_recursively(filesystem_.get(), translated_name.c_str(),
244 &plugin_undeleted_files, &plugin_undeleted_dirs,
245 plugin_status.get());
246 *undeleted_files = plugin_undeleted_files;
247 *undeleted_dirs = plugin_undeleted_dirs;
248 return StatusFromTF_Status(plugin_status.get());
249 }
250
DeleteDir(const std::string & dirname,TransactionToken * token)251 Status ModularFileSystem::DeleteDir(const std::string& dirname,
252 TransactionToken* token) {
253 if (ops_->delete_dir == nullptr)
254 return errors::Unimplemented(tensorflow::strings::StrCat(
255 "Filesystem for ", dirname, " does not support DeleteDir()"));
256
257 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
258 std::string translated_name = TranslateName(dirname);
259 ops_->delete_dir(filesystem_.get(), translated_name.c_str(),
260 plugin_status.get());
261 return StatusFromTF_Status(plugin_status.get());
262 }
263
RecursivelyCreateDir(const std::string & dirname,TransactionToken * token)264 Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
265 TransactionToken* token) {
266 if (ops_->recursively_create_dir == nullptr)
267 return FileSystem::RecursivelyCreateDir(dirname, token);
268
269 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
270 std::string translated_name = TranslateName(dirname);
271 ops_->recursively_create_dir(filesystem_.get(), translated_name.c_str(),
272 plugin_status.get());
273 return StatusFromTF_Status(plugin_status.get());
274 }
275
CreateDir(const std::string & dirname,TransactionToken * token)276 Status ModularFileSystem::CreateDir(const std::string& dirname,
277 TransactionToken* token) {
278 if (ops_->create_dir == nullptr)
279 return errors::Unimplemented(tensorflow::strings::StrCat(
280 "Filesystem for ", dirname, " does not support CreateDir()"));
281
282 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
283 std::string translated_name = TranslateName(dirname);
284 ops_->create_dir(filesystem_.get(), translated_name.c_str(),
285 plugin_status.get());
286 return StatusFromTF_Status(plugin_status.get());
287 }
288
Stat(const std::string & fname,TransactionToken * token,FileStatistics * stat)289 Status ModularFileSystem::Stat(const std::string& fname,
290 TransactionToken* token, FileStatistics* stat) {
291 if (ops_->stat == nullptr)
292 return errors::Unimplemented(tensorflow::strings::StrCat(
293 "Filesystem for ", fname, " does not support Stat()"));
294
295 if (stat == nullptr)
296 return errors::InvalidArgument("FileStatistics pointer must not be NULL");
297
298 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
299 std::string translated_name = TranslateName(fname);
300 TF_FileStatistics stats;
301 ops_->stat(filesystem_.get(), translated_name.c_str(), &stats,
302 plugin_status.get());
303
304 if (TF_GetCode(plugin_status.get()) == TF_OK) {
305 stat->length = stats.length;
306 stat->mtime_nsec = stats.mtime_nsec;
307 stat->is_directory = stats.is_directory;
308 }
309
310 return StatusFromTF_Status(plugin_status.get());
311 }
312
IsDirectory(const std::string & name,TransactionToken * token)313 Status ModularFileSystem::IsDirectory(const std::string& name,
314 TransactionToken* token) {
315 if (ops_->is_directory == nullptr)
316 return FileSystem::IsDirectory(name, token);
317
318 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
319 std::string translated_name = TranslateName(name);
320 ops_->is_directory(filesystem_.get(), translated_name.c_str(),
321 plugin_status.get());
322 return StatusFromTF_Status(plugin_status.get());
323 }
324
GetFileSize(const std::string & fname,TransactionToken * token,uint64 * file_size)325 Status ModularFileSystem::GetFileSize(const std::string& fname,
326 TransactionToken* token,
327 uint64* file_size) {
328 if (ops_->get_file_size == nullptr) {
329 FileStatistics stat;
330 Status status = Stat(fname, &stat);
331 if (!status.ok()) return status;
332 if (stat.is_directory)
333 return errors::FailedPrecondition("Called GetFileSize on a directory");
334
335 *file_size = stat.length;
336 return status;
337 }
338
339 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
340 std::string translated_name = TranslateName(fname);
341 *file_size = ops_->get_file_size(filesystem_.get(), translated_name.c_str(),
342 plugin_status.get());
343 return StatusFromTF_Status(plugin_status.get());
344 }
345
RenameFile(const std::string & src,const std::string & target,TransactionToken * token)346 Status ModularFileSystem::RenameFile(const std::string& src,
347 const std::string& target,
348 TransactionToken* token) {
349 if (ops_->rename_file == nullptr) {
350 Status status = CopyFile(src, target);
351 if (status.ok()) status = DeleteFile(src);
352 return status;
353 }
354
355 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
356 std::string translated_src = TranslateName(src);
357 std::string translated_target = TranslateName(target);
358 ops_->rename_file(filesystem_.get(), translated_src.c_str(),
359 translated_target.c_str(), plugin_status.get());
360 return StatusFromTF_Status(plugin_status.get());
361 }
362
CopyFile(const std::string & src,const std::string & target,TransactionToken * token)363 Status ModularFileSystem::CopyFile(const std::string& src,
364 const std::string& target,
365 TransactionToken* token) {
366 if (ops_->copy_file == nullptr)
367 return FileSystem::CopyFile(src, target, token);
368
369 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
370 std::string translated_src = TranslateName(src);
371 std::string translated_target = TranslateName(target);
372 ops_->copy_file(filesystem_.get(), translated_src.c_str(),
373 translated_target.c_str(), plugin_status.get());
374 return StatusFromTF_Status(plugin_status.get());
375 }
376
TranslateName(const std::string & name) const377 std::string ModularFileSystem::TranslateName(const std::string& name) const {
378 if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
379
380 char* p = ops_->translate_name(filesystem_.get(), name.c_str());
381 CHECK(p != nullptr) << "TranslateName(" << name << ") returned nullptr";
382
383 std::string ret(p);
384 // Since `p` is allocated by plugin, free it using plugin's method.
385 plugin_memory_free_(p);
386 return ret;
387 }
388
FlushCaches(TransactionToken * token)389 void ModularFileSystem::FlushCaches(TransactionToken* token) {
390 if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
391 }
392
SetOption(const std::string & name,const std::vector<string> & values)393 Status ModularFileSystem::SetOption(const std::string& name,
394 const std::vector<string>& values) {
395 if (ops_->set_filesystem_configuration == nullptr) {
396 return errors::Unimplemented(
397 "Filesystem does not support SetConfiguration()");
398 }
399 if (values.empty()) {
400 return errors::InvalidArgument(
401 "SetConfiguration() needs number of values > 0");
402 }
403
404 TF_Filesystem_Option option;
405 memset(&option, 0, sizeof(option));
406 option.name = const_cast<char*>(name.c_str());
407 TF_Filesystem_Option_Value option_value;
408 memset(&option_value, 0, sizeof(option_value));
409 option_value.type_tag = TF_Filesystem_Option_Type_Buffer;
410 option_value.num_values = values.size();
411 std::vector<TF_Filesystem_Option_Value_Union> option_values(values.size());
412 for (size_t i = 0; i < values.size(); i++) {
413 memset(&option_values[i], 0, sizeof(option_values[i]));
414 option_values[i].buffer_val.buf = const_cast<char*>(values[i].c_str());
415 option_values[i].buffer_val.buf_length = values[i].size();
416 }
417 option_value.values = &option_values[0];
418 option.value = &option_value;
419 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
420 ops_->set_filesystem_configuration(filesystem_.get(), &option, 1,
421 plugin_status.get());
422 return StatusFromTF_Status(plugin_status.get());
423 }
424
SetOption(const std::string & name,const std::vector<int64_t> & values)425 Status ModularFileSystem::SetOption(const std::string& name,
426 const std::vector<int64_t>& values) {
427 if (ops_->set_filesystem_configuration == nullptr) {
428 return errors::Unimplemented(
429 "Filesystem does not support SetConfiguration()");
430 }
431 if (values.empty()) {
432 return errors::InvalidArgument(
433 "SetConfiguration() needs number of values > 0");
434 }
435
436 TF_Filesystem_Option option;
437 memset(&option, 0, sizeof(option));
438 option.name = const_cast<char*>(name.c_str());
439 TF_Filesystem_Option_Value option_value;
440 memset(&option_value, 0, sizeof(option_value));
441 option_value.type_tag = TF_Filesystem_Option_Type_Int;
442 option_value.num_values = values.size();
443 std::vector<TF_Filesystem_Option_Value_Union> option_values(values.size());
444 for (size_t i = 0; i < values.size(); i++) {
445 memset(&option_values[i], 0, sizeof(option_values[i]));
446 option_values[i].int_val = values[i];
447 }
448 option_value.values = &option_values[0];
449 option.value = &option_value;
450 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
451 ops_->set_filesystem_configuration(filesystem_.get(), &option, 1,
452 plugin_status.get());
453 return StatusFromTF_Status(plugin_status.get());
454 }
455
SetOption(const std::string & name,const std::vector<double> & values)456 Status ModularFileSystem::SetOption(const std::string& name,
457 const std::vector<double>& values) {
458 if (ops_->set_filesystem_configuration == nullptr) {
459 return errors::Unimplemented(
460 "Filesystem does not support SetConfiguration()");
461 }
462 if (values.empty()) {
463 return errors::InvalidArgument(
464 "SetConfiguration() needs number of values > 0");
465 }
466
467 TF_Filesystem_Option option;
468 memset(&option, 0, sizeof(option));
469 option.name = const_cast<char*>(name.c_str());
470 TF_Filesystem_Option_Value option_value;
471 memset(&option_value, 0, sizeof(option_value));
472 option_value.type_tag = TF_Filesystem_Option_Type_Real;
473 option_value.num_values = values.size();
474 std::vector<TF_Filesystem_Option_Value_Union> option_values(values.size());
475 for (size_t i = 0; i < values.size(); i++) {
476 memset(&option_values[i], 0, sizeof(option_values[i]));
477 option_values[i].real_val = values[i];
478 }
479 option_value.values = &option_values[0];
480 option.value = &option_value;
481 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
482 ops_->set_filesystem_configuration(filesystem_.get(), &option, 1,
483 plugin_status.get());
484 return StatusFromTF_Status(plugin_status.get());
485 }
486
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const487 Status ModularRandomAccessFile::Read(uint64 offset, size_t n,
488 StringPiece* result, char* scratch) const {
489 if (ops_->read == nullptr)
490 return errors::Unimplemented(
491 tensorflow::strings::StrCat("Read() not implemented for ", filename_));
492
493 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
494 int64_t read =
495 ops_->read(file_.get(), offset, n, scratch, plugin_status.get());
496 if (read > 0) *result = StringPiece(scratch, read);
497 return StatusFromTF_Status(plugin_status.get());
498 }
499
Name(StringPiece * result) const500 Status ModularRandomAccessFile::Name(StringPiece* result) const {
501 *result = filename_;
502 return OkStatus();
503 }
504
Append(StringPiece data)505 Status ModularWritableFile::Append(StringPiece data) {
506 if (ops_->append == nullptr)
507 return errors::Unimplemented(tensorflow::strings::StrCat(
508 "Append() not implemented for ", filename_));
509
510 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
511 ops_->append(file_.get(), data.data(), data.size(), plugin_status.get());
512 return StatusFromTF_Status(plugin_status.get());
513 }
514
Close()515 Status ModularWritableFile::Close() {
516 if (ops_->close == nullptr)
517 return errors::Unimplemented(
518 tensorflow::strings::StrCat("Close() not implemented for ", filename_));
519
520 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
521 ops_->close(file_.get(), plugin_status.get());
522 return StatusFromTF_Status(plugin_status.get());
523 }
524
Flush()525 Status ModularWritableFile::Flush() {
526 if (ops_->flush == nullptr) return OkStatus();
527
528 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
529 ops_->flush(file_.get(), plugin_status.get());
530 return StatusFromTF_Status(plugin_status.get());
531 }
532
Sync()533 Status ModularWritableFile::Sync() {
534 if (ops_->sync == nullptr) return Flush();
535
536 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
537 ops_->sync(file_.get(), plugin_status.get());
538 return StatusFromTF_Status(plugin_status.get());
539 }
540
Name(StringPiece * result) const541 Status ModularWritableFile::Name(StringPiece* result) const {
542 *result = filename_;
543 return OkStatus();
544 }
545
Tell(int64_t * position)546 Status ModularWritableFile::Tell(int64_t* position) {
547 if (ops_->tell == nullptr)
548 return errors::Unimplemented(
549 tensorflow::strings::StrCat("Tell() not implemented for ", filename_));
550
551 UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
552 *position = ops_->tell(file_.get(), plugin_status.get());
553 return StatusFromTF_Status(plugin_status.get());
554 }
555
RegisterFilesystemPlugin(const std::string & dso_path)556 Status RegisterFilesystemPlugin(const std::string& dso_path) {
557 // Step 1: Load plugin
558 Env* env = Env::Default();
559 void* dso_handle;
560 TF_RETURN_IF_ERROR(env->LoadDynamicLibrary(dso_path.c_str(), &dso_handle));
561
562 // Step 2: Load symbol for `TF_InitPlugin`
563 void* dso_symbol;
564 TF_RETURN_IF_ERROR(
565 env->GetSymbolFromLibrary(dso_handle, "TF_InitPlugin", &dso_symbol));
566
567 // Step 3: Call `TF_InitPlugin`
568 TF_FilesystemPluginInfo info;
569 memset(&info, 0, sizeof(info));
570 auto TF_InitPlugin =
571 reinterpret_cast<int (*)(TF_FilesystemPluginInfo*)>(dso_symbol);
572 TF_InitPlugin(&info);
573
574 // Step 4: Do the actual registration
575 return filesystem_registration::RegisterFilesystemPluginImpl(&info);
576 }
577
578 } // namespace tensorflow
579