1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstddef>
16 #include <cstdint>
17
18 #include "pw_containers/vector.h"
19 #include "pw_multisink/multisink.h"
20 #include "pw_multisink/test_thread.h"
21 #include "pw_span/span.h"
22 #include "pw_string/string_builder.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread/yield.h"
25 #include "pw_unit_test/framework.h"
26
27 namespace pw::multisink {
28 namespace {
29
30 constexpr size_t kEntryBufferSize = sizeof("message 000");
31 constexpr size_t kMaxMessageCount = 250;
32 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
33
34 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
35
36 // This function is unused if joining is not supported.
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)37 [[maybe_unused]] void CompareSentAndReceivedMessages(
38 const MessageSpan& sent_messages, const MessageSpan& received_messages) {
39 ASSERT_EQ(sent_messages.size(), received_messages.size());
40 for (size_t i = 0; i < sent_messages.size(); ++i) {
41 ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
42 EXPECT_EQ(std::string_view(sent_messages[i]),
43 std::string_view(received_messages[i]));
44 }
45 }
46
47 } // namespace
48
49 // Static message pool to avoid recreating messages for every test and avoids
50 // using std::string.
51 class MessagePool {
52 public:
Instance()53 static MessagePool& Instance() {
54 static MessagePool instance;
55 return instance;
56 }
57
58 MessagePool(const MessagePool&) = delete;
59 MessagePool& operator=(const MessagePool&) = delete;
60 MessagePool(MessagePool&&) = delete;
61 MessagePool& operator=(MessagePool&&) = delete;
62
GetMessages(size_t message_count) const63 MessageSpan GetMessages(size_t message_count) const {
64 PW_ASSERT(message_count <= messages_.size());
65 return MessageSpan(messages_.begin(), message_count);
66 }
67
68 private:
MessagePool()69 MessagePool() {
70 for (size_t i = 0; i < kMaxMessageCount; ++i) {
71 messages_.emplace_back();
72 messages_.back() << "message %u" << static_cast<unsigned int>(i);
73 }
74 }
75
76 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
77 };
78
79 // Continuously reads logs from a multisink, using PopEntry() and stores copies
80 // of the retrieved messages for later verification. The thread stops when the
81 // the number of read messages and total drop count matches the expected count.
82 class LogPopReaderThread : public thread::ThreadCore {
83 public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)84 LogPopReaderThread(MultiSink& multisink,
85 uint32_t expected_message_and_drop_count)
86 : multisink_(multisink),
87 total_drop_count_(0),
88 expected_message_and_drop_count_(expected_message_and_drop_count) {
89 PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
90 }
91
drop_count()92 uint32_t drop_count() { return total_drop_count_; }
93
received_messages()94 const MessageSpan received_messages() {
95 return MessageSpan(received_messages_.begin(), received_messages_.size());
96 }
97
Run()98 void Run() override {
99 multisink_.AttachDrain(drain_);
100 ReadAllEntries();
101 }
102
ReadAllEntries()103 virtual void ReadAllEntries() {
104 do {
105 uint32_t drop_count = 0;
106 uint32_t ingress_drop_count = 0;
107 const Result<ConstByteSpan> possible_entry =
108 drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109 total_drop_count_ += drop_count + ingress_drop_count;
110 if (possible_entry.status().IsOutOfRange()) {
111 pw::this_thread::yield();
112 continue;
113 }
114 ASSERT_EQ(possible_entry.status(), OkStatus());
115 if (received_messages_.full()) {
116 return;
117 }
118 received_messages_.emplace_back();
119 received_messages_.back() << std::string_view(
120 reinterpret_cast<const char*>(possible_entry.value().data()),
121 possible_entry.value().size());
122 pw::this_thread::yield();
123 } while (total_drop_count_ + received_messages_.size() <
124 expected_message_and_drop_count_);
125 }
126
127 protected:
128 MultiSink::Drain drain_;
129 MultiSink& multisink_;
130 std::array<std::byte, kEntryBufferSize> entry_buffer_;
131 uint32_t total_drop_count_;
132 const uint32_t expected_message_and_drop_count_;
133 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
134 };
135
136 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
137 public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)138 LogPeekAndCommitReaderThread(MultiSink& multisink,
139 uint32_t expected_message_and_drop_count)
140 : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
141
ReadAllEntries()142 void ReadAllEntries() override {
143 do {
144 uint32_t drop_count = 0;
145 uint32_t ingress_drop_count = 0;
146 const Result<MultiSink::Drain::PeekedEntry> possible_entry =
147 drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
148 total_drop_count_ += drop_count + ingress_drop_count;
149 if (possible_entry.status().IsOutOfRange()) {
150 pw::this_thread::yield();
151 continue;
152 }
153 ASSERT_EQ(possible_entry.status(), OkStatus());
154 if (received_messages_.full()) {
155 return;
156 }
157 pw::this_thread::yield();
158 received_messages_.emplace_back();
159 received_messages_.back() << std::string_view(
160 reinterpret_cast<const char*>(possible_entry.value().entry().data()),
161 possible_entry.value().entry().size());
162 ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
163 pw::this_thread::yield();
164 } while (total_drop_count_ + received_messages_.size() <
165 expected_message_and_drop_count_);
166 }
167 };
168
169 // Adds the provided messages to the shared multisink.
170 class LogWriterThread : public thread::ThreadCore {
171 public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)172 LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
173 : multisink_(multisink), message_stack_(message_stack) {}
174
Run()175 void Run() override {
176 for (const auto& message : message_stack_) {
177 multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
178 pw::this_thread::yield();
179 }
180 }
181
182 private:
183 MultiSink& multisink_;
184 const MessageSpan& message_stack_;
185 };
186
187 class MultiSinkTest : public ::testing::Test {
188 protected:
MultiSinkTest()189 MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
190
191 std::byte buffer_[kBufferSize];
192 MultiSink multisink_;
193 };
194
195 #if PW_THREAD_JOINING_ENABLED
196
TEST_F(MultiSinkTest,SingleWriterSingleReader)197 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
198 const uint32_t log_count = 100;
199 const uint32_t drop_count = 5;
200 const uint32_t expected_message_and_drop_count = log_count + drop_count;
201 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
202
203 // Start reader thread.
204 LogPopReaderThread reader_thread_core(multisink_,
205 expected_message_and_drop_count);
206 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
207 reader_thread_core);
208 // Start writer thread.
209 LogWriterThread writer_thread_core(multisink_, message_stack);
210 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
211 writer_thread_core);
212
213 // Wait for writer thread to end.
214 writer_thread.join();
215 multisink_.HandleDropped(drop_count);
216 reader_thread.join();
217
218 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
219 CompareSentAndReceivedMessages(message_stack,
220 reader_thread_core.received_messages());
221 }
222
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)223 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
224 const uint32_t log_count = 100;
225 const uint32_t drop_count = 5;
226 const uint32_t expected_message_and_drop_count = log_count + drop_count;
227 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
228
229 // Start reader thread.
230 LogPeekAndCommitReaderThread reader_thread_core(
231 multisink_, expected_message_and_drop_count);
232 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
233 reader_thread_core);
234 // Start writer thread.
235 LogWriterThread writer_thread_core(multisink_, message_stack);
236 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
237 writer_thread_core);
238
239 // Wait for writer thread to end.
240 writer_thread.join();
241 multisink_.HandleDropped(drop_count);
242 reader_thread.join();
243
244 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
245 CompareSentAndReceivedMessages(message_stack,
246 reader_thread_core.received_messages());
247 }
248
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)249 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
250 const uint32_t log_count = 100;
251 const uint32_t drop_count = 5;
252 const uint32_t expected_message_and_drop_count = log_count + drop_count;
253 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
254
255 // Start reader threads.
256 LogPopReaderThread reader_thread_core1(multisink_,
257 expected_message_and_drop_count);
258 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
259 reader_thread_core1);
260 LogPopReaderThread reader_thread_core2(multisink_,
261 expected_message_and_drop_count);
262 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
263 reader_thread_core2);
264 LogPeekAndCommitReaderThread reader_thread_core3(
265 multisink_, expected_message_and_drop_count);
266 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
267 reader_thread_core3);
268 // Start writer thread.
269 LogWriterThread writer_thread_core(multisink_, message_stack);
270 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
271 writer_thread_core);
272
273 // Wait for writer thread to end.
274 writer_thread.join();
275 multisink_.HandleDropped(drop_count);
276 reader_thread1.join();
277 reader_thread2.join();
278 reader_thread3.join();
279
280 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
281 CompareSentAndReceivedMessages(message_stack,
282 reader_thread_core1.received_messages());
283 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
284 CompareSentAndReceivedMessages(message_stack,
285 reader_thread_core2.received_messages());
286 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
287 CompareSentAndReceivedMessages(message_stack,
288 reader_thread_core3.received_messages());
289 }
290
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)291 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
292 const uint32_t log_count = 100;
293 const uint32_t drop_count = 7;
294 const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
295 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
296
297 // Start reader threads.
298 LogPopReaderThread reader_thread_core1(multisink_,
299 expected_message_and_drop_count);
300 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
301 reader_thread_core1);
302 LogPopReaderThread reader_thread_core2(multisink_,
303 expected_message_and_drop_count);
304 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
305 reader_thread_core2);
306 LogPeekAndCommitReaderThread reader_thread_core3(
307 multisink_, expected_message_and_drop_count);
308 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
309 reader_thread_core3);
310 // Start writer threads.
311 LogWriterThread writer_thread_core1(multisink_, message_stack);
312 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
313 writer_thread_core1);
314 LogWriterThread writer_thread_core2(multisink_, message_stack);
315 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
316 writer_thread_core2);
317
318 // Wait for writer thread to end.
319 writer_thread1.join();
320 writer_thread2.join();
321 multisink_.HandleDropped(drop_count);
322 reader_thread1.join();
323 reader_thread2.join();
324 reader_thread3.join();
325
326 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
327 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
328 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
329 // Since we don't know the order that messages came in, we can't check them.
330 EXPECT_EQ(reader_thread_core1.received_messages().size(),
331 expected_message_and_drop_count - drop_count);
332 EXPECT_EQ(reader_thread_core2.received_messages().size(),
333 expected_message_and_drop_count - drop_count);
334 EXPECT_EQ(reader_thread_core3.received_messages().size(),
335 expected_message_and_drop_count - drop_count);
336 }
337
TEST_F(MultiSinkTest,OverflowMultisink)338 TEST_F(MultiSinkTest, OverflowMultisink) {
339 // Expect the multisink to overflow and readers to not fail when poping, or
340 // peeking and commiting entries.
341 const size_t log_count = kMaxMessageCount;
342 const size_t max_buffer_entry_count = 20;
343 std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
344 MultiSink small_multisink(small_multisink_buffer);
345
346 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
347
348 // Start reader threads.
349 LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
350 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
351 reader_thread_core1);
352 LogPopReaderThread reader_thread_core2(small_multisink, log_count);
353 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
354 reader_thread_core2);
355
356 // Start writer threads.
357 LogWriterThread writer_thread_core1(small_multisink, message_stack);
358 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
359 writer_thread_core1);
360 LogWriterThread writer_thread_core2(small_multisink, message_stack);
361 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
362 writer_thread_core2);
363
364 // Wait for writer thread to end.
365 writer_thread1.join();
366 writer_thread2.join();
367 reader_thread1.join();
368 reader_thread2.join();
369
370 // Verifying received messages and drop message counts is unreliable as we
371 // can't control the order threads will operate.
372 }
373
374 #endif // PW_THREAD_JOINING_ENABLED
375
376 } // namespace pw::multisink
377