1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
17 #define TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
18
19 #include "tensorflow/core/lib/core/coding.h"
20 #include "tensorflow/core/lib/core/status.h"
21 #include "tensorflow/core/lib/core/stringpiece.h"
22 #include "tensorflow/core/lib/hash/crc32c.h"
23 #if !defined(IS_SLIM_BUILD)
24 #include "tensorflow/core/lib/io/snappy/snappy_compression_options.h"
25 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"
26 #include "tensorflow/core/lib/io/zlib_compression_options.h"
27 #include "tensorflow/core/lib/io/zlib_outputbuffer.h"
28 #endif // IS_SLIM_BUILD
29 #include "tensorflow/core/platform/cord.h"
30 #include "tensorflow/core/platform/macros.h"
31 #include "tensorflow/core/platform/types.h"
32
33 namespace tensorflow {
34
35 class WritableFile;
36
37 namespace io {
38
39 struct RecordWriterOptions {
40 public:
41 enum CompressionType {
42 NONE = 0,
43 ZLIB_COMPRESSION = 1,
44 SNAPPY_COMPRESSION = 2
45 };
46 CompressionType compression_type = NONE;
47
48 static RecordWriterOptions CreateRecordWriterOptions(
49 const string& compression_type);
50
51 #if !defined(IS_SLIM_BUILD)
52 // Options specific to compression.
53 tensorflow::io::ZlibCompressionOptions zlib_options;
54 tensorflow::io::SnappyCompressionOptions snappy_options;
55 #endif // IS_SLIM_BUILD
56 };
57
58 class RecordWriter {
59 public:
60 // Format of a single record:
61 // uint64 length
62 // uint32 masked crc of length
63 // byte data[length]
64 // uint32 masked crc of data
65 static constexpr size_t kHeaderSize = sizeof(uint64) + sizeof(uint32);
66 static constexpr size_t kFooterSize = sizeof(uint32);
67
68 // Create a writer that will append data to "*dest".
69 // "*dest" must be initially empty.
70 // "*dest" must remain live while this Writer is in use.
71 explicit RecordWriter(WritableFile* dest, const RecordWriterOptions& options =
72 RecordWriterOptions());
73
74 // Calls Close() and logs if an error occurs.
75 //
76 // TODO(jhseu): Require that callers explicitly call Close() and remove the
77 // implicit Close() call in the destructor.
78 ~RecordWriter();
79
80 Status WriteRecord(StringPiece data);
81
82 #if defined(TF_CORD_SUPPORT)
83 Status WriteRecord(const absl::Cord& data);
84 #endif
85
86 // Flushes any buffered data held by underlying containers of the
87 // RecordWriter to the WritableFile. Does *not* flush the
88 // WritableFile.
89 Status Flush();
90
91 // Writes all output to the file. Does *not* close the WritableFile.
92 //
93 // After calling Close(), any further calls to `WriteRecord()` or `Flush()`
94 // are invalid.
95 Status Close();
96
97 // Utility method to populate TFRecord headers. Populates record-header in
98 // "header[0,kHeaderSize-1]". The record-header is based on data[0, n-1].
99 inline static void PopulateHeader(char* header, const char* data, size_t n);
100
101 inline static void PopulateHeader(char* header, const absl::Cord& data);
102
103 // Utility method to populate TFRecord footers. Populates record-footer in
104 // "footer[0,kFooterSize-1]". The record-footer is based on data[0, n-1].
105 inline static void PopulateFooter(char* footer, const char* data, size_t n);
106
107 #if defined(TF_CORD_SUPPORT)
108 inline static void PopulateFooter(char* footer, const absl::Cord& data);
109 #endif
110
111 private:
112 WritableFile* dest_;
113 RecordWriterOptions options_;
114
MaskedCrc(const char * data,size_t n)115 inline static uint32 MaskedCrc(const char* data, size_t n) {
116 return crc32c::Mask(crc32c::Value(data, n));
117 }
118
119 #if defined(TF_CORD_SUPPORT)
MaskedCrc(const absl::Cord & data)120 inline static uint32 MaskedCrc(const absl::Cord& data) {
121 return crc32c::Mask(crc32c::Value(data));
122 }
123 #endif
124
125 TF_DISALLOW_COPY_AND_ASSIGN(RecordWriter);
126 };
127
PopulateHeader(char * header,const char * data,size_t n)128 void RecordWriter::PopulateHeader(char* header, const char* data, size_t n) {
129 core::EncodeFixed64(header + 0, n);
130 core::EncodeFixed32(header + sizeof(uint64),
131 MaskedCrc(header, sizeof(uint64)));
132 }
133
PopulateFooter(char * footer,const char * data,size_t n)134 void RecordWriter::PopulateFooter(char* footer, const char* data, size_t n) {
135 core::EncodeFixed32(footer, MaskedCrc(data, n));
136 }
137
138 #if defined(TF_CORD_SUPPORT)
PopulateHeader(char * header,const absl::Cord & data)139 void RecordWriter::PopulateHeader(char* header, const absl::Cord& data) {
140 core::EncodeFixed64(header + 0, data.size());
141 core::EncodeFixed32(header + sizeof(uint64),
142 MaskedCrc(header, sizeof(uint64)));
143 }
144
PopulateFooter(char * footer,const absl::Cord & data)145 void RecordWriter::PopulateFooter(char* footer, const absl::Cord& data) {
146 core::EncodeFixed32(footer, MaskedCrc(data));
147 }
148 #endif
149
150 } // namespace io
151 } // namespace tensorflow
152
153 #endif // TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
154