• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc.  All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7 
8 #ifndef GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
9 #define GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
10 
11 #include <algorithm>
12 #include <cstdint>
13 #include <iostream>
14 #include <string>
15 #include <utility>
16 #include <vector>
17 
18 #include "absl/log/absl_check.h"
19 #include "absl/log/absl_log.h"
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 #include "absl/strings/str_format.h"
23 #include "absl/strings/string_view.h"
24 #include "google/protobuf/io/zero_copy_stream.h"
25 #include "google/protobuf/stubs/status_macros.h"
26 
27 // Must be included last.
28 #include "google/protobuf/port_def.inc"
29 
30 // Utilities for parsing contiguous buffers out of ZeroCopyInputStreams.
31 
32 namespace google {
33 namespace protobuf {
34 namespace json_internal {
35 // Forward decl. for use by helper types below.
36 class ZeroCopyBufferedStream;
37 
38 // An RAII type that represents holding a reference into the backing buffer
39 // of a ZeroCopyBufferedStream. This allows for automatic management of the
40 // backing buffer.
41 class BufferingGuard {
42  public:
43   explicit BufferingGuard(ZeroCopyBufferedStream* owner = nullptr);
44   ~BufferingGuard();
45 
BufferingGuard(const BufferingGuard & other)46   BufferingGuard(const BufferingGuard& other) : BufferingGuard(other.owner_) {}
47   BufferingGuard& operator=(const BufferingGuard& other) {
48     this->~BufferingGuard();
49     new (this) BufferingGuard(other);
50     return *this;
51   }
52 
53  private:
54   friend class Mark;
55   ZeroCopyBufferedStream* owner_ = nullptr;
56 };
57 
58 // A string that may own its contents, or live inside of a buffer owned by
59 // a ZeroCopyBufferedStream.
60 //
61 // Note that this type holds onto a reference to the owning
62 // ZeroCopyBufferedStream; this allows it to be durable against strings being
63 // moved around for buffering puroses.
64 class MaybeOwnedString {
65  public:
MaybeOwnedString(std::string value)66   explicit MaybeOwnedString(std::string value) : data_(std::move(value)) {}
MaybeOwnedString(ZeroCopyBufferedStream * stream,size_t start,size_t len,BufferingGuard token)67   MaybeOwnedString(ZeroCopyBufferedStream* stream, size_t start, size_t len,
68                    BufferingGuard token)
69       : data_(StreamOwned{stream, start, len}), token_(token) {}
70 
71   // Returns the string as a view, regardless of whether it is owned or not.
AsView()72   absl::string_view AsView() const {
73     if (auto* unowned = absl::get_if<StreamOwned>(&data_)) {
74       return unowned->AsView();
75     }
76 
77     return absl::get<std::string>(data_);
78   }
79 
string_view()80   operator absl::string_view() const { return AsView(); }  // NOLINT
81 
82   // Returns a reference to an owned string; if the wrapped string is not
83   // owned, this function will perform a copy and make it owned.
ToString()84   std::string& ToString() {
85     if (auto* unowned = absl::get_if<StreamOwned>(&data_)) {
86       data_ = std::string(unowned->AsView());
87       token_ = BufferingGuard{};
88     }
89 
90     return absl::get<std::string>(data_);
91   }
92 
93   template <typename String>
94   friend bool operator==(const MaybeOwnedString& lhs, const String& rhs) {
95     return lhs.AsView() == rhs;
96   }
97   template <typename String>
98   friend bool operator!=(const MaybeOwnedString& lhs, const String& rhs) {
99     return !(lhs == rhs);
100   }
101 
102  private:
103   struct StreamOwned {
104     ZeroCopyBufferedStream* stream;
105     size_t start, len;
106     absl::string_view AsView() const;
107   };
108   absl::variant<std::string, StreamOwned> data_;
109   BufferingGuard token_;
110 };
111 
112 // A mark in a stream. See ZeroCopyBufferedStream::Mark().
113 class Mark {
114  public:
115   // Returns a maybe-owned string up to the unread bytes boundary, except for
116   // the last `clip` bytes.
117   MaybeOwnedString UpToUnread(size_t clip = 0) const;
118 
119   // Discards this mark and its hold on the buffer.
Discard()120   void Discard() && { guard_ = BufferingGuard(); }
121 
122  private:
123   friend ZeroCopyBufferedStream;
Mark(size_t offset,BufferingGuard guard)124   Mark(size_t offset, BufferingGuard guard) : offset_(offset), guard_(guard) {}
125 
126   size_t offset_;
127   BufferingGuard guard_;
128 };
129 
130 // A wrapper over a ZeroCopyInputStream that allows doing as-needed buffer for
131 // obtaining contiguous chunks larger than those the underlying stream might
132 // provide, while minimizing the amount of actual copying.
133 class ZeroCopyBufferedStream {
134  public:
ZeroCopyBufferedStream(io::ZeroCopyInputStream * stream)135   explicit ZeroCopyBufferedStream(io::ZeroCopyInputStream* stream)
136       : stream_(stream) {}
137 
138   // Returns whether the stream is currently at eof.
139   //
140   // This function will buffer at least one character to verify whether it
141   // actually *is* at EOF.
AtEof()142   bool AtEof() {
143     (void)BufferAtLeast(1);
144     return eof_;
145   }
146 
147   // Takes exactly n characters from a string.
Take(size_t len)148   absl::StatusOr<MaybeOwnedString> Take(size_t len) {
149     auto buffering = BufferAtLeast(len);
150     RETURN_IF_ERROR(buffering.status());
151 
152     size_t start = cursor_;
153     RETURN_IF_ERROR(Advance(len));
154     return MaybeOwnedString(this, start, len, *buffering);
155   }
156 
157   // Takes characters to form a string, according to the given predicate. Stops
158   // early if an EOF is hit.
159   //
160   // The predicate must have type `(int, char) -> bool`; the first argument
161   // is the index of the character.
162   template <typename Pred>
163   absl::StatusOr<MaybeOwnedString> TakeWhile(Pred p);
164 
165   // Places a mark in the stream, ensuring that all characters consumed after
166   // the mark are buffered. This can be used to parse some characters and then
167   // recover everything that follows as a contiguous string_view so that it may
168   // be processed a second time.
169   //
170   // The returned value is an RAII type that ensure the buffer sticks around
171   // long enough.
BeginMark()172   Mark BeginMark() { return Mark(cursor_, BufferingGuard(this)); }
173 
174   // Peeks the next character in the stream.
175   //
176   // This function will not enable buffering on its own, and will read past the
177   // end of the buffer if at EOF; BufferAtLeast() should be called before
178   // calling this function.
PeekChar()179   char PeekChar() {
180     ABSL_DCHECK(!Unread().empty());
181     return Unread()[0];
182   }
183 
184   // Advances the cursor by the given number of bytes.
185   absl::Status Advance(size_t bytes);
186 
187   // Returns a view of the current buffer, which may be either the owned
188   // `buf_` or the stream-owned `last_chunk_`.
189   //
190   // The returned view is unstable: calling any function may invalidate it,
191   // because there will not be a `BufferingGuard` to guard it.
192   absl::string_view RawBuffer(size_t start,
193                               size_t len = absl::string_view::npos) const;
194 
195   // Returns a view of RawBuffer, unread bytes; this will not be the entirety
196   // of the underlying stream.
Unread()197   absl::string_view Unread() const { return RawBuffer(cursor_); }
198 
IsBuffering()199   bool IsBuffering() const { return using_buf_; }
200 
201   // Buffers at least `bytes` bytes ahead of the current cursor position,
202   // possibly enabling buffering.
203   //
204   // Returns an error if that many bytes could not be RawBuffer.
205   absl::StatusOr<BufferingGuard> BufferAtLeast(size_t bytes);
206 
207  private:
208   friend BufferingGuard;
209   friend Mark;
210   friend MaybeOwnedString;
211 
212   // Increments the buffering refcount; this will also update `buffer_start_` if
213   // necessary.
UpRefBuffer()214   void UpRefBuffer() {
215     if (outstanding_buffer_borrows_++ == 0) {
216       buffer_start_ = cursor_;
217     }
218   }
219 
220   // Decrements the buffering refcount; calling this function if the refcount is
221   // zero is undefined behavior.
222   //
223   // This function should not be called directly; it is called automatically
224   // by the destructor of `BufferingGuard`.
225   void DownRefBuffer();
226 
227   // Obtains a new chunk from the underlying stream; returns whether there is
228   // still more data to read.
229   bool ReadChunk();
230 
231   // The streamer implements a buffering stream on top of the given stream, by
232   // the following mechanism:
233   // - `cursor_` is an offset into either `last_chunk_` or `buf_`, which can
234   //   be obtained via RawBuffer() and Unread():
235   //   - If `using_buf_` is true, it is an offset into `buf_`.
236   //   - Otherwise it is an offset into `last_chunk_`.
237   // - If `outstanding_buffer_borrows_ > 0`, someone needs the buffer to stick
238   //   around. MaybeUnownedString::StreamOwned is implemented such that it does
239   //   not hold onto `last_chunk_` directly, so we can freely copy it into
240   //   `buf_` as needed arises.
241   //   - Note that we can copy only part if we update `buffer_start_`; see
242   //     RawBuffer().
243   // - If we would read more data and `outstanding_buffer_borrows_ > 0`, instead
244   //   of trashing `last_chunk_`, we copy it into `buf_` and append to `buf_`
245   //   each time we read.
246   // - If `outstanding_buffer_borrows_ == 0`, we can trash `buf_` and go back to
247   //   using `last_chunk_` directly. See `DownRefBuffer()`.
248   io::ZeroCopyInputStream* stream_;
249   absl::string_view last_chunk_;
250   std::vector<char> buf_;
251   bool using_buf_ = false;
252   size_t cursor_ = 0;
253   // Invariant: this always refers to the earliest point at which we requested
254   // buffering, since the last time outstanding_buffer_borrows_ was zero.
255   size_t buffer_start_ = 0;
256   bool eof_ = false;
257   int outstanding_buffer_borrows_ = 0;
258 };
259 
260 // These functions all rely on the definition of ZeroCopyBufferedStream, so must
261 // come after it.
BufferingGuard(ZeroCopyBufferedStream * owner)262 inline BufferingGuard::BufferingGuard(ZeroCopyBufferedStream* owner)
263     : owner_(owner) {
264   if (owner_ != nullptr) {
265     owner_->UpRefBuffer();
266   }
267 }
268 
~BufferingGuard()269 inline BufferingGuard::~BufferingGuard() {
270   if (owner_ != nullptr) {
271     owner_->DownRefBuffer();
272     owner_ = nullptr;
273   }
274 }
275 
AsView()276 inline absl::string_view MaybeOwnedString::StreamOwned::AsView() const {
277   return stream->RawBuffer(start, len);
278 }
279 
UpToUnread(size_t clip)280 inline MaybeOwnedString Mark::UpToUnread(size_t clip) const {
281   return MaybeOwnedString(guard_.owner_, offset_,
282                           guard_.owner_->cursor_ - offset_ - clip, guard_);
283 }
284 
285 template <typename Pred>
TakeWhile(Pred p)286 absl::StatusOr<MaybeOwnedString> ZeroCopyBufferedStream::TakeWhile(Pred p) {
287   size_t start = cursor_;
288   BufferingGuard guard(this);
289   while (true) {
290     if (!BufferAtLeast(1).ok()) {
291       // We treat EOF as ending the take, rather than being an error.
292       break;
293     }
294     if (!p(cursor_ - start, PeekChar())) {
295       break;
296     }
297     RETURN_IF_ERROR(Advance(1));
298   }
299 
300   return MaybeOwnedString(this, start, cursor_ - start, guard);
301 }
302 
RawBuffer(size_t start,size_t len)303 inline absl::string_view ZeroCopyBufferedStream::RawBuffer(size_t start,
304                                                            size_t len) const {
305   absl::string_view view = last_chunk_;
306   if (using_buf_) {
307     ABSL_DCHECK_LE(buffer_start_, start);
308     start -= buffer_start_;
309     view = absl::string_view(buf_.data(), buf_.size());
310   }
311 #if 0
312     // This print statement is especially useful for trouble-shooting low-level
313     // bugs in the buffering logic.
314     ABSL_LOG(INFO) << absl::StreamFormat("%s(\"%s\")[%d:%d]/%d:%d @ %p",
315                                     using_buf_ ? "buf_" : "last_chunk_",
316                                     view, start, static_cast<int>(len),
317                                     buffer_start_, cursor_, this);
318 #endif
319   ABSL_DCHECK_LE(start, view.size());
320   if (len == absl::string_view::npos) {
321     return view.substr(start);
322   }
323 
324   ABSL_DCHECK_LE(start + len, view.size());
325   return view.substr(start, len);
326 }
327 }  // namespace json_internal
328 }  // namespace protobuf
329 }  // namespace google
330 
331 #include "google/protobuf/port_undef.inc"
332 #endif  // GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
333