• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_multisink/multisink.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22 
23 #include "pw_function/function.h"
24 #include "pw_span/span.h"
25 #include "pw_status/status.h"
26 #include "pw_unit_test/framework.h"
27 
28 namespace pw::multisink {
29 using Drain = MultiSink::Drain;
30 using Listener = MultiSink::Listener;
31 
32 class CountingListener : public Listener {
33  public:
OnNewEntryAvailable()34   void OnNewEntryAvailable() override { notification_count_++; }
35 
GetNotificationCount()36   size_t GetNotificationCount() { return notification_count_; }
37 
ResetNotificationCount()38   void ResetNotificationCount() { notification_count_ = 0; }
39 
40  private:
41   size_t notification_count_ = 0;
42 };
43 
44 class EntriesSizeMonitorListener : public Listener {
45  public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)46   EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
47       : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()48   void OnNewEntryAvailable() override {
49     last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
50   }
51 
GetLastSeenUnreadEntriesSize() const52   size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
53 
54  private:
55   size_t last_seen_unread_size_;
56   pw::multisink::MultiSink::Drain& drain_;
57 };
58 
59 class MultiSinkTest : public ::testing::Test {
60  protected:
61   static constexpr std::byte kMessage[] = {
62       (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
63   static constexpr std::byte kMessageOther[] = {
64       (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
65   static constexpr size_t kMaxDrains = 3;
66   static constexpr size_t kMaxListeners = 3;
67   static constexpr size_t kEntryBufferSize = 1024;
68   static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
69 
MultiSinkTest()70   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
71 
72   // Expects the peeked or popped message to equal the provided non-empty
73   // message, and the drop count to match. If `expected_message` is empty, the
74   // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)75   void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
76                                   uint32_t result_drop_count,
77                                   uint32_t result_ingress_drop_count,
78                                   std::optional<ConstByteSpan> expected_message,
79                                   uint32_t expected_drop_count,
80                                   uint32_t expected_ingress_drop_count) {
81     if (!expected_message.has_value()) {
82       EXPECT_EQ(Status::OutOfRange(), result.status());
83     } else {
84       ASSERT_EQ(result.status(), OkStatus());
85       if (!expected_message.value().empty()) {
86         ASSERT_FALSE(result.value().empty());
87         ASSERT_EQ(result.value().size_bytes(),
88                   expected_message.value().size_bytes());
89         EXPECT_EQ(memcmp(result.value().data(),
90                          expected_message.value().data(),
91                          expected_message.value().size_bytes()),
92                   0);
93       }
94     }
95     EXPECT_EQ(result_drop_count, expected_drop_count);
96     EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
97   }
98 
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)99   void VerifyPopEntry(Drain& drain,
100                       std::optional<ConstByteSpan> expected_message,
101                       uint32_t expected_drop_count,
102                       uint32_t expected_ingress_drop_count) {
103     uint32_t drop_count = 0;
104     uint32_t ingress_drop_count = 0;
105     Result<ConstByteSpan> result =
106         drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
107     ExpectMessageAndDropCounts(result,
108                                drop_count,
109                                ingress_drop_count,
110                                expected_message,
111                                expected_drop_count,
112                                expected_ingress_drop_count);
113   }
114 
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)115   void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
116                         uint32_t result_drop_count,
117                         uint32_t result_ingress_drop_count,
118                         std::optional<ConstByteSpan> expected_message,
119                         uint32_t expected_drop_count,
120                         uint32_t expected_ingress_drop_count) {
121     if (peek_result.ok()) {
122       ASSERT_FALSE(peek_result.value().entry().empty());
123       Result<ConstByteSpan> verify_result(peek_result.value().entry());
124       ExpectMessageAndDropCounts(verify_result,
125                                  result_drop_count,
126                                  result_ingress_drop_count,
127                                  expected_message,
128                                  expected_drop_count,
129                                  expected_ingress_drop_count);
130       return;
131     }
132     if (expected_message.has_value()) {
133       // Fail since we expected OkStatus.
134       ASSERT_EQ(peek_result.status(), OkStatus());
135     }
136     EXPECT_EQ(Status::OutOfRange(), peek_result.status());
137   }
138 
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)139   void ExpectNotificationCount(CountingListener& listener,
140                                size_t expected_notification_count) {
141     EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
142     listener.ResetNotificationCount();
143   }
144 
145   std::byte buffer_[kBufferSize];
146   std::byte entry_buffer_[kEntryBufferSize];
147   CountingListener listeners_[kMaxListeners];
148   Drain drains_[kMaxDrains];
149   MultiSink multisink_;
150 };
151 
TEST_F(MultiSinkTest,SingleDrain)152 TEST_F(MultiSinkTest, SingleDrain) {
153   multisink_.AttachDrain(drains_[0]);
154   multisink_.AttachListener(listeners_[0]);
155   ExpectNotificationCount(listeners_[0], 1u);
156   multisink_.HandleEntry(kMessage);
157 
158   // Single entry push and pop.
159   ExpectNotificationCount(listeners_[0], 1u);
160   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
161   // Single empty entry push and pop.
162   multisink_.HandleEntry(ConstByteSpan());
163   ExpectNotificationCount(listeners_[0], 1u);
164   VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
165 
166   // Multiple entries with intermittent drops.
167   multisink_.HandleEntry(kMessage);
168   multisink_.HandleDropped();
169   multisink_.HandleEntry(kMessage);
170   ExpectNotificationCount(listeners_[0], 3u);
171   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
172   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
173 
174   // Send drops only.
175   multisink_.HandleDropped();
176   ExpectNotificationCount(listeners_[0], 1u);
177   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
178 
179   // Confirm out-of-range if no entries are expected.
180   ExpectNotificationCount(listeners_[0], 0u);
181   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
182 }
183 
TEST_F(MultiSinkTest,MultipleDrain)184 TEST_F(MultiSinkTest, MultipleDrain) {
185   multisink_.AttachDrain(drains_[0]);
186   multisink_.AttachDrain(drains_[1]);
187   multisink_.AttachListener(listeners_[0]);
188   multisink_.AttachListener(listeners_[1]);
189   ExpectNotificationCount(listeners_[0], 1u);
190   ExpectNotificationCount(listeners_[1], 1u);
191 
192   multisink_.HandleEntry(kMessage);
193   multisink_.HandleEntry(kMessage);
194   multisink_.HandleDropped();
195   multisink_.HandleEntry(kMessage);
196   multisink_.HandleDropped();
197 
198   // Drain one drain entirely.
199   ExpectNotificationCount(listeners_[0], 5u);
200   ExpectNotificationCount(listeners_[1], 5u);
201   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
202   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
203   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
204   VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
205   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
206 
207   // Confirm the other drain can be drained separately.
208   ExpectNotificationCount(listeners_[0], 0u);
209   ExpectNotificationCount(listeners_[1], 0u);
210   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
211   VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
212   VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
213   VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
214   VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
215 }
216 
TEST_F(MultiSinkTest,LateDrainRegistration)217 TEST_F(MultiSinkTest, LateDrainRegistration) {
218   // Drains attached after entries are pushed should still observe those entries
219   // if they have not been evicted from the ring buffer.
220   multisink_.HandleEntry(kMessage);
221 
222   multisink_.AttachDrain(drains_[0]);
223   multisink_.AttachListener(listeners_[0]);
224   ExpectNotificationCount(listeners_[0], 1u);
225   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
226   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
227 
228   multisink_.HandleEntry(kMessage);
229   ExpectNotificationCount(listeners_[0], 1u);
230   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
231   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
232 }
233 
TEST_F(MultiSinkTest,DynamicDrainRegistration)234 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
235   multisink_.AttachDrain(drains_[0]);
236   multisink_.AttachListener(listeners_[0]);
237   ExpectNotificationCount(listeners_[0], 1u);
238 
239   multisink_.HandleDropped();
240   multisink_.HandleEntry(kMessage);
241   multisink_.HandleDropped();
242   multisink_.HandleEntry(kMessage);
243 
244   // Drain out one message and detach it.
245   ExpectNotificationCount(listeners_[0], 4u);
246   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
247   multisink_.DetachDrain(drains_[0]);
248   multisink_.DetachListener(listeners_[0]);
249 
250   // Re-attaching the drain should reproduce the last observed message. Note
251   // that notifications are not expected, nor are drops observed before the
252   // first valid message in the buffer.
253   multisink_.AttachDrain(drains_[0]);
254   multisink_.AttachListener(listeners_[0]);
255   ExpectNotificationCount(listeners_[0], 1u);
256   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
257   VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
258   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
259 
260   multisink_.HandleEntry(kMessage);
261   ExpectNotificationCount(listeners_[0], 1u);
262   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
263   VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
264 }
265 
TEST_F(MultiSinkTest,TooSmallBuffer)266 TEST_F(MultiSinkTest, TooSmallBuffer) {
267   multisink_.AttachDrain(drains_[0]);
268 
269   // Insert an entry and a drop, then try to read into an insufficient buffer.
270   uint32_t drop_count = 0;
271   uint32_t ingress_drop_count = 0;
272   multisink_.HandleDropped();
273   multisink_.HandleEntry(kMessage);
274 
275   // Attempting to acquire an entry with a small buffer should result in
276   // RESOURCE_EXHAUSTED and remove it.
277   Result<ConstByteSpan> result = drains_[0].PopEntry(
278       span(entry_buffer_, 1), drop_count, ingress_drop_count);
279   EXPECT_EQ(result.status(), Status::ResourceExhausted());
280 
281   VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
282 }
283 
TEST_F(MultiSinkTest,Iterator)284 TEST_F(MultiSinkTest, Iterator) {
285   multisink_.AttachDrain(drains_[0]);
286 
287   // Insert entries and consume them all.
288   multisink_.HandleEntry(kMessage);
289   multisink_.HandleEntry(kMessage);
290   multisink_.HandleEntry(kMessage);
291 
292   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
293   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
294   VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295 
296   // Confirm that the iterator still observes the messages in the ring buffer.
297   size_t iterated_entries = 0;
298   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
299     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
300     iterated_entries++;
301   }
302   EXPECT_EQ(iterated_entries, 3u);
303 }
304 
TEST_F(MultiSinkTest,IteratorNoDrains)305 TEST_F(MultiSinkTest, IteratorNoDrains) {
306   // Insert entries with no drains attached. Even though there are no consumers,
307   // iterators should still walk from the oldest entry.
308   multisink_.HandleEntry(kMessage);
309   multisink_.HandleEntry(kMessage);
310   multisink_.HandleEntry(kMessage);
311 
312   // Confirm that the iterator still observes the messages in the ring buffer.
313   size_t iterated_entries = 0;
314   for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
315     EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
316     iterated_entries++;
317   }
318   EXPECT_EQ(iterated_entries, 3u);
319 }
320 
TEST_F(MultiSinkTest,IteratorNoEntries)321 TEST_F(MultiSinkTest, IteratorNoEntries) {
322   // Attach a drain, but don't add any entries.
323   multisink_.AttachDrain(drains_[0]);
324   // Confirm that the iterator has no entries.
325   MultiSink::UnsafeIterationWrapper unsafe_iterator =
326       multisink_.UnsafeIteration();
327   EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
328 }
329 
TEST_F(MultiSinkTest,PeekEntryNoEntries)330 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
331   multisink_.AttachDrain(drains_[0]);
332 
333   // Peek empty multisink.
334   uint32_t drop_count = 0;
335   uint32_t ingress_drop_count = 0;
336   auto peek_result =
337       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
338   VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
339 }
340 
TEST_F(MultiSinkTest,PeekAndPop)341 TEST_F(MultiSinkTest, PeekAndPop) {
342   multisink_.AttachDrain(drains_[0]);
343   multisink_.AttachDrain(drains_[1]);
344 
345   // Peek entry after multisink has some entries.
346   multisink_.HandleEntry(kMessage);
347   multisink_.HandleEntry(kMessageOther);
348   uint32_t drop_count = 0;
349   uint32_t ingress_drop_count = 0;
350   auto first_peek_result =
351       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
352   VerifyPeekResult(
353       first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
354 
355   // Multiple peeks must return the front message.
356   auto peek_duplicate =
357       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
358   VerifyPeekResult(
359       peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
360   // A second drain must peek the front message.
361   auto peek_other_drain =
362       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
363   VerifyPeekResult(
364       peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
365 
366   // After a drain pops a peeked entry, the next peek call must return the next
367   // message.
368   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
369   auto second_peek_result =
370       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
371   VerifyPeekResult(
372       second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
373   // Slower readers must be unchanged.
374   auto peek_other_drain_duplicate =
375       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
376   VerifyPeekResult(peek_other_drain_duplicate,
377                    drop_count,
378                    ingress_drop_count,
379                    kMessage,
380                    0,
381                    0);
382 
383   // PopEntry prior to popping the previously peeked entry.
384   VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
385   // Popping an entry already handled must not trigger errors.
386   ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
387   // Popping with an old peek context must not trigger errors.
388   ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
389 
390   // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
391   VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
392   auto empty_peek_result =
393       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
394   VerifyPeekResult(
395       empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
396 
397   // // Slower readers must be unchanged.
398   auto peek_other_drain_unchanged =
399       drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
400   VerifyPeekResult(peek_other_drain_unchanged,
401                    drop_count,
402                    ingress_drop_count,
403                    kMessage,
404                    0,
405                    0);
406 }
407 
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)408 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
409   multisink_.AttachDrain(drains_[0]);
410 
411   // Peek entry after multisink has some entries.
412   multisink_.HandleEntry(kMessage);
413   const uint32_t ingress_drops = 10;
414   multisink_.HandleDropped(ingress_drops);
415 
416   uint32_t drop_count = 0;
417   uint32_t ingress_drop_count = 0;
418   auto peek_result1 =
419       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
420   // No drops reported until the drain finds a gap in the sequence IDs.
421   VerifyPeekResult(
422       peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
423 
424   // Popping the peeked entry advances the drain, and a new peek will find the
425   // gap in sequence IDs.
426   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
427   auto peek_result2 =
428       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
429   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
430   EXPECT_EQ(drop_count, 0u);
431   EXPECT_EQ(ingress_drop_count, ingress_drops);
432 }
433 
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)434 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
435   multisink_.AttachDrain(drains_[0]);
436 
437   // Add entries until buffer is full and drain has to be advanced.
438   // The sequence ID takes 1 byte when less than 128.
439   const size_t max_multisink_messages = 128;
440   const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
441   // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
442   const size_t message_size = buffer_entry_size - 2;
443   std::array<std::byte, message_size> message;
444   std::memset(message.data(), 'a', message.size());
445   for (size_t i = 0; i < max_multisink_messages; ++i) {
446     message[0] = static_cast<std::byte>(i);
447     multisink_.HandleEntry(message);
448   }
449 
450   // At this point the buffer is full, but the sequence ID will take 1 more byte
451   // in the preamble, meaning that adding N new entries, drops N + 1 entries.
452   // Account for that offset.
453   const size_t expected_drops = 5;
454   for (size_t i = 1; i < expected_drops; ++i) {
455     message[0] = static_cast<std::byte>(200 + i);
456     multisink_.HandleEntry(message);
457   }
458 
459   uint32_t drop_count = 0;
460   uint32_t ingress_drop_count = 0;
461 
462   auto peek_result =
463       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
464   // The message peeked is the 6th message added.
465   message[0] = static_cast<std::byte>(5);
466   VerifyPeekResult(
467       peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
468 
469   // Add 3 more messages since we peeked the multisink, generating 2 more drops.
470   const size_t expected_drops2 = 2;
471   for (size_t i = 0; i < expected_drops2 + 1; ++i) {
472     message[0] = static_cast<std::byte>(220 + i);
473     multisink_.HandleEntry(message);
474   }
475 
476   // Pop the 6th message now, even though it was already dropped.
477   EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
478 
479   // A new peek would get the 9th message because two more messages were
480   // dropped. Given that PopEntry() was called with peek_result, all the dropped
481   // messages before peek_result should be considered handled and only the two
482   // new drops should be reported here.
483   auto peek_result2 =
484       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
485   message[0] = static_cast<std::byte>(8);  // 9th message
486   VerifyPeekResult(peek_result2,
487                    drop_count,
488                    ingress_drop_count,
489                    message,
490                    expected_drops2,
491                    0);
492 }
493 
TEST_F(MultiSinkTest,IngressDropCountOverflow)494 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
495   multisink_.AttachDrain(drains_[0]);
496 
497   // Make drain's last handled drop larger than multisink drop count, which
498   // overflowed.
499   const uint32_t drop_count_close_to_overflow =
500       std::numeric_limits<uint32_t>::max() - 3;
501   multisink_.HandleDropped(drop_count_close_to_overflow);
502   multisink_.HandleEntry(kMessage);
503 
504   // Catch up drain's drop count.
505   uint32_t drop_count = 0;
506   uint32_t ingress_drop_count = 0;
507   auto peek_result1 =
508       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
509   VerifyPeekResult(peek_result1,
510                    drop_count,
511                    ingress_drop_count,
512                    kMessage,
513                    0,
514                    drop_count_close_to_overflow);
515   // Popping the peeked entry advances the drain, and a new peek will find the
516   // gap in sequence IDs.
517   ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
518 
519   // Overflow multisink's drop count.
520   const uint32_t expected_ingress_drop_count = 10;
521   multisink_.HandleDropped(expected_ingress_drop_count);
522 
523   auto peek_result2 =
524       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
525   ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
526   EXPECT_EQ(drop_count, 0u);
527   EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
528 
529   multisink_.HandleEntry(kMessage);
530   auto peek_result3 =
531       drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
532   VerifyPeekResult(
533       peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
534 }
535 
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)536 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
537   multisink_.AttachDrain(drains_[0]);
538 
539   const uint32_t ingress_drops = 10;
540   multisink_.HandleDropped(ingress_drops);
541   multisink_.HandleEntry(kMessage);
542   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
543 
544   // Detaching and attaching drain should report the same drops.
545   multisink_.DetachDrain(drains_[0]);
546   multisink_.AttachDrain(drains_[0]);
547   VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
548 }
549 
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)550 TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
551   multisink_.AttachDrain(drains_[0]);
552 
553   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
554   multisink_.HandleEntry(kMessage);
555   multisink_.HandleEntry(kMessage);
556   const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
557   EXPECT_GT(unread_entries_size, 0u);
558 
559   // Attach another drain, it sees the same unread entriess size as the first
560   // drain.
561   multisink_.AttachDrain(drains_[1]);
562   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
563 
564   // Pop entries from the first drain.
565   VerifyPopEntry(drains_[0],
566                  /*expected_message=*/kMessage,
567                  /*expected_drop_count=*/0,
568                  /*expected_ingress_drop_count=*/0);
569   VerifyPopEntry(drains_[0],
570                  /*expected_message=*/kMessage,
571                  /*expected_drop_count=*/0,
572                  /*expected_ingress_drop_count=*/0);
573   EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
574   EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
575 }
576 
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)577 TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
578   std::array<std::byte, 32> buffer;
579   MultiSink multisink(buffer);
580 
581   pw::multisink::MultiSink::Drain drain;
582   multisink.AttachDrain(drain);
583 
584   EntriesSizeMonitorListener listener(drain);
585   multisink.AttachListener(listener);
586 
587   ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
588 
589   constexpr std::string_view kEntryToPush = "one";
590   multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
591 
592   EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
593             drain.GetUnreadEntriesSize());
594 }
595 
TEST(UnsafeIteration,NoLimit)596 TEST(UnsafeIteration, NoLimit) {
597   constexpr std::array<std::string_view, 5> kExpectedEntries{
598       "one", "two", "three", "four", "five"};
599   std::array<std::byte, 32> buffer;
600   MultiSink multisink(buffer);
601 
602   for (std::string_view entry : kExpectedEntries) {
603     multisink.HandleEntry(as_bytes(span<const char>(entry)));
604   }
605 
606   size_t entry_count = 0;
607   struct {
608     size_t& entry_count;
609     span<const std::string_view> expected_results;
610   } ctx{entry_count, kExpectedEntries};
611   auto cb = [&ctx](ConstByteSpan data) {
612     std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
613     EXPECT_EQ(data.size(), expected_entry.size());
614     const int result =
615         memcmp(data.data(), expected_entry.data(), expected_entry.size());
616     EXPECT_EQ(0, result);
617     ctx.entry_count++;
618   };
619 
620   EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
621   EXPECT_EQ(kExpectedEntries.size(), entry_count);
622 }
623 
TEST(UnsafeIteration,Subset)624 TEST(UnsafeIteration, Subset) {
625   constexpr std::array<std::string_view, 5> kExpectedEntries{
626       "one", "two", "three", "four", "five"};
627   constexpr size_t kStartOffset = 3;
628   constexpr size_t kExpectedEntriesMaxEntries =
629       kExpectedEntries.size() - kStartOffset;
630   std::array<std::byte, 32> buffer;
631   MultiSink multisink(buffer);
632 
633   for (std::string_view entry : kExpectedEntries) {
634     multisink.HandleEntry(as_bytes(span<const char>(entry)));
635   }
636 
637   size_t entry_count = 0;
638   struct {
639     size_t& entry_count;
640     span<const std::string_view> expected_results;
641   } ctx{entry_count, kExpectedEntries};
642   auto cb = [&ctx](ConstByteSpan data) {
643     std::string_view expected_entry =
644         ctx.expected_results[ctx.entry_count + kStartOffset];
645     EXPECT_EQ(data.size(), expected_entry.size());
646     const int result =
647         memcmp(data.data(), expected_entry.data(), expected_entry.size());
648     EXPECT_EQ(0, result);
649     ctx.entry_count++;
650   };
651 
652   EXPECT_EQ(
653       OkStatus(),
654       multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
655   EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
656 }
657 
658 }  // namespace pw::multisink
659