1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 // http://code.google.com/p/protobuf/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 // * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 // * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31 // Author: kenton@google.com (Kenton Varda)
32 // Based on original Protocol Buffers design by
33 // Sanjay Ghemawat, Jeff Dean, and others.
34
35 #ifdef _MSC_VER
36 #include <io.h>
37 #else
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #endif
43 #include <errno.h>
44 #include <iostream>
45 #include <algorithm>
46
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/common.h>
49 #include <google/protobuf/stubs/stl_util-inl.h>
50
51 namespace google {
52 namespace protobuf {
53 namespace io {
54
55 #ifdef _WIN32
56 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
57 // return value is undefined. We re-define it to always produce an error.
58 #define lseek(fd, offset, origin) ((off_t)-1)
59 #endif
60
61 namespace {
62
63 // EINTR sucks.
close_no_eintr(int fd)64 int close_no_eintr(int fd) {
65 int result;
66 do {
67 result = close(fd);
68 } while (result < 0 && errno == EINTR);
69 return result;
70 }
71
72 } // namespace
73
74
75 // ===================================================================
76
FileInputStream(int file_descriptor,int block_size)77 FileInputStream::FileInputStream(int file_descriptor, int block_size)
78 : copying_input_(file_descriptor),
79 impl_(©ing_input_, block_size) {
80 }
81
~FileInputStream()82 FileInputStream::~FileInputStream() {}
83
Close()84 bool FileInputStream::Close() {
85 return copying_input_.Close();
86 }
87
Next(const void ** data,int * size)88 bool FileInputStream::Next(const void** data, int* size) {
89 return impl_.Next(data, size);
90 }
91
BackUp(int count)92 void FileInputStream::BackUp(int count) {
93 impl_.BackUp(count);
94 }
95
Skip(int count)96 bool FileInputStream::Skip(int count) {
97 return impl_.Skip(count);
98 }
99
ByteCount() const100 int64 FileInputStream::ByteCount() const {
101 return impl_.ByteCount();
102 }
103
CopyingFileInputStream(int file_descriptor)104 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
105 int file_descriptor)
106 : file_(file_descriptor),
107 close_on_delete_(false),
108 is_closed_(false),
109 errno_(0),
110 previous_seek_failed_(false) {
111 }
112
~CopyingFileInputStream()113 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
114 if (close_on_delete_) {
115 if (!Close()) {
116 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
117 }
118 }
119 }
120
Close()121 bool FileInputStream::CopyingFileInputStream::Close() {
122 GOOGLE_CHECK(!is_closed_);
123
124 is_closed_ = true;
125 if (close_no_eintr(file_) != 0) {
126 // The docs on close() do not specify whether a file descriptor is still
127 // open after close() fails with EIO. However, the glibc source code
128 // seems to indicate that it is not.
129 errno_ = errno;
130 return false;
131 }
132
133 return true;
134 }
135
Read(void * buffer,int size)136 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
137 GOOGLE_CHECK(!is_closed_);
138
139 int result;
140 do {
141 result = read(file_, buffer, size);
142 } while (result < 0 && errno == EINTR);
143
144 if (result < 0) {
145 // Read error (not EOF).
146 errno_ = errno;
147 }
148
149 return result;
150 }
151
Skip(int count)152 int FileInputStream::CopyingFileInputStream::Skip(int count) {
153 GOOGLE_CHECK(!is_closed_);
154
155 if (!previous_seek_failed_ &&
156 lseek(file_, count, SEEK_CUR) != (off_t)-1) {
157 // Seek succeeded.
158 return count;
159 } else {
160 // Failed to seek.
161
162 // Note to self: Don't seek again. This file descriptor doesn't
163 // support it.
164 previous_seek_failed_ = true;
165
166 // Use the default implementation.
167 return CopyingInputStream::Skip(count);
168 }
169 }
170
171 // ===================================================================
172
FileOutputStream(int file_descriptor,int block_size)173 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
174 : copying_output_(file_descriptor),
175 impl_(©ing_output_, block_size) {
176 }
177
~FileOutputStream()178 FileOutputStream::~FileOutputStream() {
179 impl_.Flush();
180 }
181
Close()182 bool FileOutputStream::Close() {
183 bool flush_succeeded = impl_.Flush();
184 return copying_output_.Close() && flush_succeeded;
185 }
186
Flush()187 bool FileOutputStream::Flush() {
188 return impl_.Flush();
189 }
190
Next(void ** data,int * size)191 bool FileOutputStream::Next(void** data, int* size) {
192 return impl_.Next(data, size);
193 }
194
BackUp(int count)195 void FileOutputStream::BackUp(int count) {
196 impl_.BackUp(count);
197 }
198
ByteCount() const199 int64 FileOutputStream::ByteCount() const {
200 return impl_.ByteCount();
201 }
202
CopyingFileOutputStream(int file_descriptor)203 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
204 int file_descriptor)
205 : file_(file_descriptor),
206 close_on_delete_(false),
207 is_closed_(false),
208 errno_(0) {
209 }
210
~CopyingFileOutputStream()211 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
212 if (close_on_delete_) {
213 if (!Close()) {
214 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
215 }
216 }
217 }
218
Close()219 bool FileOutputStream::CopyingFileOutputStream::Close() {
220 GOOGLE_CHECK(!is_closed_);
221
222 is_closed_ = true;
223 if (close_no_eintr(file_) != 0) {
224 // The docs on close() do not specify whether a file descriptor is still
225 // open after close() fails with EIO. However, the glibc source code
226 // seems to indicate that it is not.
227 errno_ = errno;
228 return false;
229 }
230
231 return true;
232 }
233
Write(const void * buffer,int size)234 bool FileOutputStream::CopyingFileOutputStream::Write(
235 const void* buffer, int size) {
236 GOOGLE_CHECK(!is_closed_);
237 int total_written = 0;
238
239 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
240
241 while (total_written < size) {
242 int bytes;
243 do {
244 bytes = write(file_, buffer_base + total_written, size - total_written);
245 } while (bytes < 0 && errno == EINTR);
246
247 if (bytes <= 0) {
248 // Write error.
249
250 // FIXME(kenton): According to the man page, if write() returns zero,
251 // there was no error; write() simply did not write anything. It's
252 // unclear under what circumstances this might happen, but presumably
253 // errno won't be set in this case. I am confused as to how such an
254 // event should be handled. For now I'm treating it as an error, since
255 // retrying seems like it could lead to an infinite loop. I suspect
256 // this never actually happens anyway.
257
258 if (bytes < 0) {
259 errno_ = errno;
260 }
261 return false;
262 }
263 total_written += bytes;
264 }
265
266 return true;
267 }
268
269 // ===================================================================
270
IstreamInputStream(istream * input,int block_size)271 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
272 : copying_input_(input),
273 impl_(©ing_input_, block_size) {
274 }
275
~IstreamInputStream()276 IstreamInputStream::~IstreamInputStream() {}
277
Next(const void ** data,int * size)278 bool IstreamInputStream::Next(const void** data, int* size) {
279 return impl_.Next(data, size);
280 }
281
BackUp(int count)282 void IstreamInputStream::BackUp(int count) {
283 impl_.BackUp(count);
284 }
285
Skip(int count)286 bool IstreamInputStream::Skip(int count) {
287 return impl_.Skip(count);
288 }
289
ByteCount() const290 int64 IstreamInputStream::ByteCount() const {
291 return impl_.ByteCount();
292 }
293
CopyingIstreamInputStream(istream * input)294 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
295 istream* input)
296 : input_(input) {
297 }
298
~CopyingIstreamInputStream()299 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
300
Read(void * buffer,int size)301 int IstreamInputStream::CopyingIstreamInputStream::Read(
302 void* buffer, int size) {
303 input_->read(reinterpret_cast<char*>(buffer), size);
304 int result = input_->gcount();
305 if (result == 0 && input_->fail() && !input_->eof()) {
306 return -1;
307 }
308 return result;
309 }
310
311 // ===================================================================
312
OstreamOutputStream(ostream * output,int block_size)313 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
314 : copying_output_(output),
315 impl_(©ing_output_, block_size) {
316 }
317
~OstreamOutputStream()318 OstreamOutputStream::~OstreamOutputStream() {
319 impl_.Flush();
320 }
321
Next(void ** data,int * size)322 bool OstreamOutputStream::Next(void** data, int* size) {
323 return impl_.Next(data, size);
324 }
325
BackUp(int count)326 void OstreamOutputStream::BackUp(int count) {
327 impl_.BackUp(count);
328 }
329
ByteCount() const330 int64 OstreamOutputStream::ByteCount() const {
331 return impl_.ByteCount();
332 }
333
CopyingOstreamOutputStream(ostream * output)334 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
335 ostream* output)
336 : output_(output) {
337 }
338
~CopyingOstreamOutputStream()339 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
340 }
341
Write(const void * buffer,int size)342 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
343 const void* buffer, int size) {
344 output_->write(reinterpret_cast<const char*>(buffer), size);
345 return output_->good();
346 }
347
348 // ===================================================================
349
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)350 ConcatenatingInputStream::ConcatenatingInputStream(
351 ZeroCopyInputStream* const streams[], int count)
352 : streams_(streams), stream_count_(count), bytes_retired_(0) {
353 }
354
~ConcatenatingInputStream()355 ConcatenatingInputStream::~ConcatenatingInputStream() {
356 }
357
Next(const void ** data,int * size)358 bool ConcatenatingInputStream::Next(const void** data, int* size) {
359 while (stream_count_ > 0) {
360 if (streams_[0]->Next(data, size)) return true;
361
362 // That stream is done. Advance to the next one.
363 bytes_retired_ += streams_[0]->ByteCount();
364 ++streams_;
365 --stream_count_;
366 }
367
368 // No more streams.
369 return false;
370 }
371
BackUp(int count)372 void ConcatenatingInputStream::BackUp(int count) {
373 if (stream_count_ > 0) {
374 streams_[0]->BackUp(count);
375 } else {
376 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
377 }
378 }
379
Skip(int count)380 bool ConcatenatingInputStream::Skip(int count) {
381 while (stream_count_ > 0) {
382 // Assume that ByteCount() can be used to find out how much we actually
383 // skipped when Skip() fails.
384 int64 target_byte_count = streams_[0]->ByteCount() + count;
385 if (streams_[0]->Skip(count)) return true;
386
387 // Hit the end of the stream. Figure out how many more bytes we still have
388 // to skip.
389 int64 final_byte_count = streams_[0]->ByteCount();
390 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
391 count = target_byte_count - final_byte_count;
392
393 // That stream is done. Advance to the next one.
394 bytes_retired_ += final_byte_count;
395 ++streams_;
396 --stream_count_;
397 }
398
399 return false;
400 }
401
ByteCount() const402 int64 ConcatenatingInputStream::ByteCount() const {
403 if (stream_count_ == 0) {
404 return bytes_retired_;
405 } else {
406 return bytes_retired_ + streams_[0]->ByteCount();
407 }
408 }
409
410
411 // ===================================================================
412
LimitingInputStream(ZeroCopyInputStream * input,int64 limit)413 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
414 int64 limit)
415 : input_(input), limit_(limit) {}
416
~LimitingInputStream()417 LimitingInputStream::~LimitingInputStream() {
418 // If we overshot the limit, back up.
419 if (limit_ < 0) input_->BackUp(-limit_);
420 }
421
Next(const void ** data,int * size)422 bool LimitingInputStream::Next(const void** data, int* size) {
423 if (limit_ <= 0) return false;
424 if (!input_->Next(data, size)) return false;
425
426 limit_ -= *size;
427 if (limit_ < 0) {
428 // We overshot the limit. Reduce *size to hide the rest of the buffer.
429 *size += limit_;
430 }
431 return true;
432 }
433
BackUp(int count)434 void LimitingInputStream::BackUp(int count) {
435 if (limit_ < 0) {
436 input_->BackUp(count - limit_);
437 limit_ = count;
438 } else {
439 input_->BackUp(count);
440 limit_ += count;
441 }
442 }
443
Skip(int count)444 bool LimitingInputStream::Skip(int count) {
445 if (count > limit_) {
446 if (limit_ < 0) return false;
447 input_->Skip(limit_);
448 limit_ = 0;
449 return false;
450 } else {
451 if (!input_->Skip(count)) return false;
452 limit_ -= count;
453 return true;
454 }
455 }
456
ByteCount() const457 int64 LimitingInputStream::ByteCount() const {
458 if (limit_ < 0) {
459 return input_->ByteCount() + limit_;
460 } else {
461 return input_->ByteCount();
462 }
463 }
464
465
466 // ===================================================================
467
468 } // namespace io
469 } // namespace protobuf
470 } // namespace google
471