1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstddef>
16 #include <cstdint>
17 #include <span>
18
19 #include "gtest/gtest.h"
20 #include "pw_containers/vector.h"
21 #include "pw_multisink/multisink.h"
22 #include "pw_multisink/test_thread.h"
23 #include "pw_string/string_builder.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread/yield.h"
26
27 namespace pw::multisink {
28 namespace {
29 constexpr size_t kEntryBufferSize = sizeof("message 000");
30 constexpr size_t kMaxMessageCount = 250;
31 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
32
33 using MessageSpan = std::span<const StringBuffer<kEntryBufferSize>>;
34
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)35 void CompareSentAndReceivedMessages(const MessageSpan& sent_messages,
36 const MessageSpan& received_messages) {
37 ASSERT_EQ(sent_messages.size(), received_messages.size());
38 for (size_t i = 0; i < sent_messages.size(); ++i) {
39 ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
40 EXPECT_EQ(std::string_view(sent_messages[i]),
41 std::string_view(received_messages[i]));
42 }
43 }
44 } // namespace
45
46 // Static message pool to avoid recreating messages for every test and avoids
47 // using std::string.
48 class MessagePool {
49 public:
Instance()50 static MessagePool& Instance() {
51 static MessagePool instance;
52 return instance;
53 }
54
55 MessagePool(const MessagePool&) = delete;
56 MessagePool& operator=(const MessagePool&) = delete;
57 MessagePool(MessagePool&&) = delete;
58 MessagePool& operator=(MessagePool&&) = delete;
59
GetMessages(size_t message_count) const60 MessageSpan GetMessages(size_t message_count) const {
61 PW_ASSERT(message_count <= messages_.size());
62 return MessageSpan(messages_.begin(), message_count);
63 }
64
65 private:
MessagePool()66 MessagePool() {
67 for (size_t i = 0; i < kMaxMessageCount; ++i) {
68 messages_.emplace_back();
69 messages_.back() << "message %u" << static_cast<unsigned int>(i);
70 }
71 }
72
73 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
74 };
75
76 // Continuously reads logs from a multisink, using PopEntry() and stores copies
77 // of the retrieved messages for later verification. The thread stops when the
78 // the number of read messages and total drop count matches the expected count.
79 class LogPopReaderThread : public thread::ThreadCore {
80 public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)81 LogPopReaderThread(MultiSink& multisink,
82 uint32_t expected_message_and_drop_count)
83 : multisink_(multisink),
84 total_drop_count_(0),
85 expected_message_and_drop_count_(expected_message_and_drop_count) {
86 PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
87 }
88
drop_count()89 uint32_t drop_count() { return total_drop_count_; }
90
received_messages()91 const MessageSpan received_messages() {
92 return MessageSpan(received_messages_.begin(), received_messages_.size());
93 }
94
Run()95 void Run() override {
96 multisink_.AttachDrain(drain_);
97 ReadAllEntries();
98 };
99
ReadAllEntries()100 virtual void ReadAllEntries() {
101 do {
102 uint32_t drop_count = 0;
103 uint32_t ingress_drop_count = 0;
104 const Result<ConstByteSpan> possible_entry =
105 drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
106 total_drop_count_ += drop_count + ingress_drop_count;
107 if (possible_entry.status().IsOutOfRange()) {
108 pw::this_thread::yield();
109 continue;
110 }
111 ASSERT_EQ(possible_entry.status(), OkStatus());
112 if (received_messages_.full()) {
113 return;
114 }
115 received_messages_.emplace_back();
116 received_messages_.back() << std::string_view(
117 reinterpret_cast<const char*>(possible_entry.value().data()),
118 possible_entry.value().size());
119 pw::this_thread::yield();
120 } while (total_drop_count_ + received_messages_.size() <
121 expected_message_and_drop_count_);
122 }
123
124 protected:
125 MultiSink::Drain drain_;
126 MultiSink& multisink_;
127 std::array<std::byte, kEntryBufferSize> entry_buffer_;
128 uint32_t total_drop_count_;
129 const uint32_t expected_message_and_drop_count_;
130 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
131 };
132
133 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
134 public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)135 LogPeekAndCommitReaderThread(MultiSink& multisink,
136 uint32_t expected_message_and_drop_count)
137 : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
138
ReadAllEntries()139 virtual void ReadAllEntries() {
140 do {
141 uint32_t drop_count = 0;
142 uint32_t ingress_drop_count = 0;
143 const Result<MultiSink::Drain::PeekedEntry> possible_entry =
144 drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
145 total_drop_count_ += drop_count + ingress_drop_count;
146 if (possible_entry.status().IsOutOfRange()) {
147 pw::this_thread::yield();
148 continue;
149 }
150 ASSERT_EQ(possible_entry.status(), OkStatus());
151 if (received_messages_.full()) {
152 return;
153 }
154 pw::this_thread::yield();
155 received_messages_.emplace_back();
156 received_messages_.back() << std::string_view(
157 reinterpret_cast<const char*>(possible_entry.value().entry().data()),
158 possible_entry.value().entry().size());
159 ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
160 pw::this_thread::yield();
161 } while (total_drop_count_ + received_messages_.size() <
162 expected_message_and_drop_count_);
163 }
164 };
165
166 // Adds the provided messages to the shared multisink.
167 class LogWriterThread : public thread::ThreadCore {
168 public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)169 LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
170 : multisink_(multisink), message_stack_(message_stack) {}
171
Run()172 void Run() override {
173 for (const auto& message : message_stack_) {
174 multisink_.HandleEntry(
175 std::as_bytes(std::span(std::string_view(message))));
176 pw::this_thread::yield();
177 }
178 };
179
180 private:
181 MultiSink& multisink_;
182 const MessageSpan& message_stack_;
183 };
184
185 class MultiSinkTest : public ::testing::Test {
186 protected:
MultiSinkTest()187 MultiSinkTest() : multisink_(buffer_) {}
188
189 std::byte buffer_[kBufferSize];
190 MultiSink multisink_;
191
192 private:
193 };
194
TEST_F(MultiSinkTest,SingleWriterSingleReader)195 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
196 const uint32_t log_count = 100;
197 const uint32_t drop_count = 5;
198 const uint32_t expected_message_and_drop_count = log_count + drop_count;
199 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
200
201 // Start reader thread.
202 LogPopReaderThread reader_thread_core(multisink_,
203 expected_message_and_drop_count);
204 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
205 reader_thread_core);
206 // Start writer thread.
207 LogWriterThread writer_thread_core(multisink_, message_stack);
208 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
209 writer_thread_core);
210
211 // Wait for writer thread to end.
212 writer_thread.join();
213 multisink_.HandleDropped(drop_count);
214 reader_thread.join();
215
216 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
217 CompareSentAndReceivedMessages(message_stack,
218 reader_thread_core.received_messages());
219 }
220
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)221 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
222 const uint32_t log_count = 100;
223 const uint32_t drop_count = 5;
224 const uint32_t expected_message_and_drop_count = log_count + drop_count;
225 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
226
227 // Start reader thread.
228 LogPeekAndCommitReaderThread reader_thread_core(
229 multisink_, expected_message_and_drop_count);
230 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
231 reader_thread_core);
232 // Start writer thread.
233 LogWriterThread writer_thread_core(multisink_, message_stack);
234 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
235 writer_thread_core);
236
237 // Wait for writer thread to end.
238 writer_thread.join();
239 multisink_.HandleDropped(drop_count);
240 reader_thread.join();
241
242 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
243 CompareSentAndReceivedMessages(message_stack,
244 reader_thread_core.received_messages());
245 }
246
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)247 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
248 const uint32_t log_count = 100;
249 const uint32_t drop_count = 5;
250 const uint32_t expected_message_and_drop_count = log_count + drop_count;
251 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
252
253 // Start reader threads.
254 LogPopReaderThread reader_thread_core1(multisink_,
255 expected_message_and_drop_count);
256 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
257 reader_thread_core1);
258 LogPopReaderThread reader_thread_core2(multisink_,
259 expected_message_and_drop_count);
260 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
261 reader_thread_core2);
262 LogPeekAndCommitReaderThread reader_thread_core3(
263 multisink_, expected_message_and_drop_count);
264 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
265 reader_thread_core3);
266 // Start writer thread.
267 LogWriterThread writer_thread_core(multisink_, message_stack);
268 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
269 writer_thread_core);
270
271 // Wait for writer thread to end.
272 writer_thread.join();
273 multisink_.HandleDropped(drop_count);
274 reader_thread1.join();
275 reader_thread2.join();
276 reader_thread3.join();
277
278 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
279 CompareSentAndReceivedMessages(message_stack,
280 reader_thread_core1.received_messages());
281 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
282 CompareSentAndReceivedMessages(message_stack,
283 reader_thread_core2.received_messages());
284 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
285 CompareSentAndReceivedMessages(message_stack,
286 reader_thread_core3.received_messages());
287 }
288
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)289 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
290 const uint32_t log_count = 100;
291 const uint32_t drop_count = 7;
292 const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
293 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
294
295 // Start reader threads.
296 LogPopReaderThread reader_thread_core1(multisink_,
297 expected_message_and_drop_count);
298 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
299 reader_thread_core1);
300 LogPopReaderThread reader_thread_core2(multisink_,
301 expected_message_and_drop_count);
302 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
303 reader_thread_core2);
304 LogPeekAndCommitReaderThread reader_thread_core3(
305 multisink_, expected_message_and_drop_count);
306 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
307 reader_thread_core3);
308 // Start writer threads.
309 LogWriterThread writer_thread_core1(multisink_, message_stack);
310 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
311 writer_thread_core1);
312 LogWriterThread writer_thread_core2(multisink_, message_stack);
313 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
314 writer_thread_core2);
315
316 // Wait for writer thread to end.
317 writer_thread1.join();
318 writer_thread2.join();
319 multisink_.HandleDropped(drop_count);
320 reader_thread1.join();
321 reader_thread2.join();
322 reader_thread3.join();
323
324 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
325 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
326 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
327 // Since we don't know the order that messages came in, we can't check them.
328 EXPECT_EQ(reader_thread_core1.received_messages().size(),
329 expected_message_and_drop_count - drop_count);
330 EXPECT_EQ(reader_thread_core2.received_messages().size(),
331 expected_message_and_drop_count - drop_count);
332 EXPECT_EQ(reader_thread_core3.received_messages().size(),
333 expected_message_and_drop_count - drop_count);
334 }
335
TEST_F(MultiSinkTest,OverflowMultisink)336 TEST_F(MultiSinkTest, OverflowMultisink) {
337 // Expect the multisink to overflow and readers to not fail when poping, or
338 // peeking and commiting entries.
339 const size_t log_count = kMaxMessageCount;
340 const size_t max_buffer_entry_count = 20;
341 std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
342 MultiSink small_multisink(small_multisink_buffer);
343
344 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
345
346 // Start reader threads.
347 LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
348 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
349 reader_thread_core1);
350 LogPopReaderThread reader_thread_core2(small_multisink, log_count);
351 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
352 reader_thread_core2);
353
354 // Start writer threads.
355 LogWriterThread writer_thread_core1(small_multisink, message_stack);
356 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
357 writer_thread_core1);
358 LogWriterThread writer_thread_core2(small_multisink, message_stack);
359 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
360 writer_thread_core2);
361
362 // Wait for writer thread to end.
363 writer_thread1.join();
364 writer_thread2.join();
365 reader_thread1.join();
366 reader_thread2.join();
367
368 // Verifying received messages and drop message counts is unreliable as we
369 // can't control the order threads will operate.
370 }
371
372 } // namespace pw::multisink
373