• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstddef>
16 #include <cstdint>
17 #include <span>
18 
19 #include "gtest/gtest.h"
20 #include "pw_containers/vector.h"
21 #include "pw_multisink/multisink.h"
22 #include "pw_multisink/test_thread.h"
23 #include "pw_string/string_builder.h"
24 #include "pw_thread/thread.h"
25 #include "pw_thread/yield.h"
26 
27 namespace pw::multisink {
28 namespace {
29 constexpr size_t kEntryBufferSize = sizeof("message 000");
30 constexpr size_t kMaxMessageCount = 250;
31 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
32 
33 using MessageSpan = std::span<const StringBuffer<kEntryBufferSize>>;
34 
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)35 void CompareSentAndReceivedMessages(const MessageSpan& sent_messages,
36                                     const MessageSpan& received_messages) {
37   ASSERT_EQ(sent_messages.size(), received_messages.size());
38   for (size_t i = 0; i < sent_messages.size(); ++i) {
39     ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
40     EXPECT_EQ(std::string_view(sent_messages[i]),
41               std::string_view(received_messages[i]));
42   }
43 }
44 }  // namespace
45 
46 // Static message pool to avoid recreating messages for every test and avoids
47 // using std::string.
48 class MessagePool {
49  public:
Instance()50   static MessagePool& Instance() {
51     static MessagePool instance;
52     return instance;
53   }
54 
55   MessagePool(const MessagePool&) = delete;
56   MessagePool& operator=(const MessagePool&) = delete;
57   MessagePool(MessagePool&&) = delete;
58   MessagePool& operator=(MessagePool&&) = delete;
59 
GetMessages(size_t message_count) const60   MessageSpan GetMessages(size_t message_count) const {
61     PW_ASSERT(message_count <= messages_.size());
62     return MessageSpan(messages_.begin(), message_count);
63   }
64 
65  private:
MessagePool()66   MessagePool() {
67     for (size_t i = 0; i < kMaxMessageCount; ++i) {
68       messages_.emplace_back();
69       messages_.back() << "message %u" << static_cast<unsigned int>(i);
70     }
71   }
72 
73   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
74 };
75 
76 // Continuously reads logs from a multisink, using PopEntry() and stores copies
77 // of the retrieved messages for later verification. The thread stops when the
78 // the number of read messages and total drop count matches the expected count.
79 class LogPopReaderThread : public thread::ThreadCore {
80  public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)81   LogPopReaderThread(MultiSink& multisink,
82                      uint32_t expected_message_and_drop_count)
83       : multisink_(multisink),
84         total_drop_count_(0),
85         expected_message_and_drop_count_(expected_message_and_drop_count) {
86     PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
87   }
88 
drop_count()89   uint32_t drop_count() { return total_drop_count_; }
90 
received_messages()91   const MessageSpan received_messages() {
92     return MessageSpan(received_messages_.begin(), received_messages_.size());
93   }
94 
Run()95   void Run() override {
96     multisink_.AttachDrain(drain_);
97     ReadAllEntries();
98   };
99 
ReadAllEntries()100   virtual void ReadAllEntries() {
101     do {
102       uint32_t drop_count = 0;
103       uint32_t ingress_drop_count = 0;
104       const Result<ConstByteSpan> possible_entry =
105           drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
106       total_drop_count_ += drop_count + ingress_drop_count;
107       if (possible_entry.status().IsOutOfRange()) {
108         pw::this_thread::yield();
109         continue;
110       }
111       ASSERT_EQ(possible_entry.status(), OkStatus());
112       if (received_messages_.full()) {
113         return;
114       }
115       received_messages_.emplace_back();
116       received_messages_.back() << std::string_view(
117           reinterpret_cast<const char*>(possible_entry.value().data()),
118           possible_entry.value().size());
119       pw::this_thread::yield();
120     } while (total_drop_count_ + received_messages_.size() <
121              expected_message_and_drop_count_);
122   }
123 
124  protected:
125   MultiSink::Drain drain_;
126   MultiSink& multisink_;
127   std::array<std::byte, kEntryBufferSize> entry_buffer_;
128   uint32_t total_drop_count_;
129   const uint32_t expected_message_and_drop_count_;
130   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
131 };
132 
133 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
134  public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)135   LogPeekAndCommitReaderThread(MultiSink& multisink,
136                                uint32_t expected_message_and_drop_count)
137       : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
138 
ReadAllEntries()139   virtual void ReadAllEntries() {
140     do {
141       uint32_t drop_count = 0;
142       uint32_t ingress_drop_count = 0;
143       const Result<MultiSink::Drain::PeekedEntry> possible_entry =
144           drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
145       total_drop_count_ += drop_count + ingress_drop_count;
146       if (possible_entry.status().IsOutOfRange()) {
147         pw::this_thread::yield();
148         continue;
149       }
150       ASSERT_EQ(possible_entry.status(), OkStatus());
151       if (received_messages_.full()) {
152         return;
153       }
154       pw::this_thread::yield();
155       received_messages_.emplace_back();
156       received_messages_.back() << std::string_view(
157           reinterpret_cast<const char*>(possible_entry.value().entry().data()),
158           possible_entry.value().entry().size());
159       ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
160       pw::this_thread::yield();
161     } while (total_drop_count_ + received_messages_.size() <
162              expected_message_and_drop_count_);
163   }
164 };
165 
166 // Adds the provided messages to the shared multisink.
167 class LogWriterThread : public thread::ThreadCore {
168  public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)169   LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
170       : multisink_(multisink), message_stack_(message_stack) {}
171 
Run()172   void Run() override {
173     for (const auto& message : message_stack_) {
174       multisink_.HandleEntry(
175           std::as_bytes(std::span(std::string_view(message))));
176       pw::this_thread::yield();
177     }
178   };
179 
180  private:
181   MultiSink& multisink_;
182   const MessageSpan& message_stack_;
183 };
184 
185 class MultiSinkTest : public ::testing::Test {
186  protected:
MultiSinkTest()187   MultiSinkTest() : multisink_(buffer_) {}
188 
189   std::byte buffer_[kBufferSize];
190   MultiSink multisink_;
191 
192  private:
193 };
194 
TEST_F(MultiSinkTest,SingleWriterSingleReader)195 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
196   const uint32_t log_count = 100;
197   const uint32_t drop_count = 5;
198   const uint32_t expected_message_and_drop_count = log_count + drop_count;
199   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
200 
201   // Start reader thread.
202   LogPopReaderThread reader_thread_core(multisink_,
203                                         expected_message_and_drop_count);
204   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
205                                reader_thread_core);
206   // Start writer thread.
207   LogWriterThread writer_thread_core(multisink_, message_stack);
208   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
209                                writer_thread_core);
210 
211   // Wait for writer thread to end.
212   writer_thread.join();
213   multisink_.HandleDropped(drop_count);
214   reader_thread.join();
215 
216   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
217   CompareSentAndReceivedMessages(message_stack,
218                                  reader_thread_core.received_messages());
219 }
220 
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)221 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
222   const uint32_t log_count = 100;
223   const uint32_t drop_count = 5;
224   const uint32_t expected_message_and_drop_count = log_count + drop_count;
225   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
226 
227   // Start reader thread.
228   LogPeekAndCommitReaderThread reader_thread_core(
229       multisink_, expected_message_and_drop_count);
230   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
231                                reader_thread_core);
232   // Start writer thread.
233   LogWriterThread writer_thread_core(multisink_, message_stack);
234   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
235                                writer_thread_core);
236 
237   // Wait for writer thread to end.
238   writer_thread.join();
239   multisink_.HandleDropped(drop_count);
240   reader_thread.join();
241 
242   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
243   CompareSentAndReceivedMessages(message_stack,
244                                  reader_thread_core.received_messages());
245 }
246 
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)247 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
248   const uint32_t log_count = 100;
249   const uint32_t drop_count = 5;
250   const uint32_t expected_message_and_drop_count = log_count + drop_count;
251   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
252 
253   // Start reader threads.
254   LogPopReaderThread reader_thread_core1(multisink_,
255                                          expected_message_and_drop_count);
256   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
257                                 reader_thread_core1);
258   LogPopReaderThread reader_thread_core2(multisink_,
259                                          expected_message_and_drop_count);
260   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
261                                 reader_thread_core2);
262   LogPeekAndCommitReaderThread reader_thread_core3(
263       multisink_, expected_message_and_drop_count);
264   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
265                                 reader_thread_core3);
266   // Start writer thread.
267   LogWriterThread writer_thread_core(multisink_, message_stack);
268   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
269                                writer_thread_core);
270 
271   // Wait for writer thread to end.
272   writer_thread.join();
273   multisink_.HandleDropped(drop_count);
274   reader_thread1.join();
275   reader_thread2.join();
276   reader_thread3.join();
277 
278   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
279   CompareSentAndReceivedMessages(message_stack,
280                                  reader_thread_core1.received_messages());
281   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
282   CompareSentAndReceivedMessages(message_stack,
283                                  reader_thread_core2.received_messages());
284   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
285   CompareSentAndReceivedMessages(message_stack,
286                                  reader_thread_core3.received_messages());
287 }
288 
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)289 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
290   const uint32_t log_count = 100;
291   const uint32_t drop_count = 7;
292   const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
293   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
294 
295   // Start reader threads.
296   LogPopReaderThread reader_thread_core1(multisink_,
297                                          expected_message_and_drop_count);
298   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
299                                 reader_thread_core1);
300   LogPopReaderThread reader_thread_core2(multisink_,
301                                          expected_message_and_drop_count);
302   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
303                                 reader_thread_core2);
304   LogPeekAndCommitReaderThread reader_thread_core3(
305       multisink_, expected_message_and_drop_count);
306   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
307                                 reader_thread_core3);
308   // Start writer threads.
309   LogWriterThread writer_thread_core1(multisink_, message_stack);
310   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
311                                 writer_thread_core1);
312   LogWriterThread writer_thread_core2(multisink_, message_stack);
313   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
314                                 writer_thread_core2);
315 
316   // Wait for writer thread to end.
317   writer_thread1.join();
318   writer_thread2.join();
319   multisink_.HandleDropped(drop_count);
320   reader_thread1.join();
321   reader_thread2.join();
322   reader_thread3.join();
323 
324   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
325   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
326   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
327   // Since we don't know the order that messages came in, we can't check them.
328   EXPECT_EQ(reader_thread_core1.received_messages().size(),
329             expected_message_and_drop_count - drop_count);
330   EXPECT_EQ(reader_thread_core2.received_messages().size(),
331             expected_message_and_drop_count - drop_count);
332   EXPECT_EQ(reader_thread_core3.received_messages().size(),
333             expected_message_and_drop_count - drop_count);
334 }
335 
TEST_F(MultiSinkTest,OverflowMultisink)336 TEST_F(MultiSinkTest, OverflowMultisink) {
337   // Expect the multisink to overflow and readers to not fail when poping, or
338   // peeking and commiting entries.
339   const size_t log_count = kMaxMessageCount;
340   const size_t max_buffer_entry_count = 20;
341   std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
342   MultiSink small_multisink(small_multisink_buffer);
343 
344   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
345 
346   // Start reader threads.
347   LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
348   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
349                                 reader_thread_core1);
350   LogPopReaderThread reader_thread_core2(small_multisink, log_count);
351   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
352                                 reader_thread_core2);
353 
354   // Start writer threads.
355   LogWriterThread writer_thread_core1(small_multisink, message_stack);
356   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
357                                 writer_thread_core1);
358   LogWriterThread writer_thread_core2(small_multisink, message_stack);
359   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
360                                 writer_thread_core2);
361 
362   // Wait for writer thread to end.
363   writer_thread1.join();
364   writer_thread2.join();
365   reader_thread1.join();
366   reader_thread2.join();
367 
368   // Verifying received messages and drop message counts is unreliable as we
369   // can't control the order threads will operate.
370 }
371 
372 }  // namespace pw::multisink
373