1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7
8 // Author: kenton@google.com (Kenton Varda)
9 // Based on original Protocol Buffers design by
10 // Sanjay Ghemawat, Jeff Dean, and others.
11
12 #ifndef _MSC_VER
13 #include <fcntl.h>
14 #include <sys/stat.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #endif
18 #include <errno.h>
19
20 #include <algorithm>
21 #include <istream>
22 #include <ostream>
23
24 #include "google/protobuf/stubs/common.h"
25 #include "absl/log/absl_check.h"
26 #include "absl/log/absl_log.h"
27 #include "google/protobuf/io/io_win32.h"
28 #include "google/protobuf/io/zero_copy_stream_impl.h"
29
30
31 namespace google {
32 namespace protobuf {
33 namespace io {
34
35 #ifdef _WIN32
36 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
37 // return value is undefined. We re-define it to always produce an error.
38 #define lseek(fd, offset, origin) ((off_t)-1)
39 // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
40 // them like we do below.
41 using google::protobuf::io::win32::access;
42 using google::protobuf::io::win32::close;
43 using google::protobuf::io::win32::open;
44 using google::protobuf::io::win32::read;
45 using google::protobuf::io::win32::write;
46 #endif
47
48 namespace {
49
50 // EINTR sucks.
close_no_eintr(int fd)51 int close_no_eintr(int fd) {
52 int result;
53 do {
54 result = close(fd);
55 } while (result < 0 && errno == EINTR);
56 return result;
57 }
58
59 } // namespace
60
61 // ===================================================================
62
FileInputStream(int file_descriptor,int block_size)63 FileInputStream::FileInputStream(int file_descriptor, int block_size)
64 : copying_input_(file_descriptor), impl_(©ing_input_, block_size) {}
65
Close()66 bool FileInputStream::Close() { return copying_input_.Close(); }
67
Next(const void ** data,int * size)68 bool FileInputStream::Next(const void** data, int* size) {
69 return impl_.Next(data, size);
70 }
71
BackUp(int count)72 void FileInputStream::BackUp(int count) { impl_.BackUp(count); }
73
Skip(int count)74 bool FileInputStream::Skip(int count) { return impl_.Skip(count); }
75
ByteCount() const76 int64_t FileInputStream::ByteCount() const { return impl_.ByteCount(); }
77
CopyingFileInputStream(int file_descriptor)78 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
79 int file_descriptor)
80 : file_(file_descriptor),
81 close_on_delete_(false),
82 is_closed_(false),
83 errno_(0),
84 previous_seek_failed_(false) {
85 #ifndef _WIN32
86 int flags = fcntl(file_, F_GETFL);
87 flags &= ~O_NONBLOCK;
88 fcntl(file_, F_SETFL, flags);
89 #endif
90 }
91
~CopyingFileInputStream()92 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
93 if (close_on_delete_) {
94 if (!Close()) {
95 ABSL_LOG(ERROR) << "close() failed: " << strerror(errno_);
96 }
97 }
98 }
99
Close()100 bool FileInputStream::CopyingFileInputStream::Close() {
101 ABSL_CHECK(!is_closed_);
102
103 is_closed_ = true;
104 if (close_no_eintr(file_) != 0) {
105 // The docs on close() do not specify whether a file descriptor is still
106 // open after close() fails with EIO. However, the glibc source code
107 // seems to indicate that it is not.
108 errno_ = errno;
109 return false;
110 }
111
112 return true;
113 }
114
Read(void * buffer,int size)115 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
116 ABSL_CHECK(!is_closed_);
117
118 int result;
119 do {
120 result = read(file_, buffer, size);
121 } while (result < 0 && errno == EINTR);
122
123 if (result < 0) {
124 // Read error (not EOF).
125 errno_ = errno;
126 }
127
128 return result;
129 }
130
Skip(int count)131 int FileInputStream::CopyingFileInputStream::Skip(int count) {
132 ABSL_CHECK(!is_closed_);
133
134 if (!previous_seek_failed_ && lseek(file_, count, SEEK_CUR) != (off_t)-1) {
135 // Seek succeeded.
136 return count;
137 } else {
138 // Failed to seek.
139
140 // Note to self: Don't seek again. This file descriptor doesn't
141 // support it.
142 previous_seek_failed_ = true;
143
144 // Use the default implementation.
145 return CopyingInputStream::Skip(count);
146 }
147 }
148
149 // ===================================================================
150
FileOutputStream(int file_descriptor,int block_size)151 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
152 : CopyingOutputStreamAdaptor(©ing_output_, block_size),
153 copying_output_(file_descriptor) {}
154
Close()155 bool FileOutputStream::Close() {
156 bool flush_succeeded = Flush();
157 return copying_output_.Close() && flush_succeeded;
158 }
159
CopyingFileOutputStream(int file_descriptor)160 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
161 int file_descriptor)
162 : file_(file_descriptor),
163 close_on_delete_(false),
164 is_closed_(false),
165 errno_(0) {}
166
~FileOutputStream()167 FileOutputStream::~FileOutputStream() { Flush(); }
168
~CopyingFileOutputStream()169 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
170 if (close_on_delete_) {
171 if (!Close()) {
172 ABSL_LOG(ERROR) << "close() failed: " << strerror(errno_);
173 }
174 }
175 }
176
Close()177 bool FileOutputStream::CopyingFileOutputStream::Close() {
178 ABSL_CHECK(!is_closed_);
179
180 is_closed_ = true;
181 if (close_no_eintr(file_) != 0) {
182 // The docs on close() do not specify whether a file descriptor is still
183 // open after close() fails with EIO. However, the glibc source code
184 // seems to indicate that it is not.
185 errno_ = errno;
186 return false;
187 }
188
189 return true;
190 }
191
Write(const void * buffer,int size)192 bool FileOutputStream::CopyingFileOutputStream::Write(const void* buffer,
193 int size) {
194 ABSL_CHECK(!is_closed_);
195 int total_written = 0;
196
197 const uint8_t* buffer_base = reinterpret_cast<const uint8_t*>(buffer);
198
199 while (total_written < size) {
200 int bytes;
201 do {
202 bytes = write(file_, buffer_base + total_written, size - total_written);
203 } while (bytes < 0 && errno == EINTR);
204
205 if (bytes <= 0) {
206 // Write error.
207
208 // FIXME(kenton): According to the man page, if write() returns zero,
209 // there was no error; write() simply did not write anything. It's
210 // unclear under what circumstances this might happen, but presumably
211 // errno won't be set in this case. I am confused as to how such an
212 // event should be handled. For now I'm treating it as an error, since
213 // retrying seems like it could lead to an infinite loop. I suspect
214 // this never actually happens anyway.
215
216 if (bytes < 0) {
217 errno_ = errno;
218 }
219 return false;
220 }
221 total_written += bytes;
222 }
223
224 return true;
225 }
226
227 // ===================================================================
228
IstreamInputStream(std::istream * input,int block_size)229 IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
230 : copying_input_(input), impl_(©ing_input_, block_size) {}
231
Next(const void ** data,int * size)232 bool IstreamInputStream::Next(const void** data, int* size) {
233 return impl_.Next(data, size);
234 }
235
BackUp(int count)236 void IstreamInputStream::BackUp(int count) { impl_.BackUp(count); }
237
Skip(int count)238 bool IstreamInputStream::Skip(int count) { return impl_.Skip(count); }
239
ByteCount() const240 int64_t IstreamInputStream::ByteCount() const { return impl_.ByteCount(); }
241
CopyingIstreamInputStream(std::istream * input)242 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
243 std::istream* input)
244 : input_(input) {}
245
~CopyingIstreamInputStream()246 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
247
Read(void * buffer,int size)248 int IstreamInputStream::CopyingIstreamInputStream::Read(void* buffer,
249 int size) {
250 input_->read(reinterpret_cast<char*>(buffer), size);
251 int result = input_->gcount();
252 if (result == 0 && input_->fail() && !input_->eof()) {
253 return -1;
254 }
255 return result;
256 }
257
258 // ===================================================================
259
OstreamOutputStream(std::ostream * output,int block_size)260 OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
261 : copying_output_(output), impl_(©ing_output_, block_size) {}
262
~OstreamOutputStream()263 OstreamOutputStream::~OstreamOutputStream() { impl_.Flush(); }
264
Next(void ** data,int * size)265 bool OstreamOutputStream::Next(void** data, int* size) {
266 return impl_.Next(data, size);
267 }
268
BackUp(int count)269 void OstreamOutputStream::BackUp(int count) { impl_.BackUp(count); }
270
ByteCount() const271 int64_t OstreamOutputStream::ByteCount() const { return impl_.ByteCount(); }
272
CopyingOstreamOutputStream(std::ostream * output)273 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
274 std::ostream* output)
275 : output_(output) {}
276
~CopyingOstreamOutputStream()277 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
278 }
279
Write(const void * buffer,int size)280 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(const void* buffer,
281 int size) {
282 output_->write(reinterpret_cast<const char*>(buffer), size);
283 return output_->good();
284 }
285
286 // ===================================================================
287
ConcatenatingInputStream(ZeroCopyInputStream * const streams[],int count)288 ConcatenatingInputStream::ConcatenatingInputStream(
289 ZeroCopyInputStream* const streams[], int count)
290 : streams_(streams), stream_count_(count), bytes_retired_(0) {
291 }
292
Next(const void ** data,int * size)293 bool ConcatenatingInputStream::Next(const void** data, int* size) {
294 while (stream_count_ > 0) {
295 if (streams_[0]->Next(data, size)) return true;
296
297 // That stream is done. Advance to the next one.
298 bytes_retired_ += streams_[0]->ByteCount();
299 ++streams_;
300 --stream_count_;
301 }
302
303 // No more streams.
304 return false;
305 }
306
BackUp(int count)307 void ConcatenatingInputStream::BackUp(int count) {
308 if (stream_count_ > 0) {
309 streams_[0]->BackUp(count);
310 } else {
311 ABSL_DLOG(FATAL) << "Can't BackUp() after failed Next().";
312 }
313 }
314
Skip(int count)315 bool ConcatenatingInputStream::Skip(int count) {
316 while (stream_count_ > 0) {
317 // Assume that ByteCount() can be used to find out how much we actually
318 // skipped when Skip() fails.
319 int64_t target_byte_count = streams_[0]->ByteCount() + count;
320 if (streams_[0]->Skip(count)) return true;
321
322 // Hit the end of the stream. Figure out how many more bytes we still have
323 // to skip.
324 int64_t final_byte_count = streams_[0]->ByteCount();
325 ABSL_DCHECK_LT(final_byte_count, target_byte_count);
326 count = target_byte_count - final_byte_count;
327
328 // That stream is done. Advance to the next one.
329 bytes_retired_ += final_byte_count;
330 ++streams_;
331 --stream_count_;
332 }
333
334 return false;
335 }
336
ByteCount() const337 int64_t ConcatenatingInputStream::ByteCount() const {
338 if (stream_count_ == 0) {
339 return bytes_retired_;
340 } else {
341 return bytes_retired_ + streams_[0]->ByteCount();
342 }
343 }
344
345
346 // ===================================================================
347
348 } // namespace io
349 } // namespace protobuf
350 } // namespace google
351