1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstddef>
16 #include <cstdint>
17
18 #include "gtest/gtest.h"
19 #include "pw_containers/vector.h"
20 #include "pw_multisink/multisink.h"
21 #include "pw_multisink/test_thread.h"
22 #include "pw_span/span.h"
23 #include "pw_string/string_builder.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread/yield.h"
26
27 namespace pw::multisink {
28 namespace {
29 constexpr size_t kEntryBufferSize = sizeof("message 000");
30 constexpr size_t kMaxMessageCount = 250;
31 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
32
33 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
34
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)35 void CompareSentAndReceivedMessages(const MessageSpan& sent_messages,
36 const MessageSpan& received_messages) {
37 ASSERT_EQ(sent_messages.size(), received_messages.size());
38 for (size_t i = 0; i < sent_messages.size(); ++i) {
39 ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
40 EXPECT_EQ(std::string_view(sent_messages[i]),
41 std::string_view(received_messages[i]));
42 }
43 }
44 } // namespace
45
46 // Static message pool to avoid recreating messages for every test and avoids
47 // using std::string.
48 class MessagePool {
49 public:
Instance()50 static MessagePool& Instance() {
51 static MessagePool instance;
52 return instance;
53 }
54
55 MessagePool(const MessagePool&) = delete;
56 MessagePool& operator=(const MessagePool&) = delete;
57 MessagePool(MessagePool&&) = delete;
58 MessagePool& operator=(MessagePool&&) = delete;
59
GetMessages(size_t message_count) const60 MessageSpan GetMessages(size_t message_count) const {
61 PW_ASSERT(message_count <= messages_.size());
62 return MessageSpan(messages_.begin(), message_count);
63 }
64
65 private:
MessagePool()66 MessagePool() {
67 for (size_t i = 0; i < kMaxMessageCount; ++i) {
68 messages_.emplace_back();
69 messages_.back() << "message %u" << static_cast<unsigned int>(i);
70 }
71 }
72
73 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
74 };
75
76 // Continuously reads logs from a multisink, using PopEntry() and stores copies
77 // of the retrieved messages for later verification. The thread stops when the
78 // the number of read messages and total drop count matches the expected count.
79 class LogPopReaderThread : public thread::ThreadCore {
80 public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)81 LogPopReaderThread(MultiSink& multisink,
82 uint32_t expected_message_and_drop_count)
83 : multisink_(multisink),
84 total_drop_count_(0),
85 expected_message_and_drop_count_(expected_message_and_drop_count) {
86 PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
87 }
88
drop_count()89 uint32_t drop_count() { return total_drop_count_; }
90
received_messages()91 const MessageSpan received_messages() {
92 return MessageSpan(received_messages_.begin(), received_messages_.size());
93 }
94
Run()95 void Run() override {
96 multisink_.AttachDrain(drain_);
97 ReadAllEntries();
98 }
99
ReadAllEntries()100 virtual void ReadAllEntries() {
101 do {
102 uint32_t drop_count = 0;
103 uint32_t ingress_drop_count = 0;
104 const Result<ConstByteSpan> possible_entry =
105 drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
106 total_drop_count_ += drop_count + ingress_drop_count;
107 if (possible_entry.status().IsOutOfRange()) {
108 pw::this_thread::yield();
109 continue;
110 }
111 ASSERT_EQ(possible_entry.status(), OkStatus());
112 if (received_messages_.full()) {
113 return;
114 }
115 received_messages_.emplace_back();
116 received_messages_.back() << std::string_view(
117 reinterpret_cast<const char*>(possible_entry.value().data()),
118 possible_entry.value().size());
119 pw::this_thread::yield();
120 } while (total_drop_count_ + received_messages_.size() <
121 expected_message_and_drop_count_);
122 }
123
124 protected:
125 MultiSink::Drain drain_;
126 MultiSink& multisink_;
127 std::array<std::byte, kEntryBufferSize> entry_buffer_;
128 uint32_t total_drop_count_;
129 const uint32_t expected_message_and_drop_count_;
130 Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
131 };
132
133 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
134 public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)135 LogPeekAndCommitReaderThread(MultiSink& multisink,
136 uint32_t expected_message_and_drop_count)
137 : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
138
ReadAllEntries()139 void ReadAllEntries() override {
140 do {
141 uint32_t drop_count = 0;
142 uint32_t ingress_drop_count = 0;
143 const Result<MultiSink::Drain::PeekedEntry> possible_entry =
144 drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
145 total_drop_count_ += drop_count + ingress_drop_count;
146 if (possible_entry.status().IsOutOfRange()) {
147 pw::this_thread::yield();
148 continue;
149 }
150 ASSERT_EQ(possible_entry.status(), OkStatus());
151 if (received_messages_.full()) {
152 return;
153 }
154 pw::this_thread::yield();
155 received_messages_.emplace_back();
156 received_messages_.back() << std::string_view(
157 reinterpret_cast<const char*>(possible_entry.value().entry().data()),
158 possible_entry.value().entry().size());
159 ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
160 pw::this_thread::yield();
161 } while (total_drop_count_ + received_messages_.size() <
162 expected_message_and_drop_count_);
163 }
164 };
165
166 // Adds the provided messages to the shared multisink.
167 class LogWriterThread : public thread::ThreadCore {
168 public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)169 LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
170 : multisink_(multisink), message_stack_(message_stack) {}
171
Run()172 void Run() override {
173 for (const auto& message : message_stack_) {
174 multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
175 pw::this_thread::yield();
176 }
177 }
178
179 private:
180 MultiSink& multisink_;
181 const MessageSpan& message_stack_;
182 };
183
184 class MultiSinkTest : public ::testing::Test {
185 protected:
MultiSinkTest()186 MultiSinkTest() : multisink_(buffer_) {}
187
188 std::byte buffer_[kBufferSize];
189 MultiSink multisink_;
190
191 private:
192 };
193
TEST_F(MultiSinkTest,SingleWriterSingleReader)194 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
195 const uint32_t log_count = 100;
196 const uint32_t drop_count = 5;
197 const uint32_t expected_message_and_drop_count = log_count + drop_count;
198 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
199
200 // Start reader thread.
201 LogPopReaderThread reader_thread_core(multisink_,
202 expected_message_and_drop_count);
203 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
204 reader_thread_core);
205 // Start writer thread.
206 LogWriterThread writer_thread_core(multisink_, message_stack);
207 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
208 writer_thread_core);
209
210 // Wait for writer thread to end.
211 writer_thread.join();
212 multisink_.HandleDropped(drop_count);
213 reader_thread.join();
214
215 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
216 CompareSentAndReceivedMessages(message_stack,
217 reader_thread_core.received_messages());
218 }
219
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)220 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
221 const uint32_t log_count = 100;
222 const uint32_t drop_count = 5;
223 const uint32_t expected_message_and_drop_count = log_count + drop_count;
224 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
225
226 // Start reader thread.
227 LogPeekAndCommitReaderThread reader_thread_core(
228 multisink_, expected_message_and_drop_count);
229 thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
230 reader_thread_core);
231 // Start writer thread.
232 LogWriterThread writer_thread_core(multisink_, message_stack);
233 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
234 writer_thread_core);
235
236 // Wait for writer thread to end.
237 writer_thread.join();
238 multisink_.HandleDropped(drop_count);
239 reader_thread.join();
240
241 EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
242 CompareSentAndReceivedMessages(message_stack,
243 reader_thread_core.received_messages());
244 }
245
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)246 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
247 const uint32_t log_count = 100;
248 const uint32_t drop_count = 5;
249 const uint32_t expected_message_and_drop_count = log_count + drop_count;
250 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
251
252 // Start reader threads.
253 LogPopReaderThread reader_thread_core1(multisink_,
254 expected_message_and_drop_count);
255 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
256 reader_thread_core1);
257 LogPopReaderThread reader_thread_core2(multisink_,
258 expected_message_and_drop_count);
259 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
260 reader_thread_core2);
261 LogPeekAndCommitReaderThread reader_thread_core3(
262 multisink_, expected_message_and_drop_count);
263 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
264 reader_thread_core3);
265 // Start writer thread.
266 LogWriterThread writer_thread_core(multisink_, message_stack);
267 thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
268 writer_thread_core);
269
270 // Wait for writer thread to end.
271 writer_thread.join();
272 multisink_.HandleDropped(drop_count);
273 reader_thread1.join();
274 reader_thread2.join();
275 reader_thread3.join();
276
277 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
278 CompareSentAndReceivedMessages(message_stack,
279 reader_thread_core1.received_messages());
280 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
281 CompareSentAndReceivedMessages(message_stack,
282 reader_thread_core2.received_messages());
283 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
284 CompareSentAndReceivedMessages(message_stack,
285 reader_thread_core3.received_messages());
286 }
287
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)288 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
289 const uint32_t log_count = 100;
290 const uint32_t drop_count = 7;
291 const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
292 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
293
294 // Start reader threads.
295 LogPopReaderThread reader_thread_core1(multisink_,
296 expected_message_and_drop_count);
297 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
298 reader_thread_core1);
299 LogPopReaderThread reader_thread_core2(multisink_,
300 expected_message_and_drop_count);
301 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
302 reader_thread_core2);
303 LogPeekAndCommitReaderThread reader_thread_core3(
304 multisink_, expected_message_and_drop_count);
305 thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
306 reader_thread_core3);
307 // Start writer threads.
308 LogWriterThread writer_thread_core1(multisink_, message_stack);
309 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
310 writer_thread_core1);
311 LogWriterThread writer_thread_core2(multisink_, message_stack);
312 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
313 writer_thread_core2);
314
315 // Wait for writer thread to end.
316 writer_thread1.join();
317 writer_thread2.join();
318 multisink_.HandleDropped(drop_count);
319 reader_thread1.join();
320 reader_thread2.join();
321 reader_thread3.join();
322
323 EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
324 EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
325 EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
326 // Since we don't know the order that messages came in, we can't check them.
327 EXPECT_EQ(reader_thread_core1.received_messages().size(),
328 expected_message_and_drop_count - drop_count);
329 EXPECT_EQ(reader_thread_core2.received_messages().size(),
330 expected_message_and_drop_count - drop_count);
331 EXPECT_EQ(reader_thread_core3.received_messages().size(),
332 expected_message_and_drop_count - drop_count);
333 }
334
TEST_F(MultiSinkTest,OverflowMultisink)335 TEST_F(MultiSinkTest, OverflowMultisink) {
336 // Expect the multisink to overflow and readers to not fail when poping, or
337 // peeking and commiting entries.
338 const size_t log_count = kMaxMessageCount;
339 const size_t max_buffer_entry_count = 20;
340 std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
341 MultiSink small_multisink(small_multisink_buffer);
342
343 const auto message_stack = MessagePool::Instance().GetMessages(log_count);
344
345 // Start reader threads.
346 LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
347 thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
348 reader_thread_core1);
349 LogPopReaderThread reader_thread_core2(small_multisink, log_count);
350 thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
351 reader_thread_core2);
352
353 // Start writer threads.
354 LogWriterThread writer_thread_core1(small_multisink, message_stack);
355 thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
356 writer_thread_core1);
357 LogWriterThread writer_thread_core2(small_multisink, message_stack);
358 thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
359 writer_thread_core2);
360
361 // Wait for writer thread to end.
362 writer_thread1.join();
363 writer_thread2.join();
364 reader_thread1.join();
365 reader_thread2.join();
366
367 // Verifying received messages and drop message counts is unreliable as we
368 // can't control the order threads will operate.
369 }
370
371 } // namespace pw::multisink
372