1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_multisink/multisink.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <cstring>
20 #include <optional>
21 #include <string_view>
22
23 #include "pw_function/function.h"
24 #include "pw_span/span.h"
25 #include "pw_status/status.h"
26 #include "pw_unit_test/framework.h"
27
28 namespace pw::multisink {
29 using Drain = MultiSink::Drain;
30 using Listener = MultiSink::Listener;
31
32 class CountingListener : public Listener {
33 public:
OnNewEntryAvailable()34 void OnNewEntryAvailable() override { notification_count_++; }
35
GetNotificationCount()36 size_t GetNotificationCount() { return notification_count_; }
37
ResetNotificationCount()38 void ResetNotificationCount() { notification_count_ = 0; }
39
40 private:
41 size_t notification_count_ = 0;
42 };
43
44 class EntriesSizeMonitorListener : public Listener {
45 public:
EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain & drain)46 EntriesSizeMonitorListener(pw::multisink::MultiSink::Drain& drain)
47 : last_seen_unread_size_(0u), drain_(drain) {}
OnNewEntryAvailable()48 void OnNewEntryAvailable() override {
49 last_seen_unread_size_ = drain_.UnsafeGetUnreadEntriesSize();
50 }
51
GetLastSeenUnreadEntriesSize() const52 size_t GetLastSeenUnreadEntriesSize() const { return last_seen_unread_size_; }
53
54 private:
55 size_t last_seen_unread_size_;
56 pw::multisink::MultiSink::Drain& drain_;
57 };
58
59 class MultiSinkTest : public ::testing::Test {
60 protected:
61 static constexpr std::byte kMessage[] = {
62 (std::byte)0xDE, (std::byte)0xAD, (std::byte)0xBE, (std::byte)0xEF};
63 static constexpr std::byte kMessageOther[] = {
64 (std::byte)0x12, (std::byte)0x34, (std::byte)0x56, (std::byte)0x78};
65 static constexpr size_t kMaxDrains = 3;
66 static constexpr size_t kMaxListeners = 3;
67 static constexpr size_t kEntryBufferSize = 1024;
68 static constexpr size_t kBufferSize = 5 * kEntryBufferSize;
69
MultiSinkTest()70 MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
71
72 // Expects the peeked or popped message to equal the provided non-empty
73 // message, and the drop count to match. If `expected_message` is empty, the
74 // Pop call status expected is OUT_OF_RANGE.
ExpectMessageAndDropCounts(Result<ConstByteSpan> & result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)75 void ExpectMessageAndDropCounts(Result<ConstByteSpan>& result,
76 uint32_t result_drop_count,
77 uint32_t result_ingress_drop_count,
78 std::optional<ConstByteSpan> expected_message,
79 uint32_t expected_drop_count,
80 uint32_t expected_ingress_drop_count) {
81 if (!expected_message.has_value()) {
82 EXPECT_EQ(Status::OutOfRange(), result.status());
83 } else {
84 ASSERT_EQ(result.status(), OkStatus());
85 if (!expected_message.value().empty()) {
86 ASSERT_FALSE(result.value().empty());
87 ASSERT_EQ(result.value().size_bytes(),
88 expected_message.value().size_bytes());
89 EXPECT_EQ(memcmp(result.value().data(),
90 expected_message.value().data(),
91 expected_message.value().size_bytes()),
92 0);
93 }
94 }
95 EXPECT_EQ(result_drop_count, expected_drop_count);
96 EXPECT_EQ(result_ingress_drop_count, expected_ingress_drop_count);
97 }
98
VerifyPopEntry(Drain & drain,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)99 void VerifyPopEntry(Drain& drain,
100 std::optional<ConstByteSpan> expected_message,
101 uint32_t expected_drop_count,
102 uint32_t expected_ingress_drop_count) {
103 uint32_t drop_count = 0;
104 uint32_t ingress_drop_count = 0;
105 Result<ConstByteSpan> result =
106 drain.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
107 ExpectMessageAndDropCounts(result,
108 drop_count,
109 ingress_drop_count,
110 expected_message,
111 expected_drop_count,
112 expected_ingress_drop_count);
113 }
114
VerifyPeekResult(const Result<Drain::PeekedEntry> & peek_result,uint32_t result_drop_count,uint32_t result_ingress_drop_count,std::optional<ConstByteSpan> expected_message,uint32_t expected_drop_count,uint32_t expected_ingress_drop_count)115 void VerifyPeekResult(const Result<Drain::PeekedEntry>& peek_result,
116 uint32_t result_drop_count,
117 uint32_t result_ingress_drop_count,
118 std::optional<ConstByteSpan> expected_message,
119 uint32_t expected_drop_count,
120 uint32_t expected_ingress_drop_count) {
121 if (peek_result.ok()) {
122 ASSERT_FALSE(peek_result.value().entry().empty());
123 Result<ConstByteSpan> verify_result(peek_result.value().entry());
124 ExpectMessageAndDropCounts(verify_result,
125 result_drop_count,
126 result_ingress_drop_count,
127 expected_message,
128 expected_drop_count,
129 expected_ingress_drop_count);
130 return;
131 }
132 if (expected_message.has_value()) {
133 // Fail since we expected OkStatus.
134 ASSERT_EQ(peek_result.status(), OkStatus());
135 }
136 EXPECT_EQ(Status::OutOfRange(), peek_result.status());
137 }
138
ExpectNotificationCount(CountingListener & listener,size_t expected_notification_count)139 void ExpectNotificationCount(CountingListener& listener,
140 size_t expected_notification_count) {
141 EXPECT_EQ(listener.GetNotificationCount(), expected_notification_count);
142 listener.ResetNotificationCount();
143 }
144
145 std::byte buffer_[kBufferSize];
146 std::byte entry_buffer_[kEntryBufferSize];
147 CountingListener listeners_[kMaxListeners];
148 Drain drains_[kMaxDrains];
149 MultiSink multisink_;
150 };
151
TEST_F(MultiSinkTest,SingleDrain)152 TEST_F(MultiSinkTest, SingleDrain) {
153 multisink_.AttachDrain(drains_[0]);
154 multisink_.AttachListener(listeners_[0]);
155 ExpectNotificationCount(listeners_[0], 1u);
156 multisink_.HandleEntry(kMessage);
157
158 // Single entry push and pop.
159 ExpectNotificationCount(listeners_[0], 1u);
160 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
161 // Single empty entry push and pop.
162 multisink_.HandleEntry(ConstByteSpan());
163 ExpectNotificationCount(listeners_[0], 1u);
164 VerifyPopEntry(drains_[0], ConstByteSpan(), 0u, 0u);
165
166 // Multiple entries with intermittent drops.
167 multisink_.HandleEntry(kMessage);
168 multisink_.HandleDropped();
169 multisink_.HandleEntry(kMessage);
170 ExpectNotificationCount(listeners_[0], 3u);
171 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
172 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
173
174 // Send drops only.
175 multisink_.HandleDropped();
176 ExpectNotificationCount(listeners_[0], 1u);
177 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
178
179 // Confirm out-of-range if no entries are expected.
180 ExpectNotificationCount(listeners_[0], 0u);
181 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
182 }
183
TEST_F(MultiSinkTest,MultipleDrain)184 TEST_F(MultiSinkTest, MultipleDrain) {
185 multisink_.AttachDrain(drains_[0]);
186 multisink_.AttachDrain(drains_[1]);
187 multisink_.AttachListener(listeners_[0]);
188 multisink_.AttachListener(listeners_[1]);
189 ExpectNotificationCount(listeners_[0], 1u);
190 ExpectNotificationCount(listeners_[1], 1u);
191
192 multisink_.HandleEntry(kMessage);
193 multisink_.HandleEntry(kMessage);
194 multisink_.HandleDropped();
195 multisink_.HandleEntry(kMessage);
196 multisink_.HandleDropped();
197
198 // Drain one drain entirely.
199 ExpectNotificationCount(listeners_[0], 5u);
200 ExpectNotificationCount(listeners_[1], 5u);
201 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
202 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
203 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
204 VerifyPopEntry(drains_[0], std::nullopt, 0u, 1u);
205 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
206
207 // Confirm the other drain can be drained separately.
208 ExpectNotificationCount(listeners_[0], 0u);
209 ExpectNotificationCount(listeners_[1], 0u);
210 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
211 VerifyPopEntry(drains_[1], kMessage, 0u, 0u);
212 VerifyPopEntry(drains_[1], kMessage, 0u, 1u);
213 VerifyPopEntry(drains_[1], std::nullopt, 0u, 1u);
214 VerifyPopEntry(drains_[1], std::nullopt, 0u, 0u);
215 }
216
TEST_F(MultiSinkTest,LateDrainRegistration)217 TEST_F(MultiSinkTest, LateDrainRegistration) {
218 // Drains attached after entries are pushed should still observe those entries
219 // if they have not been evicted from the ring buffer.
220 multisink_.HandleEntry(kMessage);
221
222 multisink_.AttachDrain(drains_[0]);
223 multisink_.AttachListener(listeners_[0]);
224 ExpectNotificationCount(listeners_[0], 1u);
225 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
226 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
227
228 multisink_.HandleEntry(kMessage);
229 ExpectNotificationCount(listeners_[0], 1u);
230 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
231 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
232 }
233
TEST_F(MultiSinkTest,DynamicDrainRegistration)234 TEST_F(MultiSinkTest, DynamicDrainRegistration) {
235 multisink_.AttachDrain(drains_[0]);
236 multisink_.AttachListener(listeners_[0]);
237 ExpectNotificationCount(listeners_[0], 1u);
238
239 multisink_.HandleDropped();
240 multisink_.HandleEntry(kMessage);
241 multisink_.HandleDropped();
242 multisink_.HandleEntry(kMessage);
243
244 // Drain out one message and detach it.
245 ExpectNotificationCount(listeners_[0], 4u);
246 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
247 multisink_.DetachDrain(drains_[0]);
248 multisink_.DetachListener(listeners_[0]);
249
250 // Re-attaching the drain should reproduce the last observed message. Note
251 // that notifications are not expected, nor are drops observed before the
252 // first valid message in the buffer.
253 multisink_.AttachDrain(drains_[0]);
254 multisink_.AttachListener(listeners_[0]);
255 ExpectNotificationCount(listeners_[0], 1u);
256 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
257 VerifyPopEntry(drains_[0], kMessage, 0u, 1u);
258 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
259
260 multisink_.HandleEntry(kMessage);
261 ExpectNotificationCount(listeners_[0], 1u);
262 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
263 VerifyPopEntry(drains_[0], std::nullopt, 0u, 0u);
264 }
265
TEST_F(MultiSinkTest,TooSmallBuffer)266 TEST_F(MultiSinkTest, TooSmallBuffer) {
267 multisink_.AttachDrain(drains_[0]);
268
269 // Insert an entry and a drop, then try to read into an insufficient buffer.
270 uint32_t drop_count = 0;
271 uint32_t ingress_drop_count = 0;
272 multisink_.HandleDropped();
273 multisink_.HandleEntry(kMessage);
274
275 // Attempting to acquire an entry with a small buffer should result in
276 // RESOURCE_EXHAUSTED and remove it.
277 Result<ConstByteSpan> result = drains_[0].PopEntry(
278 span(entry_buffer_, 1), drop_count, ingress_drop_count);
279 EXPECT_EQ(result.status(), Status::ResourceExhausted());
280
281 VerifyPopEntry(drains_[0], std::nullopt, 1u, 1u);
282 }
283
TEST_F(MultiSinkTest,Iterator)284 TEST_F(MultiSinkTest, Iterator) {
285 multisink_.AttachDrain(drains_[0]);
286
287 // Insert entries and consume them all.
288 multisink_.HandleEntry(kMessage);
289 multisink_.HandleEntry(kMessage);
290 multisink_.HandleEntry(kMessage);
291
292 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
293 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
294 VerifyPopEntry(drains_[0], kMessage, 0u, 0u);
295
296 // Confirm that the iterator still observes the messages in the ring buffer.
297 size_t iterated_entries = 0;
298 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
299 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
300 iterated_entries++;
301 }
302 EXPECT_EQ(iterated_entries, 3u);
303 }
304
TEST_F(MultiSinkTest,IteratorNoDrains)305 TEST_F(MultiSinkTest, IteratorNoDrains) {
306 // Insert entries with no drains attached. Even though there are no consumers,
307 // iterators should still walk from the oldest entry.
308 multisink_.HandleEntry(kMessage);
309 multisink_.HandleEntry(kMessage);
310 multisink_.HandleEntry(kMessage);
311
312 // Confirm that the iterator still observes the messages in the ring buffer.
313 size_t iterated_entries = 0;
314 for (ConstByteSpan entry : multisink_.UnsafeIteration()) {
315 EXPECT_EQ(memcmp(entry.data(), kMessage, sizeof(kMessage)), 0);
316 iterated_entries++;
317 }
318 EXPECT_EQ(iterated_entries, 3u);
319 }
320
TEST_F(MultiSinkTest,IteratorNoEntries)321 TEST_F(MultiSinkTest, IteratorNoEntries) {
322 // Attach a drain, but don't add any entries.
323 multisink_.AttachDrain(drains_[0]);
324 // Confirm that the iterator has no entries.
325 MultiSink::UnsafeIterationWrapper unsafe_iterator =
326 multisink_.UnsafeIteration();
327 EXPECT_EQ(unsafe_iterator.begin(), unsafe_iterator.end());
328 }
329
TEST_F(MultiSinkTest,PeekEntryNoEntries)330 TEST_F(MultiSinkTest, PeekEntryNoEntries) {
331 multisink_.AttachDrain(drains_[0]);
332
333 // Peek empty multisink.
334 uint32_t drop_count = 0;
335 uint32_t ingress_drop_count = 0;
336 auto peek_result =
337 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
338 VerifyPeekResult(peek_result, 0, drop_count, std::nullopt, 0, 0);
339 }
340
TEST_F(MultiSinkTest,PeekAndPop)341 TEST_F(MultiSinkTest, PeekAndPop) {
342 multisink_.AttachDrain(drains_[0]);
343 multisink_.AttachDrain(drains_[1]);
344
345 // Peek entry after multisink has some entries.
346 multisink_.HandleEntry(kMessage);
347 multisink_.HandleEntry(kMessageOther);
348 uint32_t drop_count = 0;
349 uint32_t ingress_drop_count = 0;
350 auto first_peek_result =
351 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
352 VerifyPeekResult(
353 first_peek_result, drop_count, ingress_drop_count, kMessage, 0, 0);
354
355 // Multiple peeks must return the front message.
356 auto peek_duplicate =
357 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
358 VerifyPeekResult(
359 peek_duplicate, drop_count, ingress_drop_count, kMessage, 0, 0);
360 // A second drain must peek the front message.
361 auto peek_other_drain =
362 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
363 VerifyPeekResult(
364 peek_other_drain, drop_count, ingress_drop_count, kMessage, 0, 0);
365
366 // After a drain pops a peeked entry, the next peek call must return the next
367 // message.
368 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
369 auto second_peek_result =
370 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
371 VerifyPeekResult(
372 second_peek_result, drop_count, ingress_drop_count, kMessageOther, 0, 0);
373 // Slower readers must be unchanged.
374 auto peek_other_drain_duplicate =
375 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
376 VerifyPeekResult(peek_other_drain_duplicate,
377 drop_count,
378 ingress_drop_count,
379 kMessage,
380 0,
381 0);
382
383 // PopEntry prior to popping the previously peeked entry.
384 VerifyPopEntry(drains_[0], kMessageOther, 0, 0);
385 // Popping an entry already handled must not trigger errors.
386 ASSERT_EQ(drains_[0].PopEntry(second_peek_result.value()), OkStatus());
387 // Popping with an old peek context must not trigger errors.
388 ASSERT_EQ(drains_[0].PopEntry(first_peek_result.value()), OkStatus());
389
390 // Multisink is empty, pops and peeks should trigger OUT_OF_RANGE.
391 VerifyPopEntry(drains_[0], std::nullopt, 0, 0);
392 auto empty_peek_result =
393 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
394 VerifyPeekResult(
395 empty_peek_result, drop_count, ingress_drop_count, std::nullopt, 0, 0);
396
397 // // Slower readers must be unchanged.
398 auto peek_other_drain_unchanged =
399 drains_[1].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
400 VerifyPeekResult(peek_other_drain_unchanged,
401 drop_count,
402 ingress_drop_count,
403 kMessage,
404 0,
405 0);
406 }
407
TEST_F(MultiSinkTest,PeekReportsIngressDropCount)408 TEST_F(MultiSinkTest, PeekReportsIngressDropCount) {
409 multisink_.AttachDrain(drains_[0]);
410
411 // Peek entry after multisink has some entries.
412 multisink_.HandleEntry(kMessage);
413 const uint32_t ingress_drops = 10;
414 multisink_.HandleDropped(ingress_drops);
415
416 uint32_t drop_count = 0;
417 uint32_t ingress_drop_count = 0;
418 auto peek_result1 =
419 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
420 // No drops reported until the drain finds a gap in the sequence IDs.
421 VerifyPeekResult(
422 peek_result1, drop_count, ingress_drop_count, kMessage, 0, 0);
423
424 // Popping the peeked entry advances the drain, and a new peek will find the
425 // gap in sequence IDs.
426 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
427 auto peek_result2 =
428 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
429 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
430 EXPECT_EQ(drop_count, 0u);
431 EXPECT_EQ(ingress_drop_count, ingress_drops);
432 }
433
TEST_F(MultiSinkTest,PeekReportsSlowDrainDropCount)434 TEST_F(MultiSinkTest, PeekReportsSlowDrainDropCount) {
435 multisink_.AttachDrain(drains_[0]);
436
437 // Add entries until buffer is full and drain has to be advanced.
438 // The sequence ID takes 1 byte when less than 128.
439 const size_t max_multisink_messages = 128;
440 const size_t buffer_entry_size = kBufferSize / max_multisink_messages;
441 // Account for 1 byte of preamble (sequnce ID) and 1 byte of data size.
442 const size_t message_size = buffer_entry_size - 2;
443 std::array<std::byte, message_size> message;
444 std::memset(message.data(), 'a', message.size());
445 for (size_t i = 0; i < max_multisink_messages; ++i) {
446 message[0] = static_cast<std::byte>(i);
447 multisink_.HandleEntry(message);
448 }
449
450 // At this point the buffer is full, but the sequence ID will take 1 more byte
451 // in the preamble, meaning that adding N new entries, drops N + 1 entries.
452 // Account for that offset.
453 const size_t expected_drops = 5;
454 for (size_t i = 1; i < expected_drops; ++i) {
455 message[0] = static_cast<std::byte>(200 + i);
456 multisink_.HandleEntry(message);
457 }
458
459 uint32_t drop_count = 0;
460 uint32_t ingress_drop_count = 0;
461
462 auto peek_result =
463 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
464 // The message peeked is the 6th message added.
465 message[0] = static_cast<std::byte>(5);
466 VerifyPeekResult(
467 peek_result, drop_count, ingress_drop_count, message, expected_drops, 0);
468
469 // Add 3 more messages since we peeked the multisink, generating 2 more drops.
470 const size_t expected_drops2 = 2;
471 for (size_t i = 0; i < expected_drops2 + 1; ++i) {
472 message[0] = static_cast<std::byte>(220 + i);
473 multisink_.HandleEntry(message);
474 }
475
476 // Pop the 6th message now, even though it was already dropped.
477 EXPECT_EQ(drains_[0].PopEntry(peek_result.value()), OkStatus());
478
479 // A new peek would get the 9th message because two more messages were
480 // dropped. Given that PopEntry() was called with peek_result, all the dropped
481 // messages before peek_result should be considered handled and only the two
482 // new drops should be reported here.
483 auto peek_result2 =
484 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
485 message[0] = static_cast<std::byte>(8); // 9th message
486 VerifyPeekResult(peek_result2,
487 drop_count,
488 ingress_drop_count,
489 message,
490 expected_drops2,
491 0);
492 }
493
TEST_F(MultiSinkTest,IngressDropCountOverflow)494 TEST_F(MultiSinkTest, IngressDropCountOverflow) {
495 multisink_.AttachDrain(drains_[0]);
496
497 // Make drain's last handled drop larger than multisink drop count, which
498 // overflowed.
499 const uint32_t drop_count_close_to_overflow =
500 std::numeric_limits<uint32_t>::max() - 3;
501 multisink_.HandleDropped(drop_count_close_to_overflow);
502 multisink_.HandleEntry(kMessage);
503
504 // Catch up drain's drop count.
505 uint32_t drop_count = 0;
506 uint32_t ingress_drop_count = 0;
507 auto peek_result1 =
508 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
509 VerifyPeekResult(peek_result1,
510 drop_count,
511 ingress_drop_count,
512 kMessage,
513 0,
514 drop_count_close_to_overflow);
515 // Popping the peeked entry advances the drain, and a new peek will find the
516 // gap in sequence IDs.
517 ASSERT_EQ(drains_[0].PopEntry(peek_result1.value()), OkStatus());
518
519 // Overflow multisink's drop count.
520 const uint32_t expected_ingress_drop_count = 10;
521 multisink_.HandleDropped(expected_ingress_drop_count);
522
523 auto peek_result2 =
524 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
525 ASSERT_EQ(peek_result2.status(), Status::OutOfRange());
526 EXPECT_EQ(drop_count, 0u);
527 EXPECT_EQ(ingress_drop_count, expected_ingress_drop_count);
528
529 multisink_.HandleEntry(kMessage);
530 auto peek_result3 =
531 drains_[0].PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
532 VerifyPeekResult(
533 peek_result3, drop_count, ingress_drop_count, kMessage, 0, 0);
534 }
535
TEST_F(MultiSinkTest,DetachedDrainReportsDropCount)536 TEST_F(MultiSinkTest, DetachedDrainReportsDropCount) {
537 multisink_.AttachDrain(drains_[0]);
538
539 const uint32_t ingress_drops = 10;
540 multisink_.HandleDropped(ingress_drops);
541 multisink_.HandleEntry(kMessage);
542 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
543
544 // Detaching and attaching drain should report the same drops.
545 multisink_.DetachDrain(drains_[0]);
546 multisink_.AttachDrain(drains_[0]);
547 VerifyPopEntry(drains_[0], kMessage, 0, ingress_drops);
548 }
549
TEST_F(MultiSinkTest,DrainUnreadEntriesSize)550 TEST_F(MultiSinkTest, DrainUnreadEntriesSize) {
551 multisink_.AttachDrain(drains_[0]);
552
553 EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
554 multisink_.HandleEntry(kMessage);
555 multisink_.HandleEntry(kMessage);
556 const size_t unread_entries_size = drains_[0].GetUnreadEntriesSize();
557 EXPECT_GT(unread_entries_size, 0u);
558
559 // Attach another drain, it sees the same unread entriess size as the first
560 // drain.
561 multisink_.AttachDrain(drains_[1]);
562 EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
563
564 // Pop entries from the first drain.
565 VerifyPopEntry(drains_[0],
566 /*expected_message=*/kMessage,
567 /*expected_drop_count=*/0,
568 /*expected_ingress_drop_count=*/0);
569 VerifyPopEntry(drains_[0],
570 /*expected_message=*/kMessage,
571 /*expected_drop_count=*/0,
572 /*expected_ingress_drop_count=*/0);
573 EXPECT_EQ(drains_[0].GetUnreadEntriesSize(), 0u);
574 EXPECT_EQ(drains_[1].GetUnreadEntriesSize(), unread_entries_size);
575 }
576
TEST(UnsafeGetUnreadEntriesSize,ReadFromListener)577 TEST(UnsafeGetUnreadEntriesSize, ReadFromListener) {
578 std::array<std::byte, 32> buffer;
579 MultiSink multisink(buffer);
580
581 pw::multisink::MultiSink::Drain drain;
582 multisink.AttachDrain(drain);
583
584 EntriesSizeMonitorListener listener(drain);
585 multisink.AttachListener(listener);
586
587 ASSERT_EQ(listener.GetLastSeenUnreadEntriesSize(), 0u);
588
589 constexpr std::string_view kEntryToPush = "one";
590 multisink.HandleEntry(as_bytes(span<const char>(kEntryToPush)));
591
592 EXPECT_EQ(listener.GetLastSeenUnreadEntriesSize(),
593 drain.GetUnreadEntriesSize());
594 }
595
TEST(UnsafeIteration,NoLimit)596 TEST(UnsafeIteration, NoLimit) {
597 constexpr std::array<std::string_view, 5> kExpectedEntries{
598 "one", "two", "three", "four", "five"};
599 std::array<std::byte, 32> buffer;
600 MultiSink multisink(buffer);
601
602 for (std::string_view entry : kExpectedEntries) {
603 multisink.HandleEntry(as_bytes(span<const char>(entry)));
604 }
605
606 size_t entry_count = 0;
607 struct {
608 size_t& entry_count;
609 span<const std::string_view> expected_results;
610 } ctx{entry_count, kExpectedEntries};
611 auto cb = [&ctx](ConstByteSpan data) {
612 std::string_view expected_entry = ctx.expected_results[ctx.entry_count];
613 EXPECT_EQ(data.size(), expected_entry.size());
614 const int result =
615 memcmp(data.data(), expected_entry.data(), expected_entry.size());
616 EXPECT_EQ(0, result);
617 ctx.entry_count++;
618 };
619
620 EXPECT_EQ(OkStatus(), multisink.UnsafeForEachEntry(cb));
621 EXPECT_EQ(kExpectedEntries.size(), entry_count);
622 }
623
TEST(UnsafeIteration,Subset)624 TEST(UnsafeIteration, Subset) {
625 constexpr std::array<std::string_view, 5> kExpectedEntries{
626 "one", "two", "three", "four", "five"};
627 constexpr size_t kStartOffset = 3;
628 constexpr size_t kExpectedEntriesMaxEntries =
629 kExpectedEntries.size() - kStartOffset;
630 std::array<std::byte, 32> buffer;
631 MultiSink multisink(buffer);
632
633 for (std::string_view entry : kExpectedEntries) {
634 multisink.HandleEntry(as_bytes(span<const char>(entry)));
635 }
636
637 size_t entry_count = 0;
638 struct {
639 size_t& entry_count;
640 span<const std::string_view> expected_results;
641 } ctx{entry_count, kExpectedEntries};
642 auto cb = [&ctx](ConstByteSpan data) {
643 std::string_view expected_entry =
644 ctx.expected_results[ctx.entry_count + kStartOffset];
645 EXPECT_EQ(data.size(), expected_entry.size());
646 const int result =
647 memcmp(data.data(), expected_entry.data(), expected_entry.size());
648 EXPECT_EQ(0, result);
649 ctx.entry_count++;
650 };
651
652 EXPECT_EQ(
653 OkStatus(),
654 multisink.UnsafeForEachEntry(cb, kExpectedEntries.size() - kStartOffset));
655 EXPECT_EQ(kExpectedEntriesMaxEntries, entry_count);
656 }
657
658 } // namespace pw::multisink
659