• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstddef>
16 #include <cstdint>
17 
18 #include "gtest/gtest.h"
19 #include "pw_containers/vector.h"
20 #include "pw_multisink/multisink.h"
21 #include "pw_multisink/test_thread.h"
22 #include "pw_span/span.h"
23 #include "pw_string/string_builder.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread/yield.h"
26 
27 namespace pw::multisink {
28 namespace {
29 constexpr size_t kEntryBufferSize = sizeof("message 000");
30 constexpr size_t kMaxMessageCount = 250;
31 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
32 
33 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
34 
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)35 void CompareSentAndReceivedMessages(const MessageSpan& sent_messages,
36                                     const MessageSpan& received_messages) {
37   ASSERT_EQ(sent_messages.size(), received_messages.size());
38   for (size_t i = 0; i < sent_messages.size(); ++i) {
39     ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
40     EXPECT_EQ(std::string_view(sent_messages[i]),
41               std::string_view(received_messages[i]));
42   }
43 }
44 }  // namespace
45 
46 // Static message pool to avoid recreating messages for every test and avoids
47 // using std::string.
48 class MessagePool {
49  public:
Instance()50   static MessagePool& Instance() {
51     static MessagePool instance;
52     return instance;
53   }
54 
55   MessagePool(const MessagePool&) = delete;
56   MessagePool& operator=(const MessagePool&) = delete;
57   MessagePool(MessagePool&&) = delete;
58   MessagePool& operator=(MessagePool&&) = delete;
59 
GetMessages(size_t message_count) const60   MessageSpan GetMessages(size_t message_count) const {
61     PW_ASSERT(message_count <= messages_.size());
62     return MessageSpan(messages_.begin(), message_count);
63   }
64 
65  private:
MessagePool()66   MessagePool() {
67     for (size_t i = 0; i < kMaxMessageCount; ++i) {
68       messages_.emplace_back();
69       messages_.back() << "message %u" << static_cast<unsigned int>(i);
70     }
71   }
72 
73   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
74 };
75 
76 // Continuously reads logs from a multisink, using PopEntry() and stores copies
77 // of the retrieved messages for later verification. The thread stops when the
78 // the number of read messages and total drop count matches the expected count.
79 class LogPopReaderThread : public thread::ThreadCore {
80  public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)81   LogPopReaderThread(MultiSink& multisink,
82                      uint32_t expected_message_and_drop_count)
83       : multisink_(multisink),
84         total_drop_count_(0),
85         expected_message_and_drop_count_(expected_message_and_drop_count) {
86     PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
87   }
88 
drop_count()89   uint32_t drop_count() { return total_drop_count_; }
90 
received_messages()91   const MessageSpan received_messages() {
92     return MessageSpan(received_messages_.begin(), received_messages_.size());
93   }
94 
Run()95   void Run() override {
96     multisink_.AttachDrain(drain_);
97     ReadAllEntries();
98   }
99 
ReadAllEntries()100   virtual void ReadAllEntries() {
101     do {
102       uint32_t drop_count = 0;
103       uint32_t ingress_drop_count = 0;
104       const Result<ConstByteSpan> possible_entry =
105           drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
106       total_drop_count_ += drop_count + ingress_drop_count;
107       if (possible_entry.status().IsOutOfRange()) {
108         pw::this_thread::yield();
109         continue;
110       }
111       ASSERT_EQ(possible_entry.status(), OkStatus());
112       if (received_messages_.full()) {
113         return;
114       }
115       received_messages_.emplace_back();
116       received_messages_.back() << std::string_view(
117           reinterpret_cast<const char*>(possible_entry.value().data()),
118           possible_entry.value().size());
119       pw::this_thread::yield();
120     } while (total_drop_count_ + received_messages_.size() <
121              expected_message_and_drop_count_);
122   }
123 
124  protected:
125   MultiSink::Drain drain_;
126   MultiSink& multisink_;
127   std::array<std::byte, kEntryBufferSize> entry_buffer_;
128   uint32_t total_drop_count_;
129   const uint32_t expected_message_and_drop_count_;
130   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
131 };
132 
133 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
134  public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)135   LogPeekAndCommitReaderThread(MultiSink& multisink,
136                                uint32_t expected_message_and_drop_count)
137       : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
138 
ReadAllEntries()139   void ReadAllEntries() override {
140     do {
141       uint32_t drop_count = 0;
142       uint32_t ingress_drop_count = 0;
143       const Result<MultiSink::Drain::PeekedEntry> possible_entry =
144           drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
145       total_drop_count_ += drop_count + ingress_drop_count;
146       if (possible_entry.status().IsOutOfRange()) {
147         pw::this_thread::yield();
148         continue;
149       }
150       ASSERT_EQ(possible_entry.status(), OkStatus());
151       if (received_messages_.full()) {
152         return;
153       }
154       pw::this_thread::yield();
155       received_messages_.emplace_back();
156       received_messages_.back() << std::string_view(
157           reinterpret_cast<const char*>(possible_entry.value().entry().data()),
158           possible_entry.value().entry().size());
159       ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
160       pw::this_thread::yield();
161     } while (total_drop_count_ + received_messages_.size() <
162              expected_message_and_drop_count_);
163   }
164 };
165 
166 // Adds the provided messages to the shared multisink.
167 class LogWriterThread : public thread::ThreadCore {
168  public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)169   LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
170       : multisink_(multisink), message_stack_(message_stack) {}
171 
Run()172   void Run() override {
173     for (const auto& message : message_stack_) {
174       multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
175       pw::this_thread::yield();
176     }
177   }
178 
179  private:
180   MultiSink& multisink_;
181   const MessageSpan& message_stack_;
182 };
183 
184 class MultiSinkTest : public ::testing::Test {
185  protected:
MultiSinkTest()186   MultiSinkTest() : multisink_(buffer_) {}
187 
188   std::byte buffer_[kBufferSize];
189   MultiSink multisink_;
190 
191  private:
192 };
193 
TEST_F(MultiSinkTest,SingleWriterSingleReader)194 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
195   const uint32_t log_count = 100;
196   const uint32_t drop_count = 5;
197   const uint32_t expected_message_and_drop_count = log_count + drop_count;
198   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
199 
200   // Start reader thread.
201   LogPopReaderThread reader_thread_core(multisink_,
202                                         expected_message_and_drop_count);
203   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
204                                reader_thread_core);
205   // Start writer thread.
206   LogWriterThread writer_thread_core(multisink_, message_stack);
207   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
208                                writer_thread_core);
209 
210   // Wait for writer thread to end.
211   writer_thread.join();
212   multisink_.HandleDropped(drop_count);
213   reader_thread.join();
214 
215   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
216   CompareSentAndReceivedMessages(message_stack,
217                                  reader_thread_core.received_messages());
218 }
219 
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)220 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
221   const uint32_t log_count = 100;
222   const uint32_t drop_count = 5;
223   const uint32_t expected_message_and_drop_count = log_count + drop_count;
224   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
225 
226   // Start reader thread.
227   LogPeekAndCommitReaderThread reader_thread_core(
228       multisink_, expected_message_and_drop_count);
229   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
230                                reader_thread_core);
231   // Start writer thread.
232   LogWriterThread writer_thread_core(multisink_, message_stack);
233   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
234                                writer_thread_core);
235 
236   // Wait for writer thread to end.
237   writer_thread.join();
238   multisink_.HandleDropped(drop_count);
239   reader_thread.join();
240 
241   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
242   CompareSentAndReceivedMessages(message_stack,
243                                  reader_thread_core.received_messages());
244 }
245 
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)246 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
247   const uint32_t log_count = 100;
248   const uint32_t drop_count = 5;
249   const uint32_t expected_message_and_drop_count = log_count + drop_count;
250   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
251 
252   // Start reader threads.
253   LogPopReaderThread reader_thread_core1(multisink_,
254                                          expected_message_and_drop_count);
255   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
256                                 reader_thread_core1);
257   LogPopReaderThread reader_thread_core2(multisink_,
258                                          expected_message_and_drop_count);
259   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
260                                 reader_thread_core2);
261   LogPeekAndCommitReaderThread reader_thread_core3(
262       multisink_, expected_message_and_drop_count);
263   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
264                                 reader_thread_core3);
265   // Start writer thread.
266   LogWriterThread writer_thread_core(multisink_, message_stack);
267   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
268                                writer_thread_core);
269 
270   // Wait for writer thread to end.
271   writer_thread.join();
272   multisink_.HandleDropped(drop_count);
273   reader_thread1.join();
274   reader_thread2.join();
275   reader_thread3.join();
276 
277   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
278   CompareSentAndReceivedMessages(message_stack,
279                                  reader_thread_core1.received_messages());
280   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
281   CompareSentAndReceivedMessages(message_stack,
282                                  reader_thread_core2.received_messages());
283   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
284   CompareSentAndReceivedMessages(message_stack,
285                                  reader_thread_core3.received_messages());
286 }
287 
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)288 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
289   const uint32_t log_count = 100;
290   const uint32_t drop_count = 7;
291   const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
292   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
293 
294   // Start reader threads.
295   LogPopReaderThread reader_thread_core1(multisink_,
296                                          expected_message_and_drop_count);
297   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
298                                 reader_thread_core1);
299   LogPopReaderThread reader_thread_core2(multisink_,
300                                          expected_message_and_drop_count);
301   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
302                                 reader_thread_core2);
303   LogPeekAndCommitReaderThread reader_thread_core3(
304       multisink_, expected_message_and_drop_count);
305   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
306                                 reader_thread_core3);
307   // Start writer threads.
308   LogWriterThread writer_thread_core1(multisink_, message_stack);
309   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
310                                 writer_thread_core1);
311   LogWriterThread writer_thread_core2(multisink_, message_stack);
312   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
313                                 writer_thread_core2);
314 
315   // Wait for writer thread to end.
316   writer_thread1.join();
317   writer_thread2.join();
318   multisink_.HandleDropped(drop_count);
319   reader_thread1.join();
320   reader_thread2.join();
321   reader_thread3.join();
322 
323   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
324   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
325   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
326   // Since we don't know the order that messages came in, we can't check them.
327   EXPECT_EQ(reader_thread_core1.received_messages().size(),
328             expected_message_and_drop_count - drop_count);
329   EXPECT_EQ(reader_thread_core2.received_messages().size(),
330             expected_message_and_drop_count - drop_count);
331   EXPECT_EQ(reader_thread_core3.received_messages().size(),
332             expected_message_and_drop_count - drop_count);
333 }
334 
TEST_F(MultiSinkTest,OverflowMultisink)335 TEST_F(MultiSinkTest, OverflowMultisink) {
336   // Expect the multisink to overflow and readers to not fail when poping, or
337   // peeking and commiting entries.
338   const size_t log_count = kMaxMessageCount;
339   const size_t max_buffer_entry_count = 20;
340   std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
341   MultiSink small_multisink(small_multisink_buffer);
342 
343   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
344 
345   // Start reader threads.
346   LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
347   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
348                                 reader_thread_core1);
349   LogPopReaderThread reader_thread_core2(small_multisink, log_count);
350   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
351                                 reader_thread_core2);
352 
353   // Start writer threads.
354   LogWriterThread writer_thread_core1(small_multisink, message_stack);
355   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
356                                 writer_thread_core1);
357   LogWriterThread writer_thread_core2(small_multisink, message_stack);
358   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
359                                 writer_thread_core2);
360 
361   // Wait for writer thread to end.
362   writer_thread1.join();
363   writer_thread2.join();
364   reader_thread1.join();
365   reader_thread2.join();
366 
367   // Verifying received messages and drop message counts is unreliable as we
368   // can't control the order threads will operate.
369 }
370 
371 }  // namespace pw::multisink
372