• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7 
8 // Author: kenton@google.com (Kenton Varda)
9 //  Based on original Protocol Buffers design by
10 //  Sanjay Ghemawat, Jeff Dean, and others.
11 //
12 // This file contains common implementations of the interfaces defined in
13 // zero_copy_stream.h which are included in the "lite" protobuf library.
14 // These implementations cover I/O on raw arrays and strings, as well as
15 // adaptors which make it easy to implement streams based on traditional
16 // streams.  Of course, many users will probably want to write their own
17 // implementations of these interfaces specific to the particular I/O
18 // abstractions they prefer to use, but these should cover the most common
19 // cases.
20 
21 #ifndef GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
22 #define GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
23 
24 #include <iosfwd>
25 #include <memory>
26 #include <string>
27 #include <utility>
28 
29 #include "google/protobuf/stubs/callback.h"
30 #include "google/protobuf/stubs/common.h"
31 #include "absl/base/attributes.h"
32 #include "absl/base/macros.h"
33 #include "absl/strings/cord.h"
34 #include "absl/strings/cord_buffer.h"
35 #include "google/protobuf/io/zero_copy_stream.h"
36 #include "google/protobuf/port.h"
37 
38 
39 // Must be included last.
40 #include "google/protobuf/port_def.inc"
41 
42 namespace google {
43 namespace protobuf {
44 namespace io {
45 
46 // ===================================================================
47 
48 // A ZeroCopyInputStream backed by an in-memory array of bytes.
49 class PROTOBUF_EXPORT ArrayInputStream final : public ZeroCopyInputStream {
50  public:
51   // Create an InputStream that returns the bytes pointed to by "data".
52   // "data" remains the property of the caller but must remain valid until
53   // the stream is destroyed.  If a block_size is given, calls to Next()
54   // will return data blocks no larger than the given size.  Otherwise, the
55   // first call to Next() returns the entire array.  block_size is mainly
56   // useful for testing; in production you would probably never want to set
57   // it.
58   ArrayInputStream(const void* data, int size, int block_size = -1);
59   ~ArrayInputStream() override = default;
60 
61   // `ArrayInputStream` is neither copiable nor assignable
62   ArrayInputStream(const ArrayInputStream&) = delete;
63   ArrayInputStream& operator=(const ArrayInputStream&) = delete;
64 
65   // implements ZeroCopyInputStream ----------------------------------
66   bool Next(const void** data, int* size) override;
67   void BackUp(int count) override;
68   bool Skip(int count) override;
69   int64_t ByteCount() const override;
70 
71 
72  private:
73   const uint8_t* const data_;  // The byte array.
74   const int size_;           // Total size of the array.
75   const int block_size_;     // How many bytes to return at a time.
76 
77   int position_;
78   int last_returned_size_;  // How many bytes we returned last time Next()
79                             // was called (used for error checking only).
80 };
81 
82 // ===================================================================
83 
84 // A ZeroCopyOutputStream backed by an in-memory array of bytes.
85 class PROTOBUF_EXPORT ArrayOutputStream final : public ZeroCopyOutputStream {
86  public:
87   // Create an OutputStream that writes to the bytes pointed to by "data".
88   // "data" remains the property of the caller but must remain valid until
89   // the stream is destroyed.  If a block_size is given, calls to Next()
90   // will return data blocks no larger than the given size.  Otherwise, the
91   // first call to Next() returns the entire array.  block_size is mainly
92   // useful for testing; in production you would probably never want to set
93   // it.
94   ArrayOutputStream(void* data, int size, int block_size = -1);
95   ~ArrayOutputStream() override = default;
96 
97   // `ArrayOutputStream` is neither copiable nor assignable
98   ArrayOutputStream(const ArrayOutputStream&) = delete;
99   ArrayOutputStream& operator=(const ArrayOutputStream&) = delete;
100 
101   // implements ZeroCopyOutputStream ---------------------------------
102   bool Next(void** data, int* size) override;
103   void BackUp(int count) override;
104   int64_t ByteCount() const override;
105 
106  private:
107   uint8_t* const data_;     // The byte array.
108   const int size_;        // Total size of the array.
109   const int block_size_;  // How many bytes to return at a time.
110 
111   int position_;
112   int last_returned_size_;  // How many bytes we returned last time Next()
113                             // was called (used for error checking only).
114 };
115 
116 // ===================================================================
117 
118 // A ZeroCopyOutputStream which appends bytes to a string.
119 class PROTOBUF_EXPORT StringOutputStream final : public ZeroCopyOutputStream {
120  public:
121   // Create a StringOutputStream which appends bytes to the given string.
122   // The string remains property of the caller, but it is mutated in arbitrary
123   // ways and MUST NOT be accessed in any way until you're done with the
124   // stream. Either be sure there's no further usage, or (safest) destroy the
125   // stream before using the contents.
126   //
127   // Hint:  If you call target->reserve(n) before creating the stream,
128   //   the first call to Next() will return at least n bytes of buffer
129   //   space.
130   explicit StringOutputStream(std::string* target);
131   ~StringOutputStream() override = default;
132 
133   // `StringOutputStream` is neither copiable nor assignable
134   StringOutputStream(const StringOutputStream&) = delete;
135   StringOutputStream& operator=(const StringOutputStream&) = delete;
136 
137   // implements ZeroCopyOutputStream ---------------------------------
138   bool Next(void** data, int* size) override;
139   void BackUp(int count) override;
140   int64_t ByteCount() const override;
141 
142  private:
143   static constexpr size_t kMinimumSize = 16;
144 
145   std::string* target_;
146 };
147 
148 // Note:  There is no StringInputStream.  Instead, just create an
149 // ArrayInputStream as follows:
150 //   ArrayInputStream input(str.data(), str.size());
151 
152 // ===================================================================
153 
154 // A generic traditional input stream interface.
155 //
156 // Lots of traditional input streams (e.g. file descriptors, C stdio
157 // streams, and C++ iostreams) expose an interface where every read
158 // involves copying bytes into a buffer.  If you want to take such an
159 // interface and make a ZeroCopyInputStream based on it, simply implement
160 // CopyingInputStream and then use CopyingInputStreamAdaptor.
161 //
162 // CopyingInputStream implementations should avoid buffering if possible.
163 // CopyingInputStreamAdaptor does its own buffering and will read data
164 // in large blocks.
165 class PROTOBUF_EXPORT CopyingInputStream {
166  public:
~CopyingInputStream()167   virtual ~CopyingInputStream() {}
168 
169   // Reads up to "size" bytes into the given buffer.  Returns the number of
170   // bytes read.  Read() waits until at least one byte is available, or
171   // returns zero if no bytes will ever become available (EOF), or -1 if a
172   // permanent read error occurred.
173   virtual int Read(void* buffer, int size) = 0;
174 
175   // Skips the next "count" bytes of input.  Returns the number of bytes
176   // actually skipped.  This will always be exactly equal to "count" unless
177   // EOF was reached or a permanent read error occurred.
178   //
179   // The default implementation just repeatedly calls Read() into a scratch
180   // buffer.
181   virtual int Skip(int count);
182 };
183 
184 // A ZeroCopyInputStream which reads from a CopyingInputStream.  This is
185 // useful for implementing ZeroCopyInputStreams that read from traditional
186 // streams.  Note that this class is not really zero-copy.
187 //
188 // If you want to read from file descriptors or C++ istreams, this is
189 // already implemented for you:  use FileInputStream or IstreamInputStream
190 // respectively.
191 class PROTOBUF_EXPORT CopyingInputStreamAdaptor : public ZeroCopyInputStream {
192  public:
193   // Creates a stream that reads from the given CopyingInputStream.
194   // If a block_size is given, it specifies the number of bytes that
195   // should be read and returned with each call to Next().  Otherwise,
196   // a reasonable default is used.  The caller retains ownership of
197   // copying_stream unless SetOwnsCopyingStream(true) is called.
198   explicit CopyingInputStreamAdaptor(CopyingInputStream* copying_stream,
199                                      int block_size = -1);
200   ~CopyingInputStreamAdaptor() override;
201 
202   // `CopyingInputStreamAdaptor` is neither copiable nor assignable
203   CopyingInputStreamAdaptor(const CopyingInputStreamAdaptor&) = delete;
204   CopyingInputStreamAdaptor& operator=(const CopyingInputStreamAdaptor&) = delete;
205 
206   // Call SetOwnsCopyingStream(true) to tell the CopyingInputStreamAdaptor to
207   // delete the underlying CopyingInputStream when it is destroyed.
SetOwnsCopyingStream(bool value)208   void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
209 
210   // implements ZeroCopyInputStream ----------------------------------
211   bool Next(const void** data, int* size) override;
212   void BackUp(int count) override;
213   bool Skip(int count) override;
214   int64_t ByteCount() const override;
215 
216  private:
217   // Insures that buffer_ is not NULL.
218   void AllocateBufferIfNeeded();
219   // Frees the buffer and resets buffer_used_.
220   void FreeBuffer();
221 
222   // The underlying copying stream.
223   CopyingInputStream* copying_stream_;
224   bool owns_copying_stream_;
225 
226   // True if we have seen a permanent error from the underlying stream.
227   bool failed_;
228 
229   // The current position of copying_stream_, relative to the point where
230   // we started reading.
231   int64_t position_;
232 
233   // Data is read into this buffer.  It may be NULL if no buffer is currently
234   // in use.  Otherwise, it points to an array of size buffer_size_.
235   std::unique_ptr<uint8_t[]> buffer_;
236   const int buffer_size_;
237 
238   // Number of valid bytes currently in the buffer (i.e. the size last
239   // returned by Next()).  0 <= buffer_used_ <= buffer_size_.
240   int buffer_used_;
241 
242   // Number of bytes in the buffer which were backed up over by a call to
243   // BackUp().  These need to be returned again.
244   // 0 <= backup_bytes_ <= buffer_used_
245   int backup_bytes_;
246 };
247 
248 // ===================================================================
249 
250 // A generic traditional output stream interface.
251 //
252 // Lots of traditional output streams (e.g. file descriptors, C stdio
253 // streams, and C++ iostreams) expose an interface where every write
254 // involves copying bytes from a buffer.  If you want to take such an
255 // interface and make a ZeroCopyOutputStream based on it, simply implement
256 // CopyingOutputStream and then use CopyingOutputStreamAdaptor.
257 //
258 // CopyingOutputStream implementations should avoid buffering if possible.
259 // CopyingOutputStreamAdaptor does its own buffering and will write data
260 // in large blocks.
261 class PROTOBUF_EXPORT CopyingOutputStream {
262  public:
~CopyingOutputStream()263   virtual ~CopyingOutputStream() {}
264 
265   // Writes "size" bytes from the given buffer to the output.  Returns true
266   // if successful, false on a write error.
267   virtual bool Write(const void* buffer, int size) = 0;
268 };
269 
270 // A ZeroCopyOutputStream which writes to a CopyingOutputStream.  This is
271 // useful for implementing ZeroCopyOutputStreams that write to traditional
272 // streams.  Note that this class is not really zero-copy.
273 //
274 // If you want to write to file descriptors or C++ ostreams, this is
275 // already implemented for you:  use FileOutputStream or OstreamOutputStream
276 // respectively.
277 class PROTOBUF_EXPORT CopyingOutputStreamAdaptor : public ZeroCopyOutputStream {
278  public:
279   // Creates a stream that writes to the given Unix file descriptor.
280   // If a block_size is given, it specifies the size of the buffers
281   // that should be returned by Next().  Otherwise, a reasonable default
282   // is used.
283   explicit CopyingOutputStreamAdaptor(CopyingOutputStream* copying_stream,
284                                       int block_size = -1);
285   ~CopyingOutputStreamAdaptor() override;
286 
287   // `CopyingOutputStreamAdaptor` is neither copiable nor assignable
288   CopyingOutputStreamAdaptor(const CopyingOutputStreamAdaptor&) = delete;
289   CopyingOutputStreamAdaptor& operator=(const CopyingOutputStreamAdaptor&) = delete;
290 
291   // Writes all pending data to the underlying stream.  Returns false if a
292   // write error occurred on the underlying stream.  (The underlying
293   // stream itself is not necessarily flushed.)
294   bool Flush();
295 
296   // Call SetOwnsCopyingStream(true) to tell the CopyingOutputStreamAdaptor to
297   // delete the underlying CopyingOutputStream when it is destroyed.
SetOwnsCopyingStream(bool value)298   void SetOwnsCopyingStream(bool value) { owns_copying_stream_ = value; }
299 
300   // implements ZeroCopyOutputStream ---------------------------------
301   bool Next(void** data, int* size) override;
302   void BackUp(int count) override;
303   int64_t ByteCount() const override;
304   bool WriteAliasedRaw(const void* data, int size) override;
AllowsAliasing()305   bool AllowsAliasing() const override { return true; }
306   bool WriteCord(const absl::Cord& cord) override;
307 
308  private:
309   // Write the current buffer, if it is present.
310   bool WriteBuffer();
311   // Insures that buffer_ is not NULL.
312   void AllocateBufferIfNeeded();
313   // Frees the buffer.
314   void FreeBuffer();
315 
316   // The underlying copying stream.
317   CopyingOutputStream* copying_stream_;
318   bool owns_copying_stream_;
319 
320   // True if we have seen a permanent error from the underlying stream.
321   bool failed_;
322 
323   // The current position of copying_stream_, relative to the point where
324   // we started writing.
325   int64_t position_;
326 
327   // Data is written from this buffer.  It may be NULL if no buffer is
328   // currently in use.  Otherwise, it points to an array of size buffer_size_.
329   std::unique_ptr<uint8_t[]> buffer_;
330   const int buffer_size_;
331 
332   // Number of valid bytes currently in the buffer (i.e. the size last
333   // returned by Next()).  When BackUp() is called, we just reduce this.
334   // 0 <= buffer_used_ <= buffer_size_.
335   int buffer_used_;
336 };
337 
338 // ===================================================================
339 
340 // A ZeroCopyInputStream which wraps some other stream and limits it to
341 // a particular byte count.
342 class PROTOBUF_EXPORT LimitingInputStream final : public ZeroCopyInputStream {
343  public:
344   LimitingInputStream(ZeroCopyInputStream* input, int64_t limit);
345   ~LimitingInputStream() override;
346 
347   // `LimitingInputStream` is neither copiable nor assignable
348   LimitingInputStream(const LimitingInputStream&) = delete;
349   LimitingInputStream& operator=(const LimitingInputStream&) = delete;
350 
351   // implements ZeroCopyInputStream ----------------------------------
352   bool Next(const void** data, int* size) override;
353   void BackUp(int count) override;
354   bool Skip(int count) override;
355   int64_t ByteCount() const override;
356   bool ReadCord(absl::Cord* cord, int count) override;
357 
358 
359  private:
360   ZeroCopyInputStream* input_;
361   int64_t limit_;  // Decreases as we go, becomes negative if we overshoot.
362   int64_t prior_bytes_read_;  // Bytes read on underlying stream at construction
363 };
364 
365 // ===================================================================
366 
367 // A ZeroCopyInputStream backed by a Cord.  This stream implements ReadCord()
368 // in a way that can share memory between the source and destination cords
369 // rather than copying.
370 class PROTOBUF_EXPORT CordInputStream final : public ZeroCopyInputStream {
371  public:
372   // Creates an InputStream that reads from the given Cord. `cord` must
373   // not be null and must outlive this CordInputStream instance. `cord` must
374   // not be modified while this instance is actively being used: any change
375   // to `cord` will lead to undefined behavior on any subsequent call into
376   // this instance.
377   explicit CordInputStream(
378       const absl::Cord* cord ABSL_ATTRIBUTE_LIFETIME_BOUND);
379 
380 
381   // `CordInputStream` is neither copiable nor assignable
382   CordInputStream(const CordInputStream&) = delete;
383   CordInputStream& operator=(const CordInputStream&) = delete;
384 
385   // implements ZeroCopyInputStream ----------------------------------
386   bool Next(const void** data, int* size) override;
387   void BackUp(int count) override;
388   bool Skip(int count) override;
389   int64_t ByteCount() const override;
390   bool ReadCord(absl::Cord* cord, int count) override;
391 
392 
393  private:
394   // Moves `it_` to the next available chunk skipping `skip` extra bytes
395   // and updates the chunk data pointers.
396   bool NextChunk(size_t skip);
397 
398   // Updates the current chunk data context `data_`, `size_` and `available_`.
399   // If `bytes_remaining_` is zero, sets `size_` and `available_` to zero.
400   // Returns true if more data is available, false otherwise.
401   bool LoadChunkData();
402 
403   absl::Cord::CharIterator it_;
404   size_t length_;
405   size_t bytes_remaining_;
406   const char* data_;
407   size_t size_;
408   size_t available_;
409 };
410 
411 // ===================================================================
412 
413 // A ZeroCopyOutputStream that writes to a Cord.  This stream implements
414 // WriteCord() in a way that can share memory between the source and
415 // destination cords rather than copying.
416 class PROTOBUF_EXPORT CordOutputStream final : public ZeroCopyOutputStream {
417  public:
418   // Creates an OutputStream streaming serialized data into a Cord. `size_hint`,
419   // if given, is the expected total size of the resulting Cord. This is a hint
420   // only, used for optimization. Callers can obtain the generated Cord value by
421   // invoking `Consume()`.
422   explicit CordOutputStream(size_t size_hint = 0);
423 
424   // Creates an OutputStream with an initial Cord value. This constructor can be
425   // used by applications wanting to directly append serialization data to a
426   // given cord. In such cases, donating the existing value as in:
427   //
428   //   CordOutputStream stream(std::move(cord));
429   //   message.SerializeToZeroCopyStream(&stream);
430   //   cord = std::move(stream.Consume());
431   //
432   // is more efficient then appending the serialized cord in application code:
433   //
434   //   CordOutputStream stream;
435   //   message.SerializeToZeroCopyStream(&stream);
436   //   cord.Append(stream.Consume());
437   //
438   // The former allows `CordOutputStream` to utilize pre-existing privately
439   // owned Cord buffers from the donated cord where the latter does not, which
440   // may lead to more memory usage when serialuzing data into existing cords.
441   explicit CordOutputStream(absl::Cord cord, size_t size_hint = 0);
442 
443   // Creates an OutputStream with an initial Cord value and initial buffer.
444   // This donates both the preexisting cord in `cord`, as well as any
445   // pre-existing data and additional capacity in `buffer`.
446   // This function is mainly intended to be used in internal serialization logic
447   // using eager buffer initialization in EpsCopyOutputStream.
448   // The donated buffer can be empty, partially empty or full: the outputstream
449   // will DTRT in all cases and preserve any pre-existing data.
450   explicit CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
451                             size_t size_hint = 0);
452 
453   // Creates an OutputStream with an initial buffer.
454   // This method is logically identical to, but more efficient than:
455   //   `CordOutputStream(absl::Cord(), std::move(buffer), size_hint)`
456   explicit CordOutputStream(absl::CordBuffer buffer, size_t size_hint = 0);
457 
458   // `CordOutputStream` is neither copiable nor assignable
459   CordOutputStream(const CordOutputStream&) = delete;
460   CordOutputStream& operator=(const CordOutputStream&) = delete;
461 
462   // implements `ZeroCopyOutputStream` ---------------------------------
463   bool Next(void** data, int* size) final;
464   void BackUp(int count) final;
465   int64_t ByteCount() const final;
466   bool WriteCord(const absl::Cord& cord) final;
467 
468   // Consumes the serialized data as a cord value. `Consume()` internally
469   // flushes any pending state 'as if' BackUp(0) was called. While a final call
470   // to BackUp() is generally required by the `ZeroCopyOutputStream` contract,
471   // applications using `CordOutputStream` directly can call `Consume()` without
472   // a preceding call to `BackUp()`.
473   //
474   // While it will rarely be useful in practice (and especially in the presence
475   // of size hints) an instance is safe to be used after a call to `Consume()`.
476   // The only logical change in state is that all serialized data is extracted,
477   // and any new serialization calls will serialize into new cord data.
478   absl::Cord Consume();
479 
480  private:
481   // State of `buffer_` and 'cord_. As a default CordBuffer instance always has
482   // inlined capacity, we track state explicitly to avoid returning 'existing
483   // capacity' from the default or 'moved from' CordBuffer. 'kSteal' indicates
484   // we should (attempt to) steal the next buffer from the cord.
485   enum class State { kEmpty, kFull, kPartial, kSteal };
486 
487   absl::Cord cord_;
488   size_t size_hint_;
489   State state_ = State::kEmpty;
490   absl::CordBuffer buffer_;
491 };
492 
493 
494 // ===================================================================
495 
496 // Return a pointer to mutable characters underlying the given string.  The
497 // return value is valid until the next time the string is resized.  We
498 // trust the caller to treat the return value as an array of length s->size().
mutable_string_data(std::string * s)499 inline char* mutable_string_data(std::string* s) {
500   return &(*s)[0];
501 }
502 
503 // as_string_data(s) is equivalent to
504 //  ({ char* p = mutable_string_data(s); make_pair(p, p != NULL); })
505 // Sometimes it's faster: in some scenarios p cannot be NULL, and then the
506 // code can avoid that check.
as_string_data(std::string * s)507 inline std::pair<char*, bool> as_string_data(std::string* s) {
508   char* p = mutable_string_data(s);
509   return std::make_pair(p, true);
510 }
511 
512 }  // namespace io
513 }  // namespace protobuf
514 }  // namespace google
515 
516 #include "google/protobuf/port_undef.inc"
517 
518 #endif  // GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_IMPL_LITE_H__
519