• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 // http://code.google.com/p/protobuf/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 //     * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 //     * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 //     * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 // Author: kenton@google.com (Kenton Varda)
32 //  Based on original Protocol Buffers design by
33 //  Sanjay Ghemawat, Jeff Dean, and others.
34 
35 #ifdef _MSC_VER
36 #include <io.h>
37 #else
38 #include <unistd.h>
39 #include <sys/types.h>
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #endif
43 #include <errno.h>
44 #include <iostream>
45 #include <algorithm>
46 
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/common.h>
49 #include <google/protobuf/stubs/stl_util-inl.h>
50 
51 namespace google {
52 namespace protobuf {
53 namespace io {
54 
55 #ifdef _WIN32
56 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
57 // return value is undefined.  We re-define it to always produce an error.
58 #define lseek(fd, offset, origin) ((off_t)-1)
59 #endif
60 
61 namespace {
62 
63 // EINTR sucks.
close_no_eintr(int fd)64 int close_no_eintr(int fd) {
65   int result;
66   do {
67     result = close(fd);
68   } while (result < 0 && errno == EINTR);
69   return result;
70 }
71 
72 }  // namespace
73 
74 
75 // ===================================================================
76 
FileInputStream(int file_descriptor,int block_size)77 FileInputStream::FileInputStream(int file_descriptor, int block_size)
78   : copying_input_(file_descriptor),
79     impl_(&copying_input_, block_size) {
80 }
81 
~FileInputStream()82 FileInputStream::~FileInputStream() {}
83 
Close()84 bool FileInputStream::Close() {
85   return copying_input_.Close();
86 }
87 
Next(const void ** data,int * size)88 bool FileInputStream::Next(const void** data, int* size) {
89   return impl_.Next(data, size);
90 }
91 
BackUp(int count)92 void FileInputStream::BackUp(int count) {
93   impl_.BackUp(count);
94 }
95 
Skip(int count)96 bool FileInputStream::Skip(int count) {
97   return impl_.Skip(count);
98 }
99 
ByteCount() const100 int64 FileInputStream::ByteCount() const {
101   return impl_.ByteCount();
102 }
103 
CopyingFileInputStream(int file_descriptor)104 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
105     int file_descriptor)
106   : file_(file_descriptor),
107     close_on_delete_(false),
108     is_closed_(false),
109     errno_(0),
110     previous_seek_failed_(false) {
111 }
112 
~CopyingFileInputStream()113 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
114   if (close_on_delete_) {
115     if (!Close()) {
116       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
117     }
118   }
119 }
120 
Close()121 bool FileInputStream::CopyingFileInputStream::Close() {
122   GOOGLE_CHECK(!is_closed_);
123 
124   is_closed_ = true;
125   if (close_no_eintr(file_) != 0) {
126     // The docs on close() do not specify whether a file descriptor is still
127     // open after close() fails with EIO.  However, the glibc source code
128     // seems to indicate that it is not.
129     errno_ = errno;
130     return false;
131   }
132 
133   return true;
134 }
135 
Read(void * buffer,int size)136 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
137   GOOGLE_CHECK(!is_closed_);
138 
139   int result;
140   do {
141     result = read(file_, buffer, size);
142   } while (result < 0 && errno == EINTR);
143 
144   if (result < 0) {
145     // Read error (not EOF).
146     errno_ = errno;
147   }
148 
149   return result;
150 }
151 
Skip(int count)152 int FileInputStream::CopyingFileInputStream::Skip(int count) {
153   GOOGLE_CHECK(!is_closed_);
154 
155   if (!previous_seek_failed_ &&
156       lseek(file_, count, SEEK_CUR) != (off_t)-1) {
157     // Seek succeeded.
158     return count;
159   } else {
160     // Failed to seek.
161 
162     // Note to self:  Don't seek again.  This file descriptor doesn't
163     // support it.
164     previous_seek_failed_ = true;
165 
166     // Use the default implementation.
167     return CopyingInputStream::Skip(count);
168   }
169 }
170 
171 // ===================================================================
172 
FileOutputStream(int file_descriptor,int block_size)173 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
174   : copying_output_(file_descriptor),
175     impl_(&copying_output_, block_size) {
176 }
177 
~FileOutputStream()178 FileOutputStream::~FileOutputStream() {
179   impl_.Flush();
180 }
181 
Close()182 bool FileOutputStream::Close() {
183   bool flush_succeeded = impl_.Flush();
184   return copying_output_.Close() && flush_succeeded;
185 }
186 
Flush()187 bool FileOutputStream::Flush() {
188   return impl_.Flush();
189 }
190 
Next(void ** data,int * size)191 bool FileOutputStream::Next(void** data, int* size) {
192   return impl_.Next(data, size);
193 }
194 
BackUp(int count)195 void FileOutputStream::BackUp(int count) {
196   impl_.BackUp(count);
197 }
198 
ByteCount() const199 int64 FileOutputStream::ByteCount() const {
200   return impl_.ByteCount();
201 }
202 
CopyingFileOutputStream(int file_descriptor)203 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
204     int file_descriptor)
205   : file_(file_descriptor),
206     close_on_delete_(false),
207     is_closed_(false),
208     errno_(0) {
209 }
210 
~CopyingFileOutputStream()211 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
212   if (close_on_delete_) {
213     if (!Close()) {
214       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
215     }
216   }
217 }
218 
Close()219 bool FileOutputStream::CopyingFileOutputStream::Close() {
220   GOOGLE_CHECK(!is_closed_);
221 
222   is_closed_ = true;
223   if (close_no_eintr(file_) != 0) {
224     // The docs on close() do not specify whether a file descriptor is still
225     // open after close() fails with EIO.  However, the glibc source code
226     // seems to indicate that it is not.
227     errno_ = errno;
228     return false;
229   }
230 
231   return true;
232 }
233 
Write(const void * buffer,int size)234 bool FileOutputStream::CopyingFileOutputStream::Write(
235     const void* buffer, int size) {
236   GOOGLE_CHECK(!is_closed_);
237   int total_written = 0;
238 
239   const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
240 
241   while (total_written < size) {
242     int bytes;
243     do {
244       bytes = write(file_, buffer_base + total_written, size - total_written);
245     } while (bytes < 0 && errno == EINTR);
246 
247     if (bytes <= 0) {
248       // Write error.
249 
250       // FIXME(kenton):  According to the man page, if write() returns zero,
251       //   there was no error; write() simply did not write anything.  It's
252       //   unclear under what circumstances this might happen, but presumably
253       //   errno won't be set in this case.  I am confused as to how such an
254       //   event should be handled.  For now I'm treating it as an error, since
255       //   retrying seems like it could lead to an infinite loop.  I suspect
256       //   this never actually happens anyway.
257 
258       if (bytes < 0) {
259         errno_ = errno;
260       }
261       return false;
262     }
263     total_written += bytes;
264   }
265 
266   return true;
267 }
268 
269 // ===================================================================
270 
IstreamInputStream(istream * input,int block_size)271 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
272   : copying_input_(input),
273     impl_(&copying_input_, block_size) {
274 }
275 
~IstreamInputStream()276 IstreamInputStream::~IstreamInputStream() {}
277 
Next(const void ** data,int * size)278 bool IstreamInputStream::Next(const void** data, int* size) {
279   return impl_.Next(data, size);
280 }
281 
BackUp(int count)282 void IstreamInputStream::BackUp(int count) {
283   impl_.BackUp(count);
284 }
285 
Skip(int count)286 bool IstreamInputStream::Skip(int count) {
287   return impl_.Skip(count);
288 }
289 
ByteCount() const290 int64 IstreamInputStream::ByteCount() const {
291   return impl_.ByteCount();
292 }
293 
CopyingIstreamInputStream(istream * input)294 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
295     istream* input)
296   : input_(input) {
297 }
298 
~CopyingIstreamInputStream()299 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
300 
Read(void * buffer,int size)301 int IstreamInputStream::CopyingIstreamInputStream::Read(
302     void* buffer, int size) {
303   input_->read(reinterpret_cast<char*>(buffer), size);
304   int result = input_->gcount();
305   if (result == 0 && input_->fail() && !input_->eof()) {
306     return -1;
307   }
308   return result;
309 }
310 
311 // ===================================================================
312 
OstreamOutputStream(ostream * output,int block_size)313 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
314   : copying_output_(output),
315     impl_(&copying_output_, block_size) {
316 }
317 
~OstreamOutputStream()318 OstreamOutputStream::~OstreamOutputStream() {
319   impl_.Flush();
320 }
321 
Next(void ** data,int * size)322 bool OstreamOutputStream::Next(void** data, int* size) {
323   return impl_.Next(data, size);
324 }
325 
BackUp(int count)326 void OstreamOutputStream::BackUp(int count) {
327   impl_.BackUp(count);
328 }
329 
ByteCount() const330 int64 OstreamOutputStream::ByteCount() const {
331   return impl_.ByteCount();
332 }
333 
CopyingOstreamOutputStream(ostream * output)334 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
335     ostream* output)
336   : output_(output) {
337 }
338 
~CopyingOstreamOutputStream()339 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
340 }
341 
Write(const void * buffer,int size)342 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
343     const void* buffer, int size) {
344   output_->write(reinterpret_cast<const char*>(buffer), size);
345   return output_->good();
346 }
347 
348 // ===================================================================
349 
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)350 ConcatenatingInputStream::ConcatenatingInputStream(
351     ZeroCopyInputStream* const streams[], int count)
352   : streams_(streams), stream_count_(count), bytes_retired_(0) {
353 }
354 
~ConcatenatingInputStream()355 ConcatenatingInputStream::~ConcatenatingInputStream() {
356 }
357 
Next(const void ** data,int * size)358 bool ConcatenatingInputStream::Next(const void** data, int* size) {
359   while (stream_count_ > 0) {
360     if (streams_[0]->Next(data, size)) return true;
361 
362     // That stream is done.  Advance to the next one.
363     bytes_retired_ += streams_[0]->ByteCount();
364     ++streams_;
365     --stream_count_;
366   }
367 
368   // No more streams.
369   return false;
370 }
371 
BackUp(int count)372 void ConcatenatingInputStream::BackUp(int count) {
373   if (stream_count_ > 0) {
374     streams_[0]->BackUp(count);
375   } else {
376     GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
377   }
378 }
379 
Skip(int count)380 bool ConcatenatingInputStream::Skip(int count) {
381   while (stream_count_ > 0) {
382     // Assume that ByteCount() can be used to find out how much we actually
383     // skipped when Skip() fails.
384     int64 target_byte_count = streams_[0]->ByteCount() + count;
385     if (streams_[0]->Skip(count)) return true;
386 
387     // Hit the end of the stream.  Figure out how many more bytes we still have
388     // to skip.
389     int64 final_byte_count = streams_[0]->ByteCount();
390     GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
391     count = target_byte_count - final_byte_count;
392 
393     // That stream is done.  Advance to the next one.
394     bytes_retired_ += final_byte_count;
395     ++streams_;
396     --stream_count_;
397   }
398 
399   return false;
400 }
401 
ByteCount() const402 int64 ConcatenatingInputStream::ByteCount() const {
403   if (stream_count_ == 0) {
404     return bytes_retired_;
405   } else {
406     return bytes_retired_ + streams_[0]->ByteCount();
407   }
408 }
409 
410 
411 // ===================================================================
412 
LimitingInputStream(ZeroCopyInputStream * input,int64 limit)413 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
414                                          int64 limit)
415   : input_(input), limit_(limit) {}
416 
~LimitingInputStream()417 LimitingInputStream::~LimitingInputStream() {
418   // If we overshot the limit, back up.
419   if (limit_ < 0) input_->BackUp(-limit_);
420 }
421 
Next(const void ** data,int * size)422 bool LimitingInputStream::Next(const void** data, int* size) {
423   if (limit_ <= 0) return false;
424   if (!input_->Next(data, size)) return false;
425 
426   limit_ -= *size;
427   if (limit_ < 0) {
428     // We overshot the limit.  Reduce *size to hide the rest of the buffer.
429     *size += limit_;
430   }
431   return true;
432 }
433 
BackUp(int count)434 void LimitingInputStream::BackUp(int count) {
435   if (limit_ < 0) {
436     input_->BackUp(count - limit_);
437     limit_ = count;
438   } else {
439     input_->BackUp(count);
440     limit_ += count;
441   }
442 }
443 
Skip(int count)444 bool LimitingInputStream::Skip(int count) {
445   if (count > limit_) {
446     if (limit_ < 0) return false;
447     input_->Skip(limit_);
448     limit_ = 0;
449     return false;
450   } else {
451     if (!input_->Skip(count)) return false;
452     limit_ -= count;
453     return true;
454   }
455 }
456 
ByteCount() const457 int64 LimitingInputStream::ByteCount() const {
458   if (limit_ < 0) {
459     return input_->ByteCount() + limit_;
460   } else {
461     return input_->ByteCount();
462   }
463 }
464 
465 
466 // ===================================================================
467 
468 }  // namespace io
469 }  // namespace protobuf
470 }  // namespace google
471