• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstddef>
16 #include <cstdint>
17 
18 #include "pw_containers/vector.h"
19 #include "pw_multisink/multisink.h"
20 #include "pw_multisink/test_thread.h"
21 #include "pw_span/span.h"
22 #include "pw_string/string_builder.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread/yield.h"
25 #include "pw_unit_test/framework.h"
26 
27 namespace pw::multisink {
28 namespace {
29 
30 constexpr size_t kEntryBufferSize = sizeof("message 000");
31 constexpr size_t kMaxMessageCount = 250;
32 constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize;
33 
34 using MessageSpan = span<const StringBuffer<kEntryBufferSize>>;
35 
36 // This function is unused if joining is not supported.
CompareSentAndReceivedMessages(const MessageSpan & sent_messages,const MessageSpan & received_messages)37 [[maybe_unused]] void CompareSentAndReceivedMessages(
38     const MessageSpan& sent_messages, const MessageSpan& received_messages) {
39   ASSERT_EQ(sent_messages.size(), received_messages.size());
40   for (size_t i = 0; i < sent_messages.size(); ++i) {
41     ASSERT_EQ(sent_messages[i].size(), received_messages[i].size());
42     EXPECT_EQ(std::string_view(sent_messages[i]),
43               std::string_view(received_messages[i]));
44   }
45 }
46 
47 }  // namespace
48 
49 // Static message pool to avoid recreating messages for every test and avoids
50 // using std::string.
51 class MessagePool {
52  public:
Instance()53   static MessagePool& Instance() {
54     static MessagePool instance;
55     return instance;
56   }
57 
58   MessagePool(const MessagePool&) = delete;
59   MessagePool& operator=(const MessagePool&) = delete;
60   MessagePool(MessagePool&&) = delete;
61   MessagePool& operator=(MessagePool&&) = delete;
62 
GetMessages(size_t message_count) const63   MessageSpan GetMessages(size_t message_count) const {
64     PW_ASSERT(message_count <= messages_.size());
65     return MessageSpan(messages_.begin(), message_count);
66   }
67 
68  private:
MessagePool()69   MessagePool() {
70     for (size_t i = 0; i < kMaxMessageCount; ++i) {
71       messages_.emplace_back();
72       messages_.back() << "message %u" << static_cast<unsigned int>(i);
73     }
74   }
75 
76   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> messages_;
77 };
78 
79 // Continuously reads logs from a multisink, using PopEntry() and stores copies
80 // of the retrieved messages for later verification. The thread stops when the
81 // the number of read messages and total drop count matches the expected count.
82 class LogPopReaderThread : public thread::ThreadCore {
83  public:
LogPopReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)84   LogPopReaderThread(MultiSink& multisink,
85                      uint32_t expected_message_and_drop_count)
86       : multisink_(multisink),
87         total_drop_count_(0),
88         expected_message_and_drop_count_(expected_message_and_drop_count) {
89     PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount);
90   }
91 
drop_count()92   uint32_t drop_count() { return total_drop_count_; }
93 
received_messages()94   const MessageSpan received_messages() {
95     return MessageSpan(received_messages_.begin(), received_messages_.size());
96   }
97 
Run()98   void Run() override {
99     multisink_.AttachDrain(drain_);
100     ReadAllEntries();
101   }
102 
ReadAllEntries()103   virtual void ReadAllEntries() {
104     do {
105       uint32_t drop_count = 0;
106       uint32_t ingress_drop_count = 0;
107       const Result<ConstByteSpan> possible_entry =
108           drain_.PopEntry(entry_buffer_, drop_count, ingress_drop_count);
109       total_drop_count_ += drop_count + ingress_drop_count;
110       if (possible_entry.status().IsOutOfRange()) {
111         pw::this_thread::yield();
112         continue;
113       }
114       ASSERT_EQ(possible_entry.status(), OkStatus());
115       if (received_messages_.full()) {
116         return;
117       }
118       received_messages_.emplace_back();
119       received_messages_.back() << std::string_view(
120           reinterpret_cast<const char*>(possible_entry.value().data()),
121           possible_entry.value().size());
122       pw::this_thread::yield();
123     } while (total_drop_count_ + received_messages_.size() <
124              expected_message_and_drop_count_);
125   }
126 
127  protected:
128   MultiSink::Drain drain_;
129   MultiSink& multisink_;
130   std::array<std::byte, kEntryBufferSize> entry_buffer_;
131   uint32_t total_drop_count_;
132   const uint32_t expected_message_and_drop_count_;
133   Vector<StringBuffer<kEntryBufferSize>, kMaxMessageCount> received_messages_;
134 };
135 
136 class LogPeekAndCommitReaderThread : public LogPopReaderThread {
137  public:
LogPeekAndCommitReaderThread(MultiSink & multisink,uint32_t expected_message_and_drop_count)138   LogPeekAndCommitReaderThread(MultiSink& multisink,
139                                uint32_t expected_message_and_drop_count)
140       : LogPopReaderThread(multisink, expected_message_and_drop_count) {}
141 
ReadAllEntries()142   void ReadAllEntries() override {
143     do {
144       uint32_t drop_count = 0;
145       uint32_t ingress_drop_count = 0;
146       const Result<MultiSink::Drain::PeekedEntry> possible_entry =
147           drain_.PeekEntry(entry_buffer_, drop_count, ingress_drop_count);
148       total_drop_count_ += drop_count + ingress_drop_count;
149       if (possible_entry.status().IsOutOfRange()) {
150         pw::this_thread::yield();
151         continue;
152       }
153       ASSERT_EQ(possible_entry.status(), OkStatus());
154       if (received_messages_.full()) {
155         return;
156       }
157       pw::this_thread::yield();
158       received_messages_.emplace_back();
159       received_messages_.back() << std::string_view(
160           reinterpret_cast<const char*>(possible_entry.value().entry().data()),
161           possible_entry.value().entry().size());
162       ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus());
163       pw::this_thread::yield();
164     } while (total_drop_count_ + received_messages_.size() <
165              expected_message_and_drop_count_);
166   }
167 };
168 
169 // Adds the provided messages to the shared multisink.
170 class LogWriterThread : public thread::ThreadCore {
171  public:
LogWriterThread(MultiSink & multisink,const MessageSpan & message_stack)172   LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack)
173       : multisink_(multisink), message_stack_(message_stack) {}
174 
Run()175   void Run() override {
176     for (const auto& message : message_stack_) {
177       multisink_.HandleEntry(as_bytes(span(std::string_view(message))));
178       pw::this_thread::yield();
179     }
180   }
181 
182  private:
183   MultiSink& multisink_;
184   const MessageSpan& message_stack_;
185 };
186 
187 class MultiSinkTest : public ::testing::Test {
188  protected:
MultiSinkTest()189   MultiSinkTest() : buffer_{}, multisink_(buffer_) {}
190 
191   std::byte buffer_[kBufferSize];
192   MultiSink multisink_;
193 };
194 
195 #if PW_THREAD_JOINING_ENABLED
196 
TEST_F(MultiSinkTest,SingleWriterSingleReader)197 TEST_F(MultiSinkTest, SingleWriterSingleReader) {
198   const uint32_t log_count = 100;
199   const uint32_t drop_count = 5;
200   const uint32_t expected_message_and_drop_count = log_count + drop_count;
201   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
202 
203   // Start reader thread.
204   LogPopReaderThread reader_thread_core(multisink_,
205                                         expected_message_and_drop_count);
206   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
207                                reader_thread_core);
208   // Start writer thread.
209   LogWriterThread writer_thread_core(multisink_, message_stack);
210   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
211                                writer_thread_core);
212 
213   // Wait for writer thread to end.
214   writer_thread.join();
215   multisink_.HandleDropped(drop_count);
216   reader_thread.join();
217 
218   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
219   CompareSentAndReceivedMessages(message_stack,
220                                  reader_thread_core.received_messages());
221 }
222 
TEST_F(MultiSinkTest,SingleWriterSinglePeekAndCommitReader)223 TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) {
224   const uint32_t log_count = 100;
225   const uint32_t drop_count = 5;
226   const uint32_t expected_message_and_drop_count = log_count + drop_count;
227   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
228 
229   // Start reader thread.
230   LogPeekAndCommitReaderThread reader_thread_core(
231       multisink_, expected_message_and_drop_count);
232   thread::Thread reader_thread(test::MultiSinkTestThreadOptions(),
233                                reader_thread_core);
234   // Start writer thread.
235   LogWriterThread writer_thread_core(multisink_, message_stack);
236   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
237                                writer_thread_core);
238 
239   // Wait for writer thread to end.
240   writer_thread.join();
241   multisink_.HandleDropped(drop_count);
242   reader_thread.join();
243 
244   EXPECT_EQ(reader_thread_core.drop_count(), drop_count);
245   CompareSentAndReceivedMessages(message_stack,
246                                  reader_thread_core.received_messages());
247 }
248 
TEST_F(MultiSinkTest,SingleWriterMultipleReaders)249 TEST_F(MultiSinkTest, SingleWriterMultipleReaders) {
250   const uint32_t log_count = 100;
251   const uint32_t drop_count = 5;
252   const uint32_t expected_message_and_drop_count = log_count + drop_count;
253   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
254 
255   // Start reader threads.
256   LogPopReaderThread reader_thread_core1(multisink_,
257                                          expected_message_and_drop_count);
258   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
259                                 reader_thread_core1);
260   LogPopReaderThread reader_thread_core2(multisink_,
261                                          expected_message_and_drop_count);
262   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
263                                 reader_thread_core2);
264   LogPeekAndCommitReaderThread reader_thread_core3(
265       multisink_, expected_message_and_drop_count);
266   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
267                                 reader_thread_core3);
268   // Start writer thread.
269   LogWriterThread writer_thread_core(multisink_, message_stack);
270   thread::Thread writer_thread(test::MultiSinkTestThreadOptions(),
271                                writer_thread_core);
272 
273   // Wait for writer thread to end.
274   writer_thread.join();
275   multisink_.HandleDropped(drop_count);
276   reader_thread1.join();
277   reader_thread2.join();
278   reader_thread3.join();
279 
280   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
281   CompareSentAndReceivedMessages(message_stack,
282                                  reader_thread_core1.received_messages());
283   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
284   CompareSentAndReceivedMessages(message_stack,
285                                  reader_thread_core2.received_messages());
286   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
287   CompareSentAndReceivedMessages(message_stack,
288                                  reader_thread_core3.received_messages());
289 }
290 
TEST_F(MultiSinkTest,MultipleWritersMultipleReaders)291 TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) {
292   const uint32_t log_count = 100;
293   const uint32_t drop_count = 7;
294   const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count;
295   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
296 
297   // Start reader threads.
298   LogPopReaderThread reader_thread_core1(multisink_,
299                                          expected_message_and_drop_count);
300   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
301                                 reader_thread_core1);
302   LogPopReaderThread reader_thread_core2(multisink_,
303                                          expected_message_and_drop_count);
304   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
305                                 reader_thread_core2);
306   LogPeekAndCommitReaderThread reader_thread_core3(
307       multisink_, expected_message_and_drop_count);
308   thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(),
309                                 reader_thread_core3);
310   // Start writer threads.
311   LogWriterThread writer_thread_core1(multisink_, message_stack);
312   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
313                                 writer_thread_core1);
314   LogWriterThread writer_thread_core2(multisink_, message_stack);
315   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
316                                 writer_thread_core2);
317 
318   // Wait for writer thread to end.
319   writer_thread1.join();
320   writer_thread2.join();
321   multisink_.HandleDropped(drop_count);
322   reader_thread1.join();
323   reader_thread2.join();
324   reader_thread3.join();
325 
326   EXPECT_EQ(reader_thread_core1.drop_count(), drop_count);
327   EXPECT_EQ(reader_thread_core2.drop_count(), drop_count);
328   EXPECT_EQ(reader_thread_core3.drop_count(), drop_count);
329   // Since we don't know the order that messages came in, we can't check them.
330   EXPECT_EQ(reader_thread_core1.received_messages().size(),
331             expected_message_and_drop_count - drop_count);
332   EXPECT_EQ(reader_thread_core2.received_messages().size(),
333             expected_message_and_drop_count - drop_count);
334   EXPECT_EQ(reader_thread_core3.received_messages().size(),
335             expected_message_and_drop_count - drop_count);
336 }
337 
TEST_F(MultiSinkTest,OverflowMultisink)338 TEST_F(MultiSinkTest, OverflowMultisink) {
339   // Expect the multisink to overflow and readers to not fail when poping, or
340   // peeking and commiting entries.
341   const size_t log_count = kMaxMessageCount;
342   const size_t max_buffer_entry_count = 20;
343   std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize];
344   MultiSink small_multisink(small_multisink_buffer);
345 
346   const auto message_stack = MessagePool::Instance().GetMessages(log_count);
347 
348   // Start reader threads.
349   LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count);
350   thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(),
351                                 reader_thread_core1);
352   LogPopReaderThread reader_thread_core2(small_multisink, log_count);
353   thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(),
354                                 reader_thread_core2);
355 
356   // Start writer threads.
357   LogWriterThread writer_thread_core1(small_multisink, message_stack);
358   thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(),
359                                 writer_thread_core1);
360   LogWriterThread writer_thread_core2(small_multisink, message_stack);
361   thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(),
362                                 writer_thread_core2);
363 
364   // Wait for writer thread to end.
365   writer_thread1.join();
366   writer_thread2.join();
367   reader_thread1.join();
368   reader_thread2.join();
369 
370   // Verifying received messages and drop message counts is unreliable as we
371   // can't control the order threads will operate.
372 }
373 
374 #endif  // PW_THREAD_JOINING_ENABLED
375 
376 }  // namespace pw::multisink
377