• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7 
8 // Author: kenton@google.com (Kenton Varda)
9 //  Based on original Protocol Buffers design by
10 //  Sanjay Ghemawat, Jeff Dean, and others.
11 
12 #ifndef _MSC_VER
13 #include <fcntl.h>
14 #include <sys/stat.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #endif
18 #include <errno.h>
19 
20 #include <algorithm>
21 #include <istream>
22 #include <ostream>
23 
24 #include "google/protobuf/stubs/common.h"
25 #include "absl/log/absl_check.h"
26 #include "absl/log/absl_log.h"
27 #include "google/protobuf/io/io_win32.h"
28 #include "google/protobuf/io/zero_copy_stream_impl.h"
29 
30 
31 namespace google {
32 namespace protobuf {
33 namespace io {
34 
35 #ifdef _WIN32
36 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
37 // return value is undefined.  We re-define it to always produce an error.
38 #define lseek(fd, offset, origin) ((off_t)-1)
39 // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
40 // them like we do below.
41 using google::protobuf::io::win32::access;
42 using google::protobuf::io::win32::close;
43 using google::protobuf::io::win32::open;
44 using google::protobuf::io::win32::read;
45 using google::protobuf::io::win32::write;
46 #endif
47 
48 namespace {
49 
50 // EINTR sucks.
close_no_eintr(int fd)51 int close_no_eintr(int fd) {
52   int result;
53   do {
54     result = close(fd);
55   } while (result < 0 && errno == EINTR);
56   return result;
57 }
58 
59 }  // namespace
60 
61 // ===================================================================
62 
FileInputStream(int file_descriptor,int block_size)63 FileInputStream::FileInputStream(int file_descriptor, int block_size)
64     : copying_input_(file_descriptor), impl_(&copying_input_, block_size) {}
65 
Close()66 bool FileInputStream::Close() { return copying_input_.Close(); }
67 
Next(const void ** data,int * size)68 bool FileInputStream::Next(const void** data, int* size) {
69   return impl_.Next(data, size);
70 }
71 
BackUp(int count)72 void FileInputStream::BackUp(int count) { impl_.BackUp(count); }
73 
Skip(int count)74 bool FileInputStream::Skip(int count) { return impl_.Skip(count); }
75 
ByteCount() const76 int64_t FileInputStream::ByteCount() const { return impl_.ByteCount(); }
77 
CopyingFileInputStream(int file_descriptor)78 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
79     int file_descriptor)
80     : file_(file_descriptor),
81       close_on_delete_(false),
82       is_closed_(false),
83       errno_(0),
84       previous_seek_failed_(false) {
85 #ifndef _WIN32
86   int flags = fcntl(file_, F_GETFL);
87   flags &= ~O_NONBLOCK;
88   fcntl(file_, F_SETFL, flags);
89 #endif
90 }
91 
~CopyingFileInputStream()92 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
93   if (close_on_delete_) {
94     if (!Close()) {
95       ABSL_LOG(ERROR) << "close() failed: " << strerror(errno_);
96     }
97   }
98 }
99 
Close()100 bool FileInputStream::CopyingFileInputStream::Close() {
101   ABSL_CHECK(!is_closed_);
102 
103   is_closed_ = true;
104   if (close_no_eintr(file_) != 0) {
105     // The docs on close() do not specify whether a file descriptor is still
106     // open after close() fails with EIO.  However, the glibc source code
107     // seems to indicate that it is not.
108     errno_ = errno;
109     return false;
110   }
111 
112   return true;
113 }
114 
Read(void * buffer,int size)115 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
116   ABSL_CHECK(!is_closed_);
117 
118   int result;
119   do {
120     result = read(file_, buffer, size);
121   } while (result < 0 && errno == EINTR);
122 
123   if (result < 0) {
124     // Read error (not EOF).
125     errno_ = errno;
126   }
127 
128   return result;
129 }
130 
Skip(int count)131 int FileInputStream::CopyingFileInputStream::Skip(int count) {
132   ABSL_CHECK(!is_closed_);
133 
134   if (!previous_seek_failed_ && lseek(file_, count, SEEK_CUR) != (off_t)-1) {
135     // Seek succeeded.
136     return count;
137   } else {
138     // Failed to seek.
139 
140     // Note to self:  Don't seek again.  This file descriptor doesn't
141     // support it.
142     previous_seek_failed_ = true;
143 
144     // Use the default implementation.
145     return CopyingInputStream::Skip(count);
146   }
147 }
148 
149 // ===================================================================
150 
FileOutputStream(int file_descriptor,int block_size)151 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
152     : CopyingOutputStreamAdaptor(&copying_output_, block_size),
153       copying_output_(file_descriptor) {}
154 
Close()155 bool FileOutputStream::Close() {
156   bool flush_succeeded = Flush();
157   return copying_output_.Close() && flush_succeeded;
158 }
159 
CopyingFileOutputStream(int file_descriptor)160 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
161     int file_descriptor)
162     : file_(file_descriptor),
163       close_on_delete_(false),
164       is_closed_(false),
165       errno_(0) {}
166 
~FileOutputStream()167 FileOutputStream::~FileOutputStream() { Flush(); }
168 
~CopyingFileOutputStream()169 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
170   if (close_on_delete_) {
171     if (!Close()) {
172       ABSL_LOG(ERROR) << "close() failed: " << strerror(errno_);
173     }
174   }
175 }
176 
Close()177 bool FileOutputStream::CopyingFileOutputStream::Close() {
178   ABSL_CHECK(!is_closed_);
179 
180   is_closed_ = true;
181   if (close_no_eintr(file_) != 0) {
182     // The docs on close() do not specify whether a file descriptor is still
183     // open after close() fails with EIO.  However, the glibc source code
184     // seems to indicate that it is not.
185     errno_ = errno;
186     return false;
187   }
188 
189   return true;
190 }
191 
Write(const void * buffer,int size)192 bool FileOutputStream::CopyingFileOutputStream::Write(const void* buffer,
193                                                       int size) {
194   ABSL_CHECK(!is_closed_);
195   int total_written = 0;
196 
197   const uint8_t* buffer_base = reinterpret_cast<const uint8_t*>(buffer);
198 
199   while (total_written < size) {
200     int bytes;
201     do {
202       bytes = write(file_, buffer_base + total_written, size - total_written);
203     } while (bytes < 0 && errno == EINTR);
204 
205     if (bytes <= 0) {
206       // Write error.
207 
208       // FIXME(kenton):  According to the man page, if write() returns zero,
209       //   there was no error; write() simply did not write anything.  It's
210       //   unclear under what circumstances this might happen, but presumably
211       //   errno won't be set in this case.  I am confused as to how such an
212       //   event should be handled.  For now I'm treating it as an error, since
213       //   retrying seems like it could lead to an infinite loop.  I suspect
214       //   this never actually happens anyway.
215 
216       if (bytes < 0) {
217         errno_ = errno;
218       }
219       return false;
220     }
221     total_written += bytes;
222   }
223 
224   return true;
225 }
226 
227 // ===================================================================
228 
IstreamInputStream(std::istream * input,int block_size)229 IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
230     : copying_input_(input), impl_(&copying_input_, block_size) {}
231 
Next(const void ** data,int * size)232 bool IstreamInputStream::Next(const void** data, int* size) {
233   return impl_.Next(data, size);
234 }
235 
BackUp(int count)236 void IstreamInputStream::BackUp(int count) { impl_.BackUp(count); }
237 
Skip(int count)238 bool IstreamInputStream::Skip(int count) { return impl_.Skip(count); }
239 
ByteCount() const240 int64_t IstreamInputStream::ByteCount() const { return impl_.ByteCount(); }
241 
CopyingIstreamInputStream(std::istream * input)242 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
243     std::istream* input)
244     : input_(input) {}
245 
~CopyingIstreamInputStream()246 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
247 
Read(void * buffer,int size)248 int IstreamInputStream::CopyingIstreamInputStream::Read(void* buffer,
249                                                         int size) {
250   input_->read(reinterpret_cast<char*>(buffer), size);
251   int result = input_->gcount();
252   if (result == 0 && input_->fail() && !input_->eof()) {
253     return -1;
254   }
255   return result;
256 }
257 
258 // ===================================================================
259 
OstreamOutputStream(std::ostream * output,int block_size)260 OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
261     : copying_output_(output), impl_(&copying_output_, block_size) {}
262 
~OstreamOutputStream()263 OstreamOutputStream::~OstreamOutputStream() { impl_.Flush(); }
264 
Next(void ** data,int * size)265 bool OstreamOutputStream::Next(void** data, int* size) {
266   return impl_.Next(data, size);
267 }
268 
BackUp(int count)269 void OstreamOutputStream::BackUp(int count) { impl_.BackUp(count); }
270 
ByteCount() const271 int64_t OstreamOutputStream::ByteCount() const { return impl_.ByteCount(); }
272 
CopyingOstreamOutputStream(std::ostream * output)273 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
274     std::ostream* output)
275     : output_(output) {}
276 
~CopyingOstreamOutputStream()277 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
278 }
279 
Write(const void * buffer,int size)280 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(const void* buffer,
281                                                             int size) {
282   output_->write(reinterpret_cast<const char*>(buffer), size);
283   return output_->good();
284 }
285 
286 // ===================================================================
287 
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)288 ConcatenatingInputStream::ConcatenatingInputStream(
289     ZeroCopyInputStream* const streams[], int count)
290     : streams_(streams), stream_count_(count), bytes_retired_(0) {
291 }
292 
Next(const void ** data,int * size)293 bool ConcatenatingInputStream::Next(const void** data, int* size) {
294   while (stream_count_ > 0) {
295     if (streams_[0]->Next(data, size)) return true;
296 
297     // That stream is done.  Advance to the next one.
298     bytes_retired_ += streams_[0]->ByteCount();
299     ++streams_;
300     --stream_count_;
301   }
302 
303   // No more streams.
304   return false;
305 }
306 
BackUp(int count)307 void ConcatenatingInputStream::BackUp(int count) {
308   if (stream_count_ > 0) {
309     streams_[0]->BackUp(count);
310   } else {
311     ABSL_DLOG(FATAL) << "Can't BackUp() after failed Next().";
312   }
313 }
314 
Skip(int count)315 bool ConcatenatingInputStream::Skip(int count) {
316   while (stream_count_ > 0) {
317     // Assume that ByteCount() can be used to find out how much we actually
318     // skipped when Skip() fails.
319     int64_t target_byte_count = streams_[0]->ByteCount() + count;
320     if (streams_[0]->Skip(count)) return true;
321 
322     // Hit the end of the stream.  Figure out how many more bytes we still have
323     // to skip.
324     int64_t final_byte_count = streams_[0]->ByteCount();
325     ABSL_DCHECK_LT(final_byte_count, target_byte_count);
326     count = target_byte_count - final_byte_count;
327 
328     // That stream is done.  Advance to the next one.
329     bytes_retired_ += final_byte_count;
330     ++streams_;
331     --stream_count_;
332   }
333 
334   return false;
335 }
336 
ByteCount() const337 int64_t ConcatenatingInputStream::ByteCount() const {
338   if (stream_count_ == 0) {
339     return bytes_retired_;
340   } else {
341     return bytes_retired_ + streams_[0]->ByteCount();
342   }
343 }
344 
345 
346 // ===================================================================
347 
348 }  // namespace io
349 }  // namespace protobuf
350 }  // namespace google
351