• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_multisink/multisink.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <span>
22 #include <string_view>
23 
24 #include "gtest/gtest.h"
25 #include "pw_function/function.h"
26 #include "pw_status/status.h"
27 
28 namespace pw::multisink {
29 using Drain = MultiSink::Drain;
30 using Listener = MultiSink::Listener;
31 
32 class CountingListener : public Listener {
33  public:
OnNewEntryAvailable()34   void OnNewEntryAvailable() override { notification_count_++; }
35 
GetNotificationCount()36   size_t GetNotificationCount() { return notification_count_; }
37 
ResetNotificationCount()38   void ResetNotificationCount() { notification_count_ = 0; }
39 
40  private:
41   size_t notification_count_ = 0;
42 };
43 
44 class MultiSinkTest : public ::testing::Test {
45  protected:
46   static constexpr std::byte kMessage[] = {
47       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
48   static constexpr std::byte kMessageOther[] = {
49       (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
50   static constexpr size_t kMaxDrains = 3;
51   static constexpr size_t kMaxListeners = 3;
52   static constexpr size_t kEntryBufferSize = 1024;
53   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
54 
MultiSinkTest()55   MultiSinkTest() : multisink_(buffer_) {}
56 
57   // Expects the peeked or popped message to equal the provided non-empty
58   // message, and the drop count to match. If `expected_message` is empty, the
59   // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)60   void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
61                                   uint32_t result_drop_count,
62                                   uint32_t result_ingress_drop_count,
63                                   std::optional<ConstByteSpan> expected_message,
64                                   uint32_t expected_drop_count,
65                                   uint32_t expected_ingress_drop_count) {
66     if (!expected_message.has_value()) {
67       EXPECT_EQ(Status::OutOfRange(), result.status());
68     } else {
69       ASSERT_EQ(result.status(), OkStatus());
70       if (!expected_message.value().empty()) {
71         ASSERT_FALSE(result.value().empty());
72         ASSERT_EQ(result.value().size_bytes(),
73                   expected_message.value().size_bytes());
74         EXPECT_EQ(memcmp(result.value().data(),
75                          expected_message.value().data(),
76                          expected_message.value().size_bytes()),
77                   0);
78       }
79     }
80     EXPECT_EQ(result_drop_count, expected_drop_count);
81     EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
82   }
83 
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)84   void VerifyPopEntry(Drain& drain,
85                       std::optional<ConstByteSpan> expected_message,
86                       uint32_t expected_drop_count,
87                       uint32_t expected_ingress_drop_count) {
88     uint32_t drop_count = 0;
89     uint32_t ingress_drop_count = 0;
90     Result<ConstByteSpan> result =
91         drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
92     ExpectMessageAndDropCounts(result,
93                                drop_count,
94                                ingress_drop_count,
95                                expected_message,
96                                expected_drop_count,
97                                expected_ingress_drop_count);
98   }
99 
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)100   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
101                         uint32_t result_drop_count,
102                         uint32_t result_ingress_drop_count,
103                         std::optional<ConstByteSpan> expected_message,
104                         uint32_t expected_drop_count,
105                         uint32_t expected_ingress_drop_count) {
106     if (peek_result.ok()) {
107       ASSERT_FALSE(peek_result.value().entry().empty());
108       Result<ConstByteSpan> verify_result(peek_result.value().entry());
109       ExpectMessageAndDropCounts(verify_result,
110                                  result_drop_count,
111                                  result_ingress_drop_count,
112                                  expected_message,
113                                  expected_drop_count,
114                                  expected_ingress_drop_count);
115       return;
116     }
117     if (expected_message.has_value()) {
118       // Fail since we expected OkStatus.
119       ASSERT_EQ(peek_result.status(), OkStatus());
120     }
121     EXPECT_EQ(Status::OutOfRange(), peek_result.status());
122   }
123 
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)124   void ExpectNotificationCount(CountingListener& listener,
125                                size_t expected_notification_count) {
126     EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
127     listener.ResetNotificationCount();
128   }
129 
130   std::byte buffer_[kBufferSize];
131   std::byte entry_buffer_[kEntryBufferSize];
132   CountingListener listeners_[kMaxListeners];
133   Drain drains_[kMaxDrains];
134   MultiSink multisink_;
135 };
136 
TEST_F(MultiSinkTest,SingleDrain)137 TEST_F(MultiSinkTest, SingleDrain) {
138   multisink_.AttachDrain(drains_[0]);
139   multisink_.AttachListener(listeners_[0]);
140   ExpectNotificationCount(listeners_[0], 1u);
141   multisink_.HandleEntry(kMessage);
142 
143   // Single entry push and pop.
144   ExpectNotificationCount(listeners_[0], 1u);
145   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
146   // Single empty entry push and pop.
147   multisink_.HandleEntry(ConstByteSpan());
148   ExpectNotificationCount(listeners_[0], 1u);
149   VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
150 
151   // Multiple entries with intermittent drops.
152   multisink_.HandleEntry(kMessage);
153   multisink_.HandleDropped();
154   multisink_.HandleEntry(kMessage);
155   ExpectNotificationCount(listeners_[0], 3u);
156   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
157   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
158 
159   // Send drops only.
160   multisink_.HandleDropped();
161   ExpectNotificationCount(listeners_[0], 1u);
162   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
163 
164   // Confirm out-of-range if no entries are expected.
165   ExpectNotificationCount(listeners_[0], 0u);
166   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
167 }
168 
TEST_F(MultiSinkTest,MultipleDrain)169 TEST_F(MultiSinkTest, MultipleDrain) {
170   multisink_.AttachDrain(drains_[0]);
171   multisink_.AttachDrain(drains_[1]);
172   multisink_.AttachListener(listeners_[0]);
173   multisink_.AttachListener(listeners_[1]);
174   ExpectNotificationCount(listeners_[0], 1u);
175   ExpectNotificationCount(listeners_[1], 1u);
176 
177   multisink_.HandleEntry(kMessage);
178   multisink_.HandleEntry(kMessage);
179   multisink_.HandleDropped();
180   multisink_.HandleEntry(kMessage);
181   multisink_.HandleDropped();
182 
183   // Drain one drain entirely.
184   ExpectNotificationCount(listeners_[0], 5u);
185   ExpectNotificationCount(listeners_[1], 5u);
186   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
187   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
188   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
189   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
190   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
191 
192   // Confirm the other drain can be drained separately.
193   ExpectNotificationCount(listeners_[0], 0u);
194   ExpectNotificationCount(listeners_[1], 0u);
195   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
196   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
197   VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
198   VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
199   VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
200 }
201 
TEST_F(MultiSinkTest,LateDrainRegistration)202 TEST_F(MultiSinkTest, LateDrainRegistration) {
203   // Drains attached after entries are pushed should still observe those entries
204   // if they have not been evicted from the ring buffer.
205   multisink_.HandleEntry(kMessage);
206 
207   multisink_.AttachDrain(drains_[0]);
208   multisink_.AttachListener(listeners_[0]);
209   ExpectNotificationCount(listeners_[0], 1u);
210   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
211   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
212 
213   multisink_.HandleEntry(kMessage);
214   ExpectNotificationCount(listeners_[0], 1u);
215   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
216   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
217 }
218 
TEST_F(MultiSinkTest,DynamicDrainRegistration)219 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
220   multisink_.AttachDrain(drains_[0]);
221   multisink_.AttachListener(listeners_[0]);
222   ExpectNotificationCount(listeners_[0], 1u);
223 
224   multisink_.HandleDropped();
225   multisink_.HandleEntry(kMessage);
226   multisink_.HandleDropped();
227   multisink_.HandleEntry(kMessage);
228 
229   // Drain out one message and detach it.
230   ExpectNotificationCount(listeners_[0], 4u);
231   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
232   multisink_.DetachDrain(drains_[0]);
233   multisink_.DetachListener(listeners_[0]);
234 
235   // Re-attaching the drain should reproduce the last observed message. Note
236   // that notifications are not expected, nor are drops observed before the
237   // first valid message in the buffer.
238   multisink_.AttachDrain(drains_[0]);
239   multisink_.AttachListener(listeners_[0]);
240   ExpectNotificationCount(listeners_[0], 1u);
241   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
242   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
243   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
244 
245   multisink_.HandleEntry(kMessage);
246   ExpectNotificationCount(listeners_[0], 1u);
247   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
248   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
249 }
250 
TEST_F(MultiSinkTest,TooSmallBuffer)251 TEST_F(MultiSinkTest, TooSmallBuffer) {
252   multisink_.AttachDrain(drains_[0]);
253 
254   // Insert an entry and a drop, then try to read into an insufficient buffer.
255   uint32_t drop_count = 0;
256   uint32_t ingress_drop_count = 0;
257   multisink_.HandleDropped();
258   multisink_.HandleEntry(kMessage);
259 
260   // Attempting to acquire an entry with a small buffer should result in
261   // RESOURCE_EXHAUSTED and remove it.
262   Result<ConstByteSpan> result = drains_[0].PopEntry(
263       std::span(entry_buffer_, 1), drop_count, ingress_drop_count);
264   EXPECT_EQ(result.status(), Status::ResourceExhausted());
265 
266   VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
267 }
268 
TEST_F(MultiSinkTest,Iterator)269 TEST_F(MultiSinkTest, Iterator) {
270   multisink_.AttachDrain(drains_[0]);
271 
272   // Insert entries and consume them all.
273   multisink_.HandleEntry(kMessage);
274   multisink_.HandleEntry(kMessage);
275   multisink_.HandleEntry(kMessage);
276 
277   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
278   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
279   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
280 
281   // Confirm that the iterator still observes the messages in the ring buffer.
282   size_t iterated_entries = 0;
283   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
284     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
285     iterated_entries++;
286   }
287   EXPECT_EQ(iterated_entries, 3u);
288 }
289 
TEST_F(MultiSinkTest,IteratorNoDrains)290 TEST_F(MultiSinkTest, IteratorNoDrains) {
291   // Insert entries with no drains attached. Even though there are no consumers,
292   // iterators should still walk from the oldest entry.
293   multisink_.HandleEntry(kMessage);
294   multisink_.HandleEntry(kMessage);
295   multisink_.HandleEntry(kMessage);
296 
297   // Confirm that the iterator still observes the messages in the ring buffer.
298   size_t iterated_entries = 0;
299   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
300     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
301     iterated_entries++;
302   }
303   EXPECT_EQ(iterated_entries, 3u);
304 }
305 
TEST_F(MultiSinkTest,IteratorNoEntries)306 TEST_F(MultiSinkTest, IteratorNoEntries) {
307   // Attach a drain, but don't add any entries.
308   multisink_.AttachDrain(drains_[0]);
309   // Confirm that the iterator has no entries.
310   MultiSink::UnsafeIterationWrapper unsafe_iterator =
311       multisink_.UnsafeIteration();
312   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
313 }
314 
TEST_F(MultiSinkTest,PeekEntryNoEntries)315 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
316   multisink_.AttachDrain(drains_[0]);
317 
318   // Peek empty multisink.
319   uint32_t drop_count = 0;
320   uint32_t ingress_drop_count = 0;
321   auto peek_result =
322       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
323   VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
324 }
325 
TEST_F(MultiSinkTest,PeekAndPop)326 TEST_F(MultiSinkTest, PeekAndPop) {
327   multisink_.AttachDrain(drains_[0]);
328   multisink_.AttachDrain(drains_[1]);
329 
330   // Peek entry after multisink has some entries.
331   multisink_.HandleEntry(kMessage);
332   multisink_.HandleEntry(kMessageOther);
333   uint32_t drop_count = 0;
334   uint32_t ingress_drop_count = 0;
335   auto first_peek_result =
336       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
337   VerifyPeekResult(
338       first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
339 
340   // Multiple peeks must return the front message.
341   auto peek_duplicate =
342       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
343   VerifyPeekResult(
344       peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
345   // A second drain must peek the front message.
346   auto peek_other_drain =
347       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
348   VerifyPeekResult(
349       peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
350 
351   // After a drain pops a peeked entry, the next peek call must return the next
352   // message.
353   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
354   auto second_peek_result =
355       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
356   VerifyPeekResult(
357       second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
358   // Slower readers must be unchanged.
359   auto peek_other_drain_duplicate =
360       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
361   VerifyPeekResult(peek_other_drain_duplicate,
362                    drop_count,
363                    ingress_drop_count,
364                    kMessage,
365                    0,
366                    0);
367 
368   // PopEntry prior to popping the previously peeked entry.
369   VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
370   // Popping an entry already handled must not trigger errors.
371   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
372   // Popping with an old peek context must not trigger errors.
373   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
374 
375   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
376   VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
377   auto empty_peek_result =
378       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
379   VerifyPeekResult(
380       empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
381 
382   // // Slower readers must be unchanged.
383   auto peek_other_drain_unchanged =
384       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
385   VerifyPeekResult(peek_other_drain_unchanged,
386                    drop_count,
387                    ingress_drop_count,
388                    kMessage,
389                    0,
390                    0);
391 }
392 
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)393 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
394   multisink_.AttachDrain(drains_[0]);
395 
396   // Peek entry after multisink has some entries.
397   multisink_.HandleEntry(kMessage);
398   const uint32_t ingress_drops = 10;
399   multisink_.HandleDropped(ingress_drops);
400 
401   uint32_t drop_count = 0;
402   uint32_t ingress_drop_count = 0;
403   auto peek_result1 =
404       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
405   // No drops reported until the drain finds a gap in the sequence IDs.
406   VerifyPeekResult(
407       peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
408 
409   // Popping the peeked entry advances the drain, and a new peek will find the
410   // gap in sequence IDs.
411   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
412   auto peek_result2 =
413       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
414   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
415   EXPECT_EQ(drop_count, 0u);
416   EXPECT_EQ(ingress_drop_count, ingress_drops);
417 }
418 
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)419 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
420   multisink_.AttachDrain(drains_[0]);
421 
422   // Add entries until buffer is full and drain has to be advanced.
423   // The sequence ID takes 1 byte when less than 128.
424   const size_t max_multisink_messages = 128;
425   const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
426   // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
427   const size_t message_size = buffer_entry_size - 2;
428   std::array<std::byte, message_size> message;
429   std::memset(message.data(), 'a', message.size());
430   for (size_t i = 0; i < max_multisink_messages; ++i) {
431     multisink_.HandleEntry(message);
432   }
433 
434   // At this point the buffer is full, but the sequence ID will take 1 more byte
435   // in the preamble, meaning that adding N new entries, drops N + 1 entries.
436   // Account for that offset.
437   const size_t expected_drops = 5;
438   for (size_t i = 1; i < expected_drops; ++i) {
439     multisink_.HandleEntry(message);
440   }
441 
442   uint32_t drop_count = 0;
443   uint32_t ingress_drop_count = 0;
444   auto peek_result =
445       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
446   VerifyPeekResult(
447       peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
448 }
449 
TEST_F(MultiSinkTest,IngressDropCountOverflow)450 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
451   multisink_.AttachDrain(drains_[0]);
452 
453   // Make drain's last handled drop larger than multisink drop count, which
454   // overflowed.
455   const uint32_t drop_count_close_to_overflow =
456       std::numeric_limits<uint32_t>::max() - 3;
457   multisink_.HandleDropped(drop_count_close_to_overflow);
458   multisink_.HandleEntry(kMessage);
459 
460   // Catch up drain's drop count.
461   uint32_t drop_count = 0;
462   uint32_t ingress_drop_count = 0;
463   auto peek_result1 =
464       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
465   VerifyPeekResult(peek_result1,
466                    drop_count,
467                    ingress_drop_count,
468                    kMessage,
469                    0,
470                    drop_count_close_to_overflow);
471   // Popping the peeked entry advances the drain, and a new peek will find the
472   // gap in sequence IDs.
473   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
474 
475   // Overflow multisink's drop count.
476   const uint32_t expected_ingress_drop_count = 10;
477   multisink_.HandleDropped(expected_ingress_drop_count);
478 
479   auto peek_result2 =
480       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
481   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
482   EXPECT_EQ(drop_count, 0u);
483   EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
484 
485   multisink_.HandleEntry(kMessage);
486   auto peek_result3 =
487       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
488   VerifyPeekResult(
489       peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
490 }
491 
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)492 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
493   multisink_.AttachDrain(drains_[0]);
494 
495   const uint32_t ingress_drops = 10;
496   multisink_.HandleDropped(ingress_drops);
497   multisink_.HandleEntry(kMessage);
498   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
499 
500   // Detaching and attaching drain should report the same drops.
501   multisink_.DetachDrain(drains_[0]);
502   multisink_.AttachDrain(drains_[0]);
503   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
504 }
505 
TEST(UnsafeIteration,NoLimit)506 TEST(UnsafeIteration, NoLimit) {
507   constexpr std::array<std::string_view, 5> kExpectedEntries{
508       "one", "two", "three", "four", "five"};
509   std::array<std::byte, 32> buffer;
510   MultiSink multisink(buffer);
511 
512   for (std::string_view entry : kExpectedEntries) {
513     multisink.HandleEntry(std::as_bytes(std::span(entry)));
514   }
515 
516   size_t entry_count = 0;
517   struct {
518     size_t& entry_count;
519     std::span<const std::string_view> expected_results;
520   } ctx{entry_count, kExpectedEntries};
521   auto cb = [&ctx](ConstByteSpan data) {
522     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
523     EXPECT_EQ(data.size(), expected_entry.size());
524     const int result =
525         memcmp(data.data(), expected_entry.data(), expected_entry.size());
526     EXPECT_EQ(0, result);
527     ctx.entry_count++;
528   };
529 
530   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
531   EXPECT_EQ(kExpectedEntries.size(), entry_count);
532 }
533 
TEST(UnsafeIteration,Subset)534 TEST(UnsafeIteration, Subset) {
535   constexpr std::array<std::string_view, 5> kExpectedEntries{
536       "one", "two", "three", "four", "five"};
537   constexpr size_t kStartOffset = 3;
538   constexpr size_t kExpectedEntriesMaxEntries =
539       kExpectedEntries.size() - kStartOffset;
540   std::array<std::byte, 32> buffer;
541   MultiSink multisink(buffer);
542 
543   for (std::string_view entry : kExpectedEntries) {
544     multisink.HandleEntry(std::as_bytes(std::span(entry)));
545   }
546 
547   size_t entry_count = 0;
548   struct {
549     size_t& entry_count;
550     std::span<const std::string_view> expected_results;
551   } ctx{entry_count, kExpectedEntries};
552   auto cb = [&ctx](ConstByteSpan data) {
553     std::string_view expected_entry =
554         ctx.expected_results[ctx.entry_count + kStartOffset];
555     EXPECT_EQ(data.size(), expected_entry.size());
556     const int result =
557         memcmp(data.data(), expected_entry.data(), expected_entry.size());
558     EXPECT_EQ(0, result);
559     ctx.entry_count++;
560   };
561 
562   EXPECT_EQ(
563       OkStatus(),
564       multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
565   EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
566 }
567 
568 }  // namespace pw::multisink
569