• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 // https://developers.google.com/protocol-buffers/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 //     * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 //     * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 //     * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 // Author: kenton@google.com (Kenton Varda)
32 //  Based on original Protocol Buffers design by
33 //  Sanjay Ghemawat, Jeff Dean, and others.
34 
35 #ifdef _MSC_VER
36 #include <io.h>
37 #else
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #endif
43 #include <errno.h>
44 #include <iostream>
45 #include <algorithm>
46 
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/common.h>
49 #include <google/protobuf/stubs/logging.h>
50 #include <google/protobuf/stubs/stl_util.h>
51 
52 
53 namespace google {
54 namespace protobuf {
55 namespace io {
56 
57 #ifdef _WIN32
58 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
59 // return value is undefined.  We re-define it to always produce an error.
60 #define lseek(fd, offset, origin) ((off_t)-1)
61 #endif
62 
63 namespace {
64 
65 // EINTR sucks.
close_no_eintr(int fd)66 int close_no_eintr(int fd) {
67   int result;
68   do {
69     result = close(fd);
70   } while (result < 0 && errno == EINTR);
71   return result;
72 }
73 
74 }  // namespace
75 
76 
77 // ===================================================================
78 
FileInputStream(int file_descriptor,int block_size)79 FileInputStream::FileInputStream(int file_descriptor, int block_size)
80   : copying_input_(file_descriptor),
81     impl_(&copying_input_, block_size) {
82 }
83 
~FileInputStream()84 FileInputStream::~FileInputStream() {}
85 
Close()86 bool FileInputStream::Close() {
87   return copying_input_.Close();
88 }
89 
Next(const void ** data,int * size)90 bool FileInputStream::Next(const void** data, int* size) {
91   return impl_.Next(data, size);
92 }
93 
BackUp(int count)94 void FileInputStream::BackUp(int count) {
95   impl_.BackUp(count);
96 }
97 
Skip(int count)98 bool FileInputStream::Skip(int count) {
99   return impl_.Skip(count);
100 }
101 
ByteCount() const102 int64 FileInputStream::ByteCount() const {
103   return impl_.ByteCount();
104 }
105 
CopyingFileInputStream(int file_descriptor)106 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
107     int file_descriptor)
108   : file_(file_descriptor),
109     close_on_delete_(false),
110     is_closed_(false),
111     errno_(0),
112     previous_seek_failed_(false) {
113 }
114 
~CopyingFileInputStream()115 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
116   if (close_on_delete_) {
117     if (!Close()) {
118       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
119     }
120   }
121 }
122 
Close()123 bool FileInputStream::CopyingFileInputStream::Close() {
124   GOOGLE_CHECK(!is_closed_);
125 
126   is_closed_ = true;
127   if (close_no_eintr(file_) != 0) {
128     // The docs on close() do not specify whether a file descriptor is still
129     // open after close() fails with EIO.  However, the glibc source code
130     // seems to indicate that it is not.
131     errno_ = errno;
132     return false;
133   }
134 
135   return true;
136 }
137 
Read(void * buffer,int size)138 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
139   GOOGLE_CHECK(!is_closed_);
140 
141   int result;
142   do {
143     result = read(file_, buffer, size);
144   } while (result < 0 && errno == EINTR);
145 
146   if (result < 0) {
147     // Read error (not EOF).
148     errno_ = errno;
149   }
150 
151   return result;
152 }
153 
Skip(int count)154 int FileInputStream::CopyingFileInputStream::Skip(int count) {
155   GOOGLE_CHECK(!is_closed_);
156 
157   if (!previous_seek_failed_ &&
158       lseek(file_, count, SEEK_CUR) != (off_t)-1) {
159     // Seek succeeded.
160     return count;
161   } else {
162     // Failed to seek.
163 
164     // Note to self:  Don't seek again.  This file descriptor doesn't
165     // support it.
166     previous_seek_failed_ = true;
167 
168     // Use the default implementation.
169     return CopyingInputStream::Skip(count);
170   }
171 }
172 
173 // ===================================================================
174 
FileOutputStream(int file_descriptor,int block_size)175 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
176   : copying_output_(file_descriptor),
177     impl_(&copying_output_, block_size) {
178 }
179 
~FileOutputStream()180 FileOutputStream::~FileOutputStream() {
181   impl_.Flush();
182 }
183 
Close()184 bool FileOutputStream::Close() {
185   bool flush_succeeded = impl_.Flush();
186   return copying_output_.Close() && flush_succeeded;
187 }
188 
Flush()189 bool FileOutputStream::Flush() {
190   return impl_.Flush();
191 }
192 
Next(void ** data,int * size)193 bool FileOutputStream::Next(void** data, int* size) {
194   return impl_.Next(data, size);
195 }
196 
BackUp(int count)197 void FileOutputStream::BackUp(int count) {
198   impl_.BackUp(count);
199 }
200 
ByteCount() const201 int64 FileOutputStream::ByteCount() const {
202   return impl_.ByteCount();
203 }
204 
CopyingFileOutputStream(int file_descriptor)205 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
206     int file_descriptor)
207   : file_(file_descriptor),
208     close_on_delete_(false),
209     is_closed_(false),
210     errno_(0) {
211 }
212 
~CopyingFileOutputStream()213 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
214   if (close_on_delete_) {
215     if (!Close()) {
216       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
217     }
218   }
219 }
220 
Close()221 bool FileOutputStream::CopyingFileOutputStream::Close() {
222   GOOGLE_CHECK(!is_closed_);
223 
224   is_closed_ = true;
225   if (close_no_eintr(file_) != 0) {
226     // The docs on close() do not specify whether a file descriptor is still
227     // open after close() fails with EIO.  However, the glibc source code
228     // seems to indicate that it is not.
229     errno_ = errno;
230     return false;
231   }
232 
233   return true;
234 }
235 
Write(const void * buffer,int size)236 bool FileOutputStream::CopyingFileOutputStream::Write(
237     const void* buffer, int size) {
238   GOOGLE_CHECK(!is_closed_);
239   int total_written = 0;
240 
241   const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
242 
243   while (total_written < size) {
244     int bytes;
245     do {
246       bytes = write(file_, buffer_base + total_written, size - total_written);
247     } while (bytes < 0 && errno == EINTR);
248 
249     if (bytes <= 0) {
250       // Write error.
251 
252       // FIXME(kenton):  According to the man page, if write() returns zero,
253       //   there was no error; write() simply did not write anything.  It's
254       //   unclear under what circumstances this might happen, but presumably
255       //   errno won't be set in this case.  I am confused as to how such an
256       //   event should be handled.  For now I'm treating it as an error, since
257       //   retrying seems like it could lead to an infinite loop.  I suspect
258       //   this never actually happens anyway.
259 
260       if (bytes < 0) {
261         errno_ = errno;
262       }
263       return false;
264     }
265     total_written += bytes;
266   }
267 
268   return true;
269 }
270 
271 // ===================================================================
272 
IstreamInputStream(istream * input,int block_size)273 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
274   : copying_input_(input),
275     impl_(&copying_input_, block_size) {
276 }
277 
~IstreamInputStream()278 IstreamInputStream::~IstreamInputStream() {}
279 
Next(const void ** data,int * size)280 bool IstreamInputStream::Next(const void** data, int* size) {
281   return impl_.Next(data, size);
282 }
283 
BackUp(int count)284 void IstreamInputStream::BackUp(int count) {
285   impl_.BackUp(count);
286 }
287 
Skip(int count)288 bool IstreamInputStream::Skip(int count) {
289   return impl_.Skip(count);
290 }
291 
ByteCount() const292 int64 IstreamInputStream::ByteCount() const {
293   return impl_.ByteCount();
294 }
295 
CopyingIstreamInputStream(istream * input)296 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
297     istream* input)
298   : input_(input) {
299 }
300 
~CopyingIstreamInputStream()301 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
302 
Read(void * buffer,int size)303 int IstreamInputStream::CopyingIstreamInputStream::Read(
304     void* buffer, int size) {
305   input_->read(reinterpret_cast<char*>(buffer), size);
306   int result = input_->gcount();
307   if (result == 0 && input_->fail() && !input_->eof()) {
308     return -1;
309   }
310   return result;
311 }
312 
313 // ===================================================================
314 
OstreamOutputStream(ostream * output,int block_size)315 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
316   : copying_output_(output),
317     impl_(&copying_output_, block_size) {
318 }
319 
~OstreamOutputStream()320 OstreamOutputStream::~OstreamOutputStream() {
321   impl_.Flush();
322 }
323 
Next(void ** data,int * size)324 bool OstreamOutputStream::Next(void** data, int* size) {
325   return impl_.Next(data, size);
326 }
327 
BackUp(int count)328 void OstreamOutputStream::BackUp(int count) {
329   impl_.BackUp(count);
330 }
331 
ByteCount() const332 int64 OstreamOutputStream::ByteCount() const {
333   return impl_.ByteCount();
334 }
335 
CopyingOstreamOutputStream(ostream * output)336 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
337     ostream* output)
338   : output_(output) {
339 }
340 
~CopyingOstreamOutputStream()341 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
342 }
343 
Write(const void * buffer,int size)344 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
345     const void* buffer, int size) {
346   output_->write(reinterpret_cast<const char*>(buffer), size);
347   return output_->good();
348 }
349 
350 // ===================================================================
351 
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)352 ConcatenatingInputStream::ConcatenatingInputStream(
353     ZeroCopyInputStream* const streams[], int count)
354   : streams_(streams), stream_count_(count), bytes_retired_(0) {
355 }
356 
~ConcatenatingInputStream()357 ConcatenatingInputStream::~ConcatenatingInputStream() {
358 }
359 
Next(const void ** data,int * size)360 bool ConcatenatingInputStream::Next(const void** data, int* size) {
361   while (stream_count_ > 0) {
362     if (streams_[0]->Next(data, size)) return true;
363 
364     // That stream is done.  Advance to the next one.
365     bytes_retired_ += streams_[0]->ByteCount();
366     ++streams_;
367     --stream_count_;
368   }
369 
370   // No more streams.
371   return false;
372 }
373 
BackUp(int count)374 void ConcatenatingInputStream::BackUp(int count) {
375   if (stream_count_ > 0) {
376     streams_[0]->BackUp(count);
377   } else {
378     GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
379   }
380 }
381 
Skip(int count)382 bool ConcatenatingInputStream::Skip(int count) {
383   while (stream_count_ > 0) {
384     // Assume that ByteCount() can be used to find out how much we actually
385     // skipped when Skip() fails.
386     int64 target_byte_count = streams_[0]->ByteCount() + count;
387     if (streams_[0]->Skip(count)) return true;
388 
389     // Hit the end of the stream.  Figure out how many more bytes we still have
390     // to skip.
391     int64 final_byte_count = streams_[0]->ByteCount();
392     GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
393     count = target_byte_count - final_byte_count;
394 
395     // That stream is done.  Advance to the next one.
396     bytes_retired_ += final_byte_count;
397     ++streams_;
398     --stream_count_;
399   }
400 
401   return false;
402 }
403 
ByteCount() const404 int64 ConcatenatingInputStream::ByteCount() const {
405   if (stream_count_ == 0) {
406     return bytes_retired_;
407   } else {
408     return bytes_retired_ + streams_[0]->ByteCount();
409   }
410 }
411 
412 
413 // ===================================================================
414 
LimitingInputStream(ZeroCopyInputStream * input,int64 limit)415 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
416                                          int64 limit)
417   : input_(input), limit_(limit) {
418   prior_bytes_read_ = input_->ByteCount();
419 }
420 
~LimitingInputStream()421 LimitingInputStream::~LimitingInputStream() {
422   // If we overshot the limit, back up.
423   if (limit_ < 0) input_->BackUp(-limit_);
424 }
425 
Next(const void ** data,int * size)426 bool LimitingInputStream::Next(const void** data, int* size) {
427   if (limit_ <= 0) return false;
428   if (!input_->Next(data, size)) return false;
429 
430   limit_ -= *size;
431   if (limit_ < 0) {
432     // We overshot the limit.  Reduce *size to hide the rest of the buffer.
433     *size += limit_;
434   }
435   return true;
436 }
437 
BackUp(int count)438 void LimitingInputStream::BackUp(int count) {
439   if (limit_ < 0) {
440     input_->BackUp(count - limit_);
441     limit_ = count;
442   } else {
443     input_->BackUp(count);
444     limit_ += count;
445   }
446 }
447 
Skip(int count)448 bool LimitingInputStream::Skip(int count) {
449   if (count > limit_) {
450     if (limit_ < 0) return false;
451     input_->Skip(limit_);
452     limit_ = 0;
453     return false;
454   } else {
455     if (!input_->Skip(count)) return false;
456     limit_ -= count;
457     return true;
458   }
459 }
460 
ByteCount() const461 int64 LimitingInputStream::ByteCount() const {
462   if (limit_ < 0) {
463     return input_->ByteCount() + limit_ - prior_bytes_read_;
464   } else {
465     return input_->ByteCount() - prior_bytes_read_;
466   }
467 }
468 
469 
470 // ===================================================================
471 
472 }  // namespace io
473 }  // namespace protobuf
474 }  // namespace google
475