1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7
8 // Author: kenton@google.com (Kenton Varda)
9 // Based on original Protocol Buffers design by
10 // Sanjay Ghemawat, Jeff Dean, and others.
11
12 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
13
14 #include <algorithm>
15 #include <limits>
16 #include <utility>
17
18 #include "google/protobuf/stubs/common.h"
19 #include "absl/base/casts.h"
20 #include "absl/log/absl_check.h"
21 #include "absl/strings/cord.h"
22 #include "absl/strings/internal/resize_uninitialized.h"
23
24 // Must be included last
25 #include "google/protobuf/port_def.inc"
26
27 namespace google {
28 namespace protobuf {
29 namespace io {
30
31 namespace {
32
33 // Default block size for Copying{In,Out}putStreamAdaptor.
34 static const int kDefaultBlockSize = 8192;
35
36 } // namespace
37
38 // ===================================================================
39
ArrayInputStream(const void * data,int size,int block_size)40 ArrayInputStream::ArrayInputStream(const void* data, int size, int block_size)
41 : data_(reinterpret_cast<const uint8_t*>(data)),
42 size_(size),
43 block_size_(block_size > 0 ? block_size : size),
44 position_(0),
45 last_returned_size_(0) {}
46
Next(const void ** data,int * size)47 bool ArrayInputStream::Next(const void** data, int* size) {
48 if (position_ < size_) {
49 last_returned_size_ = std::min(block_size_, size_ - position_);
50 *data = data_ + position_;
51 *size = last_returned_size_;
52 position_ += last_returned_size_;
53 return true;
54 } else {
55 // We're at the end of the array.
56 last_returned_size_ = 0; // Don't let caller back up.
57 return false;
58 }
59 }
60
BackUp(int count)61 void ArrayInputStream::BackUp(int count) {
62 ABSL_CHECK_GT(last_returned_size_, 0)
63 << "BackUp() can only be called after a successful Next().";
64 ABSL_CHECK_LE(count, last_returned_size_);
65 ABSL_CHECK_GE(count, 0);
66 position_ -= count;
67 last_returned_size_ = 0; // Don't let caller back up further.
68 }
69
Skip(int count)70 bool ArrayInputStream::Skip(int count) {
71 ABSL_CHECK_GE(count, 0);
72 last_returned_size_ = 0; // Don't let caller back up.
73 if (count > size_ - position_) {
74 position_ = size_;
75 return false;
76 } else {
77 position_ += count;
78 return true;
79 }
80 }
81
ByteCount() const82 int64_t ArrayInputStream::ByteCount() const { return position_; }
83
84
85 // ===================================================================
86
ArrayOutputStream(void * data,int size,int block_size)87 ArrayOutputStream::ArrayOutputStream(void* data, int size, int block_size)
88 : data_(reinterpret_cast<uint8_t*>(data)),
89 size_(size),
90 block_size_(block_size > 0 ? block_size : size),
91 position_(0),
92 last_returned_size_(0) {}
93
Next(void ** data,int * size)94 bool ArrayOutputStream::Next(void** data, int* size) {
95 if (position_ < size_) {
96 last_returned_size_ = std::min(block_size_, size_ - position_);
97 *data = data_ + position_;
98 *size = last_returned_size_;
99 position_ += last_returned_size_;
100 return true;
101 } else {
102 // We're at the end of the array.
103 last_returned_size_ = 0; // Don't let caller back up.
104 return false;
105 }
106 }
107
BackUp(int count)108 void ArrayOutputStream::BackUp(int count) {
109 ABSL_CHECK_LE(count, last_returned_size_)
110 << "BackUp() can not exceed the size of the last Next() call.";
111 ABSL_CHECK_GE(count, 0);
112 position_ -= count;
113 last_returned_size_ -= count;
114 }
115
ByteCount() const116 int64_t ArrayOutputStream::ByteCount() const { return position_; }
117
118 // ===================================================================
119
StringOutputStream(std::string * target)120 StringOutputStream::StringOutputStream(std::string* target) : target_(target) {}
121
Next(void ** data,int * size)122 bool StringOutputStream::Next(void** data, int* size) {
123 ABSL_CHECK(target_ != nullptr);
124 size_t old_size = target_->size();
125
126 // Grow the string.
127 size_t new_size;
128 if (old_size < target_->capacity()) {
129 // Resize the string to match its capacity, since we can get away
130 // without a memory allocation this way.
131 new_size = target_->capacity();
132 } else {
133 // Size has reached capacity, try to double it.
134 new_size = old_size * 2;
135 }
136 // Avoid integer overflow in returned '*size'.
137 new_size = std::min(new_size, old_size + std::numeric_limits<int>::max());
138 // Increase the size, also make sure that it is at least kMinimumSize.
139 absl::strings_internal::STLStringResizeUninitialized(
140 target_,
141 std::max(new_size,
142 kMinimumSize + 0)); // "+ 0" works around GCC4 weirdness.
143
144 *data = mutable_string_data(target_) + old_size;
145 *size = target_->size() - old_size;
146 return true;
147 }
148
BackUp(int count)149 void StringOutputStream::BackUp(int count) {
150 ABSL_CHECK_GE(count, 0);
151 ABSL_CHECK(target_ != nullptr);
152 ABSL_CHECK_LE(static_cast<size_t>(count), target_->size());
153 target_->resize(target_->size() - count);
154 }
155
ByteCount() const156 int64_t StringOutputStream::ByteCount() const {
157 ABSL_CHECK(target_ != nullptr);
158 return target_->size();
159 }
160
161 // ===================================================================
162
Skip(int count)163 int CopyingInputStream::Skip(int count) {
164 char junk[4096];
165 int skipped = 0;
166 while (skipped < count) {
167 int bytes = Read(junk, std::min(count - skipped,
168 absl::implicit_cast<int>(sizeof(junk))));
169 if (bytes <= 0) {
170 // EOF or read error.
171 return skipped;
172 }
173 skipped += bytes;
174 }
175 return skipped;
176 }
177
CopyingInputStreamAdaptor(CopyingInputStream * copying_stream,int block_size)178 CopyingInputStreamAdaptor::CopyingInputStreamAdaptor(
179 CopyingInputStream* copying_stream, int block_size)
180 : copying_stream_(copying_stream),
181 owns_copying_stream_(false),
182 failed_(false),
183 position_(0),
184 buffer_size_(block_size > 0 ? block_size : kDefaultBlockSize),
185 buffer_used_(0),
186 backup_bytes_(0) {}
187
~CopyingInputStreamAdaptor()188 CopyingInputStreamAdaptor::~CopyingInputStreamAdaptor() {
189 if (owns_copying_stream_) {
190 delete copying_stream_;
191 }
192 }
193
Next(const void ** data,int * size)194 bool CopyingInputStreamAdaptor::Next(const void** data, int* size) {
195 if (failed_) {
196 // Already failed on a previous read.
197 return false;
198 }
199
200 AllocateBufferIfNeeded();
201
202 if (backup_bytes_ > 0) {
203 // We have data left over from a previous BackUp(), so just return that.
204 *data = buffer_.get() + buffer_used_ - backup_bytes_;
205 *size = backup_bytes_;
206 backup_bytes_ = 0;
207 return true;
208 }
209
210 // Read new data into the buffer.
211 buffer_used_ = copying_stream_->Read(buffer_.get(), buffer_size_);
212 if (buffer_used_ <= 0) {
213 // EOF or read error. We don't need the buffer anymore.
214 if (buffer_used_ < 0) {
215 // Read error (not EOF).
216 failed_ = true;
217 }
218 FreeBuffer();
219 return false;
220 }
221 position_ += buffer_used_;
222
223 *size = buffer_used_;
224 *data = buffer_.get();
225 return true;
226 }
227
BackUp(int count)228 void CopyingInputStreamAdaptor::BackUp(int count) {
229 ABSL_CHECK(backup_bytes_ == 0 && buffer_.get() != NULL)
230 << " BackUp() can only be called after Next().";
231 ABSL_CHECK_LE(count, buffer_used_)
232 << " Can't back up over more bytes than were returned by the last call"
233 " to Next().";
234 ABSL_CHECK_GE(count, 0) << " Parameter to BackUp() can't be negative.";
235
236 backup_bytes_ = count;
237 }
238
Skip(int count)239 bool CopyingInputStreamAdaptor::Skip(int count) {
240 ABSL_CHECK_GE(count, 0);
241
242 if (failed_) {
243 // Already failed on a previous read.
244 return false;
245 }
246
247 // First skip any bytes left over from a previous BackUp().
248 if (backup_bytes_ >= count) {
249 // We have more data left over than we're trying to skip. Just chop it.
250 backup_bytes_ -= count;
251 return true;
252 }
253
254 count -= backup_bytes_;
255 backup_bytes_ = 0;
256
257 int skipped = copying_stream_->Skip(count);
258 position_ += skipped;
259 return skipped == count;
260 }
261
ByteCount() const262 int64_t CopyingInputStreamAdaptor::ByteCount() const {
263 return position_ - backup_bytes_;
264 }
265
AllocateBufferIfNeeded()266 void CopyingInputStreamAdaptor::AllocateBufferIfNeeded() {
267 if (buffer_.get() == NULL) {
268 buffer_.reset(new uint8_t[buffer_size_]);
269 }
270 }
271
FreeBuffer()272 void CopyingInputStreamAdaptor::FreeBuffer() {
273 ABSL_CHECK_EQ(backup_bytes_, 0);
274 buffer_used_ = 0;
275 buffer_.reset();
276 }
277
278 // ===================================================================
279
CopyingOutputStreamAdaptor(CopyingOutputStream * copying_stream,int block_size)280 CopyingOutputStreamAdaptor::CopyingOutputStreamAdaptor(
281 CopyingOutputStream* copying_stream, int block_size)
282 : copying_stream_(copying_stream),
283 owns_copying_stream_(false),
284 failed_(false),
285 position_(0),
286 buffer_size_(block_size > 0 ? block_size : kDefaultBlockSize),
287 buffer_used_(0) {}
288
~CopyingOutputStreamAdaptor()289 CopyingOutputStreamAdaptor::~CopyingOutputStreamAdaptor() {
290 WriteBuffer();
291 if (owns_copying_stream_) {
292 delete copying_stream_;
293 }
294 }
295
Flush()296 bool CopyingOutputStreamAdaptor::Flush() { return WriteBuffer(); }
297
Next(void ** data,int * size)298 bool CopyingOutputStreamAdaptor::Next(void** data, int* size) {
299 if (buffer_used_ == buffer_size_) {
300 if (!WriteBuffer()) return false;
301 }
302
303 AllocateBufferIfNeeded();
304
305 *data = buffer_.get() + buffer_used_;
306 *size = buffer_size_ - buffer_used_;
307 buffer_used_ = buffer_size_;
308 return true;
309 }
310
BackUp(int count)311 void CopyingOutputStreamAdaptor::BackUp(int count) {
312 if (count == 0) {
313 Flush();
314 return;
315 }
316 ABSL_CHECK_GE(count, 0);
317 ABSL_CHECK_EQ(buffer_used_, buffer_size_)
318 << " BackUp() can only be called after Next().";
319 ABSL_CHECK_LE(count, buffer_used_)
320 << " Can't back up over more bytes than were returned by the last call"
321 " to Next().";
322
323 buffer_used_ -= count;
324 }
325
ByteCount() const326 int64_t CopyingOutputStreamAdaptor::ByteCount() const {
327 return position_ + buffer_used_;
328 }
329
WriteAliasedRaw(const void * data,int size)330 bool CopyingOutputStreamAdaptor::WriteAliasedRaw(const void* data, int size) {
331 if (size >= buffer_size_) {
332 if (!Flush() || !copying_stream_->Write(data, size)) {
333 return false;
334 }
335 ABSL_DCHECK_EQ(buffer_used_, 0);
336 position_ += size;
337 return true;
338 }
339
340 void* out;
341 int out_size;
342 while (true) {
343 if (!Next(&out, &out_size)) {
344 return false;
345 }
346
347 if (size <= out_size) {
348 std::memcpy(out, data, size);
349 BackUp(out_size - size);
350 return true;
351 }
352
353 std::memcpy(out, data, out_size);
354 data = static_cast<const char*>(data) + out_size;
355 size -= out_size;
356 }
357 return true;
358 }
359
WriteCord(const absl::Cord & cord)360 bool CopyingOutputStreamAdaptor::WriteCord(const absl::Cord& cord) {
361 for (absl::string_view chunk : cord.Chunks()) {
362 if (!WriteAliasedRaw(chunk.data(), chunk.size())) {
363 return false;
364 }
365 }
366 return true;
367 }
368
WriteBuffer()369 bool CopyingOutputStreamAdaptor::WriteBuffer() {
370 if (failed_) {
371 // Already failed on a previous write.
372 return false;
373 }
374
375 if (buffer_used_ == 0) return true;
376
377 if (copying_stream_->Write(buffer_.get(), buffer_used_)) {
378 position_ += buffer_used_;
379 buffer_used_ = 0;
380 return true;
381 } else {
382 failed_ = true;
383 FreeBuffer();
384 return false;
385 }
386 }
387
AllocateBufferIfNeeded()388 void CopyingOutputStreamAdaptor::AllocateBufferIfNeeded() {
389 if (buffer_ == NULL) {
390 buffer_.reset(new uint8_t[buffer_size_]);
391 }
392 }
393
FreeBuffer()394 void CopyingOutputStreamAdaptor::FreeBuffer() {
395 buffer_used_ = 0;
396 buffer_.reset();
397 }
398
399 // ===================================================================
400
LimitingInputStream(ZeroCopyInputStream * input,int64_t limit)401 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
402 int64_t limit)
403 : input_(input), limit_(limit) {
404 prior_bytes_read_ = input_->ByteCount();
405 }
406
~LimitingInputStream()407 LimitingInputStream::~LimitingInputStream() {
408 // If we overshot the limit, back up.
409 if (limit_ < 0) input_->BackUp(-limit_);
410 }
411
Next(const void ** data,int * size)412 bool LimitingInputStream::Next(const void** data, int* size) {
413 if (limit_ <= 0) return false;
414 if (!input_->Next(data, size)) return false;
415
416 limit_ -= *size;
417 if (limit_ < 0) {
418 // We overshot the limit. Reduce *size to hide the rest of the buffer.
419 *size += limit_;
420 }
421 return true;
422 }
423
BackUp(int count)424 void LimitingInputStream::BackUp(int count) {
425 if (limit_ < 0) {
426 input_->BackUp(count - limit_);
427 limit_ = count;
428 } else {
429 input_->BackUp(count);
430 limit_ += count;
431 }
432 }
433
Skip(int count)434 bool LimitingInputStream::Skip(int count) {
435 if (count > limit_) {
436 if (limit_ < 0) return false;
437 input_->Skip(limit_);
438 limit_ = 0;
439 return false;
440 } else {
441 if (!input_->Skip(count)) return false;
442 limit_ -= count;
443 return true;
444 }
445 }
446
ByteCount() const447 int64_t LimitingInputStream::ByteCount() const {
448 if (limit_ < 0) {
449 return input_->ByteCount() + limit_ - prior_bytes_read_;
450 } else {
451 return input_->ByteCount() - prior_bytes_read_;
452 }
453 }
454
ReadCord(absl::Cord * cord,int count)455 bool LimitingInputStream::ReadCord(absl::Cord* cord, int count) {
456 if (count <= 0) return true;
457 if (count <= limit_) {
458 if (!input_->ReadCord(cord, count)) return false;
459 limit_ -= count;
460 return true;
461 }
462 input_->ReadCord(cord, limit_);
463 limit_ = 0;
464 return false;
465 }
466
467
468 // ===================================================================
CordInputStream(const absl::Cord * cord)469 CordInputStream::CordInputStream(const absl::Cord* cord)
470 : it_(cord->char_begin()),
471 length_(cord->size()),
472 bytes_remaining_(length_) {
473 LoadChunkData();
474 }
475
LoadChunkData()476 bool CordInputStream::LoadChunkData() {
477 if (bytes_remaining_ != 0) {
478 absl::string_view sv = absl::Cord::ChunkRemaining(it_);
479 data_ = sv.data();
480 size_ = available_ = sv.size();
481 return true;
482 }
483 size_ = available_ = 0;
484 return false;
485 }
486
NextChunk(size_t skip)487 bool CordInputStream::NextChunk(size_t skip) {
488 // `size_ == 0` indicates we're at EOF.
489 if (size_ == 0) return false;
490
491 // The caller consumed 'size_ - available_' bytes that are not yet accounted
492 // for in the iterator position to get to the start of the next chunk.
493 const size_t distance = size_ - available_ + skip;
494 absl::Cord::Advance(&it_, distance);
495 bytes_remaining_ -= skip;
496
497 return LoadChunkData();
498 }
499
Next(const void ** data,int * size)500 bool CordInputStream::Next(const void** data, int* size) {
501 if (available_ > 0 || NextChunk(0)) {
502 *data = data_ + size_ - available_;
503 *size = available_;
504 bytes_remaining_ -= available_;
505 available_ = 0;
506 return true;
507 }
508 return false;
509 }
510
BackUp(int count)511 void CordInputStream::BackUp(int count) {
512 // Backup is only allowed on last returned chunk from `Next()`.
513 ABSL_CHECK_LE(static_cast<size_t>(count), size_ - available_);
514
515 available_ += count;
516 bytes_remaining_ += count;
517 }
518
Skip(int count)519 bool CordInputStream::Skip(int count) {
520 // Short circuit if we stay inside the current chunk.
521 if (static_cast<size_t>(count) <= available_) {
522 available_ -= count;
523 bytes_remaining_ -= count;
524 return true;
525 }
526
527 // Sanity check the skip count.
528 if (static_cast<size_t>(count) <= bytes_remaining_) {
529 // Skip to end: do not return EOF condition: skipping into EOF is ok.
530 NextChunk(count);
531 return true;
532 }
533 NextChunk(bytes_remaining_);
534 return false;
535 }
536
ByteCount() const537 int64_t CordInputStream::ByteCount() const {
538 return length_ - bytes_remaining_;
539 }
540
ReadCord(absl::Cord * cord,int count)541 bool CordInputStream::ReadCord(absl::Cord* cord, int count) {
542 // Advance the iterator to the current position
543 const size_t used = size_ - available_;
544 absl::Cord::Advance(&it_, used);
545
546 // Read the cord, adjusting the iterator position.
547 // Make sure to cap at available bytes to avoid hard crashes.
548 const size_t n = std::min(static_cast<size_t>(count), bytes_remaining_);
549 cord->Append(absl::Cord::AdvanceAndRead(&it_, n));
550
551 // Update current chunk data.
552 bytes_remaining_ -= n;
553 LoadChunkData();
554
555 return n == static_cast<size_t>(count);
556 }
557
558
CordOutputStream(size_t size_hint)559 CordOutputStream::CordOutputStream(size_t size_hint) : size_hint_(size_hint) {}
560
CordOutputStream(absl::Cord cord,size_t size_hint)561 CordOutputStream::CordOutputStream(absl::Cord cord, size_t size_hint)
562 : cord_(std::move(cord)),
563 size_hint_(size_hint),
564 state_(cord_.empty() ? State::kEmpty : State::kSteal) {}
565
CordOutputStream(absl::CordBuffer buffer,size_t size_hint)566 CordOutputStream::CordOutputStream(absl::CordBuffer buffer, size_t size_hint)
567 : size_hint_(size_hint),
568 state_(buffer.length() < buffer.capacity() ? State::kPartial
569 : State::kFull),
570 buffer_(std::move(buffer)) {}
571
CordOutputStream(absl::Cord cord,absl::CordBuffer buffer,size_t size_hint)572 CordOutputStream::CordOutputStream(absl::Cord cord, absl::CordBuffer buffer,
573 size_t size_hint)
574 : cord_(std::move(cord)),
575 size_hint_(size_hint),
576 state_(buffer.length() < buffer.capacity() ? State::kPartial
577 : State::kFull),
578 buffer_(std::move(buffer)) {}
579
Next(void ** data,int * size)580 bool CordOutputStream::Next(void** data, int* size) {
581 // Use 128 bytes as a minimum buffer size if we don't have any application
582 // provided size hints. This number is picked somewhat arbitrary as 'small
583 // enough to avoid excessive waste on small data, and large enough to not
584 // waste CPU and memory on tiny buffer overhead'.
585 // It is worth noting that absent size hints, we pick 'current size' as
586 // the default buffer size (capped at max flat size), which means we quickly
587 // double the buffer size. This is in contrast to `Cord::Append()` functions
588 // accepting strings which use a conservative 10% growth.
589 static const size_t kMinBlockSize = 128;
590
591 size_t desired_size, max_size;
592 const size_t cord_size = cord_.size() + buffer_.length();
593 if (size_hint_ > cord_size) {
594 // Try to hit size_hint_ exactly so the caller doesn't receive a larger
595 // buffer than indicated, requiring a non-zero call to BackUp() to undo
596 // the buffer capacity we returned beyond the indicated size hint.
597 desired_size = size_hint_ - cord_size;
598 max_size = desired_size;
599 } else {
600 // We're past the size hint or don't have a size hint. Try to allocate a
601 // block as large as what we have so far, or at least kMinBlockSize bytes.
602 // CordBuffer will truncate this to an appropriate size if it is too large.
603 desired_size = std::max(cord_size, kMinBlockSize);
604 max_size = std::numeric_limits<size_t>::max();
605 }
606
607 switch (state_) {
608 case State::kSteal:
609 // Steal last buffer from Cord if available.
610 assert(buffer_.length() == 0);
611 buffer_ = cord_.GetAppendBuffer(desired_size);
612 break;
613 case State::kPartial:
614 // Use existing capacity in 'buffer_`
615 assert(buffer_.length() < buffer_.capacity());
616 break;
617 case State::kFull:
618 assert(buffer_.length() > 0);
619 cord_.Append(std::move(buffer_));
620 ABSL_FALLTHROUGH_INTENDED;
621 case State::kEmpty:
622 assert(buffer_.length() == 0);
623 buffer_ = absl::CordBuffer::CreateWithDefaultLimit(desired_size);
624 break;
625 }
626
627 // Get all available capacity from the buffer.
628 absl::Span<char> span = buffer_.available();
629 assert(!span.empty());
630 *data = span.data();
631
632 // Only hand out up to 'max_size', which is limited if there is a size hint
633 // specified, and we have more available than the size hint.
634 if (span.size() > max_size) {
635 *size = static_cast<int>(max_size);
636 buffer_.IncreaseLengthBy(max_size);
637 state_ = State::kPartial;
638 } else {
639 *size = static_cast<int>(span.size());
640 buffer_.IncreaseLengthBy(span.size());
641 state_ = State::kFull;
642 }
643
644 return true;
645 }
646
BackUp(int count)647 void CordOutputStream::BackUp(int count) {
648 // Check if something to do, else state remains unchanged.
649 assert(0 <= count && count <= ByteCount());
650 if (count == 0) return;
651
652 // Backup() is not supposed to backup beyond last Next() call
653 const int buffer_length = static_cast<int>(buffer_.length());
654 assert(count <= buffer_length);
655 if (count <= buffer_length) {
656 buffer_.SetLength(static_cast<size_t>(buffer_length - count));
657 state_ = State::kPartial;
658 } else {
659 buffer_ = {};
660 cord_.RemoveSuffix(static_cast<size_t>(count));
661 state_ = State::kSteal;
662 }
663 }
664
ByteCount() const665 int64_t CordOutputStream::ByteCount() const {
666 return static_cast<int64_t>(cord_.size() + buffer_.length());
667 }
668
WriteCord(const absl::Cord & cord)669 bool CordOutputStream::WriteCord(const absl::Cord& cord) {
670 cord_.Append(std::move(buffer_));
671 cord_.Append(cord);
672 state_ = State::kSteal; // Attempt to utilize existing capacity in `cord'
673 return true;
674 }
675
Consume()676 absl::Cord CordOutputStream::Consume() {
677 cord_.Append(std::move(buffer_));
678 state_ = State::kEmpty;
679 return std::move(cord_);
680 }
681
682
683 } // namespace io
684 } // namespace protobuf
685 } // namespace google
686
687 #include "google/protobuf/port_undef.inc"
688