• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_log_rpc/log_service.h"
16 
17 #include <array>
18 #include <cstdint>
19 #include <limits>
20 
21 #include "gtest/gtest.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/endian.h"
24 #include "pw_containers/vector.h"
25 #include "pw_log/log.h"
26 #include "pw_log/proto/log.pwpb.h"
27 #include "pw_log/proto_utils.h"
28 #include "pw_log_rpc/log_filter.h"
29 #include "pw_log_rpc_private/test_utils.h"
30 #include "pw_log_tokenized/metadata.h"
31 #include "pw_protobuf/bytes_utils.h"
32 #include "pw_protobuf/decoder.h"
33 #include "pw_result/result.h"
34 #include "pw_rpc/channel.h"
35 #include "pw_rpc/raw/fake_channel_output.h"
36 #include "pw_rpc/raw/test_method_context.h"
37 #include "pw_sync/mutex.h"
38 
39 namespace pw::log_rpc {
40 namespace {
41 
42 using log::pw_rpc::raw::Logs;
43 
44 #define LOG_SERVICE_METHOD_CONTEXT \
45   PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 10)
46 
47 constexpr size_t kMaxMessageSize = 50;
48 constexpr size_t kMaxLogEntrySize =
49     RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
50 static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
51 constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
52 constexpr size_t kMaxDrains = 3;
53 constexpr char kMessage[] = "message";
54 // A message small enough to fit encoded in
55 // LogServiceTest::entry_encode_buffer_ but large enough to not fit in
56 // LogServiceTest::small_buffer_.
57 constexpr char kLongMessage[] =
58     "This is a long log message that will be dropped.";
59 static_assert(sizeof(kLongMessage) < kMaxMessageSize);
60 static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
61               RpcLogDrain::kMinEntryBufferSize);
62 std::array<std::byte, 1> rpc_request_buffer;
63 constexpr auto kSampleMetadata =
64     log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
65 constexpr auto kDropMessageMetadata =
66     log_tokenized::Metadata::Set<0, 0, 0, 0>();
67 constexpr int64_t kSampleTimestamp = 1000;
68 
69 // `LogServiceTest` sets up a logging environment for testing with a
70 // `MultiSink` for log entries, and multiple `RpcLogDrain`s for consuming such
71 // log entries. It includes methods to add log entries to the `MultiSink`, and
72 // buffers for encoding and retrieving log entries. Tests can choose how many
73 // entries to add to the multisink, and which drain to use.
74 class LogServiceTest : public ::testing::Test {
75  public:
LogServiceTest()76   LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
77     for (auto& drain : drain_map_.drains()) {
78       multisink_.AttachDrain(drain);
79     }
80   }
81 
AddLogEntries(size_t log_count,std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp)82   void AddLogEntries(size_t log_count,
83                      std::string_view message,
84                      log_tokenized::Metadata metadata,
85                      int64_t timestamp) {
86     for (size_t i = 0; i < log_count; ++i) {
87       ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok());
88     }
89   }
90 
AddLogEntry(std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp)91   StatusWithSize AddLogEntry(std::string_view message,
92                              log_tokenized::Metadata metadata,
93                              int64_t timestamp) {
94     Result<ConstByteSpan> encoded_log_result =
95         log::EncodeTokenizedLog(metadata,
96                                 std::as_bytes(std::span(message)),
97                                 timestamp,
98                                 /*thread_name=*/{},
99                                 entry_encode_buffer_);
100     PW_TRY_WITH_SIZE(encoded_log_result.status());
101     multisink_.HandleEntry(encoded_log_result.value());
102     return StatusWithSize(encoded_log_result.value().size());
103   }
104 
105  protected:
106   std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_;
107   multisink::MultiSink multisink_;
108   RpcLogDrainMap drain_map_;
109   std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
110   static constexpr size_t kMaxFilterRules = 3;
111   std::array<Filter::Rule, kMaxFilterRules> rules1_;
112   std::array<Filter::Rule, kMaxFilterRules> rules2_;
113   std::array<Filter::Rule, kMaxFilterRules> rules3_;
114   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
115       std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
116   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
117       std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
118   static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
119       std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
120   std::array<Filter, kMaxDrains> filters_ = {
121       Filter(filter_id1_, rules1_),
122       Filter(filter_id2_, rules2_),
123       Filter(filter_id3_, rules3_),
124   };
125 
126   // Drain Buffers
127   std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
128   std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
129   std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
130   static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
131   static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
132   static constexpr uint32_t kSmallBufferDrainId = 3;
133   sync::Mutex shared_mutex_;
134   std::array<RpcLogDrain, kMaxDrains> drains_{
135       RpcLogDrain(kIgnoreWriterErrorsDrainId,
136                   drain_buffer1_,
137                   shared_mutex_,
138                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
139                   &filters_[0]),
140       RpcLogDrain(kCloseWriterOnErrorDrainId,
141                   drain_buffer2_,
142                   shared_mutex_,
143                   RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
144                   &filters_[1]),
145       RpcLogDrain(kSmallBufferDrainId,
146                   small_buffer_,
147                   shared_mutex_,
148                   RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
149                   &filters_[2]),
150   };
151 
152   std::array<std::byte, 128> encoding_buffer_ = {};
153 };
154 
TEST_F(LogServiceTest,AssignWriter)155 TEST_F(LogServiceTest, AssignWriter) {
156   // Drains don't have writers.
157   for (auto& drain : drain_map_.drains()) {
158     EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
159   }
160 
161   // Create context directed to drain with ID 1.
162   RpcLogDrain& active_drain = drains_[0];
163   const uint32_t drain_channel_id = active_drain.channel_id();
164   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
165   context.set_channel_id(drain_channel_id);
166 
167   // Call RPC, which sets the drain's writer.
168   context.call(rpc_request_buffer);
169   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
170 
171   // Other drains are still missing writers.
172   for (auto& drain : drain_map_.drains()) {
173     if (drain.channel_id() != drain_channel_id) {
174       EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
175     }
176   }
177 
178   // Calling an ongoing log stream must not change the active drain's
179   // writer, and the second writer must not get any responses.
180   LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
181   second_call_context.set_channel_id(drain_channel_id);
182   second_call_context.call(rpc_request_buffer);
183   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
184   ASSERT_TRUE(second_call_context.done());
185   EXPECT_EQ(second_call_context.responses().size(), 0u);
186 
187   // Setting a new writer on a closed stream is allowed.
188   ASSERT_EQ(active_drain.Close(), OkStatus());
189   LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
190   third_call_context.set_channel_id(drain_channel_id);
191   third_call_context.call(rpc_request_buffer);
192   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
193   ASSERT_FALSE(third_call_context.done());
194   EXPECT_EQ(third_call_context.responses().size(), 0u);
195   EXPECT_EQ(active_drain.Close(), OkStatus());
196 }
197 
TEST_F(LogServiceTest,StartAndEndStream)198 TEST_F(LogServiceTest, StartAndEndStream) {
199   RpcLogDrain& active_drain = drains_[2];
200   const uint32_t drain_channel_id = active_drain.channel_id();
201   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
202   context.set_channel_id(drain_channel_id);
203 
204   // Add log entries.
205   const size_t total_entries = 10;
206   AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
207 
208   // Request logs.
209   context.call(rpc_request_buffer);
210   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
211 
212   // Not done until the stream is finished.
213   ASSERT_FALSE(context.done());
214   EXPECT_EQ(OkStatus(), active_drain.Close());
215   ASSERT_TRUE(context.done());
216 
217   EXPECT_EQ(context.status(), OkStatus());
218   // There is at least 1 response with multiple log entries packed.
219   EXPECT_GE(context.responses().size(), 1u);
220 
221   // Verify data in responses.
222   Vector<TestLogEntry, total_entries> expected_messages;
223   for (size_t i = 0; i < total_entries; ++i) {
224     expected_messages.push_back({.metadata = kSampleMetadata,
225                                  .timestamp = kSampleTimestamp,
226                                  .tokenized_data = std::as_bytes(
227                                      std::span(std::string_view(kMessage)))});
228   }
229   size_t entries_found = 0;
230   uint32_t drop_count_found = 0;
231   for (auto& response : context.responses()) {
232     protobuf::Decoder entry_decoder(response);
233     VerifyLogEntries(entry_decoder,
234                      expected_messages,
235                      entries_found,
236                      entries_found,
237                      drop_count_found);
238   }
239   EXPECT_EQ(entries_found, total_entries);
240   EXPECT_EQ(drop_count_found, 0u);
241 }
242 
TEST_F(LogServiceTest,HandleDropped)243 TEST_F(LogServiceTest, HandleDropped) {
244   RpcLogDrain& active_drain = drains_[0];
245   const uint32_t drain_channel_id = active_drain.channel_id();
246   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
247   context.set_channel_id(drain_channel_id);
248 
249   // Add log entries.
250   const size_t total_entries = 5;
251   const size_t entries_before_drop = 1;
252   const uint32_t total_drop_count = 2;
253 
254   // Force a drop entry in between entries.
255   AddLogEntries(
256       entries_before_drop, kMessage, kSampleMetadata, kSampleTimestamp);
257   multisink_.HandleDropped(total_drop_count);
258   AddLogEntries(total_entries - entries_before_drop,
259                 kMessage,
260                 kSampleMetadata,
261                 kSampleTimestamp);
262 
263   // Request logs.
264   context.call(rpc_request_buffer);
265   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
266   EXPECT_EQ(OkStatus(), active_drain.Close());
267   ASSERT_EQ(context.status(), OkStatus());
268   // There is at least 1 response with multiple log entries packed.
269   ASSERT_GE(context.responses().size(), 1u);
270 
271   Vector<TestLogEntry, total_entries + 1> expected_messages;
272   size_t i = 0;
273   for (; i < entries_before_drop; ++i) {
274     expected_messages.push_back({.metadata = kSampleMetadata,
275                                  .timestamp = kSampleTimestamp,
276                                  .tokenized_data = std::as_bytes(
277                                      std::span(std::string_view(kMessage)))});
278   }
279   expected_messages.push_back(
280       {.metadata = kDropMessageMetadata,
281        .dropped = total_drop_count,
282        .tokenized_data = std::as_bytes(
283            std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
284   for (; i < total_entries; ++i) {
285     expected_messages.push_back({.metadata = kSampleMetadata,
286                                  .timestamp = kSampleTimestamp,
287                                  .tokenized_data = std::as_bytes(
288                                      std::span(std::string_view(kMessage)))});
289   }
290 
291   // Verify data in responses.
292   size_t entries_found = 0;
293   uint32_t drop_count_found = 0;
294   for (auto& response : context.responses()) {
295     protobuf::Decoder entry_decoder(response);
296     VerifyLogEntries(entry_decoder,
297                      expected_messages,
298                      entries_found,
299                      entries_found,
300                      drop_count_found);
301   }
302   EXPECT_EQ(entries_found, total_entries);
303   EXPECT_EQ(drop_count_found, total_drop_count);
304 }
305 
TEST_F(LogServiceTest,HandleDroppedBetweenFilteredOutLogs)306 TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
307   RpcLogDrain& active_drain = drains_[0];
308   const uint32_t drain_channel_id = active_drain.channel_id();
309   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
310   context.set_channel_id(drain_channel_id);
311   // Set filter to drop INFO+ and keep DEBUG logs
312   rules1_[0].action = Filter::Rule::Action::kDrop;
313   rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL;
314 
315   // Add log entries.
316   const size_t total_entries = 5;
317   const uint32_t total_drop_count = total_entries - 1;
318 
319   // Force a drop entry in between entries that will be filtered out.
320   for (size_t i = 1; i < total_entries; ++i) {
321     ASSERT_EQ(
322         OkStatus(),
323         AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
324     multisink_.HandleDropped(1);
325   }
326   // Add message that won't be filtered out.
327   constexpr auto metadata =
328       log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
329   ASSERT_EQ(OkStatus(),
330             AddLogEntry(kMessage, metadata, kSampleTimestamp).status());
331 
332   // Request logs.
333   context.call(rpc_request_buffer);
334   EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
335   EXPECT_EQ(OkStatus(), active_drain.Close());
336   ASSERT_EQ(context.status(), OkStatus());
337   // There is at least 1 response with multiple log entries packed.
338   ASSERT_GE(context.responses().size(), 1u);
339 
340   Vector<TestLogEntry, 2> expected_messages;
341   expected_messages.push_back(
342       {.metadata = kDropMessageMetadata,
343        .dropped = total_drop_count,
344        .tokenized_data = std::as_bytes(
345            std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
346   expected_messages.push_back(
347       {.metadata = metadata,
348        .timestamp = kSampleTimestamp,
349        .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
350 
351   // Verify data in responses.
352   size_t entries_found = 0;
353   uint32_t drop_count_found = 0;
354   for (auto& response : context.responses()) {
355     protobuf::Decoder entry_decoder(response);
356     VerifyLogEntries(entry_decoder,
357                      expected_messages,
358                      entries_found,
359                      entries_found,
360                      drop_count_found);
361   }
362   EXPECT_EQ(entries_found, 1u);
363   EXPECT_EQ(drop_count_found, total_drop_count);
364 }
365 
TEST_F(LogServiceTest,HandleSmallLogEntryBuffer)366 TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
367   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
368   context.set_channel_id(kSmallBufferDrainId);
369   auto small_buffer_drain =
370       drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
371   ASSERT_TRUE(small_buffer_drain.ok());
372 
373   // Add long entries that don't fit the drain's log entry buffer, except for
374   // one, since drop count messages are only sent when a log entry can be sent.
375   const size_t total_entries = 5;
376   const uint32_t total_drop_count = total_entries - 1;
377   AddLogEntries(
378       total_drop_count, kLongMessage, kSampleMetadata, kSampleTimestamp);
379   EXPECT_EQ(OkStatus(),
380             AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
381 
382   // Request logs.
383   context.call(rpc_request_buffer);
384   EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
385   EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
386   ASSERT_EQ(context.status(), OkStatus());
387   ASSERT_EQ(context.responses().size(), 1u);
388 
389   Vector<TestLogEntry, total_entries + 1> expected_messages;
390   expected_messages.push_back(
391       {.metadata = kDropMessageMetadata,
392        .dropped = total_drop_count,
393        .tokenized_data = std::as_bytes(std::span(
394            std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage)))});
395   expected_messages.push_back(
396       {.metadata = kSampleMetadata,
397        .timestamp = kSampleTimestamp,
398        .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
399 
400   // Expect one drop message with the total drop count, and the only message
401   // that fits the buffer.
402   size_t entries_found = 0;
403   uint32_t drop_count_found = 0;
404   for (auto& response : context.responses()) {
405     protobuf::Decoder entry_decoder(response);
406     VerifyLogEntries(entry_decoder,
407                      expected_messages,
408                      entries_found,
409                      entries_found,
410                      drop_count_found);
411   }
412   EXPECT_EQ(entries_found, 1u);
413   EXPECT_EQ(drop_count_found, total_drop_count);
414 }
415 
TEST_F(LogServiceTest,FlushDrainWithoutMultisink)416 TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
417   auto& detached_drain = drains_[0];
418   multisink_.DetachDrain(detached_drain);
419   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
420   context.set_channel_id(detached_drain.channel_id());
421 
422   // Add log entries.
423   const size_t total_entries = 5;
424   AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
425   // Request logs.
426   context.call(rpc_request_buffer);
427   EXPECT_EQ(detached_drain.Close(), OkStatus());
428   ASSERT_EQ(context.status(), OkStatus());
429   EXPECT_EQ(context.responses().size(), 0u);
430 }
431 
TEST_F(LogServiceTest,LargeLogEntry)432 TEST_F(LogServiceTest, LargeLogEntry) {
433   const TestLogEntry expected_entry{
434       .metadata =
435           log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
436                                        (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
437                                        (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
438                                        (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
439       .timestamp = std::numeric_limits<int64_t>::max(),
440       .tokenized_data = std::as_bytes(std::span(kMessage)),
441   };
442 
443   // Add entry to multisink.
444   log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
445   ASSERT_EQ(encoder.WriteMessage(expected_entry.tokenized_data), OkStatus());
446   ASSERT_EQ(encoder.WriteLineLevel(
447                 (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
448                 ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
449                  ~PW_LOG_LEVEL_BITMASK)),
450             OkStatus());
451   ASSERT_EQ(encoder.WriteFlags(expected_entry.metadata.flags()), OkStatus());
452   ASSERT_EQ(encoder.WriteTimestamp(expected_entry.timestamp), OkStatus());
453   const uint32_t little_endian_module = bytes::ConvertOrderTo(
454       std::endian::little, expected_entry.metadata.module());
455   ASSERT_EQ(
456       encoder.WriteModule(std::as_bytes(std::span(&little_endian_module, 1))),
457       OkStatus());
458   ASSERT_EQ(encoder.status(), OkStatus());
459   multisink_.HandleEntry(encoder);
460 
461   // Start log stream.
462   RpcLogDrain& active_drain = drains_[0];
463   const uint32_t drain_channel_id = active_drain.channel_id();
464   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
465   context.set_channel_id(drain_channel_id);
466   context.call(rpc_request_buffer);
467   ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
468   EXPECT_EQ(OkStatus(), active_drain.Close());
469   ASSERT_EQ(context.status(), OkStatus());
470   ASSERT_EQ(context.responses().size(), 1u);
471 
472   // Verify message.
473   protobuf::Decoder entries_decoder(context.responses()[0]);
474   ASSERT_TRUE(entries_decoder.Next().ok());
475   ConstByteSpan entry;
476   EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
477   protobuf::Decoder entry_decoder(entry);
478   uint32_t drop_count = 0;
479   VerifyLogEntry(entry_decoder, expected_entry, drop_count);
480   EXPECT_EQ(drop_count, 0u);
481 }
482 
TEST_F(LogServiceTest,InterruptedLogStreamSendsDropCount)483 TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
484   const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
485   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
486   ASSERT_TRUE(drain.ok());
487 
488   LogService log_service(drain_map_);
489   const size_t max_packets = 10;
490   rpc::RawFakeChannelOutput<10, 512> output;
491   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
492   rpc::Server server(std::span(&channel, 1));
493 
494   // Add as many entries needed to have multiple packets send.
495   StatusWithSize status =
496       AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
497   ASSERT_TRUE(status.ok());
498 
499   const uint32_t max_messages_per_response =
500       encoding_buffer_.size() / status.size();
501   // Send less packets than the max to avoid crashes.
502   const uint32_t packets_sent = max_packets / 2;
503   const size_t total_entries = packets_sent * max_messages_per_response;
504   const size_t max_entries = 50;
505   // Check we can test all these entries.
506   ASSERT_GE(max_entries, total_entries);
507   AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
508 
509   // Interrupt log stream with an error.
510   const uint32_t successful_packets_sent = packets_sent / 2;
511   output.set_send_status(Status::Unavailable(), successful_packets_sent);
512 
513   // Request logs.
514   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
515       server, drain_channel_id, log_service);
516   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
517   // This drain closes on errors.
518   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
519   EXPECT_TRUE(output.done());
520 
521   // Make sure not all packets were sent.
522   ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
523 
524   // Verify data in responses.
525   Vector<TestLogEntry, max_entries> expected_messages;
526   for (size_t i = 0; i < total_entries; ++i) {
527     expected_messages.push_back({.metadata = kSampleMetadata,
528                                  .timestamp = kSampleTimestamp,
529                                  .tokenized_data = std::as_bytes(
530                                      std::span(std::string_view(kMessage)))});
531   }
532   size_t entries_found = 0;
533   uint32_t drop_count_found = 0;
534   for (auto& response : output.payloads<Logs::Listen>()) {
535     protobuf::Decoder entry_decoder(response);
536     VerifyLogEntries(entry_decoder,
537                      expected_messages,
538                      entries_found,
539                      entries_found,
540                      drop_count_found);
541   }
542 
543   // Verify that not all the entries were sent.
544   EXPECT_LT(entries_found, total_entries);
545   // The drain closes on errors, thus the drop count is reported on the next
546   // call to Flush.
547   EXPECT_EQ(drop_count_found, 0u);
548 
549   // Reset channel output and resume log stream with a new writer.
550   output.clear();
551   writer = rpc::RawServerWriter::Open<Logs::Listen>(
552       server, drain_channel_id, log_service);
553   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
554   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
555 
556   // One full packet was dropped. Since all messages are the same length,
557   // there are entries_found / successful_packets_sent per packet.
558   const uint32_t total_drop_count = entries_found / successful_packets_sent;
559   Vector<TestLogEntry, max_entries> expected_messages_after_reset;
560   expected_messages_after_reset.push_back(
561       {.metadata = kDropMessageMetadata,
562        .dropped = total_drop_count,
563        .tokenized_data = std::as_bytes(
564            std::span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
565 
566   const uint32_t remaining_entries = total_entries - total_drop_count;
567   for (size_t i = 0; i < remaining_entries; ++i) {
568     expected_messages_after_reset.push_back(
569         {.metadata = kSampleMetadata,
570          .timestamp = kSampleTimestamp,
571          .tokenized_data =
572              std::as_bytes(std::span(std::string_view(kMessage)))});
573   }
574 
575   size_t entries_found_after_reset = 0;
576   for (auto& response : output.payloads<Logs::Listen>()) {
577     protobuf::Decoder entry_decoder(response);
578     uint32_t expected_sequence_id =
579         entries_found + entries_found_after_reset + total_drop_count;
580     VerifyLogEntries(entry_decoder,
581                      expected_messages_after_reset,
582                      expected_sequence_id,
583                      entries_found_after_reset,
584                      drop_count_found);
585   }
586   EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
587   EXPECT_EQ(drop_count_found, total_drop_count);
588 }
589 
TEST_F(LogServiceTest,InterruptedLogStreamIgnoresErrors)590 TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
591   const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
592   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
593   ASSERT_TRUE(drain.ok());
594 
595   LogService log_service(drain_map_);
596   const size_t max_packets = 20;
597   rpc::RawFakeChannelOutput<max_packets, 512> output;
598   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
599   rpc::Server server(std::span(&channel, 1));
600 
601   // Add as many entries needed to have multiple packets send.
602   StatusWithSize status =
603       AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
604   ASSERT_TRUE(status.ok());
605 
606   const uint32_t max_messages_per_response =
607       encoding_buffer_.size() / status.size();
608   // Send less packets than the max to avoid crashes.
609   const uint32_t packets_sent = 4;
610   const size_t total_entries = packets_sent * max_messages_per_response;
611   const size_t max_entries = 50;
612   // Check we can test all these entries.
613   ASSERT_GT(max_entries, total_entries);
614   AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
615 
616   // Interrupt log stream with an error.
617   const uint32_t error_on_packet_count = packets_sent / 2;
618   output.set_send_status(Status::Unavailable(), error_on_packet_count);
619 
620   // Request logs.
621   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
622       server, drain_channel_id, log_service);
623   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
624   // This drain ignores errors.
625   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
626   EXPECT_FALSE(output.done());
627 
628   // Make sure some packets were sent.
629   ASSERT_GT(output.payloads<Logs::Listen>().size(), 0u);
630 
631   // Verify that not all the entries were sent.
632   size_t entries_found = 0;
633   for (auto& response : output.payloads<Logs::Listen>()) {
634     protobuf::Decoder entry_decoder(response);
635     entries_found += CountLogEntries(entry_decoder);
636   }
637   ASSERT_LT(entries_found, total_entries);
638 
639   // Verify that all messages were sent.
640   const uint32_t total_drop_count = total_entries - entries_found;
641   Vector<TestLogEntry, max_entries> expected_messages;
642   for (size_t i = 0; i < entries_found; ++i) {
643     expected_messages.push_back({.metadata = kSampleMetadata,
644                                  .timestamp = kSampleTimestamp,
645                                  .tokenized_data = std::as_bytes(
646                                      std::span(std::string_view(kMessage)))});
647   }
648 
649   entries_found = 0;
650   uint32_t drop_count_found = 0;
651   uint32_t i = 0;
652   for (; i < error_on_packet_count; ++i) {
653     protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
654     VerifyLogEntries(entry_decoder,
655                      expected_messages,
656                      entries_found,
657                      entries_found,
658                      drop_count_found);
659   }
660   for (; i < output.payloads<Logs::Listen>().size(); ++i) {
661     protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
662     VerifyLogEntries(entry_decoder,
663                      expected_messages,
664                      entries_found + total_drop_count,
665                      entries_found,
666                      drop_count_found);
667   }
668   // This drain ignores errors and thus doesn't report drops on its own.
669   EXPECT_EQ(drop_count_found, 0u);
670 
671   // More calls to flush with errors will not affect this stubborn drain.
672   const size_t previous_stream_packet_count =
673       output.payloads<Logs::Listen>().size();
674   output.set_send_status(Status::Unavailable());
675   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
676   EXPECT_FALSE(output.done());
677   ASSERT_EQ(output.payloads<Logs::Listen>().size(),
678             previous_stream_packet_count);
679 
680   output.clear();
681   EXPECT_EQ(drain.value()->Close(), OkStatus());
682   EXPECT_TRUE(output.done());
683 }
684 
TEST_F(LogServiceTest,FilterLogs)685 TEST_F(LogServiceTest, FilterLogs) {
686   // Add a variety of logs.
687   const uint32_t module = 0xcafe;
688   const uint32_t flags = 0x02;
689   const uint32_t line_number = 100;
690   const auto debug_metadata = log_tokenized::Metadata::
691       Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
692   ASSERT_TRUE(AddLogEntry(kMessage, debug_metadata, kSampleTimestamp).ok());
693   const auto info_metadata = log_tokenized::Metadata::
694       Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
695   ASSERT_TRUE(AddLogEntry(kMessage, info_metadata, kSampleTimestamp).ok());
696   const auto warn_metadata = log_tokenized::Metadata::
697       Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
698   ASSERT_TRUE(AddLogEntry(kMessage, warn_metadata, kSampleTimestamp).ok());
699   const auto error_metadata = log_tokenized::Metadata::
700       Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
701   ASSERT_TRUE(AddLogEntry(kMessage, error_metadata, kSampleTimestamp).ok());
702   const auto different_flags_metadata = log_tokenized::Metadata::
703       Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
704   ASSERT_TRUE(
705       AddLogEntry(kMessage, different_flags_metadata, kSampleTimestamp).ok());
706   const auto different_module_metadata = log_tokenized::Metadata::
707       Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
708   ASSERT_TRUE(
709       AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
710 
711   Vector<TestLogEntry, 3> expected_messages{
712       {.metadata = info_metadata,
713        .timestamp = kSampleTimestamp,
714        .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
715       {.metadata = warn_metadata,
716        .timestamp = kSampleTimestamp,
717        .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
718       {.metadata = error_metadata,
719        .timestamp = kSampleTimestamp,
720        .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
721   };
722 
723   // Set up filter rules for drain at drains_[1].
724   RpcLogDrain& drain = drains_[1];
725   for (auto& rule : rules2_) {
726     rule = {};
727   }
728   const auto module_little_endian =
729       bytes::CopyInOrder<uint32_t>(std::endian::little, module);
730   rules2_[0] = {
731       .action = Filter::Rule::Action::kKeep,
732       .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
733       .any_flags_set = flags,
734       .module_equals{module_little_endian.begin(), module_little_endian.end()}};
735   rules2_[1] = {
736       .action = Filter::Rule::Action::kDrop,
737       .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
738       .any_flags_set = 0,
739       .module_equals{},
740   };
741 
742   // Request logs.
743   LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
744   context.set_channel_id(drain.channel_id());
745   context.call({});
746   ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
747 
748   size_t entries_found = 0;
749   uint32_t drop_count_found = 0;
750   for (auto& response : context.responses()) {
751     protobuf::Decoder entry_decoder(response);
752     VerifyLogEntries(entry_decoder,
753                      expected_messages,
754                      entries_found,
755                      entries_found,
756                      drop_count_found);
757   }
758   EXPECT_EQ(entries_found, 3u);
759   EXPECT_EQ(drop_count_found, 0u);
760 }
761 
TEST_F(LogServiceTest,ReopenClosedLogStreamWithAcquiredBuffer)762 TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
763   const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
764   auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
765   ASSERT_TRUE(drain.ok());
766 
767   LogService log_service(drain_map_);
768   rpc::RawFakeChannelOutput<10, 512> output;
769   rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
770   rpc::Server server(std::span(&channel, 1));
771 
772   // Request logs.
773   rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
774       server, drain_channel_id, log_service);
775   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
776   // This drain closes on errors.
777   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
778 
779   // Request log stream with a new writer.
780   writer = rpc::RawServerWriter::Open<Logs::Listen>(
781       server, drain_channel_id, log_service);
782   EXPECT_EQ(drain.value()->Open(writer), OkStatus());
783   EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
784 }
785 
786 }  // namespace
787 }  // namespace pw::log_rpc
788