1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/rpc_log_drain.h"
16
17 #include <array>
18 #include <cstdint>
19 #include <string_view>
20
21 #include "gtest/gtest.h"
22 #include "pw_bytes/array.h"
23 #include "pw_bytes/span.h"
24 #include "pw_log/proto/log.pwpb.h"
25 #include "pw_log/proto_utils.h"
26 #include "pw_log_rpc/log_filter.h"
27 #include "pw_log_rpc/log_service.h"
28 #include "pw_log_rpc/rpc_log_drain_map.h"
29 #include "pw_log_rpc_private/test_utils.h"
30 #include "pw_log_tokenized/metadata.h"
31 #include "pw_multisink/multisink.h"
32 #include "pw_protobuf/decoder.h"
33 #include "pw_protobuf/serialized_size.h"
34 #include "pw_rpc/channel.h"
35 #include "pw_rpc/raw/fake_channel_output.h"
36 #include "pw_rpc/raw/server_reader_writer.h"
37 #include "pw_span/span.h"
38 #include "pw_status/status.h"
39 #include "pw_sync/mutex.h"
40
41 namespace pw::log_rpc {
42 namespace {
43 static constexpr size_t kBufferSize =
44 RpcLogDrain::kMinEntrySizeWithoutPayload + 32;
45
TEST(RpcLogDrain,TryFlushDrainWithClosedWriter)46 TEST(RpcLogDrain, TryFlushDrainWithClosedWriter) {
47 // Drain without a writer.
48 const uint32_t drain_id = 1;
49 std::array<std::byte, kBufferSize> buffer;
50 sync::Mutex mutex;
51 RpcLogDrain drain(
52 drain_id,
53 buffer,
54 mutex,
55 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
56 nullptr);
57 EXPECT_EQ(drain.channel_id(), drain_id);
58
59 std::byte encoding_buffer[128] = {};
60
61 // Attach drain to a MultiSink.
62 std::array<std::byte, kBufferSize * 2> multisink_buffer;
63 multisink::MultiSink multisink(multisink_buffer);
64 multisink.AttachDrain(drain);
65 EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
66
67 rpc::RawServerWriter writer;
68 ASSERT_FALSE(writer.active());
69 EXPECT_EQ(drain.Open(writer), Status::FailedPrecondition());
70 EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
71 }
72
TEST(RpcLogDrainMap,GetDrainsByIdFromDrainMap)73 TEST(RpcLogDrainMap, GetDrainsByIdFromDrainMap) {
74 static constexpr size_t kMaxDrains = 3;
75 sync::Mutex mutex;
76 std::array<std::array<std::byte, kBufferSize>, kMaxDrains> buffers;
77 std::array<RpcLogDrain, kMaxDrains> drains{
78 RpcLogDrain(0,
79 buffers[0],
80 mutex,
81 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
82 nullptr),
83 RpcLogDrain(1,
84 buffers[1],
85 mutex,
86 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
87 nullptr),
88 RpcLogDrain(2,
89 buffers[2],
90 mutex,
91 RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
92 nullptr),
93 };
94
95 RpcLogDrainMap drain_map(drains);
96 for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
97 auto drain_result = drain_map.GetDrainFromChannelId(channel_id);
98 ASSERT_TRUE(drain_result.ok());
99 EXPECT_EQ(drain_result.value(), &drains[channel_id]);
100 }
101 const span<RpcLogDrain> mapped_drains = drain_map.drains();
102 ASSERT_EQ(mapped_drains.size(), kMaxDrains);
103 for (uint32_t channel_id = 0; channel_id < kMaxDrains; ++channel_id) {
104 EXPECT_EQ(&mapped_drains[channel_id], &drains[channel_id]);
105 }
106 }
107
TEST(RpcLogDrain,FlushingDrainWithOpenWriter)108 TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
109 const uint32_t drain_id = 1;
110 std::array<std::byte, kBufferSize> buffer;
111 sync::Mutex mutex;
112 std::array<RpcLogDrain, 1> drains{
113 RpcLogDrain(drain_id,
114 buffer,
115 mutex,
116 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
117 nullptr),
118 };
119 RpcLogDrainMap drain_map(drains);
120 LogService log_service(drain_map);
121
122 std::byte encoding_buffer[128] = {};
123
124 rpc::RawFakeChannelOutput<3> output;
125 rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
126 rpc::Server server(span(&channel, 1));
127
128 // Attach drain to a MultiSink.
129 RpcLogDrain& drain = drains[0];
130 std::array<std::byte, kBufferSize * 2> multisink_buffer;
131 multisink::MultiSink multisink(multisink_buffer);
132 multisink.AttachDrain(drain);
133 EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
134
135 rpc::RawServerWriter writer =
136 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
137 server, drain_id, log_service);
138 ASSERT_TRUE(writer.active());
139 EXPECT_EQ(drain.Open(writer), OkStatus());
140 EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
141 // Can call multliple times until closed on error.
142 EXPECT_EQ(drain.Flush(encoding_buffer), OkStatus());
143 EXPECT_EQ(drain.Close(), OkStatus());
144 rpc::RawServerWriter& writer_ref = writer;
145 ASSERT_FALSE(writer_ref.active());
146 EXPECT_EQ(drain.Flush(encoding_buffer), Status::Unavailable());
147 }
148
TEST(RpcLogDrain,TryReopenOpenedDrain)149 TEST(RpcLogDrain, TryReopenOpenedDrain) {
150 const uint32_t drain_id = 1;
151 std::array<std::byte, kBufferSize> buffer;
152 sync::Mutex mutex;
153 std::array<RpcLogDrain, 1> drains{
154 RpcLogDrain(drain_id,
155 buffer,
156 mutex,
157 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
158 nullptr),
159 };
160 RpcLogDrainMap drain_map(drains);
161 LogService log_service(drain_map);
162
163 rpc::RawFakeChannelOutput<1> output;
164 rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
165 rpc::Server server(span(&channel, 1));
166
167 // Open Drain and try to open with a new writer.
168 rpc::RawServerWriter writer =
169 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
170 server, drain_id, log_service);
171 ASSERT_TRUE(writer.active());
172 RpcLogDrain& drain = drains[0];
173 EXPECT_EQ(drain.Open(writer), OkStatus());
174 rpc::RawServerWriter second_writer =
175 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
176 server, drain_id, log_service);
177 ASSERT_FALSE(writer.active());
178 ASSERT_TRUE(second_writer.active());
179 EXPECT_EQ(drain.Open(second_writer), OkStatus());
180 }
181
182 class TrickleTest : public ::testing::Test {
183 protected:
TrickleTest()184 TrickleTest()
185 : log_message_encode_buffer_(),
186 drain_encode_buffer_(),
187 channel_encode_buffer_(),
188 mutex_(),
189 drains_{
190 RpcLogDrain(
191 kDrainChannelId,
192 drain_encode_buffer_,
193 mutex_,
194 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
195 nullptr),
196 },
197 multisink_buffer_(),
198 multisink_(multisink_buffer_),
199 drain_map_(drains_),
200 log_service_(drain_map_),
201 output_(),
202 channel_(rpc::Channel::Create<kDrainChannelId>(&output_)),
203 server_(span(&channel_, 1)) {}
204
BasicLog(std::string_view message)205 TestLogEntry BasicLog(std::string_view message) {
206 return {.metadata = kSampleMetadata,
207 .timestamp = kSampleTimestamp,
208 .dropped = 0,
209 .tokenized_data = as_bytes(span<const char>(message)),
210 .thread = as_bytes(span(kSampleThreadName))};
211 }
212
AttachDrain()213 void AttachDrain() { multisink_.AttachDrain(drains_[0]); }
OpenWriter()214 void OpenWriter() {
215 writer_ = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
216 server_, kDrainChannelId, log_service_);
217 }
218
AddLogEntry(const TestLogEntry & entry)219 void AddLogEntry(const TestLogEntry& entry) {
220 Result<ConstByteSpan> encoded_log_result =
221 log::EncodeTokenizedLog(entry.metadata,
222 entry.tokenized_data,
223 entry.timestamp,
224 entry.thread,
225 log_message_encode_buffer_);
226 ASSERT_EQ(encoded_log_result.status(), OkStatus());
227 EXPECT_LE(encoded_log_result.value().size(), kMaxMessageSize);
228 multisink_.HandleEntry(encoded_log_result.value());
229 }
230
AddLogEntries(const Vector<TestLogEntry> & entries)231 void AddLogEntries(const Vector<TestLogEntry>& entries) {
232 for (const TestLogEntry& entry : entries) {
233 AddLogEntry(entry);
234 }
235 }
236
237 static constexpr uint32_t kDrainChannelId = 1;
238 static constexpr size_t kMaxMessageSize = 60;
239
240 // Use the size of the encoded BasicLog entry to calculate buffer sizes and
241 // better control the number of entries in each sent bulk.
242 static constexpr log_tokenized::Metadata kSampleMetadata =
243 log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, 300>();
244 static constexpr uint64_t kSampleTimestamp = 9000;
245 static constexpr std::string_view kSampleThreadName = "thread";
246 static constexpr size_t kBasicLogSizeWithoutPayload =
247 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kMessage, 0) +
248 protobuf::SizeOfFieldUint32(
249 log::pwpb::LogEntry::Fields::kLineLevel,
250 log::PackLineLevel(kSampleMetadata.line_number(),
251 kSampleMetadata.level())) +
252 protobuf::SizeOfFieldUint32(log::pwpb::LogEntry::Fields::kFlags,
253 kSampleMetadata.flags()) +
254 protobuf::SizeOfFieldInt64(log::pwpb::LogEntry::Fields::kTimestamp,
255 kSampleTimestamp) +
256 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kModule,
257 sizeof(kSampleMetadata.module())) +
258 protobuf::SizeOfFieldBytes(log::pwpb::LogEntry::Fields::kThread,
259 kSampleThreadName.size());
260 static constexpr size_t kDrainEncodeBufferSize =
261 kBasicLogSizeWithoutPayload + kMaxMessageSize;
262 static constexpr size_t kChannelEncodeBufferSize = kDrainEncodeBufferSize * 2;
263 std::array<std::byte, kMaxMessageSize> log_message_encode_buffer_;
264 std::array<std::byte, kDrainEncodeBufferSize> drain_encode_buffer_;
265 // Make actual encode buffer slightly smaller to account for RPC overhead.
266 std::array<std::byte, kChannelEncodeBufferSize - 8> channel_encode_buffer_;
267 sync::Mutex mutex_;
268 std::array<RpcLogDrain, 1> drains_;
269
270 std::array<std::byte, kDrainEncodeBufferSize * 12> multisink_buffer_;
271 multisink::MultiSink multisink_;
272
273 RpcLogDrainMap drain_map_;
274 LogService log_service_;
275
276 // TODO(amontanez): Why do we need 4 packets? Three should work, but seemingly
277 // on destruction a 14-byte payload is sent out, forcing us to use max
278 // expected packet count plus one.
279 rpc::RawFakeChannelOutput<4, kDrainEncodeBufferSize * 6> output_;
280 rpc::Channel channel_;
281 rpc::Server server_;
282 rpc::RawServerWriter writer_;
283 };
284
TEST_F(TrickleTest,EntriesAreFlushedToSinglePayload)285 TEST_F(TrickleTest, EntriesAreFlushedToSinglePayload) {
286 AttachDrain();
287 OpenWriter();
288
289 Vector<TestLogEntry, 3> kExpectedEntries{
290 BasicLog(":D"), BasicLog("A useful log"), BasicLog("blink")};
291 AddLogEntries(kExpectedEntries);
292
293 ASSERT_TRUE(writer_.active());
294 EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
295
296 std::optional<chrono::SystemClock::duration> min_delay =
297 drains_[0].Trickle(channel_encode_buffer_);
298 EXPECT_EQ(min_delay.has_value(), false);
299
300 rpc::PayloadsView payloads =
301 output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
302 EXPECT_EQ(payloads.size(), 1u);
303
304 uint32_t drop_count = 0;
305 size_t entries_count = 0;
306 protobuf::Decoder payload_decoder(payloads[0]);
307 payload_decoder.Reset(payloads[0]);
308 VerifyLogEntries(
309 payload_decoder, kExpectedEntries, 0, entries_count, drop_count);
310 EXPECT_EQ(drop_count, 0u);
311 EXPECT_EQ(entries_count, 3u);
312 }
313
TEST_F(TrickleTest,ManyLogsOverflowToNextPayload)314 TEST_F(TrickleTest, ManyLogsOverflowToNextPayload) {
315 AttachDrain();
316 OpenWriter();
317
318 Vector<TestLogEntry, 3> kFirstFlushedBundle{
319 BasicLog("Use longer logs in this test"),
320 BasicLog("My feet are cold"),
321 BasicLog("I'm hungry, what's for dinner?")};
322 Vector<TestLogEntry, 3> kSecondFlushedBundle{
323 BasicLog("Add a few longer logs"),
324 BasicLog("Eventually the logs will"),
325 BasicLog("Overflow into another payload")};
326
327 AddLogEntries(kFirstFlushedBundle);
328 AddLogEntries(kSecondFlushedBundle);
329
330 ASSERT_TRUE(writer_.active());
331 EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
332
333 // A single flush should produce two payloads.
334 std::optional<chrono::SystemClock::duration> min_delay =
335 drains_[0].Trickle(channel_encode_buffer_);
336 EXPECT_EQ(min_delay.has_value(), false);
337
338 rpc::PayloadsView payloads =
339 output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
340 ASSERT_EQ(payloads.size(), 2u);
341
342 uint32_t drop_count = 0;
343 size_t entries_count = 0;
344 protobuf::Decoder payload_decoder(payloads[0]);
345 payload_decoder.Reset(payloads[0]);
346 VerifyLogEntries(
347 payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
348 EXPECT_EQ(drop_count, 0u);
349 EXPECT_EQ(entries_count, 3u);
350
351 entries_count = 0;
352 payload_decoder.Reset(payloads[1]);
353 VerifyLogEntries(
354 payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
355 EXPECT_EQ(drop_count, 0u);
356 EXPECT_EQ(entries_count, 3u);
357 }
358
TEST_F(TrickleTest,LimitedFlushOverflowsToNextPayload)359 TEST_F(TrickleTest, LimitedFlushOverflowsToNextPayload) {
360 AttachDrain();
361 OpenWriter();
362
363 Vector<TestLogEntry, 3> kFirstFlushedBundle{
364 BasicLog("Use longer logs in this test"),
365 BasicLog("My feet are cold"),
366 BasicLog("I'm hungry, what's for dinner?")};
367 Vector<TestLogEntry, 3> kSecondFlushedBundle{
368 BasicLog("Add a few longer logs"),
369 BasicLog("Eventually the logs will"),
370 BasicLog("Overflow into another payload")};
371
372 AddLogEntries(kFirstFlushedBundle);
373
374 // These logs will get pushed into the next payload due to overflowing max
375 // payload size.
376 AddLogEntries(kSecondFlushedBundle);
377
378 ASSERT_TRUE(writer_.active());
379 EXPECT_EQ(drains_[0].Open(writer_), OkStatus());
380 drains_[0].set_max_bundles_per_trickle(1);
381
382 // A single flush should produce two payloads.
383 std::optional<chrono::SystemClock::duration> min_delay =
384 drains_[0].Trickle(channel_encode_buffer_);
385 EXPECT_EQ(min_delay.has_value(), true);
386 EXPECT_EQ(min_delay.value(), chrono::SystemClock::duration::zero());
387
388 rpc::PayloadsView first_flush_payloads =
389 output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
390 ASSERT_EQ(first_flush_payloads.size(), 1u);
391 uint32_t drop_count = 0;
392 size_t entries_count = 0;
393 protobuf::Decoder payload_decoder(first_flush_payloads[0]);
394 payload_decoder.Reset(first_flush_payloads[0]);
395 VerifyLogEntries(
396 payload_decoder, kFirstFlushedBundle, 0, entries_count, drop_count);
397 EXPECT_EQ(entries_count, 3u);
398
399 // An additional flush should produce another payload.
400 min_delay = drains_[0].Trickle(channel_encode_buffer_);
401 EXPECT_EQ(min_delay.has_value(), false);
402 drop_count = 0;
403 entries_count = 0;
404
405 rpc::PayloadsView second_flush_payloads =
406 output_.payloads<log::pw_rpc::raw::Logs::Listen>(kDrainChannelId);
407 ASSERT_EQ(second_flush_payloads.size(), 2u);
408 payload_decoder.Reset(second_flush_payloads[1]);
409 VerifyLogEntries(
410 payload_decoder, kSecondFlushedBundle, 3, entries_count, drop_count);
411 EXPECT_EQ(drop_count, 0u);
412 EXPECT_EQ(entries_count, 3u);
413 }
414
TEST(RpcLogDrain,OnOpenCallbackCalled)415 TEST(RpcLogDrain, OnOpenCallbackCalled) {
416 // Create drain and log components.
417 const uint32_t drain_id = 1;
418 std::array<std::byte, kBufferSize> buffer;
419 sync::Mutex mutex;
420 RpcLogDrain drain(
421 drain_id,
422 buffer,
423 mutex,
424 RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
425 nullptr);
426 RpcLogDrainMap drain_map(span(&drain, 1));
427 LogService log_service(drain_map);
428 std::array<std::byte, kBufferSize * 2> multisink_buffer;
429 multisink::MultiSink multisink(multisink_buffer);
430 multisink.AttachDrain(drain);
431
432 // Create server writer.
433 rpc::RawFakeChannelOutput<3> output;
434 rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
435 rpc::Server server(span(&channel, 1));
436 rpc::RawServerWriter writer =
437 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
438 server, drain_id, log_service);
439
440 int callback_call_times = 0;
441 Function<void()> callback = [&callback_call_times]() {
442 ++callback_call_times;
443 };
444
445 // Callback not called when not set.
446 ASSERT_TRUE(writer.active());
447 ASSERT_EQ(drain.Open(writer), OkStatus());
448 EXPECT_EQ(callback_call_times, 0);
449
450 drain.set_on_open_callback(std::move(callback));
451
452 // Callback called when writer is open.
453 writer = rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
454 server, drain_id, log_service);
455 ASSERT_TRUE(writer.active());
456 ASSERT_EQ(drain.Open(writer), OkStatus());
457 EXPECT_EQ(callback_call_times, 1);
458
459 // Callback not called when writer is closed.
460 rpc::RawServerWriter closed_writer;
461 ASSERT_FALSE(closed_writer.active());
462 ASSERT_EQ(drain.Open(closed_writer), Status::FailedPrecondition());
463 EXPECT_EQ(callback_call_times, 1);
464 }
465
466 } // namespace
467 } // namespace pw::log_rpc
468