1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7
8 // Author: kenton@google.com (Kenton Varda)
9 // Based on original Protocol Buffers design by
10 // Sanjay Ghemawat, Jeff Dean, and others.
11 //
12 // This file contains common implementations of the interfaces defined in
13 // zero_copy_stream.h which are included in the "lite" protobuf library.
14 // These implementations cover I/O on raw arrays and strings, as well as
15 // adaptors which make it easy to implement streams based on traditional
16 // streams. Of course, many users will probably want to write their own
17 // implementations of these interfaces specific to the particular I/O
18 // abstractions they prefer to use, but these should cover the most common
19 // cases.
20
21 #ifndef GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
22 #define GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
23
24 #include <iosfwd>
25 #include <memory>
26 #include <string>
27 #include <utility>
28
29 #include "google/protobuf/stubs/callback.h"
30 #include "google/protobuf/stubs/common.h"
31 #include "absl/base/attributes.h"
32 #include "absl/base/macros.h"
33 #include "absl/strings/cord.h"
34 #include "absl/strings/cord_buffer.h"
35 #include "google/protobuf/io/zero_copy_stream.h"
36 #include "google/protobuf/port.h"
37
38
39 // Must be included last.
40 #include "google/protobuf/port_def.inc"
41
42 namespace google {
43 namespace protobuf {
44 namespace io {
45
46 // ===================================================================
47
48 // A ZeroCopyInputStream backed by an in-memory array of bytes.
49 class PROTOBUF_EXPORT ArrayInputStream final : public ZeroCopyInputStream {
50 public:
51 // Create an InputStream that returns the bytes pointed to by "data".
52 // "data" remains the property of the caller but must remain valid until
53 // the stream is destroyed. If a block_size is given, calls to Next()
54 // will return data blocks no larger than the given size. Otherwise, the
55 // first call to Next() returns the entire array. block_size is mainly
56 // useful for testing; in production you would probably never want to set
57 // it.
58 ArrayInputStream(const void* data, int size, int block_size = -1);
59 ~ArrayInputStream() override = default;
60
61 // `ArrayInputStream` is neither copiable nor assignable
62 ArrayInputStream(const ArrayInputStream&) = delete;
63 ArrayInputStream& operator=(const ArrayInputStream&) = delete;
64
65 // implements ZeroCopyInputStream ----------------------------------
66 bool Next(const void** data, int* size) override;
67 void BackUp(int count) override;
68 bool Skip(int count) override;
69 int64_t ByteCount() const override;
70
71
72 private:
73 const uint8_t* const data_; // The byte array.
74 const int size_; // Total size of the array.
75 const int block_size_; // How many bytes to return at a time.
76
77 int position_;
78 int last_returned_size_; // How many bytes we returned last time Next()
79 // was called (used for error checking only).
80 };
81
82 // ===================================================================
83
84 // A ZeroCopyOutputStream backed by an in-memory array of bytes.
85 class PROTOBUF_EXPORT ArrayOutputStream final : public ZeroCopyOutputStream {
86 public:
87 // Create an OutputStream that writes to the bytes pointed to by "data".
88 // "data" remains the property of the caller but must remain valid until
89 // the stream is destroyed. If a block_size is given, calls to Next()
90 // will return data blocks no larger than the given size. Otherwise, the
91 // first call to Next() returns the entire array. block_size is mainly
92 // useful for testing; in production you would probably never want to set
93 // it.
94 ArrayOutputStream(void* data, int size, int block_size = -1);
95 ~ArrayOutputStream() override = default;
96
97 // `ArrayOutputStream` is neither copiable nor assignable
98 ArrayOutputStream(const ArrayOutputStream&) = delete;
99 ArrayOutputStream& operator=(const ArrayOutputStream&) = delete;
100
101 // implements ZeroCopyOutputStream ---------------------------------
102 bool Next(void** data, int* size) override;
103 void BackUp(int count) override;
104 int64_t ByteCount() const override;
105
106 private:
107 uint8_t* const data_; // The byte array.
108 const int size_; // Total size of the array.
109 const int block_size_; // How many bytes to return at a time.
110
111 int position_;
112 int last_returned_size_; // How many bytes we returned last time Next()
113 // was called (used for error checking only).
114 };
115
116 // ===================================================================
117
118 // A ZeroCopyOutputStream which appends bytes to a string.
119 class PROTOBUF_EXPORT StringOutputStream final : public ZeroCopyOutputStream {
120 public:
121 // Create a StringOutputStream which appends bytes to the given string.
122 // The string remains property of the caller, but it is mutated in arbitrary
123 // ways and MUST NOT be accessed in any way until you're done with the
124 // stream. Either be sure there's no further usage, or (safest) destroy the
125 // stream before using the contents.
126 //
127 // Hint: If you call target->reserve(n) before creating the stream,
128 // the first call to Next() will return at least n bytes of buffer
129 // space.
130 explicit StringOutputStream(std::string* target);
131 ~StringOutputStream() override = default;
132
133 // `StringOutputStream` is neither copiable nor assignable
134 StringOutputStream(const StringOutputStream&) = delete;
135 StringOutputStream& operator=(const StringOutputStream&) = delete;
136
137 // implements ZeroCopyOutputStream ---------------------------------
138 bool Next(void** data, int* size) override;
139 void BackUp(int count) override;
140 int64_t ByteCount() const override;
141
142 private:
143 static constexpr size_t kMinimumSize = 16;
144
145 std::string* target_;
146 };
147
148 // Note: There is no StringInputStream. Instead, just create an
149 // ArrayInputStream as follows:
150 // ArrayInputStream input(str.data(), str.size());
151
152 // ===================================================================
153
154 // A generic traditional input stream interface.
155 //
156 // Lots of traditional input streams (e.g. file descriptors, C stdio
157 // streams, and C++ iostreams) expose an interface where every read
158 // involves copying bytes into a buffer. If you want to take such an
159 // interface and make a ZeroCopyInputStream based on it, simply implement
160 // CopyingInputStream and then use CopyingInputStreamAdaptor.
161 //
162 // CopyingInputStream implementations should avoid buffering if possible.
163 // CopyingInputStreamAdaptor does its own buffering and will read data
164 // in large blocks.
165 class PROTOBUF_EXPORT CopyingInputStream {
166 public:
~CopyingInputStream()167 virtual ~CopyingInputStream() {}
168
169 // Reads up to "size" bytes into the given buffer. Returns the number of
170 // bytes read. Read() waits until at least one byte is available, or
171 // returns zero if no bytes will ever become available (EOF), or -1 if a
172 // permanent read error occurred.
173 virtual int Read(void* buffer, int size) = 0;
174
175 // Skips the next "count" bytes of input. Returns the number of bytes
176 // actually skipped. This will always be exactly equal to "count" unless
177 // EOF was reached or a permanent read error occurred.
178 //
179 // The default implementation just repeatedly calls Read() into a scratch
180 // buffer.
181 virtual int Skip(int count);
182 };
183
184 // A ZeroCopyInputStream which reads from a CopyingInputStream. This is
185 // useful for implementing ZeroCopyInputStreams that read from traditional
186 // streams. Note that this class is not really zero-copy.
187 //
188 // If you want to read from file descriptors or C++ istreams, this is
189 // already implemented for you: use FileInputStream or IstreamInputStream
190 // respectively.
191 class PROTOBUF_EXPORT CopyingInputStreamAdaptor : public ZeroCopyInputStream {
192 public:
193 // Creates a stream that reads from the given CopyingInputStream.
194 // If a block_size is given, it specifies the number of bytes that
195 // should be read and returned with each call to Next(). Otherwise,
196 // a reasonable default is used. The caller retains ownership of
197 // copying_stream unless SetOwnsCopyingStream(true) is called.
198 explicit CopyingInputStreamAdaptor(CopyingInputStream* copying_stream,
199 int block_size = -1);
200 ~CopyingInputStreamAdaptor() override;
201
202 // `CopyingInputStreamAdaptor` is neither copiable nor assignable
203 CopyingInputStreamAdaptor(const CopyingInputStreamAdaptor&) = delete;
204 CopyingInputStreamAdaptor& operator=(const CopyingInputStreamAdaptor&) = delete;
205
206 // Call SetOwnsCopyingStream(true) to tell the CopyingInputStreamAdaptor to
207 // delete the underlying CopyingInputStream when it is destroyed.
SetOwnsCopyingStream(bool value)208 void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
209
210 // implements ZeroCopyInputStream ----------------------------------
211 bool Next(const void** data, int* size) override;
212 void BackUp(int count) override;
213 bool Skip(int count) override;
214 int64_t ByteCount() const override;
215
216 private:
217 // Insures that buffer_ is not NULL.
218 void AllocateBufferIfNeeded();
219 // Frees the buffer and resets buffer_used_.
220 void FreeBuffer();
221
222 // The underlying copying stream.
223 CopyingInputStream* copying_stream_;
224 bool owns_copying_stream_;
225
226 // True if we have seen a permanent error from the underlying stream.
227 bool failed_;
228
229 // The current position of copying_stream_, relative to the point where
230 // we started reading.
231 int64_t position_;
232
233 // Data is read into this buffer. It may be NULL if no buffer is currently
234 // in use. Otherwise, it points to an array of size buffer_size_.
235 std::unique_ptr<uint8_t[]> buffer_;
236 const int buffer_size_;
237
238 // Number of valid bytes currently in the buffer (i.e. the size last
239 // returned by Next()). 0 <= buffer_used_ <= buffer_size_.
240 int buffer_used_;
241
242 // Number of bytes in the buffer which were backed up over by a call to
243 // BackUp(). These need to be returned again.
244 // 0 <= backup_bytes_ <= buffer_used_
245 int backup_bytes_;
246 };
247
248 // ===================================================================
249
250 // A generic traditional output stream interface.
251 //
252 // Lots of traditional output streams (e.g. file descriptors, C stdio
253 // streams, and C++ iostreams) expose an interface where every write
254 // involves copying bytes from a buffer. If you want to take such an
255 // interface and make a ZeroCopyOutputStream based on it, simply implement
256 // CopyingOutputStream and then use CopyingOutputStreamAdaptor.
257 //
258 // CopyingOutputStream implementations should avoid buffering if possible.
259 // CopyingOutputStreamAdaptor does its own buffering and will write data
260 // in large blocks.
261 class PROTOBUF_EXPORT CopyingOutputStream {
262 public:
~CopyingOutputStream()263 virtual ~CopyingOutputStream() {}
264
265 // Writes "size" bytes from the given buffer to the output. Returns true
266 // if successful, false on a write error.
267 virtual bool Write(const void* buffer, int size) = 0;
268 };
269
270 // A ZeroCopyOutputStream which writes to a CopyingOutputStream. This is
271 // useful for implementing ZeroCopyOutputStreams that write to traditional
272 // streams. Note that this class is not really zero-copy.
273 //
274 // If you want to write to file descriptors or C++ ostreams, this is
275 // already implemented for you: use FileOutputStream or OstreamOutputStream
276 // respectively.
277 class PROTOBUF_EXPORT CopyingOutputStreamAdaptor : public ZeroCopyOutputStream {
278 public:
279 // Creates a stream that writes to the given Unix file descriptor.
280 // If a block_size is given, it specifies the size of the buffers
281 // that should be returned by Next(). Otherwise, a reasonable default
282 // is used.
283 explicit CopyingOutputStreamAdaptor(CopyingOutputStream* copying_stream,
284 int block_size = -1);
285 ~CopyingOutputStreamAdaptor() override;
286
287 // `CopyingOutputStreamAdaptor` is neither copiable nor assignable
288 CopyingOutputStreamAdaptor(const CopyingOutputStreamAdaptor&) = delete;
289 CopyingOutputStreamAdaptor& operator=(const CopyingOutputStreamAdaptor&) = delete;
290
291 // Writes all pending data to the underlying stream. Returns false if a
292 // write error occurred on the underlying stream. (The underlying
293 // stream itself is not necessarily flushed.)
294 bool Flush();
295
296 // Call SetOwnsCopyingStream(true) to tell the CopyingOutputStreamAdaptor to
297 // delete the underlying CopyingOutputStream when it is destroyed.
SetOwnsCopyingStream(bool value)298 void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
299
300 // implements ZeroCopyOutputStream ---------------------------------
301 bool Next(void** data, int* size) override;
302 void BackUp(int count) override;
303 int64_t ByteCount() const override;
304 bool WriteAliasedRaw(const void* data, int size) override;
AllowsAliasing()305 bool AllowsAliasing() const override { return true; }
306 bool WriteCord(const absl::Cord& cord) override;
307
308 private:
309 // Write the current buffer, if it is present.
310 bool WriteBuffer();
311 // Insures that buffer_ is not NULL.
312 void AllocateBufferIfNeeded();
313 // Frees the buffer.
314 void FreeBuffer();
315
316 // The underlying copying stream.
317 CopyingOutputStream* copying_stream_;
318 bool owns_copying_stream_;
319
320 // True if we have seen a permanent error from the underlying stream.
321 bool failed_;
322
323 // The current position of copying_stream_, relative to the point where
324 // we started writing.
325 int64_t position_;
326
327 // Data is written from this buffer. It may be NULL if no buffer is
328 // currently in use. Otherwise, it points to an array of size buffer_size_.
329 std::unique_ptr<uint8_t[]> buffer_;
330 const int buffer_size_;
331
332 // Number of valid bytes currently in the buffer (i.e. the size last
333 // returned by Next()). When BackUp() is called, we just reduce this.
334 // 0 <= buffer_used_ <= buffer_size_.
335 int buffer_used_;
336 };
337
338 // ===================================================================
339
340 // A ZeroCopyInputStream which wraps some other stream and limits it to
341 // a particular byte count.
342 class PROTOBUF_EXPORT LimitingInputStream final : public ZeroCopyInputStream {
343 public:
344 LimitingInputStream(ZeroCopyInputStream* input, int64_t limit);
345 ~LimitingInputStream() override;
346
347 // `LimitingInputStream` is neither copiable nor assignable
348 LimitingInputStream(const LimitingInputStream&) = delete;
349 LimitingInputStream& operator=(const LimitingInputStream&) = delete;
350
351 // implements ZeroCopyInputStream ----------------------------------
352 bool Next(const void** data, int* size) override;
353 void BackUp(int count) override;
354 bool Skip(int count) override;
355 int64_t ByteCount() const override;
356 bool ReadCord(absl::Cord* cord, int count) override;
357
358
359 private:
360 ZeroCopyInputStream* input_;
361 int64_t limit_; // Decreases as we go, becomes negative if we overshoot.
362 int64_t prior_bytes_read_; // Bytes read on underlying stream at construction
363 };
364
365 // ===================================================================
366
367 // A ZeroCopyInputStream backed by a Cord. This stream implements ReadCord()
368 // in a way that can share memory between the source and destination cords
369 // rather than copying.
370 class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream {
371 public:
372 // Creates an InputStream that reads from the given Cord. `cord` must
373 // not be null and must outlive this CordInputStream instance. `cord` must
374 // not be modified while this instance is actively being used: any change
375 // to `cord` will lead to undefined behavior on any subsequent call into
376 // this instance.
377 explicit CordInputStream(
378 const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND);
379
380
381 // `CordInputStream` is neither copiable nor assignable
382 CordInputStream(const CordInputStream&) = delete;
383 CordInputStream& operator=(const CordInputStream&) = delete;
384
385 // implements ZeroCopyInputStream ----------------------------------
386 bool Next(const void** data, int* size) override;
387 void BackUp(int count) override;
388 bool Skip(int count) override;
389 int64_t ByteCount() const override;
390 bool ReadCord(absl::Cord* cord, int count) override;
391
392
393 private:
394 // Moves `it_` to the next available chunk skipping `skip` extra bytes
395 // and updates the chunk data pointers.
396 bool NextChunk(size_t skip);
397
398 // Updates the current chunk data context `data_`, `size_` and `available_`.
399 // If `bytes_remaining_` is zero, sets `size_` and `available_` to zero.
400 // Returns true if more data is available, false otherwise.
401 bool LoadChunkData();
402
403 absl::Cord::CharIterator it_;
404 size_t length_;
405 size_t bytes_remaining_;
406 const char* data_;
407 size_t size_;
408 size_t available_;
409 };
410
411 // ===================================================================
412
413 // A ZeroCopyOutputStream that writes to a Cord. This stream implements
414 // WriteCord() in a way that can share memory between the source and
415 // destination cords rather than copying.
416 class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream {
417 public:
418 // Creates an OutputStream streaming serialized data into a Cord. `size_hint`,
419 // if given, is the expected total size of the resulting Cord. This is a hint
420 // only, used for optimization. Callers can obtain the generated Cord value by
421 // invoking `Consume()`.
422 explicit CordOutputStream(size_t size_hint = 0);
423
424 // Creates an OutputStream with an initial Cord value. This constructor can be
425 // used by applications wanting to directly append serialization data to a
426 // given cord. In such cases, donating the existing value as in:
427 //
428 // CordOutputStream stream(std::move(cord));
429 // message.SerializeToZeroCopyStream(&stream);
430 // cord = std::move(stream.Consume());
431 //
432 // is more efficient then appending the serialized cord in application code:
433 //
434 // CordOutputStream stream;
435 // message.SerializeToZeroCopyStream(&stream);
436 // cord.Append(stream.Consume());
437 //
438 // The former allows `CordOutputStream` to utilize pre-existing privately
439 // owned Cord buffers from the donated cord where the latter does not, which
440 // may lead to more memory usage when serialuzing data into existing cords.
441 explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0);
442
443 // Creates an OutputStream with an initial Cord value and initial buffer.
444 // This donates both the preexisting cord in `cord`, as well as any
445 // pre-existing data and additional capacity in `buffer`.
446 // This function is mainly intended to be used in internal serialization logic
447 // using eager buffer initialization in EpsCopyOutputStream.
448 // The donated buffer can be empty, partially empty or full: the outputstream
449 // will DTRT in all cases and preserve any pre-existing data.
450 explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
451 size_t size_hint = 0);
452
453 // Creates an OutputStream with an initial buffer.
454 // This method is logically identical to, but more efficient than:
455 // `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)`
456 explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0);
457
458 // `CordOutputStream` is neither copiable nor assignable
459 CordOutputStream(const CordOutputStream&) = delete;
460 CordOutputStream& operator=(const CordOutputStream&) = delete;
461
462 // implements `ZeroCopyOutputStream` ---------------------------------
463 bool Next(void** data, int* size) final;
464 void BackUp(int count) final;
465 int64_t ByteCount() const final;
466 bool WriteCord(const absl::Cord& cord) final;
467
468 // Consumes the serialized data as a cord value. `Consume()` internally
469 // flushes any pending state 'as if' BackUp(0) was called. While a final call
470 // to BackUp() is generally required by the `ZeroCopyOutputStream` contract,
471 // applications using `CordOutputStream` directly can call `Consume()` without
472 // a preceding call to `BackUp()`.
473 //
474 // While it will rarely be useful in practice (and especially in the presence
475 // of size hints) an instance is safe to be used after a call to `Consume()`.
476 // The only logical change in state is that all serialized data is extracted,
477 // and any new serialization calls will serialize into new cord data.
478 absl::Cord Consume();
479
480 private:
481 // State of `buffer_` and 'cord_. As a default CordBuffer instance always has
482 // inlined capacity, we track state explicitly to avoid returning 'existing
483 // capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates
484 // we should (attempt to) steal the next buffer from the cord.
485 enum class State { kEmpty, kFull, kPartial, kSteal };
486
487 absl::Cord cord_;
488 size_t size_hint_;
489 State state_ = State::kEmpty;
490 absl::CordBuffer buffer_;
491 };
492
493
494 // ===================================================================
495
496 // Return a pointer to mutable characters underlying the given string. The
497 // return value is valid until the next time the string is resized. We
498 // trust the caller to treat the return value as an array of length s->size().
mutable_string_data(std::string * s)499 inline char* mutable_string_data(std::string* s) {
500 return &(*s)[0];
501 }
502
503 // as_string_data(s) is equivalent to
504 // ({ char* p = mutable_string_data(s); make_pair(p, p != NULL); })
505 // Sometimes it's faster: in some scenarios p cannot be NULL, and then the
506 // code can avoid that check.
as_string_data(std::string * s)507 inline std::pair<char*, bool> as_string_data(std::string* s) {
508 char* p = mutable_string_data(s);
509 return std::make_pair(p, true);
510 }
511
512 } // namespace io
513 } // namespace protobuf
514 } // namespace google
515
516 #include "google/protobuf/port_undef.inc"
517
518 #endif // GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
519