• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_log_rpc/rpc_log_drain.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <string_view>
20 
21 #include "gtest/gtest.h"
22 #include "pw_bytes/array.h"
23 #include "pw_bytes/span.h"
24 #include "pw_log/proto/log.pwpb.h"
25 #include "pw_log/proto_utils.h"
26 #include "pw_log_rpc/log_filter.h"
27 #include "pw_log_rpc/log_service.h"
28 #include "pw_log_rpc/rpc_log_drain_map.h"
29 #include "pw_log_rpc_private/test_utils.h"
30 #include "pw_log_tokenized/metadata.h"
31 #include "pw_multisink/multisink.h"
32 #include "pw_protobuf/decoder.h"
33 #include "pw_protobuf/serialized_size.h"
34 #include "pw_rpc/channel.h"
35 #include "pw_rpc/raw/fake_channel_output.h"
36 #include "pw_rpc/raw/server_reader_writer.h"
37 #include "pw_span/span.h"
38 #include "pw_status/status.h"
39 #include "pw_sync/mutex.h"
40 
41 namespace pw::log_rpc {
42 namespace {
43 static constexpr size_t kBufferSize =
44     RpcLogDrain::kMinEntrySizeWithoutPayload + 32;
45 
TEST(RpcLogDrain,TryFlushDrainWithClosedWriter)46 TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
47   // Drain without a writer.
48   const uint32_t drain_id = 1;
49   std::array<std::byte, kBufferSize> buffer;
50   sync::Mutex mutex;
51   RpcLogDrain drain(
52       drain_id,
53       buffer,
54       mutex,
55       RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
56       nullptr);
57   EXPECT_EQ(drain.channel_id(), drain_id);
58 
59   std::byte encoding_buffer[128] = {};
60 
61   // Attach drain to a MultiSink.
62   std::array<std::byte, kBufferSize * 2> multisink_buffer;
63   multisink::MultiSink multisink(multisink_buffer);
64   multisink.AttachDrain(drain);
65   EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
66 
67   rpc::RawServerWriter writer;
68   ASSERT_FALSE(writer.active());
69   EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition());
70   EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
71 }
72 
TEST(RpcLogDrainMap,GetDrainsByIdFromDrainMap)73 TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) {
74   static constexpr size_t kMaxDrains = 3;
75   sync::Mutex mutex;
76   std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers;
77   std::array<RpcLogDrain, kMaxDrains> drains{
78       RpcLogDrain(0,
79                   buffers[0],
80                   mutex,
81                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
82                   nullptr),
83       RpcLogDrain(1,
84                   buffers[1],
85                   mutex,
86                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
87                   nullptr),
88       RpcLogDrain(2,
89                   buffers[2],
90                   mutex,
91                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
92                   nullptr),
93   };
94 
95   RpcLogDrainMap drain_map(drains);
96   for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
97     auto drain_result = drain_map.GetDrainFromChannelId(channel_id);
98     ASSERT_TRUE(drain_result.ok());
99     EXPECT_EQ(drain_result.value(), &drains[channel_id]);
100   }
101   const span<RpcLogDrain> mapped_drains = drain_map.drains();
102   ASSERT_EQ(mapped_drains.size(), kMaxDrains);
103   for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
104     EXPECT_EQ(&mapped_drains[channel_id], &drains[channel_id]);
105   }
106 }
107 
TEST(RpcLogDrain,FlushingDrainWithOpenWriter)108 TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
109   const uint32_t drain_id = 1;
110   std::array<std::byte, kBufferSize> buffer;
111   sync::Mutex mutex;
112   std::array<RpcLogDrain, 1> drains{
113       RpcLogDrain(drain_id,
114                   buffer,
115                   mutex,
116                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
117                   nullptr),
118   };
119   RpcLogDrainMap drain_map(drains);
120   LogService log_service(drain_map);
121 
122   std::byte encoding_buffer[128] = {};
123 
124   rpc::RawFakeChannelOutput<3> output;
125   rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
126   rpc::Server server(span(&channel, 1));
127 
128   // Attach drain to a MultiSink.
129   RpcLogDrain& drain = drains[0];
130   std::array<std::byte, kBufferSize * 2> multisink_buffer;
131   multisink::MultiSink multisink(multisink_buffer);
132   multisink.AttachDrain(drain);
133   EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
134 
135   rpc::RawServerWriter writer =
136       rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
137           server, drain_id, log_service);
138   ASSERT_TRUE(writer.active());
139   EXPECT_EQ(drain.Open(writer), OkStatus());
140   EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
141   // Can call multliple times until closed on error.
142   EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
143   EXPECT_EQ(drain.Close(), OkStatus());
144   rpc::RawServerWriter& writer_ref = writer;
145   ASSERT_FALSE(writer_ref.active());
146   EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
147 }
148 
TEST(RpcLogDrain,TryReopenOpenedDrain)149 TEST(RpcLogDrain, TryReopenOpenedDrain) {
150   const uint32_t drain_id = 1;
151   std::array<std::byte, kBufferSize> buffer;
152   sync::Mutex mutex;
153   std::array<RpcLogDrain, 1> drains{
154       RpcLogDrain(drain_id,
155                   buffer,
156                   mutex,
157                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
158                   nullptr),
159   };
160   RpcLogDrainMap drain_map(drains);
161   LogService log_service(drain_map);
162 
163   rpc::RawFakeChannelOutput<1> output;
164   rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
165   rpc::Server server(span(&channel, 1));
166 
167   // Open Drain and try to open with a new writer.
168   rpc::RawServerWriter writer =
169       rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
170           server, drain_id, log_service);
171   ASSERT_TRUE(writer.active());
172   RpcLogDrain& drain = drains[0];
173   EXPECT_EQ(drain.Open(writer), OkStatus());
174   rpc::RawServerWriter second_writer =
175       rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
176           server, drain_id, log_service);
177   ASSERT_FALSE(writer.active());
178   ASSERT_TRUE(second_writer.active());
179   EXPECT_EQ(drain.Open(second_writer), OkStatus());
180 }
181 
182 class TrickleTest : public ::testing::Test {
183  protected:
TrickleTest()184   TrickleTest()
185       : log_message_encode_buffer_(),
186         drain_encode_buffer_(),
187         channel_encode_buffer_(),
188         mutex_(),
189         drains_{
190             RpcLogDrain(
191                 kDrainChannelId,
192                 drain_encode_buffer_,
193                 mutex_,
194                 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
195                 nullptr),
196         },
197         multisink_buffer_(),
198         multisink_(multisink_buffer_),
199         drain_map_(drains_),
200         log_service_(drain_map_),
201         output_(),
202         channel_(rpc::Channel::Create<kDrainChannelId>(&output_)),
203         server_(span(&channel_, 1)) {}
204 
BasicLog(std::string_view message)205   TestLogEntry BasicLog(std::string_view message) {
206     return {.metadata = kSampleMetadata,
207             .timestamp = kSampleTimestamp,
208             .dropped = 0,
209             .tokenized_data = as_bytes(span<const char>(message)),
210             .thread = as_bytes(span(kSampleThreadName))};
211   }
212 
AttachDrain()213   void AttachDrain() { multisink_.AttachDrain(drains_[0]); }
OpenWriter()214   void OpenWriter() {
215     writer_ = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
216         server_, kDrainChannelId, log_service_);
217   }
218 
AddLogEntry(const TestLogEntry & entry)219   void AddLogEntry(const TestLogEntry& entry) {
220     Result<ConstByteSpan> encoded_log_result =
221         log::EncodeTokenizedLog(entry.metadata,
222                                 entry.tokenized_data,
223                                 entry.timestamp,
224                                 entry.thread,
225                                 log_message_encode_buffer_);
226     ASSERT_EQ(encoded_log_result.status(), OkStatus());
227     EXPECT_LE(encoded_log_result.value().size(), kMaxMessageSize);
228     multisink_.HandleEntry(encoded_log_result.value());
229   }
230 
AddLogEntries(const Vector<TestLogEntry> & entries)231   void AddLogEntries(const Vector<TestLogEntry>& entries) {
232     for (const TestLogEntry& entry : entries) {
233       AddLogEntry(entry);
234     }
235   }
236 
237   static constexpr uint32_t kDrainChannelId = 1;
238   static constexpr size_t kMaxMessageSize = 60;
239 
240   // Use the size of the encoded BasicLog entry to calculate buffer sizes and
241   // better control the number of entries in each sent bulk.
242   static constexpr log_tokenized::Metadata kSampleMetadata =
243       log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, 300>();
244   static constexpr uint64_t kSampleTimestamp = 9000;
245   static constexpr std::string_view kSampleThreadName = "thread";
246   static constexpr size_t kBasicLogSizeWithoutPayload =
247       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kMessage, 0) +
248       protobuf::SizeOfFieldUint32(
249           log::pwpb::LogEntry::Fields::kLineLevel,
250           log::PackLineLevel(kSampleMetadata.line_number(),
251                              kSampleMetadata.level())) +
252       protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kFlags,
253                                   kSampleMetadata.flags()) +
254       protobuf::SizeOfFieldInt64(log::pwpb::LogEntry::Fields::kTimestamp,
255                                  kSampleTimestamp) +
256       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kModule,
257                                  sizeof(kSampleMetadata.module())) +
258       protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kThread,
259                                  kSampleThreadName.size());
260   static constexpr size_t kDrainEncodeBufferSize =
261       kBasicLogSizeWithoutPayload + kMaxMessageSize;
262   static constexpr size_t kChannelEncodeBufferSize = kDrainEncodeBufferSize * 2;
263   std::array<std::byte, kMaxMessageSize> log_message_encode_buffer_;
264   std::array<std::byte, kDrainEncodeBufferSize> drain_encode_buffer_;
265   // Make actual encode buffer slightly smaller to account for RPC overhead.
266   std::array<std::byte, kChannelEncodeBufferSize - 8> channel_encode_buffer_;
267   sync::Mutex mutex_;
268   std::array<RpcLogDrain, 1> drains_;
269 
270   std::array<std::byte, kDrainEncodeBufferSize * 12> multisink_buffer_;
271   multisink::MultiSink multisink_;
272 
273   RpcLogDrainMap drain_map_;
274   LogService log_service_;
275 
276   // TODO(amontanez): Why do we need 4 packets? Three should work, but seemingly
277   // on destruction a 14-byte payload is sent out, forcing us to use max
278   // expected packet count plus one.
279   rpc::RawFakeChannelOutput<4, kDrainEncodeBufferSize * 6> output_;
280   rpc::Channel channel_;
281   rpc::Server server_;
282   rpc::RawServerWriter writer_;
283 };
284 
TEST_F(TrickleTest,EntriesAreFlushedToSinglePayload)285 TEST_F(TrickleTest, EntriesAreFlushedToSinglePayload) {
286   AttachDrain();
287   OpenWriter();
288 
289   Vector<TestLogEntry, 3> kExpectedEntries{
290       BasicLog(":D"), BasicLog("A useful log"), BasicLog("blink")};
291   AddLogEntries(kExpectedEntries);
292 
293   ASSERT_TRUE(writer_.active());
294   EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
295 
296   std::optional<chrono::SystemClock::duration> min_delay =
297       drains_[0].Trickle(channel_encode_buffer_);
298   EXPECT_EQ(min_delay.has_value(), false);
299 
300   rpc::PayloadsView payloads =
301       output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
302   EXPECT_EQ(payloads.size(), 1u);
303 
304   uint32_t drop_count = 0;
305   size_t entries_count = 0;
306   protobuf::Decoder payload_decoder(payloads[0]);
307   payload_decoder.Reset(payloads[0]);
308   VerifyLogEntries(
309       payload_decoder, kExpectedEntries, 0, entries_count, drop_count);
310   EXPECT_EQ(drop_count, 0u);
311   EXPECT_EQ(entries_count, 3u);
312 }
313 
TEST_F(TrickleTest,ManyLogsOverflowToNextPayload)314 TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) {
315   AttachDrain();
316   OpenWriter();
317 
318   Vector<TestLogEntry, 3> kFirstFlushedBundle{
319       BasicLog("Use longer logs in this test"),
320       BasicLog("My feet are cold"),
321       BasicLog("I'm hungry, what's for dinner?")};
322   Vector<TestLogEntry, 3> kSecondFlushedBundle{
323       BasicLog("Add a few longer logs"),
324       BasicLog("Eventually the logs will"),
325       BasicLog("Overflow into another payload")};
326 
327   AddLogEntries(kFirstFlushedBundle);
328   AddLogEntries(kSecondFlushedBundle);
329 
330   ASSERT_TRUE(writer_.active());
331   EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
332 
333   // A single flush should produce two payloads.
334   std::optional<chrono::SystemClock::duration> min_delay =
335       drains_[0].Trickle(channel_encode_buffer_);
336   EXPECT_EQ(min_delay.has_value(), false);
337 
338   rpc::PayloadsView payloads =
339       output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
340   ASSERT_EQ(payloads.size(), 2u);
341 
342   uint32_t drop_count = 0;
343   size_t entries_count = 0;
344   protobuf::Decoder payload_decoder(payloads[0]);
345   payload_decoder.Reset(payloads[0]);
346   VerifyLogEntries(
347       payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
348   EXPECT_EQ(drop_count, 0u);
349   EXPECT_EQ(entries_count, 3u);
350 
351   entries_count = 0;
352   payload_decoder.Reset(payloads[1]);
353   VerifyLogEntries(
354       payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
355   EXPECT_EQ(drop_count, 0u);
356   EXPECT_EQ(entries_count, 3u);
357 }
358 
TEST_F(TrickleTest,LimitedFlushOverflowsToNextPayload)359 TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) {
360   AttachDrain();
361   OpenWriter();
362 
363   Vector<TestLogEntry, 3> kFirstFlushedBundle{
364       BasicLog("Use longer logs in this test"),
365       BasicLog("My feet are cold"),
366       BasicLog("I'm hungry, what's for dinner?")};
367   Vector<TestLogEntry, 3> kSecondFlushedBundle{
368       BasicLog("Add a few longer logs"),
369       BasicLog("Eventually the logs will"),
370       BasicLog("Overflow into another payload")};
371 
372   AddLogEntries(kFirstFlushedBundle);
373 
374   // These logs will get pushed into the next payload due to overflowing max
375   // payload size.
376   AddLogEntries(kSecondFlushedBundle);
377 
378   ASSERT_TRUE(writer_.active());
379   EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
380   drains_[0].set_max_bundles_per_trickle(1);
381 
382   // A single flush should produce two payloads.
383   std::optional<chrono::SystemClock::duration> min_delay =
384       drains_[0].Trickle(channel_encode_buffer_);
385   EXPECT_EQ(min_delay.has_value(), true);
386   EXPECT_EQ(min_delay.value(), chrono::SystemClock::duration::zero());
387 
388   rpc::PayloadsView first_flush_payloads =
389       output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
390   ASSERT_EQ(first_flush_payloads.size(), 1u);
391   uint32_t drop_count = 0;
392   size_t entries_count = 0;
393   protobuf::Decoder payload_decoder(first_flush_payloads[0]);
394   payload_decoder.Reset(first_flush_payloads[0]);
395   VerifyLogEntries(
396       payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
397   EXPECT_EQ(entries_count, 3u);
398 
399   // An additional flush should produce another payload.
400   min_delay = drains_[0].Trickle(channel_encode_buffer_);
401   EXPECT_EQ(min_delay.has_value(), false);
402   drop_count = 0;
403   entries_count = 0;
404 
405   rpc::PayloadsView second_flush_payloads =
406       output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
407   ASSERT_EQ(second_flush_payloads.size(), 2u);
408   payload_decoder.Reset(second_flush_payloads[1]);
409   VerifyLogEntries(
410       payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
411   EXPECT_EQ(drop_count, 0u);
412   EXPECT_EQ(entries_count, 3u);
413 }
414 
TEST(RpcLogDrain,OnOpenCallbackCalled)415 TEST(RpcLogDrain, OnOpenCallbackCalled) {
416   // Create drain and log components.
417   const uint32_t drain_id = 1;
418   std::array<std::byte, kBufferSize> buffer;
419   sync::Mutex mutex;
420   RpcLogDrain drain(
421       drain_id,
422       buffer,
423       mutex,
424       RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
425       nullptr);
426   RpcLogDrainMap drain_map(span(&drain, 1));
427   LogService log_service(drain_map);
428   std::array<std::byte, kBufferSize * 2> multisink_buffer;
429   multisink::MultiSink multisink(multisink_buffer);
430   multisink.AttachDrain(drain);
431 
432   // Create server writer.
433   rpc::RawFakeChannelOutput<3> output;
434   rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
435   rpc::Server server(span(&channel, 1));
436   rpc::RawServerWriter writer =
437       rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
438           server, drain_id, log_service);
439 
440   int callback_call_times = 0;
441   Function<void()> callback = [&callback_call_times]() {
442     ++callback_call_times;
443   };
444 
445   // Callback not called when not set.
446   ASSERT_TRUE(writer.active());
447   ASSERT_EQ(drain.Open(writer), OkStatus());
448   EXPECT_EQ(callback_call_times, 0);
449 
450   drain.set_on_open_callback(std::move(callback));
451 
452   // Callback called when writer is open.
453   writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
454       server, drain_id, log_service);
455   ASSERT_TRUE(writer.active());
456   ASSERT_EQ(drain.Open(writer), OkStatus());
457   EXPECT_EQ(callback_call_times, 1);
458 
459   // Callback not called when writer is closed.
460   rpc::RawServerWriter closed_writer;
461   ASSERT_FALSE(closed_writer.active());
462   ASSERT_EQ(drain.Open(closed_writer), Status::FailedPrecondition());
463   EXPECT_EQ(callback_call_times, 1);
464 }
465 
466 }  // namespace
467 }  // namespace pw::log_rpc
468