1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/trace_writer_impl.h"
18
19 #include <vector>
20
21 #include "perfetto/ext/base/utils.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/core/tracing_service.h"
26 #include "perfetto/protozero/message.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "src/base/test/gtest_test_suite.h"
30 #include "src/base/test/test_task_runner.h"
31 #include "src/tracing/core/shared_memory_arbiter_impl.h"
32 #include "src/tracing/test/aligned_buffer_test.h"
33 #include "src/tracing/test/mock_producer_endpoint.h"
34 #include "test/gtest_and_gmock.h"
35
36 #include "protos/perfetto/trace/test_event.gen.h"
37 #include "protos/perfetto/trace/test_event.pbzero.h"
38 #include "protos/perfetto/trace/trace_packet.gen.h"
39 #include "protos/perfetto/trace/trace_packet.pbzero.h"
40
41 namespace perfetto {
42 namespace {
43
44 using ChunkHeader = SharedMemoryABI::ChunkHeader;
45 using ShmemMode = SharedMemoryABI::ShmemMode;
46 using ::protozero::ScatteredStreamWriter;
47 using ::testing::AllOf;
48 using ::testing::ElementsAre;
49 using ::testing::IsEmpty;
50 using ::testing::IsNull;
51 using ::testing::MockFunction;
52 using ::testing::Ne;
53 using ::testing::NiceMock;
54 using ::testing::Not;
55 using ::testing::NotNull;
56 using ::testing::Optional;
57 using ::testing::SizeIs;
58 using ::testing::ValuesIn;
59
60 class TraceWriterImplTest : public AlignedBufferTest {
61 public:
62 struct PatchKey {
63 uint32_t writer_id;
64 uint32_t chunk_id;
operator <perfetto::__anonc7d0dc920111::TraceWriterImplTest::PatchKey65 bool operator<(const PatchKey& other) const {
66 return std::tie(writer_id, chunk_id) <
67 std::tie(other.writer_id, other.chunk_id);
68 }
69 };
SetUp()70 void SetUp() override {
71 default_layout_ =
72 SharedMemoryArbiterImpl::default_page_layout_for_testing();
73 SharedMemoryArbiterImpl::set_default_layout_for_testing(
74 SharedMemoryABI::PageLayout::kPageDiv4);
75 AlignedBufferTest::SetUp();
76 task_runner_.reset(new base::TestTaskRunner());
77 arbiter_.reset(new SharedMemoryArbiterImpl(
78 buf(), buf_size(), ShmemMode::kDefault, page_size(),
79 &mock_producer_endpoint_, task_runner_.get()));
80 ON_CALL(mock_producer_endpoint_, CommitData)
81 .WillByDefault([&](const CommitDataRequest& req,
82 MockProducerEndpoint::CommitDataCallback cb) {
83 last_commit_ = req;
84 last_commit_callback_ = cb;
85 for (const CommitDataRequest::ChunkToPatch& c :
86 req.chunks_to_patch()) {
87 patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
88 }
89 });
90 }
91
TearDown()92 void TearDown() override {
93 arbiter_.reset();
94 task_runner_.reset();
95 SharedMemoryArbiterImpl::set_default_layout_for_testing(default_layout_);
96 }
97
CopyPayloadAndApplyPatches(SharedMemoryABI::Chunk & chunk) const98 std::vector<uint8_t> CopyPayloadAndApplyPatches(
99 SharedMemoryABI::Chunk& chunk) const {
100 std::vector<uint8_t> copy(chunk.payload_begin(),
101 chunk.payload_begin() + chunk.payload_size());
102 ChunkHeader::Packets p = chunk.header()->packets.load();
103
104 auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
105 chunk.header()->chunk_id.load()});
106 if (it == patches_.end()) {
107 EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
108 return copy;
109 }
110 EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
111
112 for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
113 if (patch.offset() + patch.data().size() > copy.size()) {
114 ADD_FAILURE() << "Patch out of bounds";
115 continue;
116 }
117 for (size_t i = 0; i < patch.data().size(); i++) {
118 copy[patch.offset() + i] =
119 reinterpret_cast<const uint8_t*>(patch.data().data())[i];
120 }
121 }
122 return copy;
123 }
124
125 // Extracts trace packets from the shared memory buffer, and returns copies of
126 // them (after applying the patches received). The producer that writes to the
127 // shared memory (i.e. the trace writer) must be destroyed.
GetPacketsFromShmemAndPatches()128 std::vector<std::string> GetPacketsFromShmemAndPatches() {
129 std::vector<std::string> packets;
130 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
131 bool was_fragmenting = false;
132 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
133 uint32_t page_layout = abi->GetPageLayout(page_idx);
134 size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
135 for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
136 SharedMemoryABI::ChunkState chunk_state =
137 abi->GetChunkState(page_idx, chunk_idx);
138 if (chunk_state != SharedMemoryABI::kChunkFree &&
139 chunk_state != SharedMemoryABI::kChunkComplete) {
140 ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
141 << " unexpected state: " << chunk_state;
142 continue;
143 }
144 SharedMemoryABI::Chunk chunk =
145 abi->TryAcquireChunkForReading(page_idx, chunk_idx);
146 if (!chunk.is_valid())
147 continue;
148 ChunkHeader::Packets p = chunk.header()->packets.load();
149
150 EXPECT_EQ(
151 was_fragmenting,
152 static_cast<bool>(p.flags &
153 ChunkHeader::kFirstPacketContinuesFromPrevChunk));
154
155 std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
156
157 const uint8_t* read_ptr = payload.data();
158 const uint8_t* const end_read_ptr = payload.data() + payload.size();
159
160 size_t num_fragments = p.count;
161 for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
162 uint64_t len;
163 read_ptr =
164 protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
165 if (!was_fragmenting || packets.empty()) {
166 packets.push_back(std::string());
167 }
168 was_fragmenting = false;
169 if (read_ptr + len > end_read_ptr) {
170 ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
171 << " malformed chunk";
172 }
173 packets.back().append(reinterpret_cast<const char*>(read_ptr),
174 static_cast<size_t>(len));
175 read_ptr += len;
176 }
177 EXPECT_EQ(num_fragments, 0u);
178 was_fragmenting =
179 p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk;
180 }
181 }
182 // Ignore empty packets (like tracing service does).
183 packets.erase(
184 std::remove_if(packets.begin(), packets.end(),
185 [](const std::string& p) { return p.empty(); }),
186 packets.end());
187 return packets;
188 }
189
190 struct ChunkInABI {
191 size_t page_idx;
192 uint32_t page_layout;
193 size_t chunk_idx;
194 };
GetFirstChunkBeingWritten()195 std::optional<ChunkInABI> GetFirstChunkBeingWritten() {
196 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
197 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
198 uint32_t page_layout = abi->GetPageLayout(page_idx);
199 size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
200 for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
201 SharedMemoryABI::ChunkState chunk_state =
202 abi->GetChunkState(page_idx, chunk_idx);
203 if (chunk_state != SharedMemoryABI::kChunkBeingWritten) {
204 continue;
205 }
206 return ChunkInABI{page_idx, page_layout, chunk_idx};
207 }
208 }
209 return std::nullopt;
210 }
211
GetChunkFragments(size_t packets_count,const void * chunk_payload,size_t chunk_payload_size)212 static std::optional<std::vector<std::string>> GetChunkFragments(
213 size_t packets_count,
214 const void* chunk_payload,
215 size_t chunk_payload_size) {
216 std::vector<std::string> fragments;
217 const uint8_t* read_ptr = static_cast<const uint8_t*>(chunk_payload);
218 const uint8_t* const end_read_ptr = read_ptr + chunk_payload_size;
219
220 for (size_t num_fragments = packets_count;
221 num_fragments && read_ptr < end_read_ptr; num_fragments--) {
222 uint64_t len;
223 read_ptr =
224 protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
225 if (read_ptr + len > end_read_ptr) {
226 return std::nullopt;
227 }
228 fragments.push_back(std::string(reinterpret_cast<const char*>(read_ptr),
229 static_cast<size_t>(len)));
230 read_ptr += len;
231 }
232 return std::make_optional(std::move(fragments));
233 }
234
235 SharedMemoryABI::PageLayout default_layout_;
236 CommitDataRequest last_commit_;
237 ProducerEndpoint::CommitDataCallback last_commit_callback_;
238 std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
239 patches_;
240 NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
241
242 std::unique_ptr<base::TestTaskRunner> task_runner_;
243 std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
244 };
245
246 size_t const kPageSizes[] = {4096, 65536};
247 INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
248
TEST_P(TraceWriterImplTest,NewTracePacket)249 TEST_P(TraceWriterImplTest, NewTracePacket) {
250 const BufferID kBufId = 42;
251 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
252 const size_t kNumPackets = 32;
253 for (size_t i = 0; i < kNumPackets; i++) {
254 auto packet = writer->NewTracePacket();
255 packet->set_for_testing()->set_str(
256 std::string("foobar " + std::to_string(i)));
257 }
258
259 // Destroying the TraceWriteImpl should cause the last packet to be finalized
260 // and the chunk to be put back in the kChunkComplete state.
261 writer.reset();
262
263 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
264 ASSERT_THAT(packets, SizeIs(kNumPackets));
265 for (size_t i = 0; i < kNumPackets; i++) {
266 protos::gen::TracePacket packet;
267 EXPECT_TRUE(packet.ParseFromString(packets[i]));
268 EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
269 if (i == 0) {
270 EXPECT_TRUE(packet.first_packet_on_sequence());
271 } else {
272 EXPECT_FALSE(packet.first_packet_on_sequence());
273 }
274 }
275 }
276
TEST_P(TraceWriterImplTest,NewTracePacketLargePackets)277 TEST_P(TraceWriterImplTest, NewTracePacketLargePackets) {
278 const BufferID kBufId = 42;
279 const size_t chunk_size = page_size() / 4;
280 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
281 {
282 auto packet = writer->NewTracePacket();
283 packet->set_for_testing()->set_str(std::string("PACKET_1") +
284 std::string(chunk_size, 'x'));
285 }
286 {
287 auto packet = writer->NewTracePacket();
288 packet->set_for_testing()->set_str(std::string("PACKET_2") +
289 std::string(chunk_size, 'x'));
290 }
291
292 // Destroying the TraceWriteImpl should cause the last packet to be finalized
293 // and the chunk to be put back in the kChunkComplete state.
294 writer.reset();
295
296 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
297 ASSERT_THAT(packets, SizeIs(2));
298 {
299 protos::gen::TracePacket packet;
300 EXPECT_TRUE(packet.ParseFromString(packets[0]));
301 EXPECT_EQ(packet.for_testing().str(),
302 std::string("PACKET_1") + std::string(chunk_size, 'x'));
303 }
304 {
305 protos::gen::TracePacket packet;
306 EXPECT_TRUE(packet.ParseFromString(packets[1]));
307 EXPECT_EQ(packet.for_testing().str(),
308 std::string("PACKET_2") + std::string(chunk_size, 'x'));
309 }
310 }
311
312 // A prefix corresponding to first_packet_on_sequence = true in a serialized
313 // TracePacket proto.
314 constexpr char kFirstPacketOnSequenceFlagPrefix[] = {static_cast<char>(0xB8),
315 0x5, 0x1, 0x0};
316
TEST_P(TraceWriterImplTest,NewTracePacketTakeWriter)317 TEST_P(TraceWriterImplTest, NewTracePacketTakeWriter) {
318 const BufferID kBufId = 42;
319 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
320 const size_t kNumPackets = 32;
321 for (size_t i = 0; i < kNumPackets; i++) {
322 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
323 std::string raw_proto_bytes =
324 std::string("RAW_PROTO_BYTES_") + std::to_string(i);
325 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
326 raw_proto_bytes.size());
327 writer->FinishTracePacket();
328 }
329
330 // Destroying the TraceWriteImpl should cause the last packet to be finalized
331 // and the chunk to be put back in the kChunkComplete state.
332 writer.reset();
333
334 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
335 ASSERT_THAT(packets, SizeIs(kNumPackets));
336 for (size_t i = 0; i < kNumPackets; i++) {
337 std::string expected = "RAW_PROTO_BYTES_" + std::to_string(i);
338 if (i == 0) {
339 expected = kFirstPacketOnSequenceFlagPrefix + expected;
340 }
341 EXPECT_EQ(packets[i], expected);
342 }
343 }
344
345 #if defined(GTEST_HAS_DEATH_TEST)
346 using TraceWriterImplDeathTest = TraceWriterImplTest;
347 INSTANTIATE_TEST_SUITE_P(PageSize,
348 TraceWriterImplDeathTest,
349 ValuesIn(kPageSizes));
350
TEST_P(TraceWriterImplDeathTest,NewTracePacketTakeWriterNoFinish)351 TEST_P(TraceWriterImplDeathTest, NewTracePacketTakeWriterNoFinish) {
352 const BufferID kBufId = 42;
353 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
354
355 TraceWriterImpl::TracePacketHandle handle = writer->NewTracePacket();
356
357 // Avoid a secondary DCHECK failure from ~TraceWriterImpl() =>
358 // Message::Finalize() due to the stream writer being modified behind the
359 // Message's back. This turns the Finalize() call into a no-op.
360 handle->set_size_field(nullptr);
361
362 ScatteredStreamWriter* sw = handle.TakeStreamWriter();
363 std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
364 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
365 raw_proto_bytes.size());
366
367 EXPECT_DEATH({ writer->NewTracePacket(); }, "");
368 }
369 #endif // defined(GTEST_HAS_DEATH_TEST)
370
TEST_P(TraceWriterImplTest,AnnotatePatch)371 TEST_P(TraceWriterImplTest, AnnotatePatch) {
372 const BufferID kBufId = 42;
373 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
374 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
375 std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
376 sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
377 raw_proto_bytes.size());
378
379 uint8_t* patch1 =
380 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
381 ASSERT_THAT(patch1, NotNull());
382 patch1[0] = 0;
383 patch1[1] = 0;
384 patch1[2] = 0;
385 patch1[3] = 0;
386 const uint8_t* old_chunk_pointer = patch1;
387 patch1 = sw->AnnotatePatch(patch1);
388 EXPECT_NE(patch1, old_chunk_pointer);
389 ASSERT_THAT(patch1, NotNull());
390
391 sw->WriteByte('X');
392
393 uint8_t* patch2 =
394 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
395 ASSERT_THAT(patch2, NotNull());
396 patch2[0] = 0;
397 patch2[1] = 0;
398 patch2[2] = 0;
399 patch2[3] = 0;
400 old_chunk_pointer = patch2;
401 patch2 = sw->AnnotatePatch(patch2);
402 EXPECT_NE(patch2, old_chunk_pointer);
403 ASSERT_THAT(patch2, NotNull());
404
405 const size_t chunk_size = page_size() / 4;
406 std::string large_string(chunk_size, 'x');
407
408 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
409 large_string.size());
410
411 uint8_t* patch3 =
412 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
413 ASSERT_THAT(patch3, NotNull());
414 patch3[0] = 0;
415 patch3[1] = 0;
416 patch3[2] = 0;
417 patch3[3] = 0;
418 old_chunk_pointer = patch3;
419 patch3 = sw->AnnotatePatch(patch3);
420 EXPECT_NE(patch3, old_chunk_pointer);
421 ASSERT_THAT(patch3, NotNull());
422
423 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
424 large_string.size());
425
426 patch1[0] = 0x11;
427 patch1[1] = 0x11;
428 patch1[2] = 0x11;
429 patch1[3] = 0x11;
430
431 patch2[0] = 0x22;
432 patch2[1] = 0x22;
433 patch2[2] = 0x22;
434 patch2[3] = 0x22;
435
436 patch3[0] = 0x33;
437 patch3[1] = 0x33;
438 patch3[2] = 0x33;
439 patch3[3] = 0x33;
440
441 writer->FinishTracePacket();
442
443 // Destroying the TraceWriteImpl should cause the last packet to be finalized
444 // and the chunk to be put back in the kChunkComplete state.
445 writer.reset();
446
447 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
448 EXPECT_THAT(
449 packets,
450 ElementsAre(
451 kFirstPacketOnSequenceFlagPrefix + std::string("RAW_PROTO_BYTES") +
452 std::string("\x11\x11\x11\x11") + std::string("X") +
453 std::string("\x22\x22\x22\x22") + std::string(chunk_size, 'x') +
454 std::string("\x33\x33\x33\x33") + std::string(chunk_size, 'x')));
455 }
456
TEST_P(TraceWriterImplTest,MixManualTakeAndMessage)457 TEST_P(TraceWriterImplTest, MixManualTakeAndMessage) {
458 const BufferID kBufId = 42;
459 const size_t chunk_size = page_size() / 4;
460 const std::string large_string(chunk_size, 'x');
461
462 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
463
464 {
465 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
466 std::string packet1 = std::string("PACKET_1_");
467 sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet1.data()),
468 packet1.size());
469 uint8_t* patch =
470 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
471 ASSERT_THAT(patch, NotNull());
472 patch[0] = 0;
473 patch[1] = 0;
474 patch[2] = 0;
475 patch[3] = 0;
476 const uint8_t* old_chunk_pointer = patch;
477 patch = sw->AnnotatePatch(patch);
478 EXPECT_NE(patch, old_chunk_pointer);
479 ASSERT_THAT(patch, NotNull());
480 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
481 large_string.size());
482 patch[0] = 0xFF;
483 patch[1] = 0xFF;
484 patch[2] = 0xFF;
485 patch[3] = 0xFF;
486 writer->FinishTracePacket();
487 }
488
489 {
490 auto msg = writer->NewTracePacket();
491 std::string packet2 = std::string("PACKET_2_");
492 msg->AppendRawProtoBytes(packet2.data(), packet2.size());
493 auto* nested = msg->BeginNestedMessage<protozero::Message>(1);
494 nested->AppendRawProtoBytes(large_string.data(), large_string.size());
495 }
496
497 {
498 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
499 std::string packet3 = std::string("PACKET_3_");
500 sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet3.data()),
501 packet3.size());
502 uint8_t* patch =
503 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
504 ASSERT_THAT(patch, NotNull());
505 patch[0] = 0;
506 patch[1] = 0;
507 patch[2] = 0;
508 patch[3] = 0;
509 const uint8_t* old_chunk_pointer = patch;
510 patch = sw->AnnotatePatch(patch);
511 EXPECT_NE(patch, old_chunk_pointer);
512 ASSERT_THAT(patch, NotNull());
513 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
514 large_string.size());
515 patch[0] = 0xFF;
516 patch[1] = 0xFF;
517 patch[2] = 0xFF;
518 patch[3] = 0xFF;
519 writer->FinishTracePacket();
520 }
521
522 // Destroying the TraceWriteImpl should cause the last packet to be finalized
523 // and the chunk to be put back in the kChunkComplete state.
524 writer.reset();
525
526 uint8_t buf[protozero::proto_utils::kMessageLengthFieldSize];
527 protozero::proto_utils::WriteRedundantVarInt(
528 static_cast<uint32_t>(large_string.size()), buf,
529 protozero::proto_utils::kMessageLengthFieldSize);
530 std::string encoded_size(reinterpret_cast<char*>(buf), sizeof(buf));
531
532 std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
533 EXPECT_THAT(
534 packets,
535 ElementsAre(kFirstPacketOnSequenceFlagPrefix + std::string("PACKET_1_") +
536 std::string("\xFF\xFF\xFF\xFF") +
537 std::string(chunk_size, 'x'),
538 std::string("PACKET_2_") + std::string("\x0A") +
539 encoded_size + std::string(chunk_size, 'x'),
540 std::string("PACKET_3_") + std::string("\xFF\xFF\xFF\xFF") +
541 std::string(chunk_size, 'x')));
542 }
543
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketScrapable)544 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketScrapable) {
545 const BufferID kBufId = 42;
546
547 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
548
549 auto packet = writer->NewTracePacket();
550 packet->set_for_testing()->set_str("packet1");
551
552 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
553 ASSERT_TRUE(chunk_in_abi.has_value());
554
555 auto* abi = arbiter_->shmem_abi_for_testing();
556 SharedMemoryABI::Chunk chunk =
557 abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
558 chunk_in_abi->chunk_idx);
559 ASSERT_TRUE(chunk.is_valid());
560
561 EXPECT_EQ(chunk.header()->packets.load().count, 1);
562 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
563 SharedMemoryABI::ChunkState::kChunkBeingWritten);
564
565 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
566
567 // After destroying the message handle, the chunk header should have an
568 // inflated packet count.
569 EXPECT_EQ(chunk.header()->packets.load().count, 2);
570 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
571 SharedMemoryABI::ChunkState::kChunkBeingWritten);
572
573 writer.reset();
574
575 EXPECT_EQ(chunk.header()->packets.load().count, 2);
576 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
577 SharedMemoryABI::ChunkState::kChunkComplete);
578 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
579 Optional(ElementsAre(Not(IsEmpty()))));
580 }
581
TEST_P(TraceWriterImplTest,FinishTracePacketScrapable)582 TEST_P(TraceWriterImplTest, FinishTracePacketScrapable) {
583 const BufferID kBufId = 42;
584
585 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
586
587 {
588 protos::pbzero::TestEvent test_event;
589 protozero::MessageArena arena;
590 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
591 uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
592 uint8_t* data_end = protozero::proto_utils::WriteVarInt(
593 protozero::proto_utils::MakeTagLengthDelimited(
594 protos::pbzero::TracePacket::kForTestingFieldNumber),
595 data);
596 sw->WriteBytes(data, static_cast<size_t>(data_end - data));
597 test_event.Reset(sw, &arena);
598 test_event.set_size_field(
599 sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
600 test_event.set_str("payload1");
601 }
602
603 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
604 ASSERT_TRUE(chunk_in_abi.has_value());
605
606 auto* abi = arbiter_->shmem_abi_for_testing();
607 SharedMemoryABI::Chunk chunk =
608 abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
609 chunk_in_abi->chunk_idx);
610 ASSERT_TRUE(chunk.is_valid());
611
612 EXPECT_EQ(chunk.header()->packets.load().count, 1);
613 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
614 SharedMemoryABI::ChunkState::kChunkBeingWritten);
615
616 writer->FinishTracePacket();
617
618 // After a call to FinishTracePacket, the chunk header should have an inflated
619 // packet count.
620 EXPECT_EQ(chunk.header()->packets.load().count, 2);
621 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
622 SharedMemoryABI::ChunkState::kChunkBeingWritten);
623 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
624 Optional(ElementsAre(Not(IsEmpty()))));
625
626 // An extra call to FinishTracePacket should have no effect.
627 EXPECT_EQ(chunk.header()->packets.load().count, 2);
628 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
629 SharedMemoryABI::ChunkState::kChunkBeingWritten);
630 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
631 Optional(ElementsAre(Not(IsEmpty()))));
632
633 writer.reset();
634
635 EXPECT_EQ(chunk.header()->packets.load().count, 2);
636 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
637 SharedMemoryABI::ChunkState::kChunkComplete);
638 EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
639 Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
640 }
641
TEST_P(TraceWriterImplTest,MessageHandleDestroyedAndFinishTracePacketScrapable)642 TEST_P(TraceWriterImplTest,
643 MessageHandleDestroyedAndFinishTracePacketScrapable) {
644 const BufferID kBufId = 42;
645
646 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
647
648 auto packet = writer->NewTracePacket();
649 packet->set_for_testing()->set_str("packet1");
650
651 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
652 ASSERT_TRUE(chunk_in_abi.has_value());
653
654 auto* abi = arbiter_->shmem_abi_for_testing();
655 SharedMemoryABI::Chunk chunk =
656 abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
657 chunk_in_abi->chunk_idx);
658 ASSERT_TRUE(chunk.is_valid());
659
660 EXPECT_EQ(chunk.header()->packets.load().count, 1);
661 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
662 SharedMemoryABI::ChunkState::kChunkBeingWritten);
663 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
664
665 // After destroying the message handle, the chunk header should have an
666 // inflated packet count.
667 EXPECT_EQ(chunk.header()->packets.load().count, 2);
668 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
669 SharedMemoryABI::ChunkState::kChunkBeingWritten);
670
671 writer->FinishTracePacket();
672
673 // An extra call to FinishTracePacket should have no effect.
674 EXPECT_EQ(chunk.header()->packets.load().count, 2);
675 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
676 SharedMemoryABI::ChunkState::kChunkBeingWritten);
677 EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
678 Optional(ElementsAre(Not(IsEmpty()))));
679
680 writer.reset();
681
682 EXPECT_EQ(chunk.header()->packets.load().count, 2);
683 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
684 SharedMemoryABI::ChunkState::kChunkComplete);
685 EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
686 Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
687 }
688
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketFullChunk)689 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketFullChunk) {
690 const BufferID kBufId = 42;
691
692 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
693
694 auto packet = writer->NewTracePacket();
695 protos::pbzero::TestEvent* test_event = packet->set_for_testing();
696 std::string chunk_filler(test_event->stream_writer()->bytes_available(),
697 '\0');
698 test_event->AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
699
700 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
701 ASSERT_TRUE(chunk_in_abi.has_value());
702
703 auto* abi = arbiter_->shmem_abi_for_testing();
704 SharedMemoryABI::Chunk chunk =
705 abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
706 chunk_in_abi->chunk_idx);
707 ASSERT_TRUE(chunk.is_valid());
708
709 EXPECT_EQ(chunk.header()->packets.load().count, 1);
710 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
711 SharedMemoryABI::ChunkState::kChunkBeingWritten);
712 // Finish the TracePacket: since there's no space for an empty packet, the
713 // trace writer should immediately mark the chunk as completed.
714 packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
715
716 EXPECT_EQ(chunk.header()->packets.load().count, 1);
717 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
718 SharedMemoryABI::ChunkState::kChunkComplete);
719
720 writer.reset();
721
722 EXPECT_EQ(chunk.header()->packets.load().count, 1);
723 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
724 SharedMemoryABI::ChunkState::kChunkComplete);
725 }
726
TEST_P(TraceWriterImplTest,FinishTracePacketFullChunk)727 TEST_P(TraceWriterImplTest, FinishTracePacketFullChunk) {
728 const BufferID kBufId = 42;
729
730 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
731
732 {
733 protos::pbzero::TestEvent test_event;
734 protozero::MessageArena arena;
735 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
736 uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
737 uint8_t* data_end = protozero::proto_utils::WriteVarInt(
738 protozero::proto_utils::MakeTagLengthDelimited(
739 protos::pbzero::TracePacket::kForTestingFieldNumber),
740 data);
741 sw->WriteBytes(data, static_cast<size_t>(data_end - data));
742 test_event.Reset(sw, &arena);
743 test_event.set_size_field(
744 sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
745 std::string chunk_filler(sw->bytes_available(), '\0');
746 test_event.AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
747 }
748
749 std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
750 ASSERT_TRUE(chunk_in_abi.has_value());
751
752 auto* abi = arbiter_->shmem_abi_for_testing();
753 SharedMemoryABI::Chunk chunk =
754 abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
755 chunk_in_abi->chunk_idx);
756 ASSERT_TRUE(chunk.is_valid());
757
758 EXPECT_EQ(chunk.header()->packets.load().count, 1);
759 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
760 SharedMemoryABI::ChunkState::kChunkBeingWritten);
761
762 // Finish the TracePacket: since there's no space for an empty packet, the
763 // trace writer should immediately mark the chunk as completed, instead of
764 // inflating the count.
765 writer->FinishTracePacket();
766
767 EXPECT_EQ(chunk.header()->packets.load().count, 1);
768 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
769 SharedMemoryABI::ChunkState::kChunkComplete);
770
771 writer.reset();
772
773 EXPECT_EQ(chunk.header()->packets.load().count, 1);
774 EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
775 SharedMemoryABI::ChunkState::kChunkComplete);
776 }
777
TEST_P(TraceWriterImplTest,FragmentingPacketWithProducerAndServicePatching)778 TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
779 const BufferID kBufId = 42;
780 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
781
782 // Write a packet that's guaranteed to span more than a single chunk, but
783 // less than two chunks.
784 auto packet = writer->NewTracePacket();
785 size_t chunk_size = page_size() / 4;
786 std::string large_string(chunk_size, 'x');
787 packet->set_for_testing()->set_str(large_string);
788
789 // First chunk should be committed.
790 arbiter_->FlushPendingCommitDataRequests();
791 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
792 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
793 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
794 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
795 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
796
797 // We will simulate a batching cycle by first setting the batching period to
798 // a very large value and then force-flushing when we are done writing data.
799 arbiter_->SetDirectSMBPatchingSupportedByService();
800 ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
801 arbiter_->SetBatchCommitsDuration(UINT32_MAX);
802
803 // Write a second packet that's guaranteed to span more than a single chunk.
804 // Starting a new trace packet should cause the patches for the first packet
805 // (i.e. for the first chunk) to be queued for sending to the service. They
806 // cannot be applied locally because the first chunk was already committed.
807 packet->Finalize();
808 auto packet2 = writer->NewTracePacket();
809 packet2->set_for_testing()->set_str(large_string);
810
811 // Starting a new packet yet again should cause the patches for the second
812 // packet (i.e. for the second chunk) to be applied in the producer, because
813 // the second chunk has not been committed yet.
814 packet2->Finalize();
815 auto packet3 = writer->NewTracePacket();
816
817 // Simulate the end of the batching period, which should trigger a commit to
818 // the service.
819 arbiter_->FlushPendingCommitDataRequests();
820
821 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
822
823 // The first allocated chunk should be complete but need patching, since the
824 // packet extended past the chunk and no patches for the packet size or
825 // string field size were applied yet.
826 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
827 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
828 ASSERT_TRUE(chunk.is_valid());
829 EXPECT_EQ(chunk.header()->packets.load().count, 1u);
830 EXPECT_TRUE(chunk.header()->packets.load().flags &
831 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
832 EXPECT_TRUE(chunk.header()->packets.load().flags &
833 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
834
835 // Verify that a patch for the first chunk was sent to the service.
836 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
837 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
838 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
839 EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
840 chunk.header()->chunk_id.load());
841 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
842 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
843
844 // Verify that the second chunk was committed.
845 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
846 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
847 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 1u);
848 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
849
850 // The second chunk should be in a complete state and should not need
851 // patching, as the patches to it should have been applied in the producer.
852 ASSERT_EQ(abi->GetChunkState(0u, 1u), SharedMemoryABI::kChunkComplete);
853 auto chunk2 = abi->TryAcquireChunkForReading(0u, 1u);
854 ASSERT_TRUE(chunk2.is_valid());
855 EXPECT_EQ(chunk2.header()->packets.load().count, 2);
856 EXPECT_TRUE(chunk2.header()->packets.load().flags &
857 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
858 EXPECT_FALSE(chunk2.header()->packets.load().flags &
859 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
860 }
861
TEST_P(TraceWriterImplTest,FragmentingPacketWithoutEnablingProducerPatching)862 TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
863 // We will simulate a batching cycle by first setting the batching period to
864 // a very large value and will force flush to simulate a flush happening
865 // when we believe it should - in this case when a patch is encountered.
866 //
867 // Note: direct producer-side patching should be disabled by default.
868 arbiter_->SetBatchCommitsDuration(UINT32_MAX);
869
870 const BufferID kBufId = 42;
871 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
872
873 // Write a packet that's guaranteed to span more than a single chunk.
874 auto packet = writer->NewTracePacket();
875 size_t chunk_size = page_size() / 4;
876 std::string large_string(chunk_size, 'x');
877 packet->set_for_testing()->set_str(large_string);
878
879 // Starting a new packet should cause the first chunk and its patches to be
880 // committed to the service.
881 packet->Finalize();
882 auto packet2 = writer->NewTracePacket();
883 arbiter_->FlushPendingCommitDataRequests();
884
885 // The first allocated chunk should be complete but need patching, since the
886 // packet extended past the chunk and no patches for the packet size or
887 // string field size were applied in the producer.
888 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
889 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
890 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
891 ASSERT_TRUE(chunk.is_valid());
892 EXPECT_EQ(chunk.header()->packets.load().count, 1);
893 EXPECT_TRUE(chunk.header()->packets.load().flags &
894 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
895 EXPECT_TRUE(chunk.header()->packets.load().flags &
896 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
897
898 // The first chunk was committed.
899 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
900 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
901 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
902 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
903
904 // The patches for the first chunk were committed.
905 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
906 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
907 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
908 EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
909 chunk.header()->chunk_id.load());
910 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
911 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
912 }
913
914 // Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
915 // get a new chunk while fragmenting a packet. Verifies that data is dropped
916 // until the SMB is freed up and TraceWriter can get a new chunk.
TEST_P(TraceWriterImplTest,FragmentingPacketWhileBufferExhausted)917 TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
918 const BufferID kBufId = 42;
919 std::unique_ptr<TraceWriter> writer =
920 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
921
922 // Write a small first packet, so that |writer| owns a chunk.
923 auto packet = writer->NewTracePacket();
924 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
925 ->drop_packets_for_testing());
926 // 3 bytes for the first_packet_on_sequence flag.
927 EXPECT_EQ(packet->Finalize(), 3u);
928
929 // Grab all the remaining chunks in the SMB in new writers.
930 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
931 for (size_t i = 0; i < other_writers.size(); i++) {
932 other_writers[i] =
933 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
934 auto other_writer_packet = other_writers[i]->NewTracePacket();
935 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
936 ->drop_packets_for_testing());
937 }
938
939 // Write a packet that's guaranteed to span more than a single chunk,
940 // causing |writer| to attempt to acquire a new chunk but fail to do so.
941 auto packet2 = writer->NewTracePacket();
942 size_t chunk_size = page_size() / 4;
943 std::string large_string(chunk_size, 'x');
944 packet2->set_for_testing()->set_str(large_string);
945
946 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
947 ->drop_packets_for_testing());
948
949 // First chunk should be committed.
950 arbiter_->FlushPendingCommitDataRequests();
951 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
952 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
953 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
954 EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
955 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
956
957 // It should not need patching and not have the continuation flag set.
958 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
959 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
960 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
961 ASSERT_TRUE(chunk.is_valid());
962 EXPECT_EQ(chunk.header()->packets.load().count, 2);
963 EXPECT_FALSE(chunk.header()->packets.load().flags &
964 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
965 EXPECT_FALSE(chunk.header()->packets.load().flags &
966 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
967
968 // Writing more data while in garbage mode succeeds. This data is dropped.
969 packet2->Finalize();
970 auto packet3 = writer->NewTracePacket();
971 packet3->set_for_testing()->set_str(large_string);
972
973 // Release the |writer|'s first chunk as free, so that it can grab it again.
974 abi->ReleaseChunkAsFree(std::move(chunk));
975
976 // Starting a new packet should cause TraceWriter to attempt to grab a new
977 // chunk again, because we wrote enough data to wrap the garbage chunk.
978 packet3->Finalize();
979 auto packet4 = writer->NewTracePacket();
980
981 // Grabbing the chunk should have succeeded.
982 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
983 ->drop_packets_for_testing());
984
985 // The first packet in the chunk should have the previous_packet_dropped
986 // flag set, so shouldn't be empty.
987 EXPECT_GT(packet4->Finalize(), 0u);
988
989 // Flushing the writer causes the chunk to be released again.
990 writer->Flush();
991 EXPECT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
992 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
993 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
994 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
995
996 // Chunk should contain only |packet4| and not have any continuation flag
997 // set.
998 ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
999 chunk = abi->TryAcquireChunkForReading(0u, 0u);
1000 ASSERT_TRUE(chunk.is_valid());
1001 ASSERT_EQ(chunk.header()->packets.load().count, 1);
1002 EXPECT_FALSE(chunk.header()->packets.load().flags &
1003 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1004 EXPECT_FALSE(
1005 chunk.header()->packets.load().flags &
1006 SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1007 EXPECT_FALSE(chunk.header()->packets.load().flags &
1008 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1009 }
1010
1011 // Verifies that a TraceWriter that is flushed before the SMB is full and then
1012 // acquires a garbage chunk later recovers and writes a
1013 // previous_packet_dropped marker into the trace.
TEST_P(TraceWriterImplTest,FlushBeforeBufferExhausted)1014 TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
1015 const BufferID kBufId = 42;
1016 std::unique_ptr<TraceWriter> writer =
1017 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1018
1019 // Write a small first packet and flush it, so that |writer| no longer owns
1020 // any chunk.
1021 auto packet = writer->NewTracePacket();
1022 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1023 ->drop_packets_for_testing());
1024 // 3 bytes for the first_packet_on_sequence flag.
1025 EXPECT_EQ(packet->Finalize(), 3u);
1026
1027 // Flush the first chunk away.
1028 writer->Flush();
1029
1030 // First chunk should be committed. Don't release it as free just yet.
1031 arbiter_->FlushPendingCommitDataRequests();
1032 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1033 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1034 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1035
1036 // Grab all the remaining chunks in the SMB in new writers.
1037 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
1038 for (size_t i = 0; i < other_writers.size(); i++) {
1039 other_writers[i] =
1040 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1041 auto other_writer_packet = other_writers[i]->NewTracePacket();
1042 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1043 ->drop_packets_for_testing());
1044 }
1045
1046 // Write another packet, causing |writer| to acquire a garbage chunk.
1047 auto packet2 = writer->NewTracePacket();
1048 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1049 ->drop_packets_for_testing());
1050
1051 // Writing more data while in garbage mode succeeds. This data is dropped.
1052 // Make sure that we fill the garbage chunk, so that |writer| tries to
1053 // re-acquire a valid chunk for the next packet.
1054 size_t chunk_size = page_size() / 4;
1055 std::string large_string(chunk_size, 'x');
1056 packet2->set_for_testing()->set_str(large_string);
1057 packet2->Finalize();
1058
1059 // Next packet should still be in the garbage chunk.
1060 auto packet3 = writer->NewTracePacket();
1061 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1062 ->drop_packets_for_testing());
1063
1064 // Release the first chunk as free, so |writer| can acquire it again.
1065 SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
1066 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1067 auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
1068 ASSERT_TRUE(chunk.is_valid());
1069 abi->ReleaseChunkAsFree(std::move(chunk));
1070
1071 // Fill the garbage chunk, so that the writer attempts to grab another chunk
1072 // for |packet4|.
1073 packet3->set_for_testing()->set_str(large_string);
1074 packet3->Finalize();
1075
1076 // Next packet should go into the reacquired chunk we just released.
1077 auto packet4 = writer->NewTracePacket();
1078 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1079 ->drop_packets_for_testing());
1080
1081 // The first packet in the chunk should have the previous_packet_dropped
1082 // flag set, so shouldn't be empty.
1083 EXPECT_GT(packet4->Finalize(), 0u);
1084
1085 // Flushing the writer causes the chunk to be released again.
1086 writer->Flush();
1087 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1088 EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1089 EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1090 EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
1091
1092 // Chunk should contain only |packet4| and not have any continuation flag
1093 // set.
1094 ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1095 chunk = abi->TryAcquireChunkForReading(0u, 0u);
1096 ASSERT_TRUE(chunk.is_valid());
1097 ASSERT_EQ(chunk.header()->packets.load().count, 1);
1098 ASSERT_FALSE(chunk.header()->packets.load().flags &
1099 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1100 ASSERT_FALSE(
1101 chunk.header()->packets.load().flags &
1102 SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1103 ASSERT_FALSE(chunk.header()->packets.load().flags &
1104 SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1105 }
1106
1107 // Regression test that verifies that flushing a TraceWriter while a
1108 // fragmented packet still has uncommitted patches doesn't hit a DCHECK /
1109 // crash the writer thread.
TEST_P(TraceWriterImplTest,FlushAfterFragmentingPacketWhileBufferExhausted)1110 TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
1111 const BufferID kBufId = 42;
1112 std::unique_ptr<TraceWriter> writer =
1113 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1114
1115 // Write a small first packet, so that |writer| owns a chunk.
1116 auto packet = writer->NewTracePacket();
1117 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1118 ->drop_packets_for_testing());
1119 // 3 bytes for the first_packet_on_sequence flag.
1120 EXPECT_EQ(packet->Finalize(), 3u);
1121
1122 // Grab all but one of the remaining chunks in the SMB in new writers.
1123 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1124 for (size_t i = 0; i < other_writers.size(); i++) {
1125 other_writers[i] =
1126 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1127 auto other_writer_packet = other_writers[i]->NewTracePacket();
1128 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1129 ->drop_packets_for_testing());
1130 }
1131
1132 // Write a packet that's guaranteed to span more than a two chunks, causing
1133 // |writer| to attempt to acquire two new chunks, but fail to acquire the
1134 // second.
1135 auto packet2 = writer->NewTracePacket();
1136 size_t chunk_size = page_size() / 4;
1137 std::string large_string(chunk_size * 2, 'x');
1138 packet2->set_for_testing()->set_str(large_string);
1139
1140 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1141 ->drop_packets_for_testing());
1142
1143 // First two chunks should be committed.
1144 arbiter_->FlushPendingCommitDataRequests();
1145 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1146
1147 // Flushing should succeed, even though some patches are still in the
1148 // writer's patch list.
1149 packet2->Finalize();
1150 writer->Flush();
1151 }
1152
TEST_P(TraceWriterImplTest,GarbageChunkWrap)1153 TEST_P(TraceWriterImplTest, GarbageChunkWrap) {
1154 const BufferID kBufId = 42;
1155
1156 // Grab all chunks in the SMB in new writers.
1157 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4> other_writers;
1158 for (size_t i = 0; i < other_writers.size(); i++) {
1159 other_writers[i] =
1160 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1161 auto other_writer_packet = other_writers[i]->NewTracePacket();
1162 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1163 ->drop_packets_for_testing());
1164 }
1165
1166 // `writer` will only get garbage chunks, since the SMB is exhausted.
1167 std::unique_ptr<TraceWriter> writer =
1168 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1169
1170 const size_t chunk_size = page_size() / 4;
1171 std::string half_chunk_string(chunk_size / 2, 'x');
1172
1173 // Fill the first half of the garbage chunk.
1174 {
1175 auto packet = writer->NewTracePacket();
1176 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1177 ->drop_packets_for_testing());
1178 packet->set_for_testing()->set_str(half_chunk_string);
1179 }
1180
1181 // Fill the second half of the garbage chunk and more. This will call
1182 // GetNewBuffer() and restart from the beginning of the garbage chunk.
1183 {
1184 auto packet = writer->NewTracePacket();
1185 packet->set_for_testing()->set_str(half_chunk_string);
1186 }
1187
1188 // Check that TraceWriterImpl can write at the beginning of the garbage chunk
1189 // without any problems.
1190 {
1191 auto packet = writer->NewTracePacket();
1192 packet->set_for_testing()->set_str("str");
1193 }
1194 }
1195
TEST_P(TraceWriterImplTest,AnnotatePatchWhileBufferExhausted)1196 TEST_P(TraceWriterImplTest, AnnotatePatchWhileBufferExhausted) {
1197 const BufferID kBufId = 42;
1198 std::unique_ptr<TraceWriter> writer =
1199 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1200
1201 // Write a small first packet, so that |writer| owns a chunk.
1202 ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
1203 sw->WriteBytes(reinterpret_cast<const uint8_t*>("X"), 1);
1204 writer->FinishTracePacket();
1205 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1206 ->drop_packets_for_testing());
1207
1208 // Grab all but one of the remaining chunks in the SMB in new writers.
1209 std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1210 for (size_t i = 0; i < other_writers.size(); i++) {
1211 other_writers[i] =
1212 arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1213 auto other_writer_packet = other_writers[i]->NewTracePacket();
1214 EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1215 ->drop_packets_for_testing());
1216 }
1217
1218 // Write a packet that's guaranteed to span more than a two chunks, causing
1219 // |writer| to attempt to acquire two new chunks, but fail to acquire the
1220 // second.
1221 sw = writer->NewTracePacket().TakeStreamWriter();
1222 size_t chunk_size = page_size() / 4;
1223 std::string large_string(chunk_size * 2, 'x');
1224 sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
1225 large_string.size());
1226
1227 EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1228 ->drop_packets_for_testing());
1229
1230 uint8_t* patch1 =
1231 sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
1232 ASSERT_THAT(patch1, NotNull());
1233 patch1[0] = 0;
1234 patch1[1] = 0;
1235 patch1[2] = 0;
1236 patch1[3] = 0;
1237 patch1 = sw->AnnotatePatch(patch1);
1238 EXPECT_THAT(patch1, IsNull());
1239
1240 // First two chunks should be committed.
1241 arbiter_->FlushPendingCommitDataRequests();
1242 ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1243
1244 // Flushing should succeed, even though some patches are still in the
1245 // writer's patch list.
1246 writer->FinishTracePacket();
1247 writer->Flush();
1248 }
1249
TEST_P(TraceWriterImplTest,Flush)1250 TEST_P(TraceWriterImplTest, Flush) {
1251 MockFunction<void()> flush_cb;
1252
1253 const BufferID kBufId = 42;
1254 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1255 {
1256 auto packet = writer->NewTracePacket();
1257 packet->set_for_testing()->set_str("foobar");
1258 }
1259
1260 EXPECT_CALL(flush_cb, Call).Times(0);
1261 ASSERT_FALSE(last_commit_callback_);
1262 writer->Flush(flush_cb.AsStdFunction());
1263 ASSERT_TRUE(last_commit_callback_);
1264 EXPECT_CALL(flush_cb, Call).Times(1);
1265 last_commit_callback_();
1266 }
1267
TEST_P(TraceWriterImplTest,NestedMsgsPatches)1268 TEST_P(TraceWriterImplTest, NestedMsgsPatches) {
1269 const BufferID kBufId = 42;
1270 const uint32_t kNestedFieldId = 1;
1271 const uint32_t kStringFieldId = 2;
1272 const uint32_t kIntFieldId = 3;
1273 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1274
1275 size_t chunk_size = page_size() / 4;
1276 std::string large_string(chunk_size, 'x');
1277
1278 auto packet = writer->NewTracePacket();
1279 auto* nested1 =
1280 packet->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1281 auto* nested2 =
1282 nested1->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1283 auto* nested3 =
1284 nested2->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1285 uint8_t* const old_nested_1_size_field = nested1->size_field();
1286 uint8_t* const old_nested_2_size_field = nested2->size_field();
1287 uint8_t* const old_nested_3_size_field = nested3->size_field();
1288 EXPECT_THAT(old_nested_1_size_field, NotNull());
1289 EXPECT_THAT(old_nested_2_size_field, NotNull());
1290 EXPECT_THAT(old_nested_3_size_field, NotNull());
1291
1292 // Append a small field, which will fit in the current chunk.
1293 nested3->AppendVarInt<uint64_t>(kIntFieldId, 1);
1294
1295 // The `size_field`s still point to the same old location, inside the chunk.
1296 EXPECT_EQ(nested1->size_field(), old_nested_1_size_field);
1297 EXPECT_EQ(nested2->size_field(), old_nested_2_size_field);
1298 EXPECT_EQ(nested3->size_field(), old_nested_3_size_field);
1299
1300 // Append a large string, which will not fit in the current chunk.
1301 nested3->AppendString(kStringFieldId, large_string);
1302
1303 // The `size_field`s will now point to different locations (patches).
1304 EXPECT_THAT(nested1->size_field(),
1305 AllOf(Ne(old_nested_1_size_field), NotNull()));
1306 EXPECT_THAT(nested2->size_field(),
1307 AllOf(Ne(old_nested_2_size_field), NotNull()));
1308 EXPECT_THAT(nested3->size_field(),
1309 AllOf(Ne(old_nested_3_size_field), NotNull()));
1310
1311 packet->Finalize();
1312 writer->Flush();
1313
1314 arbiter_->FlushPendingCommitDataRequests();
1315
1316 ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
1317 EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
1318 EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
1319 EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
1320 EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(3));
1321 }
1322
1323 // TODO(primiano): add multi-writer test.
1324
1325 } // namespace
1326 } // namespace perfetto
1327