• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 // https://developers.google.com/protocol-buffers/
4 //
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
7 // met:
8 //
9 //     * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 //     * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
14 // distribution.
15 //     * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 
31 // Author: kenton@google.com (Kenton Varda)
32 //  Based on original Protocol Buffers design by
33 //  Sanjay Ghemawat, Jeff Dean, and others.
34 
35 #ifndef _MSC_VER
36 #include <fcntl.h>
37 #include <sys/stat.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #endif
41 #include <errno.h>
42 
43 #include <algorithm>
44 #include <iostream>
45 
46 #include <google/protobuf/stubs/common.h>
47 #include <google/protobuf/stubs/logging.h>
48 #include <google/protobuf/io/io_win32.h>
49 #include <google/protobuf/io/zero_copy_stream_impl.h>
50 #include <google/protobuf/stubs/stl_util.h>
51 
52 
53 namespace google {
54 namespace protobuf {
55 namespace io {
56 
57 #ifdef _WIN32
58 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
59 // return value is undefined.  We re-define it to always produce an error.
60 #define lseek(fd, offset, origin) ((off_t)-1)
61 // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
62 // them like we do below.
63 using google::protobuf::io::win32::access;
64 using google::protobuf::io::win32::close;
65 using google::protobuf::io::win32::open;
66 using google::protobuf::io::win32::read;
67 using google::protobuf::io::win32::write;
68 #endif
69 
70 namespace {
71 
72 // EINTR sucks.
close_no_eintr(int fd)73 int close_no_eintr(int fd) {
74   int result;
75   do {
76     result = close(fd);
77   } while (result < 0 && errno == EINTR);
78   return result;
79 }
80 
81 }  // namespace
82 
83 // ===================================================================
84 
FileInputStream(int file_descriptor,int block_size)85 FileInputStream::FileInputStream(int file_descriptor, int block_size)
86     : copying_input_(file_descriptor), impl_(&copying_input_, block_size) {}
87 
Close()88 bool FileInputStream::Close() { return copying_input_.Close(); }
89 
Next(const void ** data,int * size)90 bool FileInputStream::Next(const void** data, int* size) {
91   return impl_.Next(data, size);
92 }
93 
BackUp(int count)94 void FileInputStream::BackUp(int count) { impl_.BackUp(count); }
95 
Skip(int count)96 bool FileInputStream::Skip(int count) { return impl_.Skip(count); }
97 
ByteCount() const98 int64_t FileInputStream::ByteCount() const { return impl_.ByteCount(); }
99 
CopyingFileInputStream(int file_descriptor)100 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
101     int file_descriptor)
102     : file_(file_descriptor),
103       close_on_delete_(false),
104       is_closed_(false),
105       errno_(0),
106       previous_seek_failed_(false) {}
107 
~CopyingFileInputStream()108 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
109   if (close_on_delete_) {
110     if (!Close()) {
111       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
112     }
113   }
114 }
115 
Close()116 bool FileInputStream::CopyingFileInputStream::Close() {
117   GOOGLE_CHECK(!is_closed_);
118 
119   is_closed_ = true;
120   if (close_no_eintr(file_) != 0) {
121     // The docs on close() do not specify whether a file descriptor is still
122     // open after close() fails with EIO.  However, the glibc source code
123     // seems to indicate that it is not.
124     errno_ = errno;
125     return false;
126   }
127 
128   return true;
129 }
130 
Read(void * buffer,int size)131 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
132   GOOGLE_CHECK(!is_closed_);
133 
134   int result;
135   do {
136     result = read(file_, buffer, size);
137   } while (result < 0 && errno == EINTR);
138 
139   if (result < 0) {
140     // Read error (not EOF).
141     errno_ = errno;
142   }
143 
144   return result;
145 }
146 
Skip(int count)147 int FileInputStream::CopyingFileInputStream::Skip(int count) {
148   GOOGLE_CHECK(!is_closed_);
149 
150   if (!previous_seek_failed_ && lseek(file_, count, SEEK_CUR) != (off_t)-1) {
151     // Seek succeeded.
152     return count;
153   } else {
154     // Failed to seek.
155 
156     // Note to self:  Don't seek again.  This file descriptor doesn't
157     // support it.
158     previous_seek_failed_ = true;
159 
160     // Use the default implementation.
161     return CopyingInputStream::Skip(count);
162   }
163 }
164 
165 // ===================================================================
166 
FileOutputStream(int file_descriptor,int block_size)167 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
168     : copying_output_(file_descriptor), impl_(&copying_output_, block_size) {}
169 
~FileOutputStream()170 FileOutputStream::~FileOutputStream() { impl_.Flush(); }
171 
Close()172 bool FileOutputStream::Close() {
173   bool flush_succeeded = impl_.Flush();
174   return copying_output_.Close() && flush_succeeded;
175 }
176 
Flush()177 bool FileOutputStream::Flush() { return impl_.Flush(); }
178 
Next(void ** data,int * size)179 bool FileOutputStream::Next(void** data, int* size) {
180   return impl_.Next(data, size);
181 }
182 
BackUp(int count)183 void FileOutputStream::BackUp(int count) { impl_.BackUp(count); }
184 
ByteCount() const185 int64_t FileOutputStream::ByteCount() const { return impl_.ByteCount(); }
186 
CopyingFileOutputStream(int file_descriptor)187 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
188     int file_descriptor)
189     : file_(file_descriptor),
190       close_on_delete_(false),
191       is_closed_(false),
192       errno_(0) {}
193 
~CopyingFileOutputStream()194 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
195   if (close_on_delete_) {
196     if (!Close()) {
197       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
198     }
199   }
200 }
201 
Close()202 bool FileOutputStream::CopyingFileOutputStream::Close() {
203   GOOGLE_CHECK(!is_closed_);
204 
205   is_closed_ = true;
206   if (close_no_eintr(file_) != 0) {
207     // The docs on close() do not specify whether a file descriptor is still
208     // open after close() fails with EIO.  However, the glibc source code
209     // seems to indicate that it is not.
210     errno_ = errno;
211     return false;
212   }
213 
214   return true;
215 }
216 
Write(const void * buffer,int size)217 bool FileOutputStream::CopyingFileOutputStream::Write(const void* buffer,
218                                                       int size) {
219   GOOGLE_CHECK(!is_closed_);
220   int total_written = 0;
221 
222   const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
223 
224   while (total_written < size) {
225     int bytes;
226     do {
227       bytes = write(file_, buffer_base + total_written, size - total_written);
228     } while (bytes < 0 && errno == EINTR);
229 
230     if (bytes <= 0) {
231       // Write error.
232 
233       // FIXME(kenton):  According to the man page, if write() returns zero,
234       //   there was no error; write() simply did not write anything.  It's
235       //   unclear under what circumstances this might happen, but presumably
236       //   errno won't be set in this case.  I am confused as to how such an
237       //   event should be handled.  For now I'm treating it as an error, since
238       //   retrying seems like it could lead to an infinite loop.  I suspect
239       //   this never actually happens anyway.
240 
241       if (bytes < 0) {
242         errno_ = errno;
243       }
244       return false;
245     }
246     total_written += bytes;
247   }
248 
249   return true;
250 }
251 
252 // ===================================================================
253 
IstreamInputStream(std::istream * input,int block_size)254 IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
255     : copying_input_(input), impl_(&copying_input_, block_size) {}
256 
Next(const void ** data,int * size)257 bool IstreamInputStream::Next(const void** data, int* size) {
258   return impl_.Next(data, size);
259 }
260 
BackUp(int count)261 void IstreamInputStream::BackUp(int count) { impl_.BackUp(count); }
262 
Skip(int count)263 bool IstreamInputStream::Skip(int count) { return impl_.Skip(count); }
264 
ByteCount() const265 int64_t IstreamInputStream::ByteCount() const { return impl_.ByteCount(); }
266 
CopyingIstreamInputStream(std::istream * input)267 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
268     std::istream* input)
269     : input_(input) {}
270 
~CopyingIstreamInputStream()271 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
272 
Read(void * buffer,int size)273 int IstreamInputStream::CopyingIstreamInputStream::Read(void* buffer,
274                                                         int size) {
275   input_->read(reinterpret_cast<char*>(buffer), size);
276   int result = input_->gcount();
277   if (result == 0 && input_->fail() && !input_->eof()) {
278     return -1;
279   }
280   return result;
281 }
282 
283 // ===================================================================
284 
OstreamOutputStream(std::ostream * output,int block_size)285 OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
286     : copying_output_(output), impl_(&copying_output_, block_size) {}
287 
~OstreamOutputStream()288 OstreamOutputStream::~OstreamOutputStream() { impl_.Flush(); }
289 
Next(void ** data,int * size)290 bool OstreamOutputStream::Next(void** data, int* size) {
291   return impl_.Next(data, size);
292 }
293 
BackUp(int count)294 void OstreamOutputStream::BackUp(int count) { impl_.BackUp(count); }
295 
ByteCount() const296 int64_t OstreamOutputStream::ByteCount() const { return impl_.ByteCount(); }
297 
CopyingOstreamOutputStream(std::ostream * output)298 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
299     std::ostream* output)
300     : output_(output) {}
301 
~CopyingOstreamOutputStream()302 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
303 }
304 
Write(const void * buffer,int size)305 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(const void* buffer,
306                                                             int size) {
307   output_->write(reinterpret_cast<const char*>(buffer), size);
308   return output_->good();
309 }
310 
311 // ===================================================================
312 
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)313 ConcatenatingInputStream::ConcatenatingInputStream(
314     ZeroCopyInputStream* const streams[], int count)
315     : streams_(streams), stream_count_(count), bytes_retired_(0) {
316 }
317 
Next(const void ** data,int * size)318 bool ConcatenatingInputStream::Next(const void** data, int* size) {
319   while (stream_count_ > 0) {
320     if (streams_[0]->Next(data, size)) return true;
321 
322     // That stream is done.  Advance to the next one.
323     bytes_retired_ += streams_[0]->ByteCount();
324     ++streams_;
325     --stream_count_;
326   }
327 
328   // No more streams.
329   return false;
330 }
331 
BackUp(int count)332 void ConcatenatingInputStream::BackUp(int count) {
333   if (stream_count_ > 0) {
334     streams_[0]->BackUp(count);
335   } else {
336     GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
337   }
338 }
339 
Skip(int count)340 bool ConcatenatingInputStream::Skip(int count) {
341   while (stream_count_ > 0) {
342     // Assume that ByteCount() can be used to find out how much we actually
343     // skipped when Skip() fails.
344     int64 target_byte_count = streams_[0]->ByteCount() + count;
345     if (streams_[0]->Skip(count)) return true;
346 
347     // Hit the end of the stream.  Figure out how many more bytes we still have
348     // to skip.
349     int64 final_byte_count = streams_[0]->ByteCount();
350     GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
351     count = target_byte_count - final_byte_count;
352 
353     // That stream is done.  Advance to the next one.
354     bytes_retired_ += final_byte_count;
355     ++streams_;
356     --stream_count_;
357   }
358 
359   return false;
360 }
361 
ByteCount() const362 int64_t ConcatenatingInputStream::ByteCount() const {
363   if (stream_count_ == 0) {
364     return bytes_retired_;
365   } else {
366     return bytes_retired_ + streams_[0]->ByteCount();
367   }
368 }
369 
370 
371 // ===================================================================
372 
373 }  // namespace io
374 }  // namespace protobuf
375 }  // namespace google
376