1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_stream/mpsc_stream.h"
16
17 #include "pw_containers/vector.h"
18 #include "pw_fuzzer/fuzztest.h"
19 #include "pw_random/xor_shift.h"
20 #include "pw_thread/test_thread_context.h"
21 #include "pw_thread/thread.h"
22 #include "pw_unit_test/framework.h"
23
24 namespace pw::stream {
25 namespace {
26
27 using namespace std::chrono_literals;
28 using namespace pw::fuzzer;
29
30 ////////////////////////////////////////////////////////////////////////////////
31 // Test fixtures.
32
33 /// Capacity in bytes for data buffers.
34 constexpr size_t kBufSize = 512;
35
36 /// Fills a byte span with random data.
Fill(std::byte * buf,size_t len)37 void Fill(std::byte* buf, size_t len) {
38 ByteSpan data(buf, len);
39 random::XorShiftStarRng64 rng(1);
40 rng.Get(data);
41 }
42
43 /// FNV-1a offset basis.
44 constexpr uint64_t kOffsetBasis = 0xcbf29ce484222325ULL;
45
46 /// FNV-1a prime value.
47 constexpr uint64_t kPrimeValue = 0x100000001b3ULL;
48
49 /// Quick implementation of public-domain Fowler-Noll-Vo hashing algorithm.
50 ///
51 /// This is used in the tests below to verify equality of two sequences of bytes
52 /// that are too large to compare directly.
53 ///
54 /// See http://www.isthe.com/chongo/tech/comp/fnv/index.html
fnv1a(ConstByteSpan bytes,uint64_t & hash)55 void fnv1a(ConstByteSpan bytes, uint64_t& hash) {
56 for (const auto& b : bytes) {
57 hash = (hash ^ static_cast<uint8_t>(b)) * kPrimeValue;
58 }
59 }
60
61 /// MpscStream test context that uses a generic reader.
62 ///
63 /// This struct associates a reader and writer with their parameters and return
64 /// values. This is useful for communicating with threads spawned to call a
65 /// blocking method.
66 struct MpscTestContext {
67 MpscWriter writer;
68 MpscReader reader;
69
70 ConstByteSpan data;
71 std::byte write_buffer[kBufSize];
72 uint64_t write_hash = kOffsetBasis;
73 Status write_status;
74
75 ByteSpan destination;
76 std::byte read_buffer[kBufSize];
77 Result<ByteSpan> read_result;
78 uint64_t read_hash = kOffsetBasis;
79 size_t total_read = 0;
80
MpscTestContextpw::stream::__anonc89951cb0111::MpscTestContext81 MpscTestContext() {
82 data = ConstByteSpan(write_buffer);
83 destination = ByteSpan(read_buffer);
84 }
85
Connectpw::stream::__anonc89951cb0111::MpscTestContext86 void Connect() { CreateMpscStream(reader, writer); }
87
88 // Fills a byte span with random data.
Fillpw::stream::__anonc89951cb0111::MpscTestContext89 void Fill() { pw::stream::Fill(write_buffer, sizeof(write_buffer)); }
90
91 // Writes data using the writer.
Writepw::stream::__anonc89951cb0111::MpscTestContext92 void Write() {
93 fnv1a(data, write_hash);
94 write_status = writer.Write(data);
95 }
96
97 // Writes data repeatedly up to the writer's limit.
WriteAllpw::stream::__anonc89951cb0111::MpscTestContext98 void WriteAll() {
99 size_t limit = writer.ConservativeWriteLimit();
100 ASSERT_NE(limit, 0U);
101 ASSERT_NE(limit, Stream::kUnlimited);
102 while (limit != 0) {
103 if (limit < kBufSize) {
104 data = data.subspan(0, limit);
105 }
106 Fill();
107 Write();
108 if (!write_status.ok()) {
109 break;
110 }
111 limit = writer.ConservativeWriteLimit();
112 }
113 }
114
115 // Reads data using the reader.
Readpw::stream::__anonc89951cb0111::MpscTestContext116 void Read() {
117 read_result = reader.Read(destination);
118 if (read_result.ok()) {
119 fnv1a(*read_result, write_hash);
120 total_read += read_result->size();
121 }
122 }
123
124 // Run the given function on a dedicated thread.
125 using ThreadBody = Function<void(MpscTestContext* ctx)>;
Spawnpw::stream::__anonc89951cb0111::MpscTestContext126 void Spawn(ThreadBody func) {
127 body_ = std::move(func);
128 thread_ = thread::Thread(context_.options(), [this]() { body_(this); });
129 }
130
131 // Waits for the spawned thread to complete.
Joinpw::stream::__anonc89951cb0111::MpscTestContext132 void Join() { thread_.join(); }
133
134 private:
135 thread::Thread thread_;
136 thread::test::TestThreadContext context_;
137 ThreadBody body_;
138 };
139
140 ////////////////////////////////////////////////////////////////////////////////
141 // Unit tests.
142
TEST(MpscStreamTest,CopyWriters)143 TEST(MpscStreamTest, CopyWriters) {
144 MpscTestContext ctx;
145 ctx.Connect();
146 EXPECT_TRUE(ctx.reader.connected());
147 EXPECT_TRUE(ctx.writer.connected());
148
149 MpscWriter writer2(ctx.writer);
150 EXPECT_TRUE(ctx.reader.connected());
151 EXPECT_TRUE(ctx.writer.connected());
152 EXPECT_TRUE(writer2.connected());
153
154 MpscWriter writer3 = writer2;
155 EXPECT_TRUE(ctx.reader.connected());
156 EXPECT_TRUE(ctx.writer.connected());
157 EXPECT_TRUE(writer2.connected());
158 EXPECT_TRUE(writer3.connected());
159
160 ctx.writer.Close();
161 writer2.Close();
162 EXPECT_TRUE(ctx.reader.connected());
163 EXPECT_FALSE(ctx.writer.connected());
164 EXPECT_FALSE(writer2.connected());
165 EXPECT_TRUE(writer3.connected());
166 }
167
TEST(MpscStreamTest,MoveWriters)168 TEST(MpscStreamTest, MoveWriters) {
169 MpscTestContext ctx;
170 ctx.Connect();
171 EXPECT_TRUE(ctx.reader.connected());
172 EXPECT_TRUE(ctx.writer.connected());
173
174 MpscWriter writer2(std::move(ctx.writer));
175 EXPECT_TRUE(ctx.reader.connected());
176 EXPECT_TRUE(writer2.connected());
177
178 MpscWriter writer3 = std::move(writer2);
179 EXPECT_TRUE(ctx.reader.connected());
180 EXPECT_TRUE(writer3.connected());
181
182 // Only writer3 should be connected.
183 writer3.Close();
184 EXPECT_FALSE(writer3.connected());
185 EXPECT_FALSE(ctx.reader.connected());
186 }
187
TEST(MpscStreamTest,ReadFailsIfDisconnected)188 TEST(MpscStreamTest, ReadFailsIfDisconnected) {
189 MpscTestContext ctx;
190 ctx.Connect();
191
192 ctx.writer.Close();
193 ctx.Read();
194 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
195 }
196
TEST(MpscStreamTest,ReadBlocksWhenEmpty)197 TEST(MpscStreamTest, ReadBlocksWhenEmpty) {
198 MpscTestContext ctx;
199 ctx.Connect();
200 ctx.reader.SetTimeout(10ms);
201
202 auto start = chrono::SystemClock::now();
203 ctx.Read();
204 auto elapsed = chrono::SystemClock::now() - start;
205
206 EXPECT_EQ(ctx.read_result.status(), Status::ResourceExhausted());
207 EXPECT_GE(elapsed, 10ms);
208 }
209
TEST(MpscStreamTest,ReadReturnsAfterReaderClose)210 TEST(MpscStreamTest, ReadReturnsAfterReaderClose) {
211 MpscTestContext ctx;
212 ctx.Connect();
213
214 ctx.Spawn([](MpscTestContext* inner) { inner->Read(); });
215 ctx.reader.Close();
216 ctx.Join();
217
218 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
219 }
220
TEST(MpscStreamTest,WriteBlocksUntilTimeout)221 TEST(MpscStreamTest, WriteBlocksUntilTimeout) {
222 MpscTestContext ctx;
223 ctx.Connect();
224 ctx.writer.SetTimeout(10ms);
225 ctx.Fill();
226
227 auto start = chrono::SystemClock::now();
228 ctx.Write();
229 auto elapsed = chrono::SystemClock::now() - start;
230
231 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
232 EXPECT_GE(elapsed, 10ms);
233 }
234
TEST(MpscStreamTest,WriteReturnsAfterClose)235 TEST(MpscStreamTest, WriteReturnsAfterClose) {
236 MpscTestContext ctx;
237 ctx.Connect();
238
239 ctx.Fill();
240 ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
241 ctx.reader.Close();
242 ctx.Join();
243
244 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
245 }
246
VerifyRoundtripImpl(const Vector<std::byte> & data,ByteSpan buffer)247 void VerifyRoundtripImpl(const Vector<std::byte>& data, ByteSpan buffer) {
248 MpscTestContext ctx;
249 ctx.Connect();
250
251 ctx.reader.SetBuffer(buffer);
252 ctx.data = ConstByteSpan(data.data(), data.size());
253 ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
254 size_t offset = 0;
255 while (offset < data.size()) {
256 ctx.Read();
257 ASSERT_EQ(ctx.read_result.status(), OkStatus());
258 size_t num_read = ctx.read_result->size();
259 EXPECT_EQ(memcmp(ctx.read_buffer, &data[offset], num_read), 0);
260 offset += num_read;
261 }
262 ctx.Join();
263 }
264
265 template <size_t kCapacity>
FillAndVerifyRoundtripImpl(ByteSpan buffer)266 void FillAndVerifyRoundtripImpl(ByteSpan buffer) {
267 Vector<std::byte, kCapacity> data;
268 Fill(data.data(), data.size());
269 VerifyRoundtripImpl(data, buffer);
270 }
271
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferSmall)272 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferSmall) {
273 FillAndVerifyRoundtripImpl<kBufSize / 2>(ByteSpan());
274 }
275
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferLarge)276 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferLarge) {
277 FillAndVerifyRoundtripImpl<kBufSize * 2>(ByteSpan());
278 }
279
VerifyRoundtripWithoutBuffer(const Vector<std::byte> & data)280 void VerifyRoundtripWithoutBuffer(const Vector<std::byte>& data) {
281 VerifyRoundtripImpl(data, ByteSpan());
282 }
283 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithoutBuffer)
284 .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
285
TEST(MpscStreamTest,VerifyRoundtripWithBufferSmall)286 TEST(MpscStreamTest, VerifyRoundtripWithBufferSmall) {
287 std::byte buffer[kBufSize];
288 FillAndVerifyRoundtripImpl<kBufSize / 2>(buffer);
289 }
290
TEST(MpscStreamTest,VerifyRoundtripWithBufferLarge)291 TEST(MpscStreamTest, VerifyRoundtripWithBufferLarge) {
292 std::byte buffer[kBufSize];
293 FillAndVerifyRoundtripImpl<kBufSize * 2>(buffer);
294 }
295
VerifyRoundtripWithBuffer(const Vector<std::byte> & data)296 void VerifyRoundtripWithBuffer(const Vector<std::byte>& data) {
297 std::byte buffer[kBufSize];
298 VerifyRoundtripImpl(data, buffer);
299 }
300 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithBuffer)
301 .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
302
TEST(MpscStreamTest,CanRetryAfterPartialWrite)303 TEST(MpscStreamTest, CanRetryAfterPartialWrite) {
304 constexpr size_t kChunk = kBufSize - 4;
305 MpscTestContext ctx;
306 ctx.Connect();
307 ctx.writer.SetTimeout(10ms);
308 ByteSpan destination = ctx.destination;
309
310 ctx.Spawn([](MpscTestContext* inner) {
311 inner->Fill();
312 inner->Write();
313 });
314 ctx.destination = destination.subspan(0, kChunk);
315 ctx.Read();
316 ctx.Join();
317 EXPECT_EQ(ctx.read_result.status(), OkStatus());
318 EXPECT_EQ(ctx.read_result->size(), kChunk);
319 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
320 EXPECT_EQ(ctx.writer.last_write(), kChunk);
321
322 ctx.Spawn([](MpscTestContext* inner) {
323 inner->data = inner->data.subspan(kChunk);
324 inner->Write();
325 });
326 ctx.destination = destination.subspan(kChunk);
327 ctx.Read();
328 ctx.Join();
329 EXPECT_EQ(ctx.read_result.status(), OkStatus());
330 EXPECT_EQ(ctx.read_result->size(), 4U);
331 EXPECT_EQ(ctx.write_status, OkStatus());
332 EXPECT_EQ(ctx.writer.last_write(), 4U);
333
334 EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
335 }
336
TEST(MpscStreamTest,CannotReadAfterReaderClose)337 TEST(MpscStreamTest, CannotReadAfterReaderClose) {
338 MpscTestContext ctx;
339 ctx.Connect();
340 ctx.reader.Close();
341 ctx.Read();
342 EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
343 }
344
TEST(MpscStreamTest,CanReadAfterWriterCloses)345 TEST(MpscStreamTest, CanReadAfterWriterCloses) {
346 MpscTestContext ctx;
347 ctx.Connect();
348 std::byte buffer[kBufSize];
349 ctx.reader.SetBuffer(buffer);
350 ctx.Fill();
351 ctx.Write();
352 EXPECT_EQ(ctx.write_status, OkStatus());
353 ctx.writer.Close();
354
355 ctx.Read();
356 ASSERT_EQ(ctx.read_result.status(), OkStatus());
357 ASSERT_EQ(ctx.read_result->size(), kBufSize);
358 EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
359 }
360
TEST(MpscStreamTest,CannotWriteAfterWriterClose)361 TEST(MpscStreamTest, CannotWriteAfterWriterClose) {
362 MpscTestContext ctx;
363 ctx.Connect();
364 ctx.Fill();
365 ctx.writer.Close();
366 ctx.Write();
367 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
368 }
369
TEST(MpscStreamTest,CannotWriteAfterReaderClose)370 TEST(MpscStreamTest, CannotWriteAfterReaderClose) {
371 MpscTestContext ctx;
372 ctx.Connect();
373 ctx.Fill();
374 ctx.reader.Close();
375 ctx.Write();
376 EXPECT_EQ(ctx.write_status, Status::OutOfRange());
377 }
378
TEST(MpscStreamTest,MultipleWriters)379 TEST(MpscStreamTest, MultipleWriters) {
380 MpscTestContext ctx1;
381 ctx1.Connect();
382 Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
383 ctx1.data = ByteSpan(data1.data(), data1.size());
384
385 MpscTestContext ctx2;
386 ctx2.writer = ctx1.writer;
387 Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
388 ctx2.data = ByteSpan(data2.data(), data2.size());
389
390 MpscTestContext ctx3;
391 ctx3.writer = ctx1.writer;
392 Vector<std::byte, kBufSize * 3> data3(kBufSize * 3, std::byte(3));
393 ctx3.data = ByteSpan(data3.data(), data3.size());
394
395 // Start all threads.
396 ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
397 ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
398 ctx3.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
399
400 // The loop below keeps track of how many contiguous values are read, in order
401 // to verify that writes are not split or interleaved.
402 size_t expected[4] = {0, data1.size(), data2.size(), data3.size()};
403 size_t actual[4] = {0};
404
405 size_t total_read = 0;
406 auto current = std::byte(0);
407 size_t num_current = 0;
408 while (total_read < data1.size() + data2.size() + data3.size()) {
409 ctx1.Read();
410 if (!ctx1.read_result.ok()) {
411 break;
412 }
413 size_t num_read = ctx1.read_result->size();
414 for (size_t i = 0; i < num_read; ++i) {
415 if (current == ctx1.read_buffer[i]) {
416 ++num_current;
417 continue;
418 }
419 actual[size_t(current)] = num_current;
420 current = ctx1.read_buffer[i];
421 num_current = 1;
422 }
423 actual[size_t(current)] = num_current;
424 total_read += num_read;
425 }
426 ctx1.reader.Close();
427 ctx1.Join();
428 ctx2.Join();
429 ctx3.Join();
430 ASSERT_EQ(ctx1.read_result.status(), OkStatus());
431 for (size_t i = 0; i < 4; ++i) {
432 EXPECT_EQ(actual[i], expected[i]);
433 }
434 }
435
TEST(MpscStreamTest,GetAndSetLimits)436 TEST(MpscStreamTest, GetAndSetLimits) {
437 MpscReader reader;
438 EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
439
440 MpscWriter writer;
441 EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
442
443 CreateMpscStream(reader, writer);
444 EXPECT_EQ(reader.ConservativeReadLimit(), Stream::kUnlimited);
445 EXPECT_EQ(writer.ConservativeWriteLimit(), Stream::kUnlimited);
446
447 writer.SetLimit(10);
448 EXPECT_EQ(reader.ConservativeReadLimit(), 10U);
449 EXPECT_EQ(writer.ConservativeWriteLimit(), 10U);
450
451 writer.Close();
452 EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
453 EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
454 }
455
TEST(MpscStreamTest,ReaderAggregatesLimit)456 TEST(MpscStreamTest, ReaderAggregatesLimit) {
457 MpscTestContext ctx;
458 ctx.Connect();
459 ctx.writer.SetLimit(10);
460
461 MpscWriter writer2 = ctx.writer;
462 writer2.SetLimit(20);
463
464 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 30U);
465
466 ctx.writer.SetLimit(Stream::kUnlimited);
467 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
468
469 writer2.SetLimit(40);
470 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
471
472 ctx.writer.SetLimit(0);
473 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 40U);
474 }
475
TEST(MpscStreamTest,ReadingUpdatesLimit)476 TEST(MpscStreamTest, ReadingUpdatesLimit) {
477 MpscTestContext ctx;
478 ctx.Connect();
479
480 constexpr size_t kChunk = kBufSize - 4;
481 std::byte buffer[kBufSize];
482 ctx.reader.SetBuffer(buffer);
483 ctx.Fill();
484 ctx.writer.SetLimit(kBufSize);
485 ctx.Write();
486 EXPECT_EQ(ctx.write_status, OkStatus());
487
488 ctx.destination = ByteSpan(ctx.read_buffer, kChunk);
489 ctx.Read();
490 EXPECT_EQ(ctx.read_result.status(), OkStatus());
491 EXPECT_EQ(ctx.read_result->size(), kChunk);
492 EXPECT_EQ(ctx.reader.ConservativeReadLimit(), kBufSize - kChunk);
493 }
494
TEST(MpscStreamTest,CannotWriteMoreThanLimit)495 TEST(MpscStreamTest, CannotWriteMoreThanLimit) {
496 MpscTestContext ctx;
497 ctx.Connect();
498
499 std::byte buffer[kBufSize];
500 ctx.reader.SetBuffer(buffer);
501 ctx.writer.SetLimit(kBufSize - 1);
502 ctx.Fill();
503 ctx.Write();
504 EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
505 }
506
TEST(MpscStreamTest,WritersCanCloseAutomatically)507 TEST(MpscStreamTest, WritersCanCloseAutomatically) {
508 MpscTestContext ctx1;
509 ctx1.Connect();
510 Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
511 ctx1.writer.SetLimit(data1.size());
512 ctx1.data = ByteSpan(data1.data(), data1.size());
513
514 MpscTestContext ctx2;
515 ctx2.writer = ctx1.writer;
516 Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
517 ctx2.writer.SetLimit(data2.size());
518 ctx2.data = ByteSpan(data2.data(), data2.size());
519
520 // Start all threads.
521 EXPECT_TRUE(ctx1.reader.connected());
522 EXPECT_TRUE(ctx1.writer.connected());
523 EXPECT_TRUE(ctx2.writer.connected());
524
525 ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
526 ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
527
528 size_t total = 0;
529 while (ctx1.reader.ConservativeReadLimit() != 0) {
530 ctx1.Read();
531 EXPECT_EQ(ctx1.read_result.status(), OkStatus());
532 if (!ctx1.read_result.ok()) {
533 ctx1.reader.Close();
534 break;
535 }
536 total += ctx1.read_result->size();
537 }
538 EXPECT_EQ(total, data1.size() + data2.size());
539 ctx1.Join();
540 ctx2.Join();
541 EXPECT_FALSE(ctx1.reader.connected());
542 EXPECT_FALSE(ctx1.writer.connected());
543 EXPECT_FALSE(ctx2.writer.connected());
544 }
545
TEST(MpscStreamTest,ReadAllWithoutBuffer)546 TEST(MpscStreamTest, ReadAllWithoutBuffer) {
547 MpscTestContext ctx;
548 Status status = ctx.reader.ReadAll([](ConstByteSpan) { return OkStatus(); });
549 EXPECT_EQ(status, Status::FailedPrecondition());
550 }
551
TEST(MpscStreamTest,ReadAll)552 TEST(MpscStreamTest, ReadAll) {
553 MpscTestContext ctx;
554 ctx.Connect();
555
556 std::byte buffer[kBufSize];
557 ctx.reader.SetBuffer(buffer);
558 ctx.writer.SetLimit(kBufSize * 100);
559 ctx.Spawn([](MpscTestContext* inner) { inner->WriteAll(); });
560
561 Status status = ctx.reader.ReadAll([&ctx](ConstByteSpan data) {
562 ctx.total_read += data.size();
563 fnv1a(data, ctx.read_hash);
564 return OkStatus();
565 });
566 ctx.Join();
567
568 EXPECT_EQ(status, OkStatus());
569 EXPECT_FALSE(ctx.reader.connected());
570 EXPECT_EQ(ctx.total_read, kBufSize * 100);
571 EXPECT_EQ(ctx.read_hash, ctx.write_hash);
572 }
573
TEST(MpscStreamTest,BufferedMpscReader)574 TEST(MpscStreamTest, BufferedMpscReader) {
575 BufferedMpscReader<kBufSize> reader;
576 MpscWriter writer;
577 CreateMpscStream(reader, writer);
578
579 // `kBufSize` writes of 1 byte each should fit without blocking.
580 for (size_t i = 0; i < kBufSize; ++i) {
581 std::byte b{static_cast<uint8_t>(i)};
582 EXPECT_EQ(writer.Write(ConstByteSpan(&b, 1)), OkStatus());
583 }
584
585 std::byte rx_buffer[kBufSize];
586 auto result = reader.Read(ByteSpan(rx_buffer));
587 ASSERT_EQ(result.status(), OkStatus());
588 ASSERT_EQ(result->size(), kBufSize);
589 for (size_t i = 0; i < kBufSize; ++i) {
590 EXPECT_EQ(rx_buffer[i], std::byte(i));
591 }
592 }
593
594 } // namespace
595 } // namespace pw::stream
596