• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_stream/mpsc_stream.h"
16 
17 #include "pw_containers/vector.h"
18 #include "pw_fuzzer/fuzztest.h"
19 #include "pw_random/xor_shift.h"
20 #include "pw_thread/test_thread_context.h"
21 #include "pw_thread/thread.h"
22 #include "pw_unit_test/framework.h"
23 
24 namespace pw::stream {
25 namespace {
26 
27 using namespace std::chrono_literals;
28 using namespace pw::fuzzer;
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 // Test fixtures.
32 
33 /// Capacity in bytes for data buffers.
34 constexpr size_t kBufSize = 512;
35 
36 /// Fills a byte span with random data.
Fill(std::byte * buf,size_t len)37 void Fill(std::byte* buf, size_t len) {
38   ByteSpan data(buf, len);
39   random::XorShiftStarRng64 rng(1);
40   rng.Get(data);
41 }
42 
43 /// FNV-1a offset basis.
44 constexpr uint64_t kOffsetBasis = 0xcbf29ce484222325ULL;
45 
46 /// FNV-1a prime value.
47 constexpr uint64_t kPrimeValue = 0x100000001b3ULL;
48 
49 /// Quick implementation of public-domain Fowler-Noll-Vo hashing algorithm.
50 ///
51 /// This is used in the tests below to verify equality of two sequences of bytes
52 /// that are too large to compare directly.
53 ///
54 /// See http://www.isthe.com/chongo/tech/comp/fnv/index.html
fnv1a(ConstByteSpan bytes,uint64_t & hash)55 void fnv1a(ConstByteSpan bytes, uint64_t& hash) {
56   for (const auto& b : bytes) {
57     hash = (hash ^ static_cast<uint8_t>(b)) * kPrimeValue;
58   }
59 }
60 
61 /// MpscStream test context that uses a generic reader.
62 ///
63 /// This struct associates a reader and writer with their parameters and return
64 /// values. This is useful for communicating with threads spawned to call a
65 /// blocking method.
66 struct MpscTestContext {
67   MpscWriter writer;
68   MpscReader reader;
69 
70   ConstByteSpan data;
71   std::byte write_buffer[kBufSize];
72   uint64_t write_hash = kOffsetBasis;
73   Status write_status;
74 
75   ByteSpan destination;
76   std::byte read_buffer[kBufSize];
77   Result<ByteSpan> read_result;
78   uint64_t read_hash = kOffsetBasis;
79   size_t total_read = 0;
80 
MpscTestContextpw::stream::__anonc89951cb0111::MpscTestContext81   MpscTestContext() {
82     data = ConstByteSpan(write_buffer);
83     destination = ByteSpan(read_buffer);
84   }
85 
Connectpw::stream::__anonc89951cb0111::MpscTestContext86   void Connect() { CreateMpscStream(reader, writer); }
87 
88   // Fills a byte span with random data.
Fillpw::stream::__anonc89951cb0111::MpscTestContext89   void Fill() { pw::stream::Fill(write_buffer, sizeof(write_buffer)); }
90 
91   // Writes data using the writer.
Writepw::stream::__anonc89951cb0111::MpscTestContext92   void Write() {
93     fnv1a(data, write_hash);
94     write_status = writer.Write(data);
95   }
96 
97   // Writes data repeatedly up to the writer's limit.
WriteAllpw::stream::__anonc89951cb0111::MpscTestContext98   void WriteAll() {
99     size_t limit = writer.ConservativeWriteLimit();
100     ASSERT_NE(limit, 0U);
101     ASSERT_NE(limit, Stream::kUnlimited);
102     while (limit != 0) {
103       if (limit < kBufSize) {
104         data = data.subspan(0, limit);
105       }
106       Fill();
107       Write();
108       if (!write_status.ok()) {
109         break;
110       }
111       limit = writer.ConservativeWriteLimit();
112     }
113   }
114 
115   // Reads data using the reader.
Readpw::stream::__anonc89951cb0111::MpscTestContext116   void Read() {
117     read_result = reader.Read(destination);
118     if (read_result.ok()) {
119       fnv1a(*read_result, write_hash);
120       total_read += read_result->size();
121     }
122   }
123 
124   // Run the given function on a dedicated thread.
125   using ThreadBody = Function<void(MpscTestContext* ctx)>;
Spawnpw::stream::__anonc89951cb0111::MpscTestContext126   void Spawn(ThreadBody func) {
127     body_ = std::move(func);
128     thread_ = thread::Thread(context_.options(), [this]() { body_(this); });
129   }
130 
131   // Waits for the spawned thread to complete.
Joinpw::stream::__anonc89951cb0111::MpscTestContext132   void Join() { thread_.join(); }
133 
134  private:
135   thread::Thread thread_;
136   thread::test::TestThreadContext context_;
137   ThreadBody body_;
138 };
139 
140 ////////////////////////////////////////////////////////////////////////////////
141 // Unit tests.
142 
TEST(MpscStreamTest,CopyWriters)143 TEST(MpscStreamTest, CopyWriters) {
144   MpscTestContext ctx;
145   ctx.Connect();
146   EXPECT_TRUE(ctx.reader.connected());
147   EXPECT_TRUE(ctx.writer.connected());
148 
149   MpscWriter writer2(ctx.writer);
150   EXPECT_TRUE(ctx.reader.connected());
151   EXPECT_TRUE(ctx.writer.connected());
152   EXPECT_TRUE(writer2.connected());
153 
154   MpscWriter writer3 = writer2;
155   EXPECT_TRUE(ctx.reader.connected());
156   EXPECT_TRUE(ctx.writer.connected());
157   EXPECT_TRUE(writer2.connected());
158   EXPECT_TRUE(writer3.connected());
159 
160   ctx.writer.Close();
161   writer2.Close();
162   EXPECT_TRUE(ctx.reader.connected());
163   EXPECT_FALSE(ctx.writer.connected());
164   EXPECT_FALSE(writer2.connected());
165   EXPECT_TRUE(writer3.connected());
166 }
167 
TEST(MpscStreamTest,MoveWriters)168 TEST(MpscStreamTest, MoveWriters) {
169   MpscTestContext ctx;
170   ctx.Connect();
171   EXPECT_TRUE(ctx.reader.connected());
172   EXPECT_TRUE(ctx.writer.connected());
173 
174   MpscWriter writer2(std::move(ctx.writer));
175   EXPECT_TRUE(ctx.reader.connected());
176   EXPECT_TRUE(writer2.connected());
177 
178   MpscWriter writer3 = std::move(writer2);
179   EXPECT_TRUE(ctx.reader.connected());
180   EXPECT_TRUE(writer3.connected());
181 
182   // Only writer3 should be connected.
183   writer3.Close();
184   EXPECT_FALSE(writer3.connected());
185   EXPECT_FALSE(ctx.reader.connected());
186 }
187 
TEST(MpscStreamTest,ReadFailsIfDisconnected)188 TEST(MpscStreamTest, ReadFailsIfDisconnected) {
189   MpscTestContext ctx;
190   ctx.Connect();
191 
192   ctx.writer.Close();
193   ctx.Read();
194   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
195 }
196 
TEST(MpscStreamTest,ReadBlocksWhenEmpty)197 TEST(MpscStreamTest, ReadBlocksWhenEmpty) {
198   MpscTestContext ctx;
199   ctx.Connect();
200   ctx.reader.SetTimeout(10ms);
201 
202   auto start = chrono::SystemClock::now();
203   ctx.Read();
204   auto elapsed = chrono::SystemClock::now() - start;
205 
206   EXPECT_EQ(ctx.read_result.status(), Status::ResourceExhausted());
207   EXPECT_GE(elapsed, 10ms);
208 }
209 
TEST(MpscStreamTest,ReadReturnsAfterReaderClose)210 TEST(MpscStreamTest, ReadReturnsAfterReaderClose) {
211   MpscTestContext ctx;
212   ctx.Connect();
213 
214   ctx.Spawn([](MpscTestContext* inner) { inner->Read(); });
215   ctx.reader.Close();
216   ctx.Join();
217 
218   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
219 }
220 
TEST(MpscStreamTest,WriteBlocksUntilTimeout)221 TEST(MpscStreamTest, WriteBlocksUntilTimeout) {
222   MpscTestContext ctx;
223   ctx.Connect();
224   ctx.writer.SetTimeout(10ms);
225   ctx.Fill();
226 
227   auto start = chrono::SystemClock::now();
228   ctx.Write();
229   auto elapsed = chrono::SystemClock::now() - start;
230 
231   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
232   EXPECT_GE(elapsed, 10ms);
233 }
234 
TEST(MpscStreamTest,WriteReturnsAfterClose)235 TEST(MpscStreamTest, WriteReturnsAfterClose) {
236   MpscTestContext ctx;
237   ctx.Connect();
238 
239   ctx.Fill();
240   ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
241   ctx.reader.Close();
242   ctx.Join();
243 
244   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
245 }
246 
VerifyRoundtripImpl(const Vector<std::byte> & data,ByteSpan buffer)247 void VerifyRoundtripImpl(const Vector<std::byte>& data, ByteSpan buffer) {
248   MpscTestContext ctx;
249   ctx.Connect();
250 
251   ctx.reader.SetBuffer(buffer);
252   ctx.data = ConstByteSpan(data.data(), data.size());
253   ctx.Spawn([](MpscTestContext* inner) { inner->Write(); });
254   size_t offset = 0;
255   while (offset < data.size()) {
256     ctx.Read();
257     ASSERT_EQ(ctx.read_result.status(), OkStatus());
258     size_t num_read = ctx.read_result->size();
259     EXPECT_EQ(memcmp(ctx.read_buffer, &data[offset], num_read), 0);
260     offset += num_read;
261   }
262   ctx.Join();
263 }
264 
265 template <size_t kCapacity>
FillAndVerifyRoundtripImpl(ByteSpan buffer)266 void FillAndVerifyRoundtripImpl(ByteSpan buffer) {
267   Vector<std::byte, kCapacity> data;
268   Fill(data.data(), data.size());
269   VerifyRoundtripImpl(data, buffer);
270 }
271 
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferSmall)272 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferSmall) {
273   FillAndVerifyRoundtripImpl<kBufSize / 2>(ByteSpan());
274 }
275 
TEST(MpscStreamTest,VerifyRoundtripWithoutBufferLarge)276 TEST(MpscStreamTest, VerifyRoundtripWithoutBufferLarge) {
277   FillAndVerifyRoundtripImpl<kBufSize * 2>(ByteSpan());
278 }
279 
VerifyRoundtripWithoutBuffer(const Vector<std::byte> & data)280 void VerifyRoundtripWithoutBuffer(const Vector<std::byte>& data) {
281   VerifyRoundtripImpl(data, ByteSpan());
282 }
283 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithoutBuffer)
284     .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
285 
TEST(MpscStreamTest,VerifyRoundtripWithBufferSmall)286 TEST(MpscStreamTest, VerifyRoundtripWithBufferSmall) {
287   std::byte buffer[kBufSize];
288   FillAndVerifyRoundtripImpl<kBufSize / 2>(buffer);
289 }
290 
TEST(MpscStreamTest,VerifyRoundtripWithBufferLarge)291 TEST(MpscStreamTest, VerifyRoundtripWithBufferLarge) {
292   std::byte buffer[kBufSize];
293   FillAndVerifyRoundtripImpl<kBufSize * 2>(buffer);
294 }
295 
VerifyRoundtripWithBuffer(const Vector<std::byte> & data)296 void VerifyRoundtripWithBuffer(const Vector<std::byte>& data) {
297   std::byte buffer[kBufSize];
298   VerifyRoundtripImpl(data, buffer);
299 }
300 FUZZ_TEST(MpscStreamTest, VerifyRoundtripWithBuffer)
301     .WithDomains(VectorOf<kBufSize * 2>(Arbitrary<std::byte>()).WithMinSize(1));
302 
TEST(MpscStreamTest,CanRetryAfterPartialWrite)303 TEST(MpscStreamTest, CanRetryAfterPartialWrite) {
304   constexpr size_t kChunk = kBufSize - 4;
305   MpscTestContext ctx;
306   ctx.Connect();
307   ctx.writer.SetTimeout(10ms);
308   ByteSpan destination = ctx.destination;
309 
310   ctx.Spawn([](MpscTestContext* inner) {
311     inner->Fill();
312     inner->Write();
313   });
314   ctx.destination = destination.subspan(0, kChunk);
315   ctx.Read();
316   ctx.Join();
317   EXPECT_EQ(ctx.read_result.status(), OkStatus());
318   EXPECT_EQ(ctx.read_result->size(), kChunk);
319   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
320   EXPECT_EQ(ctx.writer.last_write(), kChunk);
321 
322   ctx.Spawn([](MpscTestContext* inner) {
323     inner->data = inner->data.subspan(kChunk);
324     inner->Write();
325   });
326   ctx.destination = destination.subspan(kChunk);
327   ctx.Read();
328   ctx.Join();
329   EXPECT_EQ(ctx.read_result.status(), OkStatus());
330   EXPECT_EQ(ctx.read_result->size(), 4U);
331   EXPECT_EQ(ctx.write_status, OkStatus());
332   EXPECT_EQ(ctx.writer.last_write(), 4U);
333 
334   EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
335 }
336 
TEST(MpscStreamTest,CannotReadAfterReaderClose)337 TEST(MpscStreamTest, CannotReadAfterReaderClose) {
338   MpscTestContext ctx;
339   ctx.Connect();
340   ctx.reader.Close();
341   ctx.Read();
342   EXPECT_EQ(ctx.read_result.status(), Status::OutOfRange());
343 }
344 
TEST(MpscStreamTest,CanReadAfterWriterCloses)345 TEST(MpscStreamTest, CanReadAfterWriterCloses) {
346   MpscTestContext ctx;
347   ctx.Connect();
348   std::byte buffer[kBufSize];
349   ctx.reader.SetBuffer(buffer);
350   ctx.Fill();
351   ctx.Write();
352   EXPECT_EQ(ctx.write_status, OkStatus());
353   ctx.writer.Close();
354 
355   ctx.Read();
356   ASSERT_EQ(ctx.read_result.status(), OkStatus());
357   ASSERT_EQ(ctx.read_result->size(), kBufSize);
358   EXPECT_EQ(memcmp(ctx.write_buffer, ctx.read_buffer, kBufSize), 0);
359 }
360 
TEST(MpscStreamTest,CannotWriteAfterWriterClose)361 TEST(MpscStreamTest, CannotWriteAfterWriterClose) {
362   MpscTestContext ctx;
363   ctx.Connect();
364   ctx.Fill();
365   ctx.writer.Close();
366   ctx.Write();
367   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
368 }
369 
TEST(MpscStreamTest,CannotWriteAfterReaderClose)370 TEST(MpscStreamTest, CannotWriteAfterReaderClose) {
371   MpscTestContext ctx;
372   ctx.Connect();
373   ctx.Fill();
374   ctx.reader.Close();
375   ctx.Write();
376   EXPECT_EQ(ctx.write_status, Status::OutOfRange());
377 }
378 
TEST(MpscStreamTest,MultipleWriters)379 TEST(MpscStreamTest, MultipleWriters) {
380   MpscTestContext ctx1;
381   ctx1.Connect();
382   Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
383   ctx1.data = ByteSpan(data1.data(), data1.size());
384 
385   MpscTestContext ctx2;
386   ctx2.writer = ctx1.writer;
387   Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
388   ctx2.data = ByteSpan(data2.data(), data2.size());
389 
390   MpscTestContext ctx3;
391   ctx3.writer = ctx1.writer;
392   Vector<std::byte, kBufSize * 3> data3(kBufSize * 3, std::byte(3));
393   ctx3.data = ByteSpan(data3.data(), data3.size());
394 
395   // Start all threads.
396   ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
397   ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
398   ctx3.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
399 
400   // The loop below keeps track of how many contiguous values are read, in order
401   // to verify that writes are not split or interleaved.
402   size_t expected[4] = {0, data1.size(), data2.size(), data3.size()};
403   size_t actual[4] = {0};
404 
405   size_t total_read = 0;
406   auto current = std::byte(0);
407   size_t num_current = 0;
408   while (total_read < data1.size() + data2.size() + data3.size()) {
409     ctx1.Read();
410     if (!ctx1.read_result.ok()) {
411       break;
412     }
413     size_t num_read = ctx1.read_result->size();
414     for (size_t i = 0; i < num_read; ++i) {
415       if (current == ctx1.read_buffer[i]) {
416         ++num_current;
417         continue;
418       }
419       actual[size_t(current)] = num_current;
420       current = ctx1.read_buffer[i];
421       num_current = 1;
422     }
423     actual[size_t(current)] = num_current;
424     total_read += num_read;
425   }
426   ctx1.reader.Close();
427   ctx1.Join();
428   ctx2.Join();
429   ctx3.Join();
430   ASSERT_EQ(ctx1.read_result.status(), OkStatus());
431   for (size_t i = 0; i < 4; ++i) {
432     EXPECT_EQ(actual[i], expected[i]);
433   }
434 }
435 
TEST(MpscStreamTest,GetAndSetLimits)436 TEST(MpscStreamTest, GetAndSetLimits) {
437   MpscReader reader;
438   EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
439 
440   MpscWriter writer;
441   EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
442 
443   CreateMpscStream(reader, writer);
444   EXPECT_EQ(reader.ConservativeReadLimit(), Stream::kUnlimited);
445   EXPECT_EQ(writer.ConservativeWriteLimit(), Stream::kUnlimited);
446 
447   writer.SetLimit(10);
448   EXPECT_EQ(reader.ConservativeReadLimit(), 10U);
449   EXPECT_EQ(writer.ConservativeWriteLimit(), 10U);
450 
451   writer.Close();
452   EXPECT_EQ(reader.ConservativeReadLimit(), 0U);
453   EXPECT_EQ(writer.ConservativeWriteLimit(), 0U);
454 }
455 
TEST(MpscStreamTest,ReaderAggregatesLimit)456 TEST(MpscStreamTest, ReaderAggregatesLimit) {
457   MpscTestContext ctx;
458   ctx.Connect();
459   ctx.writer.SetLimit(10);
460 
461   MpscWriter writer2 = ctx.writer;
462   writer2.SetLimit(20);
463 
464   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 30U);
465 
466   ctx.writer.SetLimit(Stream::kUnlimited);
467   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
468 
469   writer2.SetLimit(40);
470   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), Stream::kUnlimited);
471 
472   ctx.writer.SetLimit(0);
473   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), 40U);
474 }
475 
TEST(MpscStreamTest,ReadingUpdatesLimit)476 TEST(MpscStreamTest, ReadingUpdatesLimit) {
477   MpscTestContext ctx;
478   ctx.Connect();
479 
480   constexpr size_t kChunk = kBufSize - 4;
481   std::byte buffer[kBufSize];
482   ctx.reader.SetBuffer(buffer);
483   ctx.Fill();
484   ctx.writer.SetLimit(kBufSize);
485   ctx.Write();
486   EXPECT_EQ(ctx.write_status, OkStatus());
487 
488   ctx.destination = ByteSpan(ctx.read_buffer, kChunk);
489   ctx.Read();
490   EXPECT_EQ(ctx.read_result.status(), OkStatus());
491   EXPECT_EQ(ctx.read_result->size(), kChunk);
492   EXPECT_EQ(ctx.reader.ConservativeReadLimit(), kBufSize - kChunk);
493 }
494 
TEST(MpscStreamTest,CannotWriteMoreThanLimit)495 TEST(MpscStreamTest, CannotWriteMoreThanLimit) {
496   MpscTestContext ctx;
497   ctx.Connect();
498 
499   std::byte buffer[kBufSize];
500   ctx.reader.SetBuffer(buffer);
501   ctx.writer.SetLimit(kBufSize - 1);
502   ctx.Fill();
503   ctx.Write();
504   EXPECT_EQ(ctx.write_status, Status::ResourceExhausted());
505 }
506 
TEST(MpscStreamTest,WritersCanCloseAutomatically)507 TEST(MpscStreamTest, WritersCanCloseAutomatically) {
508   MpscTestContext ctx1;
509   ctx1.Connect();
510   Vector<std::byte, kBufSize + 1> data1(kBufSize + 1, std::byte(1));
511   ctx1.writer.SetLimit(data1.size());
512   ctx1.data = ByteSpan(data1.data(), data1.size());
513 
514   MpscTestContext ctx2;
515   ctx2.writer = ctx1.writer;
516   Vector<std::byte, kBufSize / 2> data2(kBufSize / 2, std::byte(2));
517   ctx2.writer.SetLimit(data2.size());
518   ctx2.data = ByteSpan(data2.data(), data2.size());
519 
520   // Start all threads.
521   EXPECT_TRUE(ctx1.reader.connected());
522   EXPECT_TRUE(ctx1.writer.connected());
523   EXPECT_TRUE(ctx2.writer.connected());
524 
525   ctx1.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
526   ctx2.Spawn([](MpscTestContext* ctx) { ctx->Write(); });
527 
528   size_t total = 0;
529   while (ctx1.reader.ConservativeReadLimit() != 0) {
530     ctx1.Read();
531     EXPECT_EQ(ctx1.read_result.status(), OkStatus());
532     if (!ctx1.read_result.ok()) {
533       ctx1.reader.Close();
534       break;
535     }
536     total += ctx1.read_result->size();
537   }
538   EXPECT_EQ(total, data1.size() + data2.size());
539   ctx1.Join();
540   ctx2.Join();
541   EXPECT_FALSE(ctx1.reader.connected());
542   EXPECT_FALSE(ctx1.writer.connected());
543   EXPECT_FALSE(ctx2.writer.connected());
544 }
545 
TEST(MpscStreamTest,ReadAllWithoutBuffer)546 TEST(MpscStreamTest, ReadAllWithoutBuffer) {
547   MpscTestContext ctx;
548   Status status = ctx.reader.ReadAll([](ConstByteSpan) { return OkStatus(); });
549   EXPECT_EQ(status, Status::FailedPrecondition());
550 }
551 
TEST(MpscStreamTest,ReadAll)552 TEST(MpscStreamTest, ReadAll) {
553   MpscTestContext ctx;
554   ctx.Connect();
555 
556   std::byte buffer[kBufSize];
557   ctx.reader.SetBuffer(buffer);
558   ctx.writer.SetLimit(kBufSize * 100);
559   ctx.Spawn([](MpscTestContext* inner) { inner->WriteAll(); });
560 
561   Status status = ctx.reader.ReadAll([&ctx](ConstByteSpan data) {
562     ctx.total_read += data.size();
563     fnv1a(data, ctx.read_hash);
564     return OkStatus();
565   });
566   ctx.Join();
567 
568   EXPECT_EQ(status, OkStatus());
569   EXPECT_FALSE(ctx.reader.connected());
570   EXPECT_EQ(ctx.total_read, kBufSize * 100);
571   EXPECT_EQ(ctx.read_hash, ctx.write_hash);
572 }
573 
TEST(MpscStreamTest,BufferedMpscReader)574 TEST(MpscStreamTest, BufferedMpscReader) {
575   BufferedMpscReader<kBufSize> reader;
576   MpscWriter writer;
577   CreateMpscStream(reader, writer);
578 
579   // `kBufSize` writes of 1 byte each should fit without blocking.
580   for (size_t i = 0; i < kBufSize; ++i) {
581     std::byte b{static_cast<uint8_t>(i)};
582     EXPECT_EQ(writer.Write(ConstByteSpan(&b, 1)), OkStatus());
583   }
584 
585   std::byte rx_buffer[kBufSize];
586   auto result = reader.Read(ByteSpan(rx_buffer));
587   ASSERT_EQ(result.status(), OkStatus());
588   ASSERT_EQ(result->size(), kBufSize);
589   for (size_t i = 0; i < kBufSize; ++i) {
590     EXPECT_EQ(rx_buffer[i], std::byte(i));
591   }
592 }
593 
594 }  // namespace
595 }  // namespace pw::stream
596