1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 // https://developers.google.com/protocol-buffers/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 // * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 // * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31 // Author: kenton@google.com (Kenton Varda)
32 // Based on original Protocol Buffers design by
33 // Sanjay Ghemawat, Jeff Dean, and others.
34
35 #ifndef _MSC_VER
36 #include <fcntl.h>
37 #include <sys/stat.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #endif
41 #include <errno.h>
42 #include <algorithm>
43 #include <iostream>
44 #include <google/protobuf/stubs/common.h>
45 #include <google/protobuf/stubs/logging.h>
46 #include <google/protobuf/io/io_win32.h>
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/stl_util.h>
49
50
51 namespace google {
52 namespace protobuf {
53 namespace io {
54
55 #ifdef _WIN32
56 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
57 // return value is undefined. We re-define it to always produce an error.
58 #define lseek(fd, offset, origin) ((off_t)-1)
59 // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
60 // them like we do below.
61 using google::protobuf::io::win32::access;
62 using google::protobuf::io::win32::close;
63 using google::protobuf::io::win32::open;
64 using google::protobuf::io::win32::read;
65 using google::protobuf::io::win32::write;
66 #endif
67
68 namespace {
69
70 // EINTR sucks.
close_no_eintr(int fd)71 int close_no_eintr(int fd) {
72 int result;
73 do {
74 result = close(fd);
75 } while (result < 0 && errno == EINTR);
76 return result;
77 }
78
79 } // namespace
80
81 // ===================================================================
82
FileInputStream(int file_descriptor,int block_size)83 FileInputStream::FileInputStream(int file_descriptor, int block_size)
84 : copying_input_(file_descriptor), impl_(©ing_input_, block_size) {}
85
Close()86 bool FileInputStream::Close() { return copying_input_.Close(); }
87
Next(const void ** data,int * size)88 bool FileInputStream::Next(const void** data, int* size) {
89 return impl_.Next(data, size);
90 }
91
BackUp(int count)92 void FileInputStream::BackUp(int count) { impl_.BackUp(count); }
93
Skip(int count)94 bool FileInputStream::Skip(int count) { return impl_.Skip(count); }
95
ByteCount() const96 int64 FileInputStream::ByteCount() const { return impl_.ByteCount(); }
97
CopyingFileInputStream(int file_descriptor)98 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
99 int file_descriptor)
100 : file_(file_descriptor),
101 close_on_delete_(false),
102 is_closed_(false),
103 errno_(0),
104 previous_seek_failed_(false) {}
105
~CopyingFileInputStream()106 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
107 if (close_on_delete_) {
108 if (!Close()) {
109 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
110 }
111 }
112 }
113
Close()114 bool FileInputStream::CopyingFileInputStream::Close() {
115 GOOGLE_CHECK(!is_closed_);
116
117 is_closed_ = true;
118 if (close_no_eintr(file_) != 0) {
119 // The docs on close() do not specify whether a file descriptor is still
120 // open after close() fails with EIO. However, the glibc source code
121 // seems to indicate that it is not.
122 errno_ = errno;
123 return false;
124 }
125
126 return true;
127 }
128
Read(void * buffer,int size)129 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
130 GOOGLE_CHECK(!is_closed_);
131
132 int result;
133 do {
134 result = read(file_, buffer, size);
135 } while (result < 0 && errno == EINTR);
136
137 if (result < 0) {
138 // Read error (not EOF).
139 errno_ = errno;
140 }
141
142 return result;
143 }
144
Skip(int count)145 int FileInputStream::CopyingFileInputStream::Skip(int count) {
146 GOOGLE_CHECK(!is_closed_);
147
148 if (!previous_seek_failed_ && lseek(file_, count, SEEK_CUR) != (off_t)-1) {
149 // Seek succeeded.
150 return count;
151 } else {
152 // Failed to seek.
153
154 // Note to self: Don't seek again. This file descriptor doesn't
155 // support it.
156 previous_seek_failed_ = true;
157
158 // Use the default implementation.
159 return CopyingInputStream::Skip(count);
160 }
161 }
162
163 // ===================================================================
164
FileOutputStream(int file_descriptor,int block_size)165 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
166 : copying_output_(file_descriptor), impl_(©ing_output_, block_size) {}
167
~FileOutputStream()168 FileOutputStream::~FileOutputStream() { impl_.Flush(); }
169
Close()170 bool FileOutputStream::Close() {
171 bool flush_succeeded = impl_.Flush();
172 return copying_output_.Close() && flush_succeeded;
173 }
174
Flush()175 bool FileOutputStream::Flush() { return impl_.Flush(); }
176
Next(void ** data,int * size)177 bool FileOutputStream::Next(void** data, int* size) {
178 return impl_.Next(data, size);
179 }
180
BackUp(int count)181 void FileOutputStream::BackUp(int count) { impl_.BackUp(count); }
182
ByteCount() const183 int64 FileOutputStream::ByteCount() const { return impl_.ByteCount(); }
184
CopyingFileOutputStream(int file_descriptor)185 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
186 int file_descriptor)
187 : file_(file_descriptor),
188 close_on_delete_(false),
189 is_closed_(false),
190 errno_(0) {}
191
~CopyingFileOutputStream()192 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
193 if (close_on_delete_) {
194 if (!Close()) {
195 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
196 }
197 }
198 }
199
Close()200 bool FileOutputStream::CopyingFileOutputStream::Close() {
201 GOOGLE_CHECK(!is_closed_);
202
203 is_closed_ = true;
204 if (close_no_eintr(file_) != 0) {
205 // The docs on close() do not specify whether a file descriptor is still
206 // open after close() fails with EIO. However, the glibc source code
207 // seems to indicate that it is not.
208 errno_ = errno;
209 return false;
210 }
211
212 return true;
213 }
214
Write(const void * buffer,int size)215 bool FileOutputStream::CopyingFileOutputStream::Write(const void* buffer,
216 int size) {
217 GOOGLE_CHECK(!is_closed_);
218 int total_written = 0;
219
220 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
221
222 while (total_written < size) {
223 int bytes;
224 do {
225 bytes = write(file_, buffer_base + total_written, size - total_written);
226 } while (bytes < 0 && errno == EINTR);
227
228 if (bytes <= 0) {
229 // Write error.
230
231 // FIXME(kenton): According to the man page, if write() returns zero,
232 // there was no error; write() simply did not write anything. It's
233 // unclear under what circumstances this might happen, but presumably
234 // errno won't be set in this case. I am confused as to how such an
235 // event should be handled. For now I'm treating it as an error, since
236 // retrying seems like it could lead to an infinite loop. I suspect
237 // this never actually happens anyway.
238
239 if (bytes < 0) {
240 errno_ = errno;
241 }
242 return false;
243 }
244 total_written += bytes;
245 }
246
247 return true;
248 }
249
250 // ===================================================================
251
IstreamInputStream(std::istream * input,int block_size)252 IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
253 : copying_input_(input), impl_(©ing_input_, block_size) {}
254
Next(const void ** data,int * size)255 bool IstreamInputStream::Next(const void** data, int* size) {
256 return impl_.Next(data, size);
257 }
258
BackUp(int count)259 void IstreamInputStream::BackUp(int count) { impl_.BackUp(count); }
260
Skip(int count)261 bool IstreamInputStream::Skip(int count) { return impl_.Skip(count); }
262
ByteCount() const263 int64 IstreamInputStream::ByteCount() const { return impl_.ByteCount(); }
264
CopyingIstreamInputStream(std::istream * input)265 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
266 std::istream* input)
267 : input_(input) {}
268
~CopyingIstreamInputStream()269 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
270
Read(void * buffer,int size)271 int IstreamInputStream::CopyingIstreamInputStream::Read(void* buffer,
272 int size) {
273 input_->read(reinterpret_cast<char*>(buffer), size);
274 int result = input_->gcount();
275 if (result == 0 && input_->fail() && !input_->eof()) {
276 return -1;
277 }
278 return result;
279 }
280
281 // ===================================================================
282
OstreamOutputStream(std::ostream * output,int block_size)283 OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
284 : copying_output_(output), impl_(©ing_output_, block_size) {}
285
~OstreamOutputStream()286 OstreamOutputStream::~OstreamOutputStream() { impl_.Flush(); }
287
Next(void ** data,int * size)288 bool OstreamOutputStream::Next(void** data, int* size) {
289 return impl_.Next(data, size);
290 }
291
BackUp(int count)292 void OstreamOutputStream::BackUp(int count) { impl_.BackUp(count); }
293
ByteCount() const294 int64 OstreamOutputStream::ByteCount() const { return impl_.ByteCount(); }
295
CopyingOstreamOutputStream(std::ostream * output)296 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
297 std::ostream* output)
298 : output_(output) {}
299
~CopyingOstreamOutputStream()300 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
301 }
302
Write(const void * buffer,int size)303 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(const void* buffer,
304 int size) {
305 output_->write(reinterpret_cast<const char*>(buffer), size);
306 return output_->good();
307 }
308
309 // ===================================================================
310
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)311 ConcatenatingInputStream::ConcatenatingInputStream(
312 ZeroCopyInputStream* const streams[], int count)
313 : streams_(streams), stream_count_(count), bytes_retired_(0) {
314 }
315
Next(const void ** data,int * size)316 bool ConcatenatingInputStream::Next(const void** data, int* size) {
317 while (stream_count_ > 0) {
318 if (streams_[0]->Next(data, size)) return true;
319
320 // That stream is done. Advance to the next one.
321 bytes_retired_ += streams_[0]->ByteCount();
322 ++streams_;
323 --stream_count_;
324 }
325
326 // No more streams.
327 return false;
328 }
329
BackUp(int count)330 void ConcatenatingInputStream::BackUp(int count) {
331 if (stream_count_ > 0) {
332 streams_[0]->BackUp(count);
333 } else {
334 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
335 }
336 }
337
Skip(int count)338 bool ConcatenatingInputStream::Skip(int count) {
339 while (stream_count_ > 0) {
340 // Assume that ByteCount() can be used to find out how much we actually
341 // skipped when Skip() fails.
342 int64 target_byte_count = streams_[0]->ByteCount() + count;
343 if (streams_[0]->Skip(count)) return true;
344
345 // Hit the end of the stream. Figure out how many more bytes we still have
346 // to skip.
347 int64 final_byte_count = streams_[0]->ByteCount();
348 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
349 count = target_byte_count - final_byte_count;
350
351 // That stream is done. Advance to the next one.
352 bytes_retired_ += final_byte_count;
353 ++streams_;
354 --stream_count_;
355 }
356
357 return false;
358 }
359
ByteCount() const360 int64 ConcatenatingInputStream::ByteCount() const {
361 if (stream_count_ == 0) {
362 return bytes_retired_;
363 } else {
364 return bytes_retired_ + streams_[0]->ByteCount();
365 }
366 }
367
368
369 // ===================================================================
370
371 } // namespace io
372 } // namespace protobuf
373 } // namespace google
374