1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_multisink/multisink.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22
23 #include "gtest/gtest.h"
24 #include "pw_function/function.h"
25 #include "pw_span/span.h"
26 #include "pw_status/status.h"
27
28 namespace pw::multisink {
29 using Drain = MultiSink::Drain;
30 using Listener = MultiSink::Listener;
31
32 class CountingListener : public Listener {
33 public:
OnNewEntryAvailable()34 void OnNewEntryAvailable() override { notification_count_++; }
35
GetNotificationCount()36 size_t GetNotificationCount() { return notification_count_; }
37
ResetNotificationCount()38 void ResetNotificationCount() { notification_count_ = 0; }
39
40 private:
41 size_t notification_count_ = 0;
42 };
43
44 class MultiSinkTest : public ::testing::Test {
45 protected:
46 static constexpr std::byte kMessage[] = {
47 (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
48 static constexpr std::byte kMessageOther[] = {
49 (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
50 static constexpr size_t kMaxDrains = 3;
51 static constexpr size_t kMaxListeners = 3;
52 static constexpr size_t kEntryBufferSize = 1024;
53 static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
54
MultiSinkTest()55 MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
56
57 // Expects the peeked or popped message to equal the provided non-empty
58 // message, and the drop count to match. If `expected_message` is empty, the
59 // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)60 void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
61 uint32_t result_drop_count,
62 uint32_t result_ingress_drop_count,
63 std::optional<ConstByteSpan> expected_message,
64 uint32_t expected_drop_count,
65 uint32_t expected_ingress_drop_count) {
66 if (!expected_message.has_value()) {
67 EXPECT_EQ(Status::OutOfRange(), result.status());
68 } else {
69 ASSERT_EQ(result.status(), OkStatus());
70 if (!expected_message.value().empty()) {
71 ASSERT_FALSE(result.value().empty());
72 ASSERT_EQ(result.value().size_bytes(),
73 expected_message.value().size_bytes());
74 EXPECT_EQ(memcmp(result.value().data(),
75 expected_message.value().data(),
76 expected_message.value().size_bytes()),
77 0);
78 }
79 }
80 EXPECT_EQ(result_drop_count, expected_drop_count);
81 EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
82 }
83
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)84 void VerifyPopEntry(Drain& drain,
85 std::optional<ConstByteSpan> expected_message,
86 uint32_t expected_drop_count,
87 uint32_t expected_ingress_drop_count) {
88 uint32_t drop_count = 0;
89 uint32_t ingress_drop_count = 0;
90 Result<ConstByteSpan> result =
91 drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
92 ExpectMessageAndDropCounts(result,
93 drop_count,
94 ingress_drop_count,
95 expected_message,
96 expected_drop_count,
97 expected_ingress_drop_count);
98 }
99
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)100 void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
101 uint32_t result_drop_count,
102 uint32_t result_ingress_drop_count,
103 std::optional<ConstByteSpan> expected_message,
104 uint32_t expected_drop_count,
105 uint32_t expected_ingress_drop_count) {
106 if (peek_result.ok()) {
107 ASSERT_FALSE(peek_result.value().entry().empty());
108 Result<ConstByteSpan> verify_result(peek_result.value().entry());
109 ExpectMessageAndDropCounts(verify_result,
110 result_drop_count,
111 result_ingress_drop_count,
112 expected_message,
113 expected_drop_count,
114 expected_ingress_drop_count);
115 return;
116 }
117 if (expected_message.has_value()) {
118 // Fail since we expected OkStatus.
119 ASSERT_EQ(peek_result.status(), OkStatus());
120 }
121 EXPECT_EQ(Status::OutOfRange(), peek_result.status());
122 }
123
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)124 void ExpectNotificationCount(CountingListener& listener,
125 size_t expected_notification_count) {
126 EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
127 listener.ResetNotificationCount();
128 }
129
130 std::byte buffer_[kBufferSize];
131 std::byte entry_buffer_[kEntryBufferSize];
132 CountingListener listeners_[kMaxListeners];
133 Drain drains_[kMaxDrains];
134 MultiSink multisink_;
135 };
136
TEST_F(MultiSinkTest,SingleDrain)137 TEST_F(MultiSinkTest, SingleDrain) {
138 multisink_.AttachDrain(drains_[0]);
139 multisink_.AttachListener(listeners_[0]);
140 ExpectNotificationCount(listeners_[0], 1u);
141 multisink_.HandleEntry(kMessage);
142
143 // Single entry push and pop.
144 ExpectNotificationCount(listeners_[0], 1u);
145 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
146 // Single empty entry push and pop.
147 multisink_.HandleEntry(ConstByteSpan());
148 ExpectNotificationCount(listeners_[0], 1u);
149 VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
150
151 // Multiple entries with intermittent drops.
152 multisink_.HandleEntry(kMessage);
153 multisink_.HandleDropped();
154 multisink_.HandleEntry(kMessage);
155 ExpectNotificationCount(listeners_[0], 3u);
156 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
157 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
158
159 // Send drops only.
160 multisink_.HandleDropped();
161 ExpectNotificationCount(listeners_[0], 1u);
162 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
163
164 // Confirm out-of-range if no entries are expected.
165 ExpectNotificationCount(listeners_[0], 0u);
166 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
167 }
168
TEST_F(MultiSinkTest,MultipleDrain)169 TEST_F(MultiSinkTest, MultipleDrain) {
170 multisink_.AttachDrain(drains_[0]);
171 multisink_.AttachDrain(drains_[1]);
172 multisink_.AttachListener(listeners_[0]);
173 multisink_.AttachListener(listeners_[1]);
174 ExpectNotificationCount(listeners_[0], 1u);
175 ExpectNotificationCount(listeners_[1], 1u);
176
177 multisink_.HandleEntry(kMessage);
178 multisink_.HandleEntry(kMessage);
179 multisink_.HandleDropped();
180 multisink_.HandleEntry(kMessage);
181 multisink_.HandleDropped();
182
183 // Drain one drain entirely.
184 ExpectNotificationCount(listeners_[0], 5u);
185 ExpectNotificationCount(listeners_[1], 5u);
186 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
187 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
188 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
189 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
190 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
191
192 // Confirm the other drain can be drained separately.
193 ExpectNotificationCount(listeners_[0], 0u);
194 ExpectNotificationCount(listeners_[1], 0u);
195 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
196 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
197 VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
198 VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
199 VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
200 }
201
TEST_F(MultiSinkTest,LateDrainRegistration)202 TEST_F(MultiSinkTest, LateDrainRegistration) {
203 // Drains attached after entries are pushed should still observe those entries
204 // if they have not been evicted from the ring buffer.
205 multisink_.HandleEntry(kMessage);
206
207 multisink_.AttachDrain(drains_[0]);
208 multisink_.AttachListener(listeners_[0]);
209 ExpectNotificationCount(listeners_[0], 1u);
210 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
211 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
212
213 multisink_.HandleEntry(kMessage);
214 ExpectNotificationCount(listeners_[0], 1u);
215 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
216 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
217 }
218
TEST_F(MultiSinkTest,DynamicDrainRegistration)219 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
220 multisink_.AttachDrain(drains_[0]);
221 multisink_.AttachListener(listeners_[0]);
222 ExpectNotificationCount(listeners_[0], 1u);
223
224 multisink_.HandleDropped();
225 multisink_.HandleEntry(kMessage);
226 multisink_.HandleDropped();
227 multisink_.HandleEntry(kMessage);
228
229 // Drain out one message and detach it.
230 ExpectNotificationCount(listeners_[0], 4u);
231 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
232 multisink_.DetachDrain(drains_[0]);
233 multisink_.DetachListener(listeners_[0]);
234
235 // Re-attaching the drain should reproduce the last observed message. Note
236 // that notifications are not expected, nor are drops observed before the
237 // first valid message in the buffer.
238 multisink_.AttachDrain(drains_[0]);
239 multisink_.AttachListener(listeners_[0]);
240 ExpectNotificationCount(listeners_[0], 1u);
241 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
242 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
243 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
244
245 multisink_.HandleEntry(kMessage);
246 ExpectNotificationCount(listeners_[0], 1u);
247 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
248 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
249 }
250
TEST_F(MultiSinkTest,TooSmallBuffer)251 TEST_F(MultiSinkTest, TooSmallBuffer) {
252 multisink_.AttachDrain(drains_[0]);
253
254 // Insert an entry and a drop, then try to read into an insufficient buffer.
255 uint32_t drop_count = 0;
256 uint32_t ingress_drop_count = 0;
257 multisink_.HandleDropped();
258 multisink_.HandleEntry(kMessage);
259
260 // Attempting to acquire an entry with a small buffer should result in
261 // RESOURCE_EXHAUSTED and remove it.
262 Result<ConstByteSpan> result = drains_[0].PopEntry(
263 span(entry_buffer_, 1), drop_count, ingress_drop_count);
264 EXPECT_EQ(result.status(), Status::ResourceExhausted());
265
266 VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
267 }
268
TEST_F(MultiSinkTest,Iterator)269 TEST_F(MultiSinkTest, Iterator) {
270 multisink_.AttachDrain(drains_[0]);
271
272 // Insert entries and consume them all.
273 multisink_.HandleEntry(kMessage);
274 multisink_.HandleEntry(kMessage);
275 multisink_.HandleEntry(kMessage);
276
277 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
278 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
279 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
280
281 // Confirm that the iterator still observes the messages in the ring buffer.
282 size_t iterated_entries = 0;
283 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
284 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
285 iterated_entries++;
286 }
287 EXPECT_EQ(iterated_entries, 3u);
288 }
289
TEST_F(MultiSinkTest,IteratorNoDrains)290 TEST_F(MultiSinkTest, IteratorNoDrains) {
291 // Insert entries with no drains attached. Even though there are no consumers,
292 // iterators should still walk from the oldest entry.
293 multisink_.HandleEntry(kMessage);
294 multisink_.HandleEntry(kMessage);
295 multisink_.HandleEntry(kMessage);
296
297 // Confirm that the iterator still observes the messages in the ring buffer.
298 size_t iterated_entries = 0;
299 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
300 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
301 iterated_entries++;
302 }
303 EXPECT_EQ(iterated_entries, 3u);
304 }
305
TEST_F(MultiSinkTest,IteratorNoEntries)306 TEST_F(MultiSinkTest, IteratorNoEntries) {
307 // Attach a drain, but don't add any entries.
308 multisink_.AttachDrain(drains_[0]);
309 // Confirm that the iterator has no entries.
310 MultiSink::UnsafeIterationWrapper unsafe_iterator =
311 multisink_.UnsafeIteration();
312 EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
313 }
314
TEST_F(MultiSinkTest,PeekEntryNoEntries)315 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
316 multisink_.AttachDrain(drains_[0]);
317
318 // Peek empty multisink.
319 uint32_t drop_count = 0;
320 uint32_t ingress_drop_count = 0;
321 auto peek_result =
322 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
323 VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
324 }
325
TEST_F(MultiSinkTest,PeekAndPop)326 TEST_F(MultiSinkTest, PeekAndPop) {
327 multisink_.AttachDrain(drains_[0]);
328 multisink_.AttachDrain(drains_[1]);
329
330 // Peek entry after multisink has some entries.
331 multisink_.HandleEntry(kMessage);
332 multisink_.HandleEntry(kMessageOther);
333 uint32_t drop_count = 0;
334 uint32_t ingress_drop_count = 0;
335 auto first_peek_result =
336 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
337 VerifyPeekResult(
338 first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
339
340 // Multiple peeks must return the front message.
341 auto peek_duplicate =
342 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
343 VerifyPeekResult(
344 peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
345 // A second drain must peek the front message.
346 auto peek_other_drain =
347 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
348 VerifyPeekResult(
349 peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
350
351 // After a drain pops a peeked entry, the next peek call must return the next
352 // message.
353 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
354 auto second_peek_result =
355 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
356 VerifyPeekResult(
357 second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
358 // Slower readers must be unchanged.
359 auto peek_other_drain_duplicate =
360 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
361 VerifyPeekResult(peek_other_drain_duplicate,
362 drop_count,
363 ingress_drop_count,
364 kMessage,
365 0,
366 0);
367
368 // PopEntry prior to popping the previously peeked entry.
369 VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
370 // Popping an entry already handled must not trigger errors.
371 ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
372 // Popping with an old peek context must not trigger errors.
373 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
374
375 // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
376 VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
377 auto empty_peek_result =
378 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
379 VerifyPeekResult(
380 empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
381
382 // // Slower readers must be unchanged.
383 auto peek_other_drain_unchanged =
384 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
385 VerifyPeekResult(peek_other_drain_unchanged,
386 drop_count,
387 ingress_drop_count,
388 kMessage,
389 0,
390 0);
391 }
392
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)393 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
394 multisink_.AttachDrain(drains_[0]);
395
396 // Peek entry after multisink has some entries.
397 multisink_.HandleEntry(kMessage);
398 const uint32_t ingress_drops = 10;
399 multisink_.HandleDropped(ingress_drops);
400
401 uint32_t drop_count = 0;
402 uint32_t ingress_drop_count = 0;
403 auto peek_result1 =
404 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
405 // No drops reported until the drain finds a gap in the sequence IDs.
406 VerifyPeekResult(
407 peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
408
409 // Popping the peeked entry advances the drain, and a new peek will find the
410 // gap in sequence IDs.
411 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
412 auto peek_result2 =
413 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
414 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
415 EXPECT_EQ(drop_count, 0u);
416 EXPECT_EQ(ingress_drop_count, ingress_drops);
417 }
418
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)419 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
420 multisink_.AttachDrain(drains_[0]);
421
422 // Add entries until buffer is full and drain has to be advanced.
423 // The sequence ID takes 1 byte when less than 128.
424 const size_t max_multisink_messages = 128;
425 const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
426 // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
427 const size_t message_size = buffer_entry_size - 2;
428 std::array<std::byte, message_size> message;
429 std::memset(message.data(), 'a', message.size());
430 for (size_t i = 0; i < max_multisink_messages; ++i) {
431 message[0] = static_cast<std::byte>(i);
432 multisink_.HandleEntry(message);
433 }
434
435 // At this point the buffer is full, but the sequence ID will take 1 more byte
436 // in the preamble, meaning that adding N new entries, drops N + 1 entries.
437 // Account for that offset.
438 const size_t expected_drops = 5;
439 for (size_t i = 1; i < expected_drops; ++i) {
440 message[0] = static_cast<std::byte>(200 + i);
441 multisink_.HandleEntry(message);
442 }
443
444 uint32_t drop_count = 0;
445 uint32_t ingress_drop_count = 0;
446
447 auto peek_result =
448 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
449 // The message peeked is the 6th message added.
450 message[0] = static_cast<std::byte>(5);
451 VerifyPeekResult(
452 peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
453
454 // Add 3 more messages since we peeked the multisink, generating 2 more drops.
455 const size_t expected_drops2 = 2;
456 for (size_t i = 0; i < expected_drops2 + 1; ++i) {
457 message[0] = static_cast<std::byte>(220 + i);
458 multisink_.HandleEntry(message);
459 }
460
461 // Pop the 6th message now, even though it was already dropped.
462 EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
463
464 // A new peek would get the 9th message because two more messages were
465 // dropped. Given that PopEntry() was called with peek_result, all the dropped
466 // messages before peek_result should be considered handled and only the two
467 // new drops should be reported here.
468 auto peek_result2 =
469 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
470 message[0] = static_cast<std::byte>(8); // 9th message
471 VerifyPeekResult(peek_result2,
472 drop_count,
473 ingress_drop_count,
474 message,
475 expected_drops2,
476 0);
477 }
478
TEST_F(MultiSinkTest,IngressDropCountOverflow)479 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
480 multisink_.AttachDrain(drains_[0]);
481
482 // Make drain's last handled drop larger than multisink drop count, which
483 // overflowed.
484 const uint32_t drop_count_close_to_overflow =
485 std::numeric_limits<uint32_t>::max() - 3;
486 multisink_.HandleDropped(drop_count_close_to_overflow);
487 multisink_.HandleEntry(kMessage);
488
489 // Catch up drain's drop count.
490 uint32_t drop_count = 0;
491 uint32_t ingress_drop_count = 0;
492 auto peek_result1 =
493 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
494 VerifyPeekResult(peek_result1,
495 drop_count,
496 ingress_drop_count,
497 kMessage,
498 0,
499 drop_count_close_to_overflow);
500 // Popping the peeked entry advances the drain, and a new peek will find the
501 // gap in sequence IDs.
502 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
503
504 // Overflow multisink's drop count.
505 const uint32_t expected_ingress_drop_count = 10;
506 multisink_.HandleDropped(expected_ingress_drop_count);
507
508 auto peek_result2 =
509 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
510 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
511 EXPECT_EQ(drop_count, 0u);
512 EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
513
514 multisink_.HandleEntry(kMessage);
515 auto peek_result3 =
516 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
517 VerifyPeekResult(
518 peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
519 }
520
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)521 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
522 multisink_.AttachDrain(drains_[0]);
523
524 const uint32_t ingress_drops = 10;
525 multisink_.HandleDropped(ingress_drops);
526 multisink_.HandleEntry(kMessage);
527 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
528
529 // Detaching and attaching drain should report the same drops.
530 multisink_.DetachDrain(drains_[0]);
531 multisink_.AttachDrain(drains_[0]);
532 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
533 }
534
TEST(UnsafeIteration,NoLimit)535 TEST(UnsafeIteration, NoLimit) {
536 constexpr std::array<std::string_view, 5> kExpectedEntries{
537 "one", "two", "three", "four", "five"};
538 std::array<std::byte, 32> buffer;
539 MultiSink multisink(buffer);
540
541 for (std::string_view entry : kExpectedEntries) {
542 multisink.HandleEntry(as_bytes(span<const char>(entry)));
543 }
544
545 size_t entry_count = 0;
546 struct {
547 size_t& entry_count;
548 span<const std::string_view> expected_results;
549 } ctx{entry_count, kExpectedEntries};
550 auto cb = [&ctx](ConstByteSpan data) {
551 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
552 EXPECT_EQ(data.size(), expected_entry.size());
553 const int result =
554 memcmp(data.data(), expected_entry.data(), expected_entry.size());
555 EXPECT_EQ(0, result);
556 ctx.entry_count++;
557 };
558
559 EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
560 EXPECT_EQ(kExpectedEntries.size(), entry_count);
561 }
562
TEST(UnsafeIteration,Subset)563 TEST(UnsafeIteration, Subset) {
564 constexpr std::array<std::string_view, 5> kExpectedEntries{
565 "one", "two", "three", "four", "five"};
566 constexpr size_t kStartOffset = 3;
567 constexpr size_t kExpectedEntriesMaxEntries =
568 kExpectedEntries.size() - kStartOffset;
569 std::array<std::byte, 32> buffer;
570 MultiSink multisink(buffer);
571
572 for (std::string_view entry : kExpectedEntries) {
573 multisink.HandleEntry(as_bytes(span<const char>(entry)));
574 }
575
576 size_t entry_count = 0;
577 struct {
578 size_t& entry_count;
579 span<const std::string_view> expected_results;
580 } ctx{entry_count, kExpectedEntries};
581 auto cb = [&ctx](ConstByteSpan data) {
582 std::string_view expected_entry =
583 ctx.expected_results[ctx.entry_count + kStartOffset];
584 EXPECT_EQ(data.size(), expected_entry.size());
585 const int result =
586 memcmp(data.data(), expected_entry.data(), expected_entry.size());
587 EXPECT_EQ(0, result);
588 ctx.entry_count++;
589 };
590
591 EXPECT_EQ(
592 OkStatus(),
593 multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
594 EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
595 }
596
597 } // namespace pw::multisink
598