1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7
8 #ifndef GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
9 #define GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
10
11 #include <algorithm>
12 #include <cstdint>
13 #include <iostream>
14 #include <string>
15 #include <utility>
16 #include <vector>
17
18 #include "absl/log/absl_check.h"
19 #include "absl/log/absl_log.h"
20 #include "absl/status/status.h"
21 #include "absl/status/statusor.h"
22 #include "absl/strings/str_format.h"
23 #include "absl/strings/string_view.h"
24 #include "google/protobuf/io/zero_copy_stream.h"
25 #include "google/protobuf/stubs/status_macros.h"
26
27 // Must be included last.
28 #include "google/protobuf/port_def.inc"
29
30 // Utilities for parsing contiguous buffers out of ZeroCopyInputStreams.
31
32 namespace google {
33 namespace protobuf {
34 namespace json_internal {
35 // Forward decl. for use by helper types below.
36 class ZeroCopyBufferedStream;
37
38 // An RAII type that represents holding a reference into the backing buffer
39 // of a ZeroCopyBufferedStream. This allows for automatic management of the
40 // backing buffer.
41 class BufferingGuard {
42 public:
43 explicit BufferingGuard(ZeroCopyBufferedStream* owner = nullptr);
44 ~BufferingGuard();
45
BufferingGuard(const BufferingGuard & other)46 BufferingGuard(const BufferingGuard& other) : BufferingGuard(other.owner_) {}
47 BufferingGuard& operator=(const BufferingGuard& other) {
48 this->~BufferingGuard();
49 new (this) BufferingGuard(other);
50 return *this;
51 }
52
53 private:
54 friend class Mark;
55 ZeroCopyBufferedStream* owner_ = nullptr;
56 };
57
58 // A string that may own its contents, or live inside of a buffer owned by
59 // a ZeroCopyBufferedStream.
60 //
61 // Note that this type holds onto a reference to the owning
62 // ZeroCopyBufferedStream; this allows it to be durable against strings being
63 // moved around for buffering puroses.
64 class MaybeOwnedString {
65 public:
MaybeOwnedString(std::string value)66 explicit MaybeOwnedString(std::string value) : data_(std::move(value)) {}
MaybeOwnedString(ZeroCopyBufferedStream * stream,size_t start,size_t len,BufferingGuard token)67 MaybeOwnedString(ZeroCopyBufferedStream* stream, size_t start, size_t len,
68 BufferingGuard token)
69 : data_(StreamOwned{stream, start, len}), token_(token) {}
70
71 // Returns the string as a view, regardless of whether it is owned or not.
AsView()72 absl::string_view AsView() const {
73 if (auto* unowned = absl::get_if<StreamOwned>(&data_)) {
74 return unowned->AsView();
75 }
76
77 return absl::get<std::string>(data_);
78 }
79
string_view()80 operator absl::string_view() const { return AsView(); } // NOLINT
81
82 // Returns a reference to an owned string; if the wrapped string is not
83 // owned, this function will perform a copy and make it owned.
ToString()84 std::string& ToString() {
85 if (auto* unowned = absl::get_if<StreamOwned>(&data_)) {
86 data_ = std::string(unowned->AsView());
87 token_ = BufferingGuard{};
88 }
89
90 return absl::get<std::string>(data_);
91 }
92
93 template <typename String>
94 friend bool operator==(const MaybeOwnedString& lhs, const String& rhs) {
95 return lhs.AsView() == rhs;
96 }
97 template <typename String>
98 friend bool operator!=(const MaybeOwnedString& lhs, const String& rhs) {
99 return !(lhs == rhs);
100 }
101
102 private:
103 struct StreamOwned {
104 ZeroCopyBufferedStream* stream;
105 size_t start, len;
106 absl::string_view AsView() const;
107 };
108 absl::variant<std::string, StreamOwned> data_;
109 BufferingGuard token_;
110 };
111
112 // A mark in a stream. See ZeroCopyBufferedStream::Mark().
113 class Mark {
114 public:
115 // Returns a maybe-owned string up to the unread bytes boundary, except for
116 // the last `clip` bytes.
117 MaybeOwnedString UpToUnread(size_t clip = 0) const;
118
119 // Discards this mark and its hold on the buffer.
Discard()120 void Discard() && { guard_ = BufferingGuard(); }
121
122 private:
123 friend ZeroCopyBufferedStream;
Mark(size_t offset,BufferingGuard guard)124 Mark(size_t offset, BufferingGuard guard) : offset_(offset), guard_(guard) {}
125
126 size_t offset_;
127 BufferingGuard guard_;
128 };
129
130 // A wrapper over a ZeroCopyInputStream that allows doing as-needed buffer for
131 // obtaining contiguous chunks larger than those the underlying stream might
132 // provide, while minimizing the amount of actual copying.
133 class ZeroCopyBufferedStream {
134 public:
ZeroCopyBufferedStream(io::ZeroCopyInputStream * stream)135 explicit ZeroCopyBufferedStream(io::ZeroCopyInputStream* stream)
136 : stream_(stream) {}
137
138 // Returns whether the stream is currently at eof.
139 //
140 // This function will buffer at least one character to verify whether it
141 // actually *is* at EOF.
AtEof()142 bool AtEof() {
143 (void)BufferAtLeast(1);
144 return eof_;
145 }
146
147 // Takes exactly n characters from a string.
Take(size_t len)148 absl::StatusOr<MaybeOwnedString> Take(size_t len) {
149 auto buffering = BufferAtLeast(len);
150 RETURN_IF_ERROR(buffering.status());
151
152 size_t start = cursor_;
153 RETURN_IF_ERROR(Advance(len));
154 return MaybeOwnedString(this, start, len, *buffering);
155 }
156
157 // Takes characters to form a string, according to the given predicate. Stops
158 // early if an EOF is hit.
159 //
160 // The predicate must have type `(int, char) -> bool`; the first argument
161 // is the index of the character.
162 template <typename Pred>
163 absl::StatusOr<MaybeOwnedString> TakeWhile(Pred p);
164
165 // Places a mark in the stream, ensuring that all characters consumed after
166 // the mark are buffered. This can be used to parse some characters and then
167 // recover everything that follows as a contiguous string_view so that it may
168 // be processed a second time.
169 //
170 // The returned value is an RAII type that ensure the buffer sticks around
171 // long enough.
BeginMark()172 Mark BeginMark() { return Mark(cursor_, BufferingGuard(this)); }
173
174 // Peeks the next character in the stream.
175 //
176 // This function will not enable buffering on its own, and will read past the
177 // end of the buffer if at EOF; BufferAtLeast() should be called before
178 // calling this function.
PeekChar()179 char PeekChar() {
180 ABSL_DCHECK(!Unread().empty());
181 return Unread()[0];
182 }
183
184 // Advances the cursor by the given number of bytes.
185 absl::Status Advance(size_t bytes);
186
187 // Returns a view of the current buffer, which may be either the owned
188 // `buf_` or the stream-owned `last_chunk_`.
189 //
190 // The returned view is unstable: calling any function may invalidate it,
191 // because there will not be a `BufferingGuard` to guard it.
192 absl::string_view RawBuffer(size_t start,
193 size_t len = absl::string_view::npos) const;
194
195 // Returns a view of RawBuffer, unread bytes; this will not be the entirety
196 // of the underlying stream.
Unread()197 absl::string_view Unread() const { return RawBuffer(cursor_); }
198
IsBuffering()199 bool IsBuffering() const { return using_buf_; }
200
201 // Buffers at least `bytes` bytes ahead of the current cursor position,
202 // possibly enabling buffering.
203 //
204 // Returns an error if that many bytes could not be RawBuffer.
205 absl::StatusOr<BufferingGuard> BufferAtLeast(size_t bytes);
206
207 private:
208 friend BufferingGuard;
209 friend Mark;
210 friend MaybeOwnedString;
211
212 // Increments the buffering refcount; this will also update `buffer_start_` if
213 // necessary.
UpRefBuffer()214 void UpRefBuffer() {
215 if (outstanding_buffer_borrows_++ == 0) {
216 buffer_start_ = cursor_;
217 }
218 }
219
220 // Decrements the buffering refcount; calling this function if the refcount is
221 // zero is undefined behavior.
222 //
223 // This function should not be called directly; it is called automatically
224 // by the destructor of `BufferingGuard`.
225 void DownRefBuffer();
226
227 // Obtains a new chunk from the underlying stream; returns whether there is
228 // still more data to read.
229 bool ReadChunk();
230
231 // The streamer implements a buffering stream on top of the given stream, by
232 // the following mechanism:
233 // - `cursor_` is an offset into either `last_chunk_` or `buf_`, which can
234 // be obtained via RawBuffer() and Unread():
235 // - If `using_buf_` is true, it is an offset into `buf_`.
236 // - Otherwise it is an offset into `last_chunk_`.
237 // - If `outstanding_buffer_borrows_ > 0`, someone needs the buffer to stick
238 // around. MaybeUnownedString::StreamOwned is implemented such that it does
239 // not hold onto `last_chunk_` directly, so we can freely copy it into
240 // `buf_` as needed arises.
241 // - Note that we can copy only part if we update `buffer_start_`; see
242 // RawBuffer().
243 // - If we would read more data and `outstanding_buffer_borrows_ > 0`, instead
244 // of trashing `last_chunk_`, we copy it into `buf_` and append to `buf_`
245 // each time we read.
246 // - If `outstanding_buffer_borrows_ == 0`, we can trash `buf_` and go back to
247 // using `last_chunk_` directly. See `DownRefBuffer()`.
248 io::ZeroCopyInputStream* stream_;
249 absl::string_view last_chunk_;
250 std::vector<char> buf_;
251 bool using_buf_ = false;
252 size_t cursor_ = 0;
253 // Invariant: this always refers to the earliest point at which we requested
254 // buffering, since the last time outstanding_buffer_borrows_ was zero.
255 size_t buffer_start_ = 0;
256 bool eof_ = false;
257 int outstanding_buffer_borrows_ = 0;
258 };
259
260 // These functions all rely on the definition of ZeroCopyBufferedStream, so must
261 // come after it.
BufferingGuard(ZeroCopyBufferedStream * owner)262 inline BufferingGuard::BufferingGuard(ZeroCopyBufferedStream* owner)
263 : owner_(owner) {
264 if (owner_ != nullptr) {
265 owner_->UpRefBuffer();
266 }
267 }
268
~BufferingGuard()269 inline BufferingGuard::~BufferingGuard() {
270 if (owner_ != nullptr) {
271 owner_->DownRefBuffer();
272 owner_ = nullptr;
273 }
274 }
275
AsView()276 inline absl::string_view MaybeOwnedString::StreamOwned::AsView() const {
277 return stream->RawBuffer(start, len);
278 }
279
UpToUnread(size_t clip)280 inline MaybeOwnedString Mark::UpToUnread(size_t clip) const {
281 return MaybeOwnedString(guard_.owner_, offset_,
282 guard_.owner_->cursor_ - offset_ - clip, guard_);
283 }
284
285 template <typename Pred>
TakeWhile(Pred p)286 absl::StatusOr<MaybeOwnedString> ZeroCopyBufferedStream::TakeWhile(Pred p) {
287 size_t start = cursor_;
288 BufferingGuard guard(this);
289 while (true) {
290 if (!BufferAtLeast(1).ok()) {
291 // We treat EOF as ending the take, rather than being an error.
292 break;
293 }
294 if (!p(cursor_ - start, PeekChar())) {
295 break;
296 }
297 RETURN_IF_ERROR(Advance(1));
298 }
299
300 return MaybeOwnedString(this, start, cursor_ - start, guard);
301 }
302
RawBuffer(size_t start,size_t len)303 inline absl::string_view ZeroCopyBufferedStream::RawBuffer(size_t start,
304 size_t len) const {
305 absl::string_view view = last_chunk_;
306 if (using_buf_) {
307 ABSL_DCHECK_LE(buffer_start_, start);
308 start -= buffer_start_;
309 view = absl::string_view(buf_.data(), buf_.size());
310 }
311 #if 0
312 // This print statement is especially useful for trouble-shooting low-level
313 // bugs in the buffering logic.
314 ABSL_LOG(INFO) << absl::StreamFormat("%s(\"%s\")[%d:%d]/%d:%d @ %p",
315 using_buf_ ? "buf_" : "last_chunk_",
316 view, start, static_cast<int>(len),
317 buffer_start_, cursor_, this);
318 #endif
319 ABSL_DCHECK_LE(start, view.size());
320 if (len == absl::string_view::npos) {
321 return view.substr(start);
322 }
323
324 ABSL_DCHECK_LE(start + len, view.size());
325 return view.substr(start, len);
326 }
327 } // namespace json_internal
328 } // namespace protobuf
329 } // namespace google
330
331 #include "google/protobuf/port_undef.inc"
332 #endif // GOOGLE_PROTOBUF_JSON_INTERNAL_ZERO_COPY_BUFFERED_STREAM_H__
333