1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/log_service.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <limits>
20
21 #include "gtest/gtest.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/endian.h"
24 #include "pw_containers/vector.h"
25 #include "pw_log/log.h"
26 #include "pw_log/proto/log.pwpb.h"
27 #include "pw_log/proto_utils.h"
28 #include "pw_log_rpc/log_filter.h"
29 #include "pw_log_rpc_private/test_utils.h"
30 #include "pw_log_tokenized/metadata.h"
31 #include "pw_protobuf/bytes_utils.h"
32 #include "pw_protobuf/decoder.h"
33 #include "pw_result/result.h"
34 #include "pw_rpc/channel.h"
35 #include "pw_rpc/raw/fake_channel_output.h"
36 #include "pw_rpc/raw/test_method_context.h"
37 #include "pw_sync/mutex.h"
38
39 namespace pw::log_rpc {
40 namespace {
41
42 using log::pw_rpc::raw::Logs;
43
44 #define LOG_SERVICE_METHOD_CONTEXT \
45 PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 10)
46
47 constexpr size_t kMaxMessageSize = 50;
48 constexpr size_t kMaxLogEntrySize =
49 RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
50 static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
51 constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
52 constexpr size_t kMaxDrains = 3;
53 constexpr char kMessage[] = "message";
54 // A message small enough to fit encoded in
55 // LogServiceTest::entry_encode_buffer_ but large enough to not fit in
56 // LogServiceTest::small_buffer_.
57 constexpr char kLongMessage[] =
58 "This is a long log message that will be dropped.";
59 static_assert(sizeof(kLongMessage) < kMaxMessageSize);
60 static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
61 RpcLogDrain::kMinEntryBufferSize);
62 std::array<std::byte, 1> rpc_request_buffer;
63 constexpr auto kSampleMetadata =
64 log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
65 constexpr auto kDropMessageMetadata =
66 log_tokenized::Metadata::Set<0, 0, 0, 0>();
67 constexpr int64_t kSampleTimestamp = 1000;
68
69 // `LogServiceTest` sets up a logging environment for testing with a
70 // `MultiSink` for log entries, and multiple `RpcLogDrain`s for consuming such
71 // log entries. It includes methods to add log entries to the `MultiSink`, and
72 // buffers for encoding and retrieving log entries. Tests can choose how many
73 // entries to add to the multisink, and which drain to use.
74 class LogServiceTest : public ::testing::Test {
75 public:
LogServiceTest()76 LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
77 for (auto& drain : drain_map_.drains()) {
78 multisink_.AttachDrain(drain);
79 }
80 }
81
AddLogEntries(size_t log_count,std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp)82 void AddLogEntries(size_t log_count,
83 std::string_view message,
84 log_tokenized::Metadata metadata,
85 int64_t timestamp) {
86 for (size_t i = 0; i < log_count; ++i) {
87 ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok());
88 }
89 }
90
AddLogEntry(std::string_view message,log_tokenized::Metadata metadata,int64_t timestamp)91 StatusWithSize AddLogEntry(std::string_view message,
92 log_tokenized::Metadata metadata,
93 int64_t timestamp) {
94 Result<ConstByteSpan> encoded_log_result =
95 log::EncodeTokenizedLog(metadata,
96 std::as_bytes(std::span(message)),
97 timestamp,
98 /*thread_name=*/{},
99 entry_encode_buffer_);
100 PW_TRY_WITH_SIZE(encoded_log_result.status());
101 multisink_.HandleEntry(encoded_log_result.value());
102 return StatusWithSize(encoded_log_result.value().size());
103 }
104
105 protected:
106 std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_;
107 multisink::MultiSink multisink_;
108 RpcLogDrainMap drain_map_;
109 std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
110 static constexpr size_t kMaxFilterRules = 3;
111 std::array<Filter::Rule, kMaxFilterRules> rules1_;
112 std::array<Filter::Rule, kMaxFilterRules> rules2_;
113 std::array<Filter::Rule, kMaxFilterRules> rules3_;
114 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
115 std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
116 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
117 std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
118 static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
119 std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
120 std::array<Filter, kMaxDrains> filters_ = {
121 Filter(filter_id1_, rules1_),
122 Filter(filter_id2_, rules2_),
123 Filter(filter_id3_, rules3_),
124 };
125
126 // Drain Buffers
127 std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
128 std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
129 std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
130 static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
131 static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
132 static constexpr uint32_t kSmallBufferDrainId = 3;
133 sync::Mutex shared_mutex_;
134 std::array<RpcLogDrain, kMaxDrains> drains_{
135 RpcLogDrain(kIgnoreWriterErrorsDrainId,
136 drain_buffer1_,
137 shared_mutex_,
138 RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
139 &filters_[0]),
140 RpcLogDrain(kCloseWriterOnErrorDrainId,
141 drain_buffer2_,
142 shared_mutex_,
143 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
144 &filters_[1]),
145 RpcLogDrain(kSmallBufferDrainId,
146 small_buffer_,
147 shared_mutex_,
148 RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
149 &filters_[2]),
150 };
151
152 std::array<std::byte, 128> encoding_buffer_ = {};
153 };
154
TEST_F(LogServiceTest,AssignWriter)155 TEST_F(LogServiceTest, AssignWriter) {
156 // Drains don't have writers.
157 for (auto& drain : drain_map_.drains()) {
158 EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
159 }
160
161 // Create context directed to drain with ID 1.
162 RpcLogDrain& active_drain = drains_[0];
163 const uint32_t drain_channel_id = active_drain.channel_id();
164 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
165 context.set_channel_id(drain_channel_id);
166
167 // Call RPC, which sets the drain's writer.
168 context.call(rpc_request_buffer);
169 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
170
171 // Other drains are still missing writers.
172 for (auto& drain : drain_map_.drains()) {
173 if (drain.channel_id() != drain_channel_id) {
174 EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
175 }
176 }
177
178 // Calling an ongoing log stream must not change the active drain's
179 // writer, and the second writer must not get any responses.
180 LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
181 second_call_context.set_channel_id(drain_channel_id);
182 second_call_context.call(rpc_request_buffer);
183 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
184 ASSERT_TRUE(second_call_context.done());
185 EXPECT_EQ(second_call_context.responses().size(), 0u);
186
187 // Setting a new writer on a closed stream is allowed.
188 ASSERT_EQ(active_drain.Close(), OkStatus());
189 LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
190 third_call_context.set_channel_id(drain_channel_id);
191 third_call_context.call(rpc_request_buffer);
192 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
193 ASSERT_FALSE(third_call_context.done());
194 EXPECT_EQ(third_call_context.responses().size(), 0u);
195 EXPECT_EQ(active_drain.Close(), OkStatus());
196 }
197
TEST_F(LogServiceTest,StartAndEndStream)198 TEST_F(LogServiceTest, StartAndEndStream) {
199 RpcLogDrain& active_drain = drains_[2];
200 const uint32_t drain_channel_id = active_drain.channel_id();
201 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
202 context.set_channel_id(drain_channel_id);
203
204 // Add log entries.
205 const size_t total_entries = 10;
206 AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
207
208 // Request logs.
209 context.call(rpc_request_buffer);
210 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
211
212 // Not done until the stream is finished.
213 ASSERT_FALSE(context.done());
214 EXPECT_EQ(OkStatus(), active_drain.Close());
215 ASSERT_TRUE(context.done());
216
217 EXPECT_EQ(context.status(), OkStatus());
218 // There is at least 1 response with multiple log entries packed.
219 EXPECT_GE(context.responses().size(), 1u);
220
221 // Verify data in responses.
222 Vector<TestLogEntry, total_entries> expected_messages;
223 for (size_t i = 0; i < total_entries; ++i) {
224 expected_messages.push_back({.metadata = kSampleMetadata,
225 .timestamp = kSampleTimestamp,
226 .tokenized_data = std::as_bytes(
227 std::span(std::string_view(kMessage)))});
228 }
229 size_t entries_found = 0;
230 uint32_t drop_count_found = 0;
231 for (auto& response : context.responses()) {
232 protobuf::Decoder entry_decoder(response);
233 VerifyLogEntries(entry_decoder,
234 expected_messages,
235 entries_found,
236 entries_found,
237 drop_count_found);
238 }
239 EXPECT_EQ(entries_found, total_entries);
240 EXPECT_EQ(drop_count_found, 0u);
241 }
242
TEST_F(LogServiceTest,HandleDropped)243 TEST_F(LogServiceTest, HandleDropped) {
244 RpcLogDrain& active_drain = drains_[0];
245 const uint32_t drain_channel_id = active_drain.channel_id();
246 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
247 context.set_channel_id(drain_channel_id);
248
249 // Add log entries.
250 const size_t total_entries = 5;
251 const size_t entries_before_drop = 1;
252 const uint32_t total_drop_count = 2;
253
254 // Force a drop entry in between entries.
255 AddLogEntries(
256 entries_before_drop, kMessage, kSampleMetadata, kSampleTimestamp);
257 multisink_.HandleDropped(total_drop_count);
258 AddLogEntries(total_entries - entries_before_drop,
259 kMessage,
260 kSampleMetadata,
261 kSampleTimestamp);
262
263 // Request logs.
264 context.call(rpc_request_buffer);
265 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
266 EXPECT_EQ(OkStatus(), active_drain.Close());
267 ASSERT_EQ(context.status(), OkStatus());
268 // There is at least 1 response with multiple log entries packed.
269 ASSERT_GE(context.responses().size(), 1u);
270
271 Vector<TestLogEntry, total_entries + 1> expected_messages;
272 size_t i = 0;
273 for (; i < entries_before_drop; ++i) {
274 expected_messages.push_back({.metadata = kSampleMetadata,
275 .timestamp = kSampleTimestamp,
276 .tokenized_data = std::as_bytes(
277 std::span(std::string_view(kMessage)))});
278 }
279 expected_messages.push_back(
280 {.metadata = kDropMessageMetadata,
281 .dropped = total_drop_count,
282 .tokenized_data = std::as_bytes(
283 std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
284 for (; i < total_entries; ++i) {
285 expected_messages.push_back({.metadata = kSampleMetadata,
286 .timestamp = kSampleTimestamp,
287 .tokenized_data = std::as_bytes(
288 std::span(std::string_view(kMessage)))});
289 }
290
291 // Verify data in responses.
292 size_t entries_found = 0;
293 uint32_t drop_count_found = 0;
294 for (auto& response : context.responses()) {
295 protobuf::Decoder entry_decoder(response);
296 VerifyLogEntries(entry_decoder,
297 expected_messages,
298 entries_found,
299 entries_found,
300 drop_count_found);
301 }
302 EXPECT_EQ(entries_found, total_entries);
303 EXPECT_EQ(drop_count_found, total_drop_count);
304 }
305
TEST_F(LogServiceTest,HandleDroppedBetweenFilteredOutLogs)306 TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
307 RpcLogDrain& active_drain = drains_[0];
308 const uint32_t drain_channel_id = active_drain.channel_id();
309 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
310 context.set_channel_id(drain_channel_id);
311 // Set filter to drop INFO+ and keep DEBUG logs
312 rules1_[0].action = Filter::Rule::Action::kDrop;
313 rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL;
314
315 // Add log entries.
316 const size_t total_entries = 5;
317 const uint32_t total_drop_count = total_entries - 1;
318
319 // Force a drop entry in between entries that will be filtered out.
320 for (size_t i = 1; i < total_entries; ++i) {
321 ASSERT_EQ(
322 OkStatus(),
323 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
324 multisink_.HandleDropped(1);
325 }
326 // Add message that won't be filtered out.
327 constexpr auto metadata =
328 log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
329 ASSERT_EQ(OkStatus(),
330 AddLogEntry(kMessage, metadata, kSampleTimestamp).status());
331
332 // Request logs.
333 context.call(rpc_request_buffer);
334 EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
335 EXPECT_EQ(OkStatus(), active_drain.Close());
336 ASSERT_EQ(context.status(), OkStatus());
337 // There is at least 1 response with multiple log entries packed.
338 ASSERT_GE(context.responses().size(), 1u);
339
340 Vector<TestLogEntry, 2> expected_messages;
341 expected_messages.push_back(
342 {.metadata = kDropMessageMetadata,
343 .dropped = total_drop_count,
344 .tokenized_data = std::as_bytes(
345 std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
346 expected_messages.push_back(
347 {.metadata = metadata,
348 .timestamp = kSampleTimestamp,
349 .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
350
351 // Verify data in responses.
352 size_t entries_found = 0;
353 uint32_t drop_count_found = 0;
354 for (auto& response : context.responses()) {
355 protobuf::Decoder entry_decoder(response);
356 VerifyLogEntries(entry_decoder,
357 expected_messages,
358 entries_found,
359 entries_found,
360 drop_count_found);
361 }
362 EXPECT_EQ(entries_found, 1u);
363 EXPECT_EQ(drop_count_found, total_drop_count);
364 }
365
TEST_F(LogServiceTest,HandleSmallLogEntryBuffer)366 TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
367 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
368 context.set_channel_id(kSmallBufferDrainId);
369 auto small_buffer_drain =
370 drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
371 ASSERT_TRUE(small_buffer_drain.ok());
372
373 // Add long entries that don't fit the drain's log entry buffer, except for
374 // one, since drop count messages are only sent when a log entry can be sent.
375 const size_t total_entries = 5;
376 const uint32_t total_drop_count = total_entries - 1;
377 AddLogEntries(
378 total_drop_count, kLongMessage, kSampleMetadata, kSampleTimestamp);
379 EXPECT_EQ(OkStatus(),
380 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
381
382 // Request logs.
383 context.call(rpc_request_buffer);
384 EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
385 EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
386 ASSERT_EQ(context.status(), OkStatus());
387 ASSERT_EQ(context.responses().size(), 1u);
388
389 Vector<TestLogEntry, total_entries + 1> expected_messages;
390 expected_messages.push_back(
391 {.metadata = kDropMessageMetadata,
392 .dropped = total_drop_count,
393 .tokenized_data = std::as_bytes(std::span(
394 std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage)))});
395 expected_messages.push_back(
396 {.metadata = kSampleMetadata,
397 .timestamp = kSampleTimestamp,
398 .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
399
400 // Expect one drop message with the total drop count, and the only message
401 // that fits the buffer.
402 size_t entries_found = 0;
403 uint32_t drop_count_found = 0;
404 for (auto& response : context.responses()) {
405 protobuf::Decoder entry_decoder(response);
406 VerifyLogEntries(entry_decoder,
407 expected_messages,
408 entries_found,
409 entries_found,
410 drop_count_found);
411 }
412 EXPECT_EQ(entries_found, 1u);
413 EXPECT_EQ(drop_count_found, total_drop_count);
414 }
415
TEST_F(LogServiceTest,FlushDrainWithoutMultisink)416 TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
417 auto& detached_drain = drains_[0];
418 multisink_.DetachDrain(detached_drain);
419 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
420 context.set_channel_id(detached_drain.channel_id());
421
422 // Add log entries.
423 const size_t total_entries = 5;
424 AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
425 // Request logs.
426 context.call(rpc_request_buffer);
427 EXPECT_EQ(detached_drain.Close(), OkStatus());
428 ASSERT_EQ(context.status(), OkStatus());
429 EXPECT_EQ(context.responses().size(), 0u);
430 }
431
TEST_F(LogServiceTest,LargeLogEntry)432 TEST_F(LogServiceTest, LargeLogEntry) {
433 const TestLogEntry expected_entry{
434 .metadata =
435 log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
436 (1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
437 (1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
438 (1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
439 .timestamp = std::numeric_limits<int64_t>::max(),
440 .tokenized_data = std::as_bytes(std::span(kMessage)),
441 };
442
443 // Add entry to multisink.
444 log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
445 ASSERT_EQ(encoder.WriteMessage(expected_entry.tokenized_data), OkStatus());
446 ASSERT_EQ(encoder.WriteLineLevel(
447 (expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
448 ((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
449 ~PW_LOG_LEVEL_BITMASK)),
450 OkStatus());
451 ASSERT_EQ(encoder.WriteFlags(expected_entry.metadata.flags()), OkStatus());
452 ASSERT_EQ(encoder.WriteTimestamp(expected_entry.timestamp), OkStatus());
453 const uint32_t little_endian_module = bytes::ConvertOrderTo(
454 std::endian::little, expected_entry.metadata.module());
455 ASSERT_EQ(
456 encoder.WriteModule(std::as_bytes(std::span(&little_endian_module, 1))),
457 OkStatus());
458 ASSERT_EQ(encoder.status(), OkStatus());
459 multisink_.HandleEntry(encoder);
460
461 // Start log stream.
462 RpcLogDrain& active_drain = drains_[0];
463 const uint32_t drain_channel_id = active_drain.channel_id();
464 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
465 context.set_channel_id(drain_channel_id);
466 context.call(rpc_request_buffer);
467 ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
468 EXPECT_EQ(OkStatus(), active_drain.Close());
469 ASSERT_EQ(context.status(), OkStatus());
470 ASSERT_EQ(context.responses().size(), 1u);
471
472 // Verify message.
473 protobuf::Decoder entries_decoder(context.responses()[0]);
474 ASSERT_TRUE(entries_decoder.Next().ok());
475 ConstByteSpan entry;
476 EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
477 protobuf::Decoder entry_decoder(entry);
478 uint32_t drop_count = 0;
479 VerifyLogEntry(entry_decoder, expected_entry, drop_count);
480 EXPECT_EQ(drop_count, 0u);
481 }
482
TEST_F(LogServiceTest,InterruptedLogStreamSendsDropCount)483 TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
484 const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
485 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
486 ASSERT_TRUE(drain.ok());
487
488 LogService log_service(drain_map_);
489 const size_t max_packets = 10;
490 rpc::RawFakeChannelOutput<10, 512> output;
491 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
492 rpc::Server server(std::span(&channel, 1));
493
494 // Add as many entries needed to have multiple packets send.
495 StatusWithSize status =
496 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
497 ASSERT_TRUE(status.ok());
498
499 const uint32_t max_messages_per_response =
500 encoding_buffer_.size() / status.size();
501 // Send less packets than the max to avoid crashes.
502 const uint32_t packets_sent = max_packets / 2;
503 const size_t total_entries = packets_sent * max_messages_per_response;
504 const size_t max_entries = 50;
505 // Check we can test all these entries.
506 ASSERT_GE(max_entries, total_entries);
507 AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
508
509 // Interrupt log stream with an error.
510 const uint32_t successful_packets_sent = packets_sent / 2;
511 output.set_send_status(Status::Unavailable(), successful_packets_sent);
512
513 // Request logs.
514 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
515 server, drain_channel_id, log_service);
516 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
517 // This drain closes on errors.
518 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
519 EXPECT_TRUE(output.done());
520
521 // Make sure not all packets were sent.
522 ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
523
524 // Verify data in responses.
525 Vector<TestLogEntry, max_entries> expected_messages;
526 for (size_t i = 0; i < total_entries; ++i) {
527 expected_messages.push_back({.metadata = kSampleMetadata,
528 .timestamp = kSampleTimestamp,
529 .tokenized_data = std::as_bytes(
530 std::span(std::string_view(kMessage)))});
531 }
532 size_t entries_found = 0;
533 uint32_t drop_count_found = 0;
534 for (auto& response : output.payloads<Logs::Listen>()) {
535 protobuf::Decoder entry_decoder(response);
536 VerifyLogEntries(entry_decoder,
537 expected_messages,
538 entries_found,
539 entries_found,
540 drop_count_found);
541 }
542
543 // Verify that not all the entries were sent.
544 EXPECT_LT(entries_found, total_entries);
545 // The drain closes on errors, thus the drop count is reported on the next
546 // call to Flush.
547 EXPECT_EQ(drop_count_found, 0u);
548
549 // Reset channel output and resume log stream with a new writer.
550 output.clear();
551 writer = rpc::RawServerWriter::Open<Logs::Listen>(
552 server, drain_channel_id, log_service);
553 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
554 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
555
556 // One full packet was dropped. Since all messages are the same length,
557 // there are entries_found / successful_packets_sent per packet.
558 const uint32_t total_drop_count = entries_found / successful_packets_sent;
559 Vector<TestLogEntry, max_entries> expected_messages_after_reset;
560 expected_messages_after_reset.push_back(
561 {.metadata = kDropMessageMetadata,
562 .dropped = total_drop_count,
563 .tokenized_data = std::as_bytes(
564 std::span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
565
566 const uint32_t remaining_entries = total_entries - total_drop_count;
567 for (size_t i = 0; i < remaining_entries; ++i) {
568 expected_messages_after_reset.push_back(
569 {.metadata = kSampleMetadata,
570 .timestamp = kSampleTimestamp,
571 .tokenized_data =
572 std::as_bytes(std::span(std::string_view(kMessage)))});
573 }
574
575 size_t entries_found_after_reset = 0;
576 for (auto& response : output.payloads<Logs::Listen>()) {
577 protobuf::Decoder entry_decoder(response);
578 uint32_t expected_sequence_id =
579 entries_found + entries_found_after_reset + total_drop_count;
580 VerifyLogEntries(entry_decoder,
581 expected_messages_after_reset,
582 expected_sequence_id,
583 entries_found_after_reset,
584 drop_count_found);
585 }
586 EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
587 EXPECT_EQ(drop_count_found, total_drop_count);
588 }
589
TEST_F(LogServiceTest,InterruptedLogStreamIgnoresErrors)590 TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
591 const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
592 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
593 ASSERT_TRUE(drain.ok());
594
595 LogService log_service(drain_map_);
596 const size_t max_packets = 20;
597 rpc::RawFakeChannelOutput<max_packets, 512> output;
598 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
599 rpc::Server server(std::span(&channel, 1));
600
601 // Add as many entries needed to have multiple packets send.
602 StatusWithSize status =
603 AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
604 ASSERT_TRUE(status.ok());
605
606 const uint32_t max_messages_per_response =
607 encoding_buffer_.size() / status.size();
608 // Send less packets than the max to avoid crashes.
609 const uint32_t packets_sent = 4;
610 const size_t total_entries = packets_sent * max_messages_per_response;
611 const size_t max_entries = 50;
612 // Check we can test all these entries.
613 ASSERT_GT(max_entries, total_entries);
614 AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
615
616 // Interrupt log stream with an error.
617 const uint32_t error_on_packet_count = packets_sent / 2;
618 output.set_send_status(Status::Unavailable(), error_on_packet_count);
619
620 // Request logs.
621 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
622 server, drain_channel_id, log_service);
623 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
624 // This drain ignores errors.
625 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
626 EXPECT_FALSE(output.done());
627
628 // Make sure some packets were sent.
629 ASSERT_GT(output.payloads<Logs::Listen>().size(), 0u);
630
631 // Verify that not all the entries were sent.
632 size_t entries_found = 0;
633 for (auto& response : output.payloads<Logs::Listen>()) {
634 protobuf::Decoder entry_decoder(response);
635 entries_found += CountLogEntries(entry_decoder);
636 }
637 ASSERT_LT(entries_found, total_entries);
638
639 // Verify that all messages were sent.
640 const uint32_t total_drop_count = total_entries - entries_found;
641 Vector<TestLogEntry, max_entries> expected_messages;
642 for (size_t i = 0; i < entries_found; ++i) {
643 expected_messages.push_back({.metadata = kSampleMetadata,
644 .timestamp = kSampleTimestamp,
645 .tokenized_data = std::as_bytes(
646 std::span(std::string_view(kMessage)))});
647 }
648
649 entries_found = 0;
650 uint32_t drop_count_found = 0;
651 uint32_t i = 0;
652 for (; i < error_on_packet_count; ++i) {
653 protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
654 VerifyLogEntries(entry_decoder,
655 expected_messages,
656 entries_found,
657 entries_found,
658 drop_count_found);
659 }
660 for (; i < output.payloads<Logs::Listen>().size(); ++i) {
661 protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
662 VerifyLogEntries(entry_decoder,
663 expected_messages,
664 entries_found + total_drop_count,
665 entries_found,
666 drop_count_found);
667 }
668 // This drain ignores errors and thus doesn't report drops on its own.
669 EXPECT_EQ(drop_count_found, 0u);
670
671 // More calls to flush with errors will not affect this stubborn drain.
672 const size_t previous_stream_packet_count =
673 output.payloads<Logs::Listen>().size();
674 output.set_send_status(Status::Unavailable());
675 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
676 EXPECT_FALSE(output.done());
677 ASSERT_EQ(output.payloads<Logs::Listen>().size(),
678 previous_stream_packet_count);
679
680 output.clear();
681 EXPECT_EQ(drain.value()->Close(), OkStatus());
682 EXPECT_TRUE(output.done());
683 }
684
TEST_F(LogServiceTest,FilterLogs)685 TEST_F(LogServiceTest, FilterLogs) {
686 // Add a variety of logs.
687 const uint32_t module = 0xcafe;
688 const uint32_t flags = 0x02;
689 const uint32_t line_number = 100;
690 const auto debug_metadata = log_tokenized::Metadata::
691 Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
692 ASSERT_TRUE(AddLogEntry(kMessage, debug_metadata, kSampleTimestamp).ok());
693 const auto info_metadata = log_tokenized::Metadata::
694 Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
695 ASSERT_TRUE(AddLogEntry(kMessage, info_metadata, kSampleTimestamp).ok());
696 const auto warn_metadata = log_tokenized::Metadata::
697 Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
698 ASSERT_TRUE(AddLogEntry(kMessage, warn_metadata, kSampleTimestamp).ok());
699 const auto error_metadata = log_tokenized::Metadata::
700 Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
701 ASSERT_TRUE(AddLogEntry(kMessage, error_metadata, kSampleTimestamp).ok());
702 const auto different_flags_metadata = log_tokenized::Metadata::
703 Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
704 ASSERT_TRUE(
705 AddLogEntry(kMessage, different_flags_metadata, kSampleTimestamp).ok());
706 const auto different_module_metadata = log_tokenized::Metadata::
707 Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
708 ASSERT_TRUE(
709 AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
710
711 Vector<TestLogEntry, 3> expected_messages{
712 {.metadata = info_metadata,
713 .timestamp = kSampleTimestamp,
714 .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
715 {.metadata = warn_metadata,
716 .timestamp = kSampleTimestamp,
717 .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
718 {.metadata = error_metadata,
719 .timestamp = kSampleTimestamp,
720 .tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
721 };
722
723 // Set up filter rules for drain at drains_[1].
724 RpcLogDrain& drain = drains_[1];
725 for (auto& rule : rules2_) {
726 rule = {};
727 }
728 const auto module_little_endian =
729 bytes::CopyInOrder<uint32_t>(std::endian::little, module);
730 rules2_[0] = {
731 .action = Filter::Rule::Action::kKeep,
732 .level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
733 .any_flags_set = flags,
734 .module_equals{module_little_endian.begin(), module_little_endian.end()}};
735 rules2_[1] = {
736 .action = Filter::Rule::Action::kDrop,
737 .level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
738 .any_flags_set = 0,
739 .module_equals{},
740 };
741
742 // Request logs.
743 LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
744 context.set_channel_id(drain.channel_id());
745 context.call({});
746 ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
747
748 size_t entries_found = 0;
749 uint32_t drop_count_found = 0;
750 for (auto& response : context.responses()) {
751 protobuf::Decoder entry_decoder(response);
752 VerifyLogEntries(entry_decoder,
753 expected_messages,
754 entries_found,
755 entries_found,
756 drop_count_found);
757 }
758 EXPECT_EQ(entries_found, 3u);
759 EXPECT_EQ(drop_count_found, 0u);
760 }
761
TEST_F(LogServiceTest,ReopenClosedLogStreamWithAcquiredBuffer)762 TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
763 const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
764 auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
765 ASSERT_TRUE(drain.ok());
766
767 LogService log_service(drain_map_);
768 rpc::RawFakeChannelOutput<10, 512> output;
769 rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
770 rpc::Server server(std::span(&channel, 1));
771
772 // Request logs.
773 rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
774 server, drain_channel_id, log_service);
775 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
776 // This drain closes on errors.
777 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
778
779 // Request log stream with a new writer.
780 writer = rpc::RawServerWriter::Open<Logs::Listen>(
781 server, drain_channel_id, log_service);
782 EXPECT_EQ(drain.value()->Open(writer), OkStatus());
783 EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
784 }
785
786 } // namespace
787 } // namespace pw::log_rpc
788