1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 //
4 // Use of this source code is governed by a BSD-style
5 // license that can be found in the LICENSE file or at
6 // https://developers.google.com/open-source/licenses/bsd
7
8 // Author: kenton@google.com (Kenton Varda)
9 // Based on original Protocol Buffers design by
10 // Sanjay Ghemawat, Jeff Dean, and others.
11 //
12 // Testing strategy: For each type of I/O (array, string, file, etc.) we
13 // create an output stream and write some data to it, then create a
14 // corresponding input stream to read the same data back and expect it to
15 // match. When the data is written, it is written in several small chunks
16 // of varying sizes, with a BackUp() after each chunk. It is read back
17 // similarly, but with chunks separated at different points. The whole
18 // process is run with a variety of block sizes for both the input and
19 // the output.
20 //
21 // TODO: Rewrite this test to bring it up to the standards of all
22 // the other proto2 tests. May want to wait for gTest to implement
23 // "parametized tests" so that one set of tests can be used on all the
24 // implementations.
25
26 #ifndef _WIN32
27 #include <sys/socket.h>
28 #include <unistd.h>
29 #endif
30 #include <errno.h>
31 #include <fcntl.h>
32 #include <stdlib.h>
33 #include <sys/stat.h>
34 #include <sys/types.h>
35
36 #include <algorithm>
37 #include <chrono>
38 #include <iterator>
39 #include <memory>
40 #include <sstream>
41 #include <thread>
42 #include <utility>
43 #include <vector>
44
45 #include "google/protobuf/stubs/common.h"
46 #include "google/protobuf/testing/file.h"
47 #include "google/protobuf/testing/file.h"
48 #include <gtest/gtest.h>
49 #include "absl/log/absl_check.h"
50 #include "absl/log/absl_log.h"
51 #include "absl/status/status.h"
52 #include "absl/strings/cord.h"
53 #include "absl/strings/cord_buffer.h"
54 #include "absl/strings/str_cat.h"
55 #include "absl/strings/string_view.h"
56 #include "google/protobuf/io/coded_stream.h"
57 #include "google/protobuf/io/io_win32.h"
58 #include "google/protobuf/io/zero_copy_stream_impl.h"
59 #include "google/protobuf/test_util2.h"
60
61 #if HAVE_ZLIB
62 #include "google/protobuf/io/gzip_stream.h"
63 #endif
64
65 #include "google/protobuf/test_util.h"
66
67 // Must be included last.
68 #include "google/protobuf/port_def.inc"
69
70 namespace google {
71 namespace protobuf {
72 namespace io {
73 namespace {
74
75 #ifdef _WIN32
76 #define pipe(fds) _pipe(fds, 4096, O_BINARY)
77 // DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
78 // them like we do below.
79 using google::protobuf::io::win32::access;
80 using google::protobuf::io::win32::close;
81 using google::protobuf::io::win32::mkdir;
82 using google::protobuf::io::win32::open;
83 #endif
84
85 #ifndef O_BINARY
86 #ifdef _O_BINARY
87 #define O_BINARY _O_BINARY
88 #else
89 #define O_BINARY 0 // If this isn't defined, the platform doesn't need it.
90 #endif
91 #endif
92
93 class IoTest : public testing::Test {
94 protected:
95 // Test helpers.
96
97 // Helper to write an array of data to an output stream.
98 bool WriteToOutput(ZeroCopyOutputStream* output, const void* data, int size);
99 // Helper to read a fixed-length array of data from an input stream.
100 int ReadFromInput(ZeroCopyInputStream* input, void* data, int size);
101 // Write a string to the output stream.
102 void WriteString(ZeroCopyOutputStream* output, const std::string& str);
103 // Read a number of bytes equal to the size of the given string and checks
104 // that it matches the string.
105 void ReadString(ZeroCopyInputStream* input, const std::string& str);
106 // Writes some text to the output stream in a particular order. Returns
107 // the number of bytes written, in case the caller needs that to set up an
108 // input stream.
109 int WriteStuff(ZeroCopyOutputStream* output);
110 // Reads text from an input stream and expects it to match what
111 // WriteStuff() writes.
112 void ReadStuff(ZeroCopyInputStream* input, bool read_eof = true);
113
114 // Similar to WriteStuff, but performs more sophisticated testing.
115 int WriteStuffLarge(ZeroCopyOutputStream* output);
116 // Reads and tests a stream that should have been written to
117 // via WriteStuffLarge().
118 void ReadStuffLarge(ZeroCopyInputStream* input);
119
120 #if HAVE_ZLIB
121 std::string Compress(const std::string& data,
122 const GzipOutputStream::Options& options);
123 std::string Uncompress(const std::string& data);
124 #endif
125
126 static const int kBlockSizes[];
127 static const int kBlockSizeCount;
128 };
129
130 const int IoTest::kBlockSizes[] = {-1, 1, 2, 5, 7, 10, 23, 64};
131 const int IoTest::kBlockSizeCount = ABSL_ARRAYSIZE(IoTest::kBlockSizes);
132
WriteToOutput(ZeroCopyOutputStream * output,const void * data,int size)133 bool IoTest::WriteToOutput(ZeroCopyOutputStream* output, const void* data,
134 int size) {
135 const uint8_t* in = reinterpret_cast<const uint8_t*>(data);
136 int in_size = size;
137
138 void* out;
139 int out_size;
140
141 while (true) {
142 if (!output->Next(&out, &out_size)) {
143 return false;
144 }
145 EXPECT_GT(out_size, 0);
146
147 if (in_size <= out_size) {
148 memcpy(out, in, in_size);
149 output->BackUp(out_size - in_size);
150 return true;
151 }
152
153 memcpy(out, in, out_size);
154 in += out_size;
155 in_size -= out_size;
156 }
157 }
158
159 #define MAX_REPEATED_ZEROS 100
160
ReadFromInput(ZeroCopyInputStream * input,void * data,int size)161 int IoTest::ReadFromInput(ZeroCopyInputStream* input, void* data, int size) {
162 uint8_t* out = reinterpret_cast<uint8_t*>(data);
163 int out_size = size;
164
165 const void* in;
166 int in_size = 0;
167
168 int repeated_zeros = 0;
169
170 while (true) {
171 if (!input->Next(&in, &in_size)) {
172 return size - out_size;
173 }
174 EXPECT_GT(in_size, -1);
175 if (in_size == 0) {
176 repeated_zeros++;
177 } else {
178 repeated_zeros = 0;
179 }
180 EXPECT_LT(repeated_zeros, MAX_REPEATED_ZEROS);
181
182 if (out_size <= in_size) {
183 memcpy(out, in, out_size);
184 if (in_size > out_size) {
185 input->BackUp(in_size - out_size);
186 }
187 return size; // Copied all of it.
188 }
189
190 memcpy(out, in, in_size);
191 out += in_size;
192 out_size -= in_size;
193 }
194 }
195
WriteString(ZeroCopyOutputStream * output,const std::string & str)196 void IoTest::WriteString(ZeroCopyOutputStream* output, const std::string& str) {
197 EXPECT_TRUE(WriteToOutput(output, str.c_str(), str.size()));
198 }
199
ReadString(ZeroCopyInputStream * input,const std::string & str)200 void IoTest::ReadString(ZeroCopyInputStream* input, const std::string& str) {
201 std::unique_ptr<char[]> buffer(new char[str.size() + 1]);
202 buffer[str.size()] = '\0';
203 EXPECT_EQ(ReadFromInput(input, buffer.get(), str.size()), str.size());
204 EXPECT_STREQ(str.c_str(), buffer.get());
205 }
206
WriteStuff(ZeroCopyOutputStream * output)207 int IoTest::WriteStuff(ZeroCopyOutputStream* output) {
208 WriteString(output, "Hello world!\n");
209 WriteString(output, "Some te");
210 WriteString(output, "xt. Blah blah.");
211 WriteString(output, "abcdefg");
212 WriteString(output, "01234567890123456789");
213 WriteString(output, "foobar");
214
215 EXPECT_EQ(output->ByteCount(), 68);
216
217 int result = output->ByteCount();
218 return result;
219 }
220
221 // Reads text from an input stream and expects it to match what WriteStuff()
222 // writes.
ReadStuff(ZeroCopyInputStream * input,bool read_eof)223 void IoTest::ReadStuff(ZeroCopyInputStream* input, bool read_eof) {
224 ReadString(input, "Hello world!\n");
225 ReadString(input, "Some text. ");
226 ReadString(input, "Blah ");
227 ReadString(input, "blah.");
228 ReadString(input, "abcdefg");
229 EXPECT_TRUE(input->Skip(20));
230 ReadString(input, "foo");
231 ReadString(input, "bar");
232
233 EXPECT_EQ(input->ByteCount(), 68);
234
235 if (read_eof) {
236 uint8_t byte;
237 EXPECT_EQ(ReadFromInput(input, &byte, 1), 0);
238 }
239 }
240
WriteStuffLarge(ZeroCopyOutputStream * output)241 int IoTest::WriteStuffLarge(ZeroCopyOutputStream* output) {
242 WriteString(output, "Hello world!\n");
243 WriteString(output, "Some te");
244 WriteString(output, "xt. Blah blah.");
245 WriteString(output, std::string(100000, 'x')); // A very long string
246 WriteString(output, std::string(100000, 'y')); // A very long string
247 WriteString(output, "01234567890123456789");
248
249 EXPECT_EQ(output->ByteCount(), 200055);
250
251 int result = output->ByteCount();
252 return result;
253 }
254
255 // Reads text from an input stream and expects it to match what WriteStuff()
256 // writes.
ReadStuffLarge(ZeroCopyInputStream * input)257 void IoTest::ReadStuffLarge(ZeroCopyInputStream* input) {
258 ReadString(input, "Hello world!\nSome text. ");
259 EXPECT_TRUE(input->Skip(5));
260 ReadString(input, "blah.");
261 EXPECT_TRUE(input->Skip(100000 - 10));
262 ReadString(input, std::string(10, 'x') + std::string(100000 - 20000, 'y'));
263 EXPECT_TRUE(input->Skip(20000 - 10));
264 ReadString(input, "yyyyyyyyyy01234567890123456789");
265
266 EXPECT_EQ(input->ByteCount(), 200055);
267
268 uint8_t byte;
269 EXPECT_EQ(ReadFromInput(input, &byte, 1), 0);
270 }
271
272 // ===================================================================
273
TEST_F(IoTest,ArrayIo)274 TEST_F(IoTest, ArrayIo) {
275 const int kBufferSize = 256;
276 uint8_t buffer[kBufferSize];
277
278 for (int i = 0; i < kBlockSizeCount; i++) {
279 for (int j = 0; j < kBlockSizeCount; j++) {
280 int size;
281 {
282 ArrayOutputStream output(buffer, kBufferSize, kBlockSizes[i]);
283 size = WriteStuff(&output);
284 }
285 {
286 ArrayInputStream input(buffer, size, kBlockSizes[j]);
287 ReadStuff(&input);
288 }
289 }
290 }
291 }
292
TEST_F(IoTest,TwoSessionWrite)293 TEST_F(IoTest, TwoSessionWrite) {
294 // Test that two concatenated write sessions read correctly
295
296 static const char* strA = "0123456789";
297 static const char* strB = "WhirledPeas";
298 const int kBufferSize = 2 * 1024;
299 uint8_t* buffer = new uint8_t[kBufferSize];
300 char* temp_buffer = new char[40];
301
302 for (int i = 0; i < kBlockSizeCount; i++) {
303 for (int j = 0; j < kBlockSizeCount; j++) {
304 ArrayOutputStream* output =
305 new ArrayOutputStream(buffer, kBufferSize, kBlockSizes[i]);
306 CodedOutputStream* coded_output = new CodedOutputStream(output);
307 coded_output->WriteVarint32(strlen(strA));
308 coded_output->WriteRaw(strA, strlen(strA));
309 delete coded_output; // flush
310 int64_t pos = output->ByteCount();
311 delete output;
312 output = new ArrayOutputStream(buffer + pos, kBufferSize - pos,
313 kBlockSizes[i]);
314 coded_output = new CodedOutputStream(output);
315 coded_output->WriteVarint32(strlen(strB));
316 coded_output->WriteRaw(strB, strlen(strB));
317 delete coded_output; // flush
318 int64_t size = pos + output->ByteCount();
319 delete output;
320
321 ArrayInputStream* input =
322 new ArrayInputStream(buffer, size, kBlockSizes[j]);
323 CodedInputStream* coded_input = new CodedInputStream(input);
324 uint32_t insize;
325 EXPECT_TRUE(coded_input->ReadVarint32(&insize));
326 EXPECT_EQ(strlen(strA), insize);
327 EXPECT_TRUE(coded_input->ReadRaw(temp_buffer, insize));
328 EXPECT_EQ(0, memcmp(temp_buffer, strA, insize));
329
330 EXPECT_TRUE(coded_input->ReadVarint32(&insize));
331 EXPECT_EQ(strlen(strB), insize);
332 EXPECT_TRUE(coded_input->ReadRaw(temp_buffer, insize));
333 EXPECT_EQ(0, memcmp(temp_buffer, strB, insize));
334
335 delete coded_input;
336 delete input;
337 }
338 }
339
340 delete[] temp_buffer;
341 delete[] buffer;
342 }
343
344 #if HAVE_ZLIB
TEST_F(IoTest,GzipIo)345 TEST_F(IoTest, GzipIo) {
346 const int kBufferSize = 2 * 1024;
347 uint8* buffer = new uint8[kBufferSize];
348 for (int i = 0; i < kBlockSizeCount; i++) {
349 for (int j = 0; j < kBlockSizeCount; j++) {
350 for (int z = 0; z < kBlockSizeCount; z++) {
351 int gzip_buffer_size = kBlockSizes[z];
352 int size;
353 {
354 ArrayOutputStream output(buffer, kBufferSize, kBlockSizes[i]);
355 GzipOutputStream::Options options;
356 options.format = GzipOutputStream::GZIP;
357 if (gzip_buffer_size != -1) {
358 options.buffer_size = gzip_buffer_size;
359 }
360 GzipOutputStream gzout(&output, options);
361 WriteStuff(&gzout);
362 gzout.Close();
363 size = output.ByteCount();
364 }
365 {
366 ArrayInputStream input(buffer, size, kBlockSizes[j]);
367 GzipInputStream gzin(&input, GzipInputStream::GZIP, gzip_buffer_size);
368 ReadStuff(&gzin);
369 }
370 }
371 }
372 }
373 delete[] buffer;
374 }
375
TEST_F(IoTest,GzipIoWithFlush)376 TEST_F(IoTest, GzipIoWithFlush) {
377 const int kBufferSize = 2 * 1024;
378 uint8* buffer = new uint8[kBufferSize];
379 // We start with i = 4 as we want a block size > 6. With block size <= 6
380 // Flush() fills up the entire 2K buffer with flush markers and the test
381 // fails. See documentation for Flush() for more detail.
382 for (int i = 4; i < kBlockSizeCount; i++) {
383 for (int j = 0; j < kBlockSizeCount; j++) {
384 for (int z = 0; z < kBlockSizeCount; z++) {
385 int gzip_buffer_size = kBlockSizes[z];
386 int size;
387 {
388 ArrayOutputStream output(buffer, kBufferSize, kBlockSizes[i]);
389 GzipOutputStream::Options options;
390 options.format = GzipOutputStream::GZIP;
391 if (gzip_buffer_size != -1) {
392 options.buffer_size = gzip_buffer_size;
393 }
394 GzipOutputStream gzout(&output, options);
395 WriteStuff(&gzout);
396 EXPECT_TRUE(gzout.Flush());
397 gzout.Close();
398 size = output.ByteCount();
399 }
400 {
401 ArrayInputStream input(buffer, size, kBlockSizes[j]);
402 GzipInputStream gzin(&input, GzipInputStream::GZIP, gzip_buffer_size);
403 ReadStuff(&gzin);
404 }
405 }
406 }
407 }
408 delete[] buffer;
409 }
410
TEST_F(IoTest,GzipIoContiguousFlushes)411 TEST_F(IoTest, GzipIoContiguousFlushes) {
412 const int kBufferSize = 2 * 1024;
413 uint8* buffer = new uint8[kBufferSize];
414
415 int block_size = kBlockSizes[4];
416 int gzip_buffer_size = block_size;
417 int size;
418
419 ArrayOutputStream output(buffer, kBufferSize, block_size);
420 GzipOutputStream::Options options;
421 options.format = GzipOutputStream::GZIP;
422 if (gzip_buffer_size != -1) {
423 options.buffer_size = gzip_buffer_size;
424 }
425 GzipOutputStream gzout(&output, options);
426 WriteStuff(&gzout);
427 EXPECT_TRUE(gzout.Flush());
428 EXPECT_TRUE(gzout.Flush());
429 gzout.Close();
430 size = output.ByteCount();
431
432 ArrayInputStream input(buffer, size, block_size);
433 GzipInputStream gzin(&input, GzipInputStream::GZIP, gzip_buffer_size);
434 ReadStuff(&gzin);
435
436 delete[] buffer;
437 }
438
TEST_F(IoTest,GzipIoReadAfterFlush)439 TEST_F(IoTest, GzipIoReadAfterFlush) {
440 const int kBufferSize = 2 * 1024;
441 uint8* buffer = new uint8[kBufferSize];
442
443 int block_size = kBlockSizes[4];
444 int gzip_buffer_size = block_size;
445 int size;
446 ArrayOutputStream output(buffer, kBufferSize, block_size);
447 GzipOutputStream::Options options;
448 options.format = GzipOutputStream::GZIP;
449 if (gzip_buffer_size != -1) {
450 options.buffer_size = gzip_buffer_size;
451 }
452
453 GzipOutputStream gzout(&output, options);
454 WriteStuff(&gzout);
455 EXPECT_TRUE(gzout.Flush());
456 size = output.ByteCount();
457
458 ArrayInputStream input(buffer, size, block_size);
459 GzipInputStream gzin(&input, GzipInputStream::GZIP, gzip_buffer_size);
460 ReadStuff(&gzin);
461
462 gzout.Close();
463
464 delete[] buffer;
465 }
466
TEST_F(IoTest,ZlibIo)467 TEST_F(IoTest, ZlibIo) {
468 const int kBufferSize = 2 * 1024;
469 uint8* buffer = new uint8[kBufferSize];
470 for (int i = 0; i < kBlockSizeCount; i++) {
471 for (int j = 0; j < kBlockSizeCount; j++) {
472 for (int z = 0; z < kBlockSizeCount; z++) {
473 int gzip_buffer_size = kBlockSizes[z];
474 int size;
475 {
476 ArrayOutputStream output(buffer, kBufferSize, kBlockSizes[i]);
477 GzipOutputStream::Options options;
478 options.format = GzipOutputStream::ZLIB;
479 if (gzip_buffer_size != -1) {
480 options.buffer_size = gzip_buffer_size;
481 }
482 GzipOutputStream gzout(&output, options);
483 WriteStuff(&gzout);
484 gzout.Close();
485 size = output.ByteCount();
486 }
487 {
488 ArrayInputStream input(buffer, size, kBlockSizes[j]);
489 GzipInputStream gzin(&input, GzipInputStream::ZLIB, gzip_buffer_size);
490 ReadStuff(&gzin);
491 }
492 }
493 }
494 }
495 delete[] buffer;
496 }
497
TEST_F(IoTest,ZlibIoInputAutodetect)498 TEST_F(IoTest, ZlibIoInputAutodetect) {
499 const int kBufferSize = 2 * 1024;
500 uint8* buffer = new uint8[kBufferSize];
501 int size;
502 {
503 ArrayOutputStream output(buffer, kBufferSize);
504 GzipOutputStream::Options options;
505 options.format = GzipOutputStream::ZLIB;
506 GzipOutputStream gzout(&output, options);
507 WriteStuff(&gzout);
508 gzout.Close();
509 size = output.ByteCount();
510 }
511 {
512 ArrayInputStream input(buffer, size);
513 GzipInputStream gzin(&input, GzipInputStream::AUTO);
514 ReadStuff(&gzin);
515 }
516 {
517 ArrayOutputStream output(buffer, kBufferSize);
518 GzipOutputStream::Options options;
519 options.format = GzipOutputStream::GZIP;
520 GzipOutputStream gzout(&output, options);
521 WriteStuff(&gzout);
522 gzout.Close();
523 size = output.ByteCount();
524 }
525 {
526 ArrayInputStream input(buffer, size);
527 GzipInputStream gzin(&input, GzipInputStream::AUTO);
528 ReadStuff(&gzin);
529 }
530 delete[] buffer;
531 }
532
Compress(const std::string & data,const GzipOutputStream::Options & options)533 std::string IoTest::Compress(const std::string& data,
534 const GzipOutputStream::Options& options) {
535 std::string result;
536 {
537 StringOutputStream output(&result);
538 GzipOutputStream gzout(&output, options);
539 WriteToOutput(&gzout, data.data(), data.size());
540 }
541 return result;
542 }
543
Uncompress(const std::string & data)544 std::string IoTest::Uncompress(const std::string& data) {
545 std::string result;
546 {
547 ArrayInputStream input(data.data(), data.size());
548 GzipInputStream gzin(&input);
549 const void* buffer;
550 int size;
551 while (gzin.Next(&buffer, &size)) {
552 result.append(reinterpret_cast<const char*>(buffer), size);
553 }
554 }
555 return result;
556 }
557
TEST_F(IoTest,CompressionOptions)558 TEST_F(IoTest, CompressionOptions) {
559 // Some ad-hoc testing of compression options.
560
561 protobuf_unittest::TestAllTypes message;
562 TestUtil::SetAllFields(&message);
563 std::string golden = message.SerializeAsString();
564
565 GzipOutputStream::Options options;
566 std::string gzip_compressed = Compress(golden, options);
567
568 options.compression_level = 0;
569 std::string not_compressed = Compress(golden, options);
570
571 // Try zlib compression for fun.
572 options = GzipOutputStream::Options();
573 options.format = GzipOutputStream::ZLIB;
574 std::string zlib_compressed = Compress(golden, options);
575
576 // Uncompressed should be bigger than the original since it should have some
577 // sort of header.
578 EXPECT_GT(not_compressed.size(), golden.size());
579
580 // Higher compression levels should result in smaller sizes.
581 EXPECT_LT(zlib_compressed.size(), not_compressed.size());
582
583 // ZLIB format should differ from GZIP format.
584 EXPECT_TRUE(zlib_compressed != gzip_compressed);
585
586 // Everything should decompress correctly.
587 EXPECT_TRUE(Uncompress(not_compressed) == golden);
588 EXPECT_TRUE(Uncompress(gzip_compressed) == golden);
589 EXPECT_TRUE(Uncompress(zlib_compressed) == golden);
590 }
591
TEST_F(IoTest,TwoSessionWriteGzip)592 TEST_F(IoTest, TwoSessionWriteGzip) {
593 // Test that two concatenated gzip streams can be read correctly
594
595 static const char* strA = "0123456789";
596 static const char* strB = "QuickBrownFox";
597 const int kBufferSize = 2 * 1024;
598 uint8* buffer = new uint8[kBufferSize];
599 char* temp_buffer = new char[40];
600
601 for (int i = 0; i < kBlockSizeCount; i++) {
602 for (int j = 0; j < kBlockSizeCount; j++) {
603 ArrayOutputStream* output =
604 new ArrayOutputStream(buffer, kBufferSize, kBlockSizes[i]);
605 GzipOutputStream* gzout = new GzipOutputStream(output);
606 CodedOutputStream* coded_output = new CodedOutputStream(gzout);
607 int32 outlen = strlen(strA) + 1;
608 coded_output->WriteVarint32(outlen);
609 coded_output->WriteRaw(strA, outlen);
610 delete coded_output; // flush
611 delete gzout; // flush
612 int64 pos = output->ByteCount();
613 delete output;
614 output = new ArrayOutputStream(buffer + pos, kBufferSize - pos,
615 kBlockSizes[i]);
616 gzout = new GzipOutputStream(output);
617 coded_output = new CodedOutputStream(gzout);
618 outlen = strlen(strB) + 1;
619 coded_output->WriteVarint32(outlen);
620 coded_output->WriteRaw(strB, outlen);
621 delete coded_output; // flush
622 delete gzout; // flush
623 int64 size = pos + output->ByteCount();
624 delete output;
625
626 ArrayInputStream* input =
627 new ArrayInputStream(buffer, size, kBlockSizes[j]);
628 GzipInputStream* gzin = new GzipInputStream(input);
629 CodedInputStream* coded_input = new CodedInputStream(gzin);
630 uint32 insize;
631 EXPECT_TRUE(coded_input->ReadVarint32(&insize));
632 EXPECT_EQ(strlen(strA) + 1, insize);
633 EXPECT_TRUE(coded_input->ReadRaw(temp_buffer, insize));
634 EXPECT_EQ(0, memcmp(temp_buffer, strA, insize))
635 << "strA=" << strA << " in=" << temp_buffer;
636
637 EXPECT_TRUE(coded_input->ReadVarint32(&insize));
638 EXPECT_EQ(strlen(strB) + 1, insize);
639 EXPECT_TRUE(coded_input->ReadRaw(temp_buffer, insize));
640 EXPECT_EQ(0, memcmp(temp_buffer, strB, insize))
641 << " out_block_size=" << kBlockSizes[i]
642 << " in_block_size=" << kBlockSizes[j] << " pos=" << pos
643 << " size=" << size << " strB=" << strB << " in=" << temp_buffer;
644
645 delete coded_input;
646 delete gzin;
647 delete input;
648 }
649 }
650
651 delete[] temp_buffer;
652 delete[] buffer;
653 }
654
TEST_F(IoTest,GzipInputByteCountAfterClosed)655 TEST_F(IoTest, GzipInputByteCountAfterClosed) {
656 std::string golden = "abcdefghijklmnopqrstuvwxyz";
657 std::string compressed = Compress(golden, GzipOutputStream::Options());
658
659 for (int i = 0; i < kBlockSizeCount; i++) {
660 ArrayInputStream arr_input(compressed.data(), compressed.size(),
661 kBlockSizes[i]);
662 GzipInputStream gz_input(&arr_input);
663 const void* buffer;
664 int size;
665 while (gz_input.Next(&buffer, &size)) {
666 EXPECT_LE(gz_input.ByteCount(), golden.size());
667 }
668 EXPECT_EQ(golden.size(), gz_input.ByteCount());
669 }
670 }
671
TEST_F(IoTest,GzipInputByteCountAfterClosedConcatenatedStreams)672 TEST_F(IoTest, GzipInputByteCountAfterClosedConcatenatedStreams) {
673 std::string golden1 = "abcdefghijklmnopqrstuvwxyz";
674 std::string golden2 = "the quick brown fox jumps over the lazy dog";
675 const size_t total_size = golden1.size() + golden2.size();
676 std::string compressed = Compress(golden1, GzipOutputStream::Options()) +
677 Compress(golden2, GzipOutputStream::Options());
678
679 for (int i = 0; i < kBlockSizeCount; i++) {
680 ArrayInputStream arr_input(compressed.data(), compressed.size(),
681 kBlockSizes[i]);
682 GzipInputStream gz_input(&arr_input);
683 const void* buffer;
684 int size;
685 while (gz_input.Next(&buffer, &size)) {
686 EXPECT_LE(gz_input.ByteCount(), total_size);
687 }
688 EXPECT_EQ(total_size, gz_input.ByteCount());
689 }
690 }
691 #endif
692
693 // There is no string input, only string output. Also, it doesn't support
694 // explicit block sizes. So, we'll only run one test and we'll use
695 // ArrayInput to read back the results.
TEST_F(IoTest,StringIo)696 TEST_F(IoTest, StringIo) {
697 std::string str;
698 {
699 StringOutputStream output(&str);
700 WriteStuff(&output);
701 }
702 {
703 ArrayInputStream input(str.data(), str.size());
704 ReadStuff(&input);
705 }
706 }
707
708
TEST(DefaultReadCordTest,ReadSmallCord)709 TEST(DefaultReadCordTest, ReadSmallCord) {
710 std::string source = "abcdefghijk";
711 ArrayInputStream input(source.data(), source.size());
712
713 absl::Cord dest;
714 EXPECT_TRUE(input.Skip(1));
715 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
716
717 EXPECT_EQ(dest, "bcdefghij");
718 }
719
TEST(DefaultReadCordTest,ReadSmallCordAfterBackUp)720 TEST(DefaultReadCordTest, ReadSmallCordAfterBackUp) {
721 std::string source = "abcdefghijk";
722 ArrayInputStream input(source.data(), source.size());
723
724 absl::Cord dest;
725 const void* buffer;
726 int size;
727 EXPECT_TRUE(input.Next(&buffer, &size));
728 input.BackUp(size - 1);
729
730 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
731
732 EXPECT_EQ(dest, "bcdefghij");
733 }
734
TEST(DefaultReadCordTest,ReadLargeCord)735 TEST(DefaultReadCordTest, ReadLargeCord) {
736 std::string source = "abcdefghijk";
737 for (int i = 0; i < 1024; i++) {
738 source.append("abcdefghijk");
739 }
740
741 absl::Cord dest;
742 ArrayInputStream input(source.data(), source.size());
743 EXPECT_TRUE(input.Skip(1));
744 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
745
746 absl::Cord expected(source);
747 expected.RemovePrefix(1);
748 expected.RemoveSuffix(1);
749
750 EXPECT_EQ(expected, dest);
751 }
752
TEST(DefaultReadCordTest,ReadLargeCordAfterBackup)753 TEST(DefaultReadCordTest, ReadLargeCordAfterBackup) {
754 std::string source = "abcdefghijk";
755 for (int i = 0; i < 1024; i++) {
756 source.append("abcdefghijk");
757 }
758
759 absl::Cord dest;
760 ArrayInputStream input(source.data(), source.size());
761
762 const void* buffer;
763 int size;
764 EXPECT_TRUE(input.Next(&buffer, &size));
765 input.BackUp(size - 1);
766
767 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
768
769 absl::Cord expected(source);
770 expected.RemovePrefix(1);
771 expected.RemoveSuffix(1);
772
773 EXPECT_EQ(expected, dest);
774
775 EXPECT_TRUE(input.Next(&buffer, &size));
776 EXPECT_EQ("k", std::string(reinterpret_cast<const char*>(buffer), size));
777 }
778
TEST(DefaultReadCordTest,ReadCordEof)779 TEST(DefaultReadCordTest, ReadCordEof) {
780 std::string source = "abcdefghijk";
781
782 absl::Cord dest;
783 ArrayInputStream input(source.data(), source.size());
784 input.Skip(1);
785 EXPECT_FALSE(input.ReadCord(&dest, source.size()));
786
787 absl::Cord expected(source);
788 expected.RemovePrefix(1);
789 EXPECT_EQ(expected, dest);
790 }
791
TEST(DefaultWriteCordTest,WriteEmptyCordToArray)792 TEST(DefaultWriteCordTest, WriteEmptyCordToArray) {
793 absl::Cord source;
794 std::string buffer = "abc";
795 ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
796 EXPECT_TRUE(output.WriteCord(source));
797 EXPECT_EQ(output.ByteCount(), source.size());
798 EXPECT_EQ(buffer, "abc");
799 }
800
TEST(DefaultWriteCordTest,WriteSmallCord)801 TEST(DefaultWriteCordTest, WriteSmallCord) {
802 absl::Cord source("foo bar");
803
804 std::string buffer(source.size(), 'z');
805 ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
806 EXPECT_TRUE(output.WriteCord(source));
807 EXPECT_EQ(output.ByteCount(), source.size());
808 EXPECT_EQ(buffer, source);
809 }
810
TEST(DefaultWriteCordTest,WriteLargeCord)811 TEST(DefaultWriteCordTest, WriteLargeCord) {
812 absl::Cord source;
813 for (int i = 0; i < 1024; i++) {
814 source.Append("foo bar");
815 }
816 // Verify that we created a fragmented cord.
817 ASSERT_GT(std::distance(source.chunk_begin(), source.chunk_end()), 1);
818
819 std::string buffer(source.size(), 'z');
820 ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
821 EXPECT_TRUE(output.WriteCord(source));
822 EXPECT_EQ(output.ByteCount(), source.size());
823 EXPECT_EQ(buffer, source);
824 }
825
TEST(DefaultWriteCordTest,WriteTooLargeCord)826 TEST(DefaultWriteCordTest, WriteTooLargeCord) {
827 absl::Cord source;
828 for (int i = 0; i < 1024; i++) {
829 source.Append("foo bar");
830 }
831
832 std::string buffer(source.size() - 1, 'z');
833 ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
834 EXPECT_FALSE(output.WriteCord(source));
835 EXPECT_EQ(output.ByteCount(), buffer.size());
836 EXPECT_EQ(buffer, source.Subcord(0, output.ByteCount()));
837 }
838
TEST(CordInputStreamTest,SkipToEnd)839 TEST(CordInputStreamTest, SkipToEnd) {
840 absl::Cord source(std::string(10000, 'z'));
841 CordInputStream stream(&source);
842 EXPECT_TRUE(stream.Skip(10000));
843 EXPECT_EQ(stream.ByteCount(), 10000);
844 }
845
TEST_F(IoTest,CordIo)846 TEST_F(IoTest, CordIo) {
847 CordOutputStream output;
848 int size = WriteStuff(&output);
849 absl::Cord cord = output.Consume();
850 EXPECT_EQ(size, cord.size());
851
852 {
853 CordInputStream input(&cord);
854 ReadStuff(&input);
855 }
856 }
857
858 template <typename Container>
MakeFragmentedCord(const Container & c)859 absl::Cord MakeFragmentedCord(const Container& c) {
860 absl::Cord result;
861 for (const auto& s : c) {
862 absl::string_view sv(s);
863 auto buffer = absl::CordBuffer::CreateWithDefaultLimit(sv.size());
864 absl::Span<char> out = buffer.available_up_to(sv.size());
865 memcpy(out.data(), sv.data(), out.size());
866 buffer.SetLength(out.size());
867 result.Append(std::move(buffer));
868 }
869 return result;
870 }
871
872 // Test that we can read correctly from a fragmented Cord.
TEST_F(IoTest,FragmentedCordInput)873 TEST_F(IoTest, FragmentedCordInput) {
874 std::string str;
875 {
876 StringOutputStream output(&str);
877 WriteStuff(&output);
878 }
879
880 for (int i = 0; i < kBlockSizeCount; i++) {
881 int block_size = kBlockSizes[i];
882 if (block_size < 0) {
883 // Skip the -1 case.
884 continue;
885 }
886 absl::string_view str_piece = str;
887
888 // Create a fragmented cord by splitting the input into many cord
889 // functions.
890 std::vector<absl::string_view> fragments;
891 while (!str_piece.empty()) {
892 size_t n = std::min<size_t>(str_piece.size(), block_size);
893 fragments.push_back(str_piece.substr(0, n));
894 str_piece.remove_prefix(n);
895 }
896 absl::Cord fragmented_cord = MakeFragmentedCord(fragments);
897
898 CordInputStream input(&fragmented_cord);
899 ReadStuff(&input);
900 }
901 }
902
TEST_F(IoTest,ReadSmallCord)903 TEST_F(IoTest, ReadSmallCord) {
904 absl::Cord source;
905 source.Append("foo bar");
906
907 absl::Cord dest;
908 CordInputStream input(&source);
909 EXPECT_TRUE(input.Skip(1));
910 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
911
912 EXPECT_EQ(absl::Cord("oo ba"), dest);
913 }
914
TEST_F(IoTest,ReadSmallCordAfterBackUp)915 TEST_F(IoTest, ReadSmallCordAfterBackUp) {
916 absl::Cord source;
917 source.Append("foo bar");
918
919 absl::Cord dest;
920 CordInputStream input(&source);
921
922 const void* buffer;
923 int size;
924 EXPECT_TRUE(input.Next(&buffer, &size));
925 input.BackUp(size - 1);
926
927 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
928
929 EXPECT_EQ(absl::Cord("oo ba"), dest);
930 }
931
TEST_F(IoTest,ReadLargeCord)932 TEST_F(IoTest, ReadLargeCord) {
933 absl::Cord source;
934 for (int i = 0; i < 1024; i++) {
935 source.Append("foo bar");
936 }
937
938 absl::Cord dest;
939 CordInputStream input(&source);
940 EXPECT_TRUE(input.Skip(1));
941 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
942
943 absl::Cord expected = source;
944 expected.RemovePrefix(1);
945 expected.RemoveSuffix(1);
946
947 EXPECT_EQ(expected, dest);
948 }
949
TEST_F(IoTest,ReadLargeCordAfterBackUp)950 TEST_F(IoTest, ReadLargeCordAfterBackUp) {
951 absl::Cord source;
952 for (int i = 0; i < 1024; i++) {
953 source.Append("foo bar");
954 }
955
956 absl::Cord dest;
957 CordInputStream input(&source);
958
959 const void* buffer;
960 int size;
961 EXPECT_TRUE(input.Next(&buffer, &size));
962 input.BackUp(size - 1);
963
964 EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));
965
966 absl::Cord expected = source;
967 expected.RemovePrefix(1);
968 expected.RemoveSuffix(1);
969
970 EXPECT_EQ(expected, dest);
971
972 EXPECT_TRUE(input.Next(&buffer, &size));
973 EXPECT_EQ("r", std::string(reinterpret_cast<const char*>(buffer), size));
974 }
975
TEST_F(IoTest,ReadCordEof)976 TEST_F(IoTest, ReadCordEof) {
977 absl::Cord source;
978 source.Append("foo bar");
979
980 absl::Cord dest;
981 CordInputStream input(&source);
982 input.Skip(1);
983 EXPECT_FALSE(input.ReadCord(&dest, source.size()));
984
985 absl::Cord expected = source;
986 expected.RemovePrefix(1);
987 EXPECT_EQ(expected, dest);
988 }
989
TEST(CordOutputStreamTest,Empty)990 TEST(CordOutputStreamTest, Empty) {
991 CordOutputStream output;
992 EXPECT_TRUE(output.Consume().empty());
993 }
994
TEST(CordOutputStreamTest,ConsumesCordClearingState)995 TEST(CordOutputStreamTest, ConsumesCordClearingState) {
996 CordOutputStream output(absl::Cord("abcdef"));
997 EXPECT_EQ(output.Consume(), "abcdef");
998 EXPECT_TRUE(output.Consume().empty());
999 }
1000
TEST(CordOutputStreamTest,DonateEmptyCordBuffer)1001 TEST(CordOutputStreamTest, DonateEmptyCordBuffer) {
1002 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1003 absl::Span<char> available = buffer.available();
1004 void* available_data = available.data();
1005 CordOutputStream output(std::move(buffer));
1006 void* data;
1007 int size;
1008 EXPECT_TRUE(output.Next(&data, &size));
1009 EXPECT_EQ(data, available_data);
1010 EXPECT_EQ(size, static_cast<int>(available.size()));
1011 memset(data, 'a', static_cast<size_t>(size));
1012
1013 absl::Cord cord = output.Consume();
1014 ASSERT_TRUE(cord.TryFlat());
1015 absl::string_view flat = *cord.TryFlat();
1016 EXPECT_EQ(flat, std::string(static_cast<size_t>(size), 'a'));
1017 EXPECT_EQ(flat.data(), available_data);
1018 }
1019
TEST(CordOutputStreamTest,DonatePartialCordBuffer)1020 TEST(CordOutputStreamTest, DonatePartialCordBuffer) {
1021 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1022 absl::Span<char> available = buffer.available();
1023 memset(available.data(), 'a', 100);
1024 buffer.IncreaseLengthBy(100);
1025 void* available_data = available.data();
1026 CordOutputStream output(std::move(buffer));
1027
1028 absl::Cord cord = output.Consume();
1029 ASSERT_TRUE(cord.TryFlat());
1030 absl::string_view flat = *cord.TryFlat();
1031 EXPECT_EQ(flat, std::string(100, 'a'));
1032 EXPECT_EQ(flat.data(), available_data);
1033 }
1034
TEST(CordOutputStreamTest,DonatePartialCordBufferAndUseExtraCapacity)1035 TEST(CordOutputStreamTest, DonatePartialCordBufferAndUseExtraCapacity) {
1036 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1037 absl::Span<char> available = buffer.available();
1038 memset(available.data(), 'a', 100);
1039 buffer.IncreaseLengthBy(100);
1040 void* available_data = available.data();
1041 void* next_available_data = available.data() + 100;
1042 CordOutputStream output(std::move(buffer));
1043 void* data;
1044 int size;
1045 EXPECT_TRUE(output.Next(&data, &size));
1046 EXPECT_EQ(data, next_available_data);
1047 EXPECT_EQ(size, static_cast<int>(available.size() - 100));
1048 memset(data, 'b', static_cast<size_t>(size));
1049
1050 absl::Cord cord = output.Consume();
1051 ASSERT_TRUE(cord.TryFlat());
1052 absl::string_view flat = *cord.TryFlat();
1053 EXPECT_EQ(flat, std::string(100, 'a') +
1054 std::string(static_cast<size_t>(size), 'b'));
1055 EXPECT_EQ(flat.data(), available_data);
1056 }
1057
TEST(CordOutputStreamTest,DonateCordAndPartialCordBufferAndUseExtraCapacity)1058 TEST(CordOutputStreamTest, DonateCordAndPartialCordBufferAndUseExtraCapacity) {
1059 absl::Cord cord(std::string(400, 'a'));
1060 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1061 absl::Span<char> available = buffer.available();
1062 memset(available.data(), 'b', 100);
1063 buffer.IncreaseLengthBy(100);
1064 void* next_available_data = available.data() + 100;
1065 CordOutputStream output(std::move(cord), std::move(buffer));
1066 void* data;
1067 int size;
1068 EXPECT_TRUE(output.Next(&data, &size));
1069 EXPECT_EQ(data, next_available_data);
1070 EXPECT_EQ(size, static_cast<int>(available.size() - 100));
1071 memset(data, 'c', static_cast<size_t>(size));
1072
1073 cord = output.Consume();
1074 EXPECT_FALSE(cord.TryFlat());
1075 EXPECT_EQ(cord, std::string(400, 'a') + std::string(100, 'b') +
1076 std::string(static_cast<size_t>(size), 'c'));
1077 }
1078
TEST(CordOutputStreamTest,DonateFullCordBuffer)1079 TEST(CordOutputStreamTest, DonateFullCordBuffer) {
1080 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1081 absl::Span<char> available = buffer.available();
1082 memset(available.data(), 'a', available.size());
1083 buffer.IncreaseLengthBy(available.size());
1084 CordOutputStream output(std::move(buffer));
1085 void* data;
1086 int size;
1087 EXPECT_TRUE(output.Next(&data, &size));
1088 memset(data, 'b', static_cast<size_t>(size));
1089
1090 absl::Cord cord = output.Consume();
1091 EXPECT_FALSE(cord.TryFlat());
1092 EXPECT_EQ(cord, std::string(available.size(), 'a') +
1093 std::string(static_cast<size_t>(size), 'b'));
1094 }
1095
TEST(CordOutputStreamTest,DonateFullCordBufferAndCord)1096 TEST(CordOutputStreamTest, DonateFullCordBufferAndCord) {
1097 absl::Cord cord(std::string(400, 'a'));
1098 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1099 absl::Span<char> available = buffer.available();
1100 memset(available.data(), 'b', available.size());
1101 buffer.IncreaseLengthBy(available.size());
1102 CordOutputStream output(std::move(cord), std::move(buffer));
1103 void* data;
1104 int size;
1105 EXPECT_TRUE(output.Next(&data, &size));
1106 memset(data, 'c', static_cast<size_t>(size));
1107
1108 cord = output.Consume();
1109 EXPECT_FALSE(cord.TryFlat());
1110 EXPECT_EQ(cord, std::string(400, 'a') +
1111 std::string(available.size(), 'b') +
1112 std::string(static_cast<size_t>(size), 'c'));
1113 }
1114
TEST(CordOutputStreamTest,DonateFullCordBufferAndBackup)1115 TEST(CordOutputStreamTest, DonateFullCordBufferAndBackup) {
1116 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1117 absl::Span<char> available = buffer.available();
1118 memset(available.data(), 'a', available.size());
1119 buffer.IncreaseLengthBy(available.size());
1120
1121 // We back up by 100 before calling Next()
1122 void* available_data = available.data();
1123 void* next_available_data = available.data() + available.size() - 100;
1124 CordOutputStream output(std::move(buffer));
1125 output.BackUp(100);
1126
1127 void* data;
1128 int size;
1129 EXPECT_TRUE(output.Next(&data, &size));
1130 EXPECT_EQ(data, next_available_data);
1131 EXPECT_EQ(size, 100);
1132 memset(data, 'b', 100);
1133
1134 absl::Cord cord = output.Consume();
1135 ASSERT_TRUE(cord.TryFlat());
1136 absl::string_view flat = *cord.TryFlat();
1137 EXPECT_EQ(flat,
1138 std::string(available.size() - 100, 'a') + std::string(100, 'b'));
1139 EXPECT_EQ(flat.data(), available_data);
1140 }
1141
TEST(CordOutputStreamTest,DonateCordAndFullCordBufferAndBackup)1142 TEST(CordOutputStreamTest, DonateCordAndFullCordBufferAndBackup) {
1143 absl::Cord cord(std::string(400, 'a'));
1144 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(500);
1145 absl::Span<char> available = buffer.available();
1146 memset(available.data(), 'b', available.size());
1147 buffer.IncreaseLengthBy(available.size());
1148
1149 // We back up by 100 before calling Next()
1150 void* next_available_data = available.data() + available.size() - 100;
1151 CordOutputStream output(std::move(cord), std::move(buffer));
1152 output.BackUp(100);
1153
1154 void* data;
1155 int size;
1156 EXPECT_TRUE(output.Next(&data, &size));
1157 EXPECT_EQ(data, next_available_data);
1158 EXPECT_EQ(size, 100);
1159 memset(data, 'c', 100);
1160
1161 cord = output.Consume();
1162 EXPECT_FALSE(cord.TryFlat());
1163 EXPECT_EQ(cord, std::string(400, 'a') +
1164 std::string(available.size() - 100, 'b') +
1165 std::string(100, 'c'));
1166 }
1167
TEST(CordOutputStreamTest,ProperHintCreatesSingleFlatCord)1168 TEST(CordOutputStreamTest, ProperHintCreatesSingleFlatCord) {
1169 CordOutputStream output(2000);
1170 void* data;
1171 int size;
1172 ASSERT_TRUE(output.Next(&data, &size));
1173 ASSERT_EQ(size, 2000);
1174 memset(data, 'a', 2000);
1175
1176 absl::Cord cord = output.Consume();
1177 ASSERT_TRUE(cord.TryFlat());
1178 absl::string_view flat = *cord.TryFlat();
1179 EXPECT_EQ(flat, std::string(2000, 'a'));
1180 }
1181
TEST(CordOutputStreamTest,SizeHintDictatesTotalSize)1182 TEST(CordOutputStreamTest, SizeHintDictatesTotalSize) {
1183 absl::Cord cord(std::string(500, 'a'));
1184 CordOutputStream output(std::move(cord), 2000);
1185 void* data;
1186 int size;
1187
1188 int remaining = 1500;
1189 while (remaining > 0) {
1190 ASSERT_TRUE(output.Next(&data, &size));
1191 ASSERT_LE(size, remaining);
1192 memset(data, 'b', static_cast<size_t>(size));
1193 remaining -= size;
1194 }
1195 ASSERT_EQ(remaining, 0);
1196
1197 cord = output.Consume();
1198 EXPECT_EQ(cord, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
1199 }
1200
TEST(CordOutputStreamTest,BackUpReusesPartialBuffer)1201 TEST(CordOutputStreamTest, BackUpReusesPartialBuffer) {
1202 CordOutputStream output(2000);
1203 void* data;
1204 int size;
1205
1206 ASSERT_TRUE(output.Next(&data, &size));
1207 ASSERT_EQ(size, 2000);
1208 memset(data, '1', 100);
1209 output.BackUp(1900);
1210
1211 ASSERT_TRUE(output.Next(&data, &size));
1212 ASSERT_EQ(size, 1900);
1213 memset(data, '2', 200);
1214 output.BackUp(1700);
1215
1216 ASSERT_TRUE(output.Next(&data, &size));
1217 ASSERT_EQ(size, 1700);
1218 memset(data, '3', 400);
1219 output.BackUp(1300);
1220
1221 ASSERT_TRUE(output.Next(&data, &size));
1222 ASSERT_EQ(size, 1300);
1223 memset(data, '4', 1300);
1224
1225 absl::Cord cord = output.Consume();
1226 ASSERT_TRUE(cord.TryFlat());
1227 absl::string_view flat = *cord.TryFlat();
1228 EXPECT_EQ(flat, absl::StrCat(std::string(100, '1'), std::string(200, '2'),
1229 std::string(400, '3'), std::string(1300, '4')));
1230 }
1231
TEST(CordOutputStreamTest,UsesPrivateCapacityInDonatedCord)1232 TEST(CordOutputStreamTest, UsesPrivateCapacityInDonatedCord) {
1233 absl::Cord cord;
1234 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000);
1235 memset(buffer.data(), 'a', 500);
1236 buffer.SetLength(500);
1237 cord.Append(std::move(buffer));
1238
1239 CordOutputStream output(std::move(cord), 2000);
1240 void* data;
1241 int size;
1242
1243 ASSERT_TRUE(output.Next(&data, &size));
1244 ASSERT_EQ(size, 1500);
1245 memset(data, 'b', 1500);
1246
1247 cord = output.Consume();
1248 ASSERT_TRUE(cord.TryFlat());
1249 absl::string_view flat = *cord.TryFlat();
1250 EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
1251 }
1252
TEST(CordOutputStreamTest,UsesPrivateCapacityInAppendedCord)1253 TEST(CordOutputStreamTest, UsesPrivateCapacityInAppendedCord) {
1254 absl::Cord cord;
1255 absl::CordBuffer buffer = absl::CordBuffer::CreateWithDefaultLimit(2000);
1256 memset(buffer.data(), 'a', 500);
1257 buffer.SetLength(500);
1258 cord.Append(std::move(buffer));
1259
1260 CordOutputStream output(2000);
1261 void* data;
1262 int size;
1263
1264 // Add cord. Clearing it makes it privately owned by 'output' as it's non
1265 // trivial size guarantees it is ref counted, not deep copied.
1266 output.WriteCord(cord);
1267 cord.Clear();
1268
1269 ASSERT_TRUE(output.Next(&data, &size));
1270 ASSERT_EQ(size, 1500);
1271 memset(data, 'b', 1500);
1272
1273 cord = output.Consume();
1274 ASSERT_TRUE(cord.TryFlat());
1275 absl::string_view flat = *cord.TryFlat();
1276 EXPECT_EQ(flat, absl::StrCat(std::string(500, 'a'), std::string(1500, 'b')));
1277 }
1278
TEST(CordOutputStreamTest,CapsSizeAtHintButUsesCapacityBeyondHint)1279 TEST(CordOutputStreamTest, CapsSizeAtHintButUsesCapacityBeyondHint) {
1280 // This tests verifies that when we provide a hint of 'x' bytes, that the
1281 // returned size from Next() will be capped at 'size_hint', but that if we
1282 // exceed size_hint, it will use the capacity in any internal buffer beyond
1283 // the size hint. We test this by providing a hint that is too large to be
1284 // inlined, but so small that we have a guarantee it's smaller than the
1285 // minimum flat size so we will have a 'capped' larger buffer as state.
1286 size_t size_hint = sizeof(absl::Cord) + 1;
1287 CordOutputStream output(size_hint);
1288 void* data;
1289 int size;
1290
1291 ASSERT_TRUE(output.Next(&data, &size));
1292 ASSERT_EQ(size, size_hint);
1293 memset(data, 'a', static_cast<size_t>(size));
1294
1295 ASSERT_TRUE(output.Next(&data, &size));
1296 memset(data, 'b', static_cast<size_t>(size));
1297
1298 // We should have received the same buffer on each Next() call
1299 absl::Cord cord = output.Consume();
1300 ASSERT_TRUE(cord.TryFlat());
1301 absl::string_view flat = *cord.TryFlat();
1302 EXPECT_EQ(flat, absl::StrCat(std::string(size_hint, 'a'),
1303 std::string(static_cast<size_t>(size), 'b')));
1304 }
1305
TEST(CordOutputStreamTest,SizeDoublesWithoutHint)1306 TEST(CordOutputStreamTest, SizeDoublesWithoutHint) {
1307 CordOutputStream output;
1308 void* data;
1309 int size;
1310
1311 // Whitebox: we are guaranteed at least 128 bytes initially. We also assume
1312 // that the maximum size is roughly 4KiB - overhead without being precise.
1313 int min_size = 128;
1314 const int max_size = 4000;
1315 ASSERT_TRUE(output.Next(&data, &size));
1316 memset(data, 0, static_cast<size_t>(size));
1317 ASSERT_GE(size, min_size);
1318
1319 for (int i = 0; i < 6; ++i) {
1320 ASSERT_TRUE(output.Next(&data, &size));
1321 memset(data, 0, static_cast<size_t>(size));
1322 ASSERT_GE(size, min_size);
1323 min_size = (std::min)(min_size * 2, max_size);
1324 }
1325 }
1326
TEST_F(IoTest,WriteSmallCord)1327 TEST_F(IoTest, WriteSmallCord) {
1328 absl::Cord source;
1329 source.Append("foo bar");
1330
1331 CordOutputStream output(absl::Cord("existing:"));
1332 EXPECT_TRUE(output.WriteCord(source));
1333 absl::Cord cord = output.Consume();
1334 EXPECT_EQ(absl::Cord("existing:foo bar"), cord);
1335 }
1336
TEST_F(IoTest,WriteLargeCord)1337 TEST_F(IoTest, WriteLargeCord) {
1338 absl::Cord source;
1339 for (int i = 0; i < 1024; i++) {
1340 source.Append("foo bar");
1341 }
1342
1343 CordOutputStream output(absl::Cord("existing:"));
1344 EXPECT_TRUE(output.WriteCord(source));
1345 absl::Cord cord = output.Consume();
1346
1347 absl::Cord expected = source;
1348 expected.Prepend("existing:");
1349 EXPECT_EQ(expected, cord);
1350 }
1351
1352 // Test that large size hints lead to large block sizes.
TEST_F(IoTest,CordOutputSizeHint)1353 TEST_F(IoTest, CordOutputSizeHint) {
1354 CordOutputStream output1;
1355 CordOutputStream output2(12345);
1356
1357 void* data1;
1358 void* data2;
1359 int size1, size2;
1360 ASSERT_TRUE(output1.Next(&data1, &size1));
1361 ASSERT_TRUE(output2.Next(&data2, &size2));
1362
1363 // Prevent 'unflushed output' debug checks and warnings
1364 output1.BackUp(size1);
1365 output2.BackUp(size2);
1366
1367 EXPECT_GT(size2, size1);
1368
1369 // Prevent any warnings on unused or unflushed data
1370 output1.Consume();
1371 output2.Consume();
1372 }
1373
1374 // Test that when we use a size hint, we get a buffer boundary exactly on that
1375 // byte.
TEST_F(IoTest,CordOutputBufferEndsAtSizeHint)1376 TEST_F(IoTest, CordOutputBufferEndsAtSizeHint) {
1377 static const int kSizeHint = 12345;
1378
1379 CordOutputStream output(kSizeHint);
1380
1381 void* data;
1382 int size;
1383 int total_read = 0;
1384
1385 while (total_read < kSizeHint) {
1386 ASSERT_TRUE(output.Next(&data, &size));
1387 memset(data, 0, static_cast<size_t>(size)); // Avoid uninitialized data UB
1388 total_read += size;
1389 }
1390
1391 EXPECT_EQ(kSizeHint, total_read);
1392
1393 // We should be able to keep going past the size hint.
1394 ASSERT_TRUE(output.Next(&data, &size));
1395 EXPECT_GT(size, 0);
1396
1397 // Prevent any warnings on unused or unflushed data
1398 output.Consume();
1399 }
1400
1401
1402 // To test files, we create a temporary file, write, read, truncate, repeat.
TEST_F(IoTest,FileIo)1403 TEST_F(IoTest, FileIo) {
1404 std::string filename =
1405 absl::StrCat(::testing::TempDir(), "/zero_copy_stream_test_file");
1406
1407 for (int i = 0; i < kBlockSizeCount; i++) {
1408 for (int j = 0; j < kBlockSizeCount; j++) {
1409 // Make a temporary file.
1410 int file =
1411 open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0777);
1412 ASSERT_GE(file, 0);
1413
1414 {
1415 FileOutputStream output(file, kBlockSizes[i]);
1416 WriteStuff(&output);
1417 EXPECT_EQ(0, output.GetErrno());
1418 }
1419
1420 // Rewind.
1421 ASSERT_NE(lseek(file, 0, SEEK_SET), (off_t)-1);
1422
1423 {
1424 FileInputStream input(file, kBlockSizes[j]);
1425 ReadStuff(&input);
1426 EXPECT_EQ(0, input.GetErrno());
1427 }
1428
1429 close(file);
1430 }
1431 }
1432 }
1433
1434 #ifndef _WIN32
1435 // This tests the FileInputStream with a non blocking file. It opens a pipe in
1436 // non blocking mode, then starts reading it. The writing thread starts writing
1437 // 100ms after that.
TEST_F(IoTest,NonBlockingFileIo)1438 TEST_F(IoTest, NonBlockingFileIo) {
1439 for (int i = 0; i < kBlockSizeCount; i++) {
1440 for (int j = 0; j < kBlockSizeCount; j++) {
1441 int fd[2];
1442 // On Linux we could use pipe2 to make the pipe non-blocking in one step,
1443 // but we instead use pipe and fcntl because pipe2 is not available on
1444 // Mac OS.
1445 ASSERT_EQ(pipe(fd), 0);
1446 ASSERT_EQ(fcntl(fd[0], F_SETFL, O_NONBLOCK), 0);
1447 ASSERT_EQ(fcntl(fd[1], F_SETFL, O_NONBLOCK), 0);
1448
1449 std::mutex go_write;
1450 go_write.lock();
1451
1452 bool done_reading = false;
1453
1454 std::thread write_thread([this, fd, &go_write, i]() {
1455 go_write.lock();
1456 go_write.unlock();
1457 FileOutputStream output(fd[1], kBlockSizes[i]);
1458 WriteStuff(&output);
1459 EXPECT_EQ(0, output.GetErrno());
1460 });
1461
1462 std::thread read_thread([this, fd, &done_reading, j]() {
1463 FileInputStream input(fd[0], kBlockSizes[j]);
1464 ReadStuff(&input, false /* read_eof */);
1465 done_reading = true;
1466 close(fd[0]);
1467 close(fd[1]);
1468 EXPECT_EQ(0, input.GetErrno());
1469 });
1470
1471 // Sleeping is not necessary but makes the next expectation relevant: the
1472 // reading thread waits for the data to be available before returning.
1473 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1474 EXPECT_FALSE(done_reading);
1475 go_write.unlock();
1476 write_thread.join();
1477 read_thread.join();
1478 EXPECT_TRUE(done_reading);
1479 }
1480 }
1481 }
1482
TEST_F(IoTest,BlockingFileIoWithTimeout)1483 TEST_F(IoTest, BlockingFileIoWithTimeout) {
1484 int fd[2];
1485
1486 for (int i = 0; i < kBlockSizeCount; i++) {
1487 ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, fd), 0);
1488 struct timeval tv {
1489 .tv_sec = 0, .tv_usec = 5000
1490 };
1491 ASSERT_EQ(setsockopt(fd[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)), 0);
1492 FileInputStream input(fd[0], kBlockSizes[i]);
1493 uint8_t byte;
1494 EXPECT_EQ(ReadFromInput(&input, &byte, 1), 0);
1495 EXPECT_EQ(EAGAIN, input.GetErrno());
1496 }
1497 }
1498 #endif
1499
1500 #if HAVE_ZLIB
TEST_F(IoTest,GzipFileIo)1501 TEST_F(IoTest, GzipFileIo) {
1502 std::string filename =
1503 absl::StrCat(::testing::TempDir(), "/zero_copy_stream_test_file");
1504
1505 for (int i = 0; i < kBlockSizeCount; i++) {
1506 for (int j = 0; j < kBlockSizeCount; j++) {
1507 // Make a temporary file.
1508 int file =
1509 open(filename.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0777);
1510 ASSERT_GE(file, 0);
1511 {
1512 FileOutputStream output(file, kBlockSizes[i]);
1513 GzipOutputStream gzout(&output);
1514 WriteStuffLarge(&gzout);
1515 gzout.Close();
1516 output.Flush();
1517 EXPECT_EQ(0, output.GetErrno());
1518 }
1519
1520 // Rewind.
1521 ASSERT_NE(lseek(file, 0, SEEK_SET), (off_t)-1);
1522
1523 {
1524 FileInputStream input(file, kBlockSizes[j]);
1525 GzipInputStream gzin(&input);
1526 ReadStuffLarge(&gzin);
1527 EXPECT_EQ(0, input.GetErrno());
1528 }
1529
1530 close(file);
1531 }
1532 }
1533 }
1534 #endif
1535
1536 // MSVC raises various debugging exceptions if we try to use a file
1537 // descriptor of -1, defeating our tests below. This class will disable
1538 // these debug assertions while in scope.
1539 class MsvcDebugDisabler {
1540 public:
1541 #if defined(_MSC_VER) && _MSC_VER >= 1400
MsvcDebugDisabler()1542 MsvcDebugDisabler() {
1543 old_handler_ = _set_invalid_parameter_handler(MyHandler);
1544 old_mode_ = _CrtSetReportMode(_CRT_ASSERT, 0);
1545 }
~MsvcDebugDisabler()1546 ~MsvcDebugDisabler() {
1547 old_handler_ = _set_invalid_parameter_handler(old_handler_);
1548 old_mode_ = _CrtSetReportMode(_CRT_ASSERT, old_mode_);
1549 }
1550
MyHandler(const wchar_t * expr,const wchar_t * func,const wchar_t * file,unsigned int line,uintptr_t pReserved)1551 static void MyHandler(const wchar_t* expr, const wchar_t* func,
1552 const wchar_t* file, unsigned int line,
1553 uintptr_t pReserved) {
1554 // do nothing
1555 }
1556
1557 _invalid_parameter_handler old_handler_;
1558 int old_mode_;
1559 #else
1560 // Dummy constructor and destructor to ensure that GCC doesn't complain
1561 // that debug_disabler is an unused variable.
1562 MsvcDebugDisabler() {}
1563 ~MsvcDebugDisabler() {}
1564 #endif
1565 };
1566
1567 // Test that FileInputStreams report errors correctly.
TEST_F(IoTest,FileReadError)1568 TEST_F(IoTest, FileReadError) {
1569 MsvcDebugDisabler debug_disabler;
1570
1571 // -1 = invalid file descriptor.
1572 FileInputStream input(-1);
1573
1574 const void* buffer;
1575 int size;
1576 EXPECT_FALSE(input.Next(&buffer, &size));
1577 EXPECT_EQ(EBADF, input.GetErrno());
1578 }
1579
1580 // Test that FileOutputStreams report errors correctly.
TEST_F(IoTest,FileWriteError)1581 TEST_F(IoTest, FileWriteError) {
1582 MsvcDebugDisabler debug_disabler;
1583
1584 // -1 = invalid file descriptor.
1585 FileOutputStream input(-1);
1586
1587 void* buffer;
1588 int size;
1589
1590 // The first call to Next() succeeds because it doesn't have anything to
1591 // write yet.
1592 EXPECT_TRUE(input.Next(&buffer, &size));
1593
1594 // Second call fails.
1595 EXPECT_FALSE(input.Next(&buffer, &size));
1596
1597 EXPECT_EQ(EBADF, input.GetErrno());
1598 }
1599
1600 // Pipes are not seekable, so File{Input,Output}Stream ends up doing some
1601 // different things to handle them. We'll test by writing to a pipe and
1602 // reading back from it.
TEST_F(IoTest,PipeIo)1603 TEST_F(IoTest, PipeIo) {
1604 int files[2];
1605
1606 for (int i = 0; i < kBlockSizeCount; i++) {
1607 for (int j = 0; j < kBlockSizeCount; j++) {
1608 // Need to create a new pipe each time because ReadStuff() expects
1609 // to see EOF at the end.
1610 ASSERT_EQ(pipe(files), 0);
1611
1612 {
1613 FileOutputStream output(files[1], kBlockSizes[i]);
1614 WriteStuff(&output);
1615 EXPECT_EQ(0, output.GetErrno());
1616 }
1617 close(files[1]); // Send EOF.
1618
1619 {
1620 FileInputStream input(files[0], kBlockSizes[j]);
1621 ReadStuff(&input);
1622 EXPECT_EQ(0, input.GetErrno());
1623 }
1624 close(files[0]);
1625 }
1626 }
1627 }
1628
1629 // Test using C++ iostreams.
TEST_F(IoTest,IostreamIo)1630 TEST_F(IoTest, IostreamIo) {
1631 for (int i = 0; i < kBlockSizeCount; i++) {
1632 for (int j = 0; j < kBlockSizeCount; j++) {
1633 {
1634 std::stringstream stream;
1635
1636 {
1637 OstreamOutputStream output(&stream, kBlockSizes[i]);
1638 WriteStuff(&output);
1639 EXPECT_FALSE(stream.fail());
1640 }
1641
1642 {
1643 IstreamInputStream input(&stream, kBlockSizes[j]);
1644 ReadStuff(&input);
1645 EXPECT_TRUE(stream.eof());
1646 }
1647 }
1648
1649 {
1650 std::stringstream stream;
1651
1652 {
1653 OstreamOutputStream output(&stream, kBlockSizes[i]);
1654 WriteStuffLarge(&output);
1655 EXPECT_FALSE(stream.fail());
1656 }
1657
1658 {
1659 IstreamInputStream input(&stream, kBlockSizes[j]);
1660 ReadStuffLarge(&input);
1661 EXPECT_TRUE(stream.eof());
1662 }
1663 }
1664 }
1665 }
1666 }
1667
1668 // To test ConcatenatingInputStream, we create several ArrayInputStreams
1669 // covering a buffer and then concatenate them.
TEST_F(IoTest,ConcatenatingInputStream)1670 TEST_F(IoTest, ConcatenatingInputStream) {
1671 const int kBufferSize = 256;
1672 uint8_t buffer[kBufferSize];
1673
1674 // Fill the buffer.
1675 ArrayOutputStream output(buffer, kBufferSize);
1676 WriteStuff(&output);
1677
1678 // Now split it up into multiple streams of varying sizes.
1679 ASSERT_EQ(68, output.ByteCount()); // Test depends on this.
1680 ArrayInputStream input1(buffer, 12);
1681 ArrayInputStream input2(buffer + 12, 7);
1682 ArrayInputStream input3(buffer + 19, 6);
1683 ArrayInputStream input4(buffer + 25, 15);
1684 ArrayInputStream input5(buffer + 40, 0);
1685 // Note: We want to make sure we have a stream boundary somewhere between
1686 // bytes 42 and 62, which is the range that it Skip()ed by ReadStuff(). This
1687 // tests that a bug that existed in the original code for Skip() is fixed.
1688 ArrayInputStream input6(buffer + 40, 10);
1689 ArrayInputStream input7(buffer + 50, 18); // Total = 68 bytes.
1690
1691 ZeroCopyInputStream* streams[] = {&input1, &input2, &input3, &input4,
1692 &input5, &input6, &input7};
1693
1694 // Create the concatenating stream and read.
1695 ConcatenatingInputStream input(streams, ABSL_ARRAYSIZE(streams));
1696 ReadStuff(&input);
1697 }
1698
1699 // To test LimitingInputStream, we write our golden text to a buffer, then
1700 // create an ArrayInputStream that contains the whole buffer (not just the
1701 // bytes written), then use a LimitingInputStream to limit it just to the
1702 // bytes written.
TEST_F(IoTest,LimitingInputStream)1703 TEST_F(IoTest, LimitingInputStream) {
1704 const int kBufferSize = 256;
1705 uint8_t buffer[kBufferSize];
1706
1707 // Fill the buffer.
1708 ArrayOutputStream output(buffer, kBufferSize);
1709 WriteStuff(&output);
1710
1711 // Set up input.
1712 ArrayInputStream array_input(buffer, kBufferSize);
1713 LimitingInputStream input(&array_input, output.ByteCount());
1714
1715 ReadStuff(&input);
1716 }
1717
1718 // Checks that ByteCount works correctly for LimitingInputStreams where the
1719 // underlying stream has already been read.
TEST_F(IoTest,LimitingInputStreamByteCount)1720 TEST_F(IoTest, LimitingInputStreamByteCount) {
1721 const int kHalfBufferSize = 128;
1722 const int kBufferSize = kHalfBufferSize * 2;
1723 uint8_t buffer[kBufferSize] = {};
1724
1725 // Set up input. Only allow half to be read at once.
1726 ArrayInputStream array_input(buffer, kBufferSize, kHalfBufferSize);
1727 const void* data;
1728 int size;
1729 EXPECT_TRUE(array_input.Next(&data, &size));
1730 EXPECT_EQ(kHalfBufferSize, array_input.ByteCount());
1731 // kHalfBufferSize - 1 to test limiting logic as well.
1732 LimitingInputStream input(&array_input, kHalfBufferSize - 1);
1733 EXPECT_EQ(0, input.ByteCount());
1734 EXPECT_TRUE(input.Next(&data, &size));
1735 EXPECT_EQ(kHalfBufferSize - 1, input.ByteCount());
1736 }
1737
1738 // Check that a zero-size array doesn't confuse the code.
TEST(ZeroSizeArray,Input)1739 TEST(ZeroSizeArray, Input) {
1740 ArrayInputStream input(nullptr, 0);
1741 const void* data;
1742 int size;
1743 EXPECT_FALSE(input.Next(&data, &size));
1744 }
1745
TEST(ZeroSizeArray,Output)1746 TEST(ZeroSizeArray, Output) {
1747 ArrayOutputStream output(nullptr, 0);
1748 void* data;
1749 int size;
1750 EXPECT_FALSE(output.Next(&data, &size));
1751 }
1752
1753 } // namespace
1754 } // namespace io
1755 } // namespace protobuf
1756 } // namespace google
1757
1758 #include "google/protobuf/port_undef.inc"
1759