• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_multisink/multisink.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22 
23 #include "gtest/gtest.h"
24 #include "pw_function/function.h"
25 #include "pw_span/span.h"
26 #include "pw_status/status.h"
27 
28 namespace pw::multisink {
29 using Drain = MultiSink::Drain;
30 using Listener = MultiSink::Listener;
31 
32 class CountingListener : public Listener {
33  public:
OnNewEntryAvailable()34   void OnNewEntryAvailable() override { notification_count_++; }
35 
GetNotificationCount()36   size_t GetNotificationCount() { return notification_count_; }
37 
ResetNotificationCount()38   void ResetNotificationCount() { notification_count_ = 0; }
39 
40  private:
41   size_t notification_count_ = 0;
42 };
43 
44 class MultiSinkTest : public ::testing::Test {
45  protected:
46   static constexpr std::byte kMessage[] = {
47       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
48   static constexpr std::byte kMessageOther[] = {
49       (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
50   static constexpr size_t kMaxDrains = 3;
51   static constexpr size_t kMaxListeners = 3;
52   static constexpr size_t kEntryBufferSize = 1024;
53   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
54 
MultiSinkTest()55   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
56 
57   // Expects the peeked or popped message to equal the provided non-empty
58   // message, and the drop count to match. If `expected_message` is empty, the
59   // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)60   void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
61                                   uint32_t result_drop_count,
62                                   uint32_t result_ingress_drop_count,
63                                   std::optional<ConstByteSpan> expected_message,
64                                   uint32_t expected_drop_count,
65                                   uint32_t expected_ingress_drop_count) {
66     if (!expected_message.has_value()) {
67       EXPECT_EQ(Status::OutOfRange(), result.status());
68     } else {
69       ASSERT_EQ(result.status(), OkStatus());
70       if (!expected_message.value().empty()) {
71         ASSERT_FALSE(result.value().empty());
72         ASSERT_EQ(result.value().size_bytes(),
73                   expected_message.value().size_bytes());
74         EXPECT_EQ(memcmp(result.value().data(),
75                          expected_message.value().data(),
76                          expected_message.value().size_bytes()),
77                   0);
78       }
79     }
80     EXPECT_EQ(result_drop_count, expected_drop_count);
81     EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
82   }
83 
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)84   void VerifyPopEntry(Drain& drain,
85                       std::optional<ConstByteSpan> expected_message,
86                       uint32_t expected_drop_count,
87                       uint32_t expected_ingress_drop_count) {
88     uint32_t drop_count = 0;
89     uint32_t ingress_drop_count = 0;
90     Result<ConstByteSpan> result =
91         drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
92     ExpectMessageAndDropCounts(result,
93                                drop_count,
94                                ingress_drop_count,
95                                expected_message,
96                                expected_drop_count,
97                                expected_ingress_drop_count);
98   }
99 
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)100   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
101                         uint32_t result_drop_count,
102                         uint32_t result_ingress_drop_count,
103                         std::optional<ConstByteSpan> expected_message,
104                         uint32_t expected_drop_count,
105                         uint32_t expected_ingress_drop_count) {
106     if (peek_result.ok()) {
107       ASSERT_FALSE(peek_result.value().entry().empty());
108       Result<ConstByteSpan> verify_result(peek_result.value().entry());
109       ExpectMessageAndDropCounts(verify_result,
110                                  result_drop_count,
111                                  result_ingress_drop_count,
112                                  expected_message,
113                                  expected_drop_count,
114                                  expected_ingress_drop_count);
115       return;
116     }
117     if (expected_message.has_value()) {
118       // Fail since we expected OkStatus.
119       ASSERT_EQ(peek_result.status(), OkStatus());
120     }
121     EXPECT_EQ(Status::OutOfRange(), peek_result.status());
122   }
123 
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)124   void ExpectNotificationCount(CountingListener& listener,
125                                size_t expected_notification_count) {
126     EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
127     listener.ResetNotificationCount();
128   }
129 
130   std::byte buffer_[kBufferSize];
131   std::byte entry_buffer_[kEntryBufferSize];
132   CountingListener listeners_[kMaxListeners];
133   Drain drains_[kMaxDrains];
134   MultiSink multisink_;
135 };
136 
TEST_F(MultiSinkTest,SingleDrain)137 TEST_F(MultiSinkTest, SingleDrain) {
138   multisink_.AttachDrain(drains_[0]);
139   multisink_.AttachListener(listeners_[0]);
140   ExpectNotificationCount(listeners_[0], 1u);
141   multisink_.HandleEntry(kMessage);
142 
143   // Single entry push and pop.
144   ExpectNotificationCount(listeners_[0], 1u);
145   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
146   // Single empty entry push and pop.
147   multisink_.HandleEntry(ConstByteSpan());
148   ExpectNotificationCount(listeners_[0], 1u);
149   VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
150 
151   // Multiple entries with intermittent drops.
152   multisink_.HandleEntry(kMessage);
153   multisink_.HandleDropped();
154   multisink_.HandleEntry(kMessage);
155   ExpectNotificationCount(listeners_[0], 3u);
156   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
157   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
158 
159   // Send drops only.
160   multisink_.HandleDropped();
161   ExpectNotificationCount(listeners_[0], 1u);
162   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
163 
164   // Confirm out-of-range if no entries are expected.
165   ExpectNotificationCount(listeners_[0], 0u);
166   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
167 }
168 
TEST_F(MultiSinkTest,MultipleDrain)169 TEST_F(MultiSinkTest, MultipleDrain) {
170   multisink_.AttachDrain(drains_[0]);
171   multisink_.AttachDrain(drains_[1]);
172   multisink_.AttachListener(listeners_[0]);
173   multisink_.AttachListener(listeners_[1]);
174   ExpectNotificationCount(listeners_[0], 1u);
175   ExpectNotificationCount(listeners_[1], 1u);
176 
177   multisink_.HandleEntry(kMessage);
178   multisink_.HandleEntry(kMessage);
179   multisink_.HandleDropped();
180   multisink_.HandleEntry(kMessage);
181   multisink_.HandleDropped();
182 
183   // Drain one drain entirely.
184   ExpectNotificationCount(listeners_[0], 5u);
185   ExpectNotificationCount(listeners_[1], 5u);
186   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
187   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
188   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
189   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
190   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
191 
192   // Confirm the other drain can be drained separately.
193   ExpectNotificationCount(listeners_[0], 0u);
194   ExpectNotificationCount(listeners_[1], 0u);
195   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
196   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
197   VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
198   VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
199   VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
200 }
201 
TEST_F(MultiSinkTest,LateDrainRegistration)202 TEST_F(MultiSinkTest, LateDrainRegistration) {
203   // Drains attached after entries are pushed should still observe those entries
204   // if they have not been evicted from the ring buffer.
205   multisink_.HandleEntry(kMessage);
206 
207   multisink_.AttachDrain(drains_[0]);
208   multisink_.AttachListener(listeners_[0]);
209   ExpectNotificationCount(listeners_[0], 1u);
210   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
211   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
212 
213   multisink_.HandleEntry(kMessage);
214   ExpectNotificationCount(listeners_[0], 1u);
215   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
216   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
217 }
218 
TEST_F(MultiSinkTest,DynamicDrainRegistration)219 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
220   multisink_.AttachDrain(drains_[0]);
221   multisink_.AttachListener(listeners_[0]);
222   ExpectNotificationCount(listeners_[0], 1u);
223 
224   multisink_.HandleDropped();
225   multisink_.HandleEntry(kMessage);
226   multisink_.HandleDropped();
227   multisink_.HandleEntry(kMessage);
228 
229   // Drain out one message and detach it.
230   ExpectNotificationCount(listeners_[0], 4u);
231   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
232   multisink_.DetachDrain(drains_[0]);
233   multisink_.DetachListener(listeners_[0]);
234 
235   // Re-attaching the drain should reproduce the last observed message. Note
236   // that notifications are not expected, nor are drops observed before the
237   // first valid message in the buffer.
238   multisink_.AttachDrain(drains_[0]);
239   multisink_.AttachListener(listeners_[0]);
240   ExpectNotificationCount(listeners_[0], 1u);
241   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
242   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
243   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
244 
245   multisink_.HandleEntry(kMessage);
246   ExpectNotificationCount(listeners_[0], 1u);
247   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
248   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
249 }
250 
TEST_F(MultiSinkTest,TooSmallBuffer)251 TEST_F(MultiSinkTest, TooSmallBuffer) {
252   multisink_.AttachDrain(drains_[0]);
253 
254   // Insert an entry and a drop, then try to read into an insufficient buffer.
255   uint32_t drop_count = 0;
256   uint32_t ingress_drop_count = 0;
257   multisink_.HandleDropped();
258   multisink_.HandleEntry(kMessage);
259 
260   // Attempting to acquire an entry with a small buffer should result in
261   // RESOURCE_EXHAUSTED and remove it.
262   Result<ConstByteSpan> result = drains_[0].PopEntry(
263       span(entry_buffer_, 1), drop_count, ingress_drop_count);
264   EXPECT_EQ(result.status(), Status::ResourceExhausted());
265 
266   VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
267 }
268 
TEST_F(MultiSinkTest,Iterator)269 TEST_F(MultiSinkTest, Iterator) {
270   multisink_.AttachDrain(drains_[0]);
271 
272   // Insert entries and consume them all.
273   multisink_.HandleEntry(kMessage);
274   multisink_.HandleEntry(kMessage);
275   multisink_.HandleEntry(kMessage);
276 
277   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
278   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
279   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
280 
281   // Confirm that the iterator still observes the messages in the ring buffer.
282   size_t iterated_entries = 0;
283   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
284     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
285     iterated_entries++;
286   }
287   EXPECT_EQ(iterated_entries, 3u);
288 }
289 
TEST_F(MultiSinkTest,IteratorNoDrains)290 TEST_F(MultiSinkTest, IteratorNoDrains) {
291   // Insert entries with no drains attached. Even though there are no consumers,
292   // iterators should still walk from the oldest entry.
293   multisink_.HandleEntry(kMessage);
294   multisink_.HandleEntry(kMessage);
295   multisink_.HandleEntry(kMessage);
296 
297   // Confirm that the iterator still observes the messages in the ring buffer.
298   size_t iterated_entries = 0;
299   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
300     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
301     iterated_entries++;
302   }
303   EXPECT_EQ(iterated_entries, 3u);
304 }
305 
TEST_F(MultiSinkTest,IteratorNoEntries)306 TEST_F(MultiSinkTest, IteratorNoEntries) {
307   // Attach a drain, but don't add any entries.
308   multisink_.AttachDrain(drains_[0]);
309   // Confirm that the iterator has no entries.
310   MultiSink::UnsafeIterationWrapper unsafe_iterator =
311       multisink_.UnsafeIteration();
312   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
313 }
314 
TEST_F(MultiSinkTest,PeekEntryNoEntries)315 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
316   multisink_.AttachDrain(drains_[0]);
317 
318   // Peek empty multisink.
319   uint32_t drop_count = 0;
320   uint32_t ingress_drop_count = 0;
321   auto peek_result =
322       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
323   VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
324 }
325 
TEST_F(MultiSinkTest,PeekAndPop)326 TEST_F(MultiSinkTest, PeekAndPop) {
327   multisink_.AttachDrain(drains_[0]);
328   multisink_.AttachDrain(drains_[1]);
329 
330   // Peek entry after multisink has some entries.
331   multisink_.HandleEntry(kMessage);
332   multisink_.HandleEntry(kMessageOther);
333   uint32_t drop_count = 0;
334   uint32_t ingress_drop_count = 0;
335   auto first_peek_result =
336       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
337   VerifyPeekResult(
338       first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
339 
340   // Multiple peeks must return the front message.
341   auto peek_duplicate =
342       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
343   VerifyPeekResult(
344       peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
345   // A second drain must peek the front message.
346   auto peek_other_drain =
347       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
348   VerifyPeekResult(
349       peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
350 
351   // After a drain pops a peeked entry, the next peek call must return the next
352   // message.
353   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
354   auto second_peek_result =
355       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
356   VerifyPeekResult(
357       second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
358   // Slower readers must be unchanged.
359   auto peek_other_drain_duplicate =
360       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
361   VerifyPeekResult(peek_other_drain_duplicate,
362                    drop_count,
363                    ingress_drop_count,
364                    kMessage,
365                    0,
366                    0);
367 
368   // PopEntry prior to popping the previously peeked entry.
369   VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
370   // Popping an entry already handled must not trigger errors.
371   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
372   // Popping with an old peek context must not trigger errors.
373   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
374 
375   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
376   VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
377   auto empty_peek_result =
378       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
379   VerifyPeekResult(
380       empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
381 
382   // // Slower readers must be unchanged.
383   auto peek_other_drain_unchanged =
384       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
385   VerifyPeekResult(peek_other_drain_unchanged,
386                    drop_count,
387                    ingress_drop_count,
388                    kMessage,
389                    0,
390                    0);
391 }
392 
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)393 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
394   multisink_.AttachDrain(drains_[0]);
395 
396   // Peek entry after multisink has some entries.
397   multisink_.HandleEntry(kMessage);
398   const uint32_t ingress_drops = 10;
399   multisink_.HandleDropped(ingress_drops);
400 
401   uint32_t drop_count = 0;
402   uint32_t ingress_drop_count = 0;
403   auto peek_result1 =
404       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
405   // No drops reported until the drain finds a gap in the sequence IDs.
406   VerifyPeekResult(
407       peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
408 
409   // Popping the peeked entry advances the drain, and a new peek will find the
410   // gap in sequence IDs.
411   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
412   auto peek_result2 =
413       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
414   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
415   EXPECT_EQ(drop_count, 0u);
416   EXPECT_EQ(ingress_drop_count, ingress_drops);
417 }
418 
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)419 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
420   multisink_.AttachDrain(drains_[0]);
421 
422   // Add entries until buffer is full and drain has to be advanced.
423   // The sequence ID takes 1 byte when less than 128.
424   const size_t max_multisink_messages = 128;
425   const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
426   // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
427   const size_t message_size = buffer_entry_size - 2;
428   std::array<std::byte, message_size> message;
429   std::memset(message.data(), 'a', message.size());
430   for (size_t i = 0; i < max_multisink_messages; ++i) {
431     message[0] = static_cast<std::byte>(i);
432     multisink_.HandleEntry(message);
433   }
434 
435   // At this point the buffer is full, but the sequence ID will take 1 more byte
436   // in the preamble, meaning that adding N new entries, drops N + 1 entries.
437   // Account for that offset.
438   const size_t expected_drops = 5;
439   for (size_t i = 1; i < expected_drops; ++i) {
440     message[0] = static_cast<std::byte>(200 + i);
441     multisink_.HandleEntry(message);
442   }
443 
444   uint32_t drop_count = 0;
445   uint32_t ingress_drop_count = 0;
446 
447   auto peek_result =
448       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
449   // The message peeked is the 6th message added.
450   message[0] = static_cast<std::byte>(5);
451   VerifyPeekResult(
452       peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
453 
454   // Add 3 more messages since we peeked the multisink, generating 2 more drops.
455   const size_t expected_drops2 = 2;
456   for (size_t i = 0; i < expected_drops2 + 1; ++i) {
457     message[0] = static_cast<std::byte>(220 + i);
458     multisink_.HandleEntry(message);
459   }
460 
461   // Pop the 6th message now, even though it was already dropped.
462   EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
463 
464   // A new peek would get the 9th message because two more messages were
465   // dropped. Given that PopEntry() was called with peek_result, all the dropped
466   // messages before peek_result should be considered handled and only the two
467   // new drops should be reported here.
468   auto peek_result2 =
469       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
470   message[0] = static_cast<std::byte>(8);  // 9th message
471   VerifyPeekResult(peek_result2,
472                    drop_count,
473                    ingress_drop_count,
474                    message,
475                    expected_drops2,
476                    0);
477 }
478 
TEST_F(MultiSinkTest,IngressDropCountOverflow)479 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
480   multisink_.AttachDrain(drains_[0]);
481 
482   // Make drain's last handled drop larger than multisink drop count, which
483   // overflowed.
484   const uint32_t drop_count_close_to_overflow =
485       std::numeric_limits<uint32_t>::max() - 3;
486   multisink_.HandleDropped(drop_count_close_to_overflow);
487   multisink_.HandleEntry(kMessage);
488 
489   // Catch up drain's drop count.
490   uint32_t drop_count = 0;
491   uint32_t ingress_drop_count = 0;
492   auto peek_result1 =
493       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
494   VerifyPeekResult(peek_result1,
495                    drop_count,
496                    ingress_drop_count,
497                    kMessage,
498                    0,
499                    drop_count_close_to_overflow);
500   // Popping the peeked entry advances the drain, and a new peek will find the
501   // gap in sequence IDs.
502   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
503 
504   // Overflow multisink's drop count.
505   const uint32_t expected_ingress_drop_count = 10;
506   multisink_.HandleDropped(expected_ingress_drop_count);
507 
508   auto peek_result2 =
509       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
510   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
511   EXPECT_EQ(drop_count, 0u);
512   EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
513 
514   multisink_.HandleEntry(kMessage);
515   auto peek_result3 =
516       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
517   VerifyPeekResult(
518       peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
519 }
520 
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)521 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
522   multisink_.AttachDrain(drains_[0]);
523 
524   const uint32_t ingress_drops = 10;
525   multisink_.HandleDropped(ingress_drops);
526   multisink_.HandleEntry(kMessage);
527   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
528 
529   // Detaching and attaching drain should report the same drops.
530   multisink_.DetachDrain(drains_[0]);
531   multisink_.AttachDrain(drains_[0]);
532   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
533 }
534 
TEST(UnsafeIteration,NoLimit)535 TEST(UnsafeIteration, NoLimit) {
536   constexpr std::array<std::string_view, 5> kExpectedEntries{
537       "one", "two", "three", "four", "five"};
538   std::array<std::byte, 32> buffer;
539   MultiSink multisink(buffer);
540 
541   for (std::string_view entry : kExpectedEntries) {
542     multisink.HandleEntry(as_bytes(span<const char>(entry)));
543   }
544 
545   size_t entry_count = 0;
546   struct {
547     size_t& entry_count;
548     span<const std::string_view> expected_results;
549   } ctx{entry_count, kExpectedEntries};
550   auto cb = [&ctx](ConstByteSpan data) {
551     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
552     EXPECT_EQ(data.size(), expected_entry.size());
553     const int result =
554         memcmp(data.data(), expected_entry.data(), expected_entry.size());
555     EXPECT_EQ(0, result);
556     ctx.entry_count++;
557   };
558 
559   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
560   EXPECT_EQ(kExpectedEntries.size(), entry_count);
561 }
562 
TEST(UnsafeIteration,Subset)563 TEST(UnsafeIteration, Subset) {
564   constexpr std::array<std::string_view, 5> kExpectedEntries{
565       "one", "two", "three", "four", "five"};
566   constexpr size_t kStartOffset = 3;
567   constexpr size_t kExpectedEntriesMaxEntries =
568       kExpectedEntries.size() - kStartOffset;
569   std::array<std::byte, 32> buffer;
570   MultiSink multisink(buffer);
571 
572   for (std::string_view entry : kExpectedEntries) {
573     multisink.HandleEntry(as_bytes(span<const char>(entry)));
574   }
575 
576   size_t entry_count = 0;
577   struct {
578     size_t& entry_count;
579     span<const std::string_view> expected_results;
580   } ctx{entry_count, kExpectedEntries};
581   auto cb = [&ctx](ConstByteSpan data) {
582     std::string_view expected_entry =
583         ctx.expected_results[ctx.entry_count + kStartOffset];
584     EXPECT_EQ(data.size(), expected_entry.size());
585     const int result =
586         memcmp(data.data(), expected_entry.data(), expected_entry.size());
587     EXPECT_EQ(0, result);
588     ctx.entry_count++;
589   };
590 
591   EXPECT_EQ(
592       OkStatus(),
593       multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
594   EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
595 }
596 
597 }  // namespace pw::multisink
598