• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <vector>
20 
21 #include "perfetto/ext/base/utils.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/core/tracing_service.h"
26 #include "perfetto/protozero/message.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/protozero/scattered_stream_writer.h"
29 #include "src/base/test/gtest_test_suite.h"
30 #include "src/base/test/test_task_runner.h"
31 #include "src/tracing/core/shared_memory_arbiter_impl.h"
32 #include "src/tracing/test/aligned_buffer_test.h"
33 #include "src/tracing/test/mock_producer_endpoint.h"
34 #include "test/gtest_and_gmock.h"
35 
36 #include "protos/perfetto/trace/test_event.gen.h"
37 #include "protos/perfetto/trace/test_event.pbzero.h"
38 #include "protos/perfetto/trace/trace_packet.gen.h"
39 #include "protos/perfetto/trace/trace_packet.pbzero.h"
40 
41 namespace perfetto {
42 namespace {
43 
44 using ChunkHeader = SharedMemoryABI::ChunkHeader;
45 using ShmemMode = SharedMemoryABI::ShmemMode;
46 using ::protozero::ScatteredStreamWriter;
47 using ::testing::AllOf;
48 using ::testing::ElementsAre;
49 using ::testing::IsEmpty;
50 using ::testing::IsNull;
51 using ::testing::MockFunction;
52 using ::testing::Ne;
53 using ::testing::NiceMock;
54 using ::testing::Not;
55 using ::testing::NotNull;
56 using ::testing::Optional;
57 using ::testing::SizeIs;
58 using ::testing::ValuesIn;
59 
60 class TraceWriterImplTest : public AlignedBufferTest {
61  public:
62   struct PatchKey {
63     uint32_t writer_id;
64     uint32_t chunk_id;
operator <perfetto::__anonc7d0dc920111::TraceWriterImplTest::PatchKey65     bool operator<(const PatchKey& other) const {
66       return std::tie(writer_id, chunk_id) <
67              std::tie(other.writer_id, other.chunk_id);
68     }
69   };
SetUp()70   void SetUp() override {
71     default_layout_ =
72         SharedMemoryArbiterImpl::default_page_layout_for_testing();
73     SharedMemoryArbiterImpl::set_default_layout_for_testing(
74         SharedMemoryABI::PageLayout::kPageDiv4);
75     AlignedBufferTest::SetUp();
76     task_runner_.reset(new base::TestTaskRunner());
77     arbiter_.reset(new SharedMemoryArbiterImpl(
78         buf(), buf_size(), ShmemMode::kDefault, page_size(),
79         &mock_producer_endpoint_, task_runner_.get()));
80     ON_CALL(mock_producer_endpoint_, CommitData)
81         .WillByDefault([&](const CommitDataRequest& req,
82                            MockProducerEndpoint::CommitDataCallback cb) {
83           last_commit_ = req;
84           last_commit_callback_ = cb;
85           for (const CommitDataRequest::ChunkToPatch& c :
86                req.chunks_to_patch()) {
87             patches_[PatchKey{c.writer_id(), c.chunk_id()}] = c.patches();
88           }
89         });
90   }
91 
TearDown()92   void TearDown() override {
93     arbiter_.reset();
94     task_runner_.reset();
95     SharedMemoryArbiterImpl::set_default_layout_for_testing(default_layout_);
96   }
97 
CopyPayloadAndApplyPatches(SharedMemoryABI::Chunk & chunk) const98   std::vector<uint8_t> CopyPayloadAndApplyPatches(
99       SharedMemoryABI::Chunk& chunk) const {
100     std::vector<uint8_t> copy(chunk.payload_begin(),
101                               chunk.payload_begin() + chunk.payload_size());
102     ChunkHeader::Packets p = chunk.header()->packets.load();
103 
104     auto it = patches_.find(PatchKey{chunk.header()->writer_id.load(),
105                                      chunk.header()->chunk_id.load()});
106     if (it == patches_.end()) {
107       EXPECT_FALSE(p.flags & ChunkHeader::kChunkNeedsPatching);
108       return copy;
109     }
110     EXPECT_TRUE(p.flags & ChunkHeader::kChunkNeedsPatching);
111 
112     for (const CommitDataRequest::ChunkToPatch::Patch& patch : it->second) {
113       if (patch.offset() + patch.data().size() > copy.size()) {
114         ADD_FAILURE() << "Patch out of bounds";
115         continue;
116       }
117       for (size_t i = 0; i < patch.data().size(); i++) {
118         copy[patch.offset() + i] =
119             reinterpret_cast<const uint8_t*>(patch.data().data())[i];
120       }
121     }
122     return copy;
123   }
124 
125   // Extracts trace packets from the shared memory buffer, and returns copies of
126   // them (after applying the patches received). The producer that writes to the
127   // shared memory (i.e. the trace writer) must be destroyed.
GetPacketsFromShmemAndPatches()128   std::vector<std::string> GetPacketsFromShmemAndPatches() {
129     std::vector<std::string> packets;
130     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
131     bool was_fragmenting = false;
132     for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
133       uint32_t page_layout = abi->GetPageLayout(page_idx);
134       size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
135       for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
136         SharedMemoryABI::ChunkState chunk_state =
137             abi->GetChunkState(page_idx, chunk_idx);
138         if (chunk_state != SharedMemoryABI::kChunkFree &&
139             chunk_state != SharedMemoryABI::kChunkComplete) {
140           ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
141                         << " unexpected state: " << chunk_state;
142           continue;
143         }
144         SharedMemoryABI::Chunk chunk =
145             abi->TryAcquireChunkForReading(page_idx, chunk_idx);
146         if (!chunk.is_valid())
147           continue;
148         ChunkHeader::Packets p = chunk.header()->packets.load();
149 
150         EXPECT_EQ(
151             was_fragmenting,
152             static_cast<bool>(p.flags &
153                               ChunkHeader::kFirstPacketContinuesFromPrevChunk));
154 
155         std::vector<uint8_t> payload = CopyPayloadAndApplyPatches(chunk);
156 
157         const uint8_t* read_ptr = payload.data();
158         const uint8_t* const end_read_ptr = payload.data() + payload.size();
159 
160         size_t num_fragments = p.count;
161         for (; num_fragments && read_ptr < end_read_ptr; num_fragments--) {
162           uint64_t len;
163           read_ptr =
164               protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
165           if (!was_fragmenting || packets.empty()) {
166             packets.push_back(std::string());
167           }
168           was_fragmenting = false;
169           if (read_ptr + len > end_read_ptr) {
170             ADD_FAILURE() << "Page " << page_idx << " chunk " << chunk_idx
171                           << " malformed chunk";
172           }
173           packets.back().append(reinterpret_cast<const char*>(read_ptr),
174                                 static_cast<size_t>(len));
175           read_ptr += len;
176         }
177         EXPECT_EQ(num_fragments, 0u);
178         was_fragmenting =
179             p.flags & ChunkHeader::kLastPacketContinuesOnNextChunk;
180       }
181     }
182     // Ignore empty packets (like tracing service does).
183     packets.erase(
184         std::remove_if(packets.begin(), packets.end(),
185                        [](const std::string& p) { return p.empty(); }),
186         packets.end());
187     return packets;
188   }
189 
190   struct ChunkInABI {
191     size_t page_idx;
192     uint32_t page_layout;
193     size_t chunk_idx;
194   };
GetFirstChunkBeingWritten()195   std::optional<ChunkInABI> GetFirstChunkBeingWritten() {
196     SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
197     for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
198       uint32_t page_layout = abi->GetPageLayout(page_idx);
199       size_t num_chunks = SharedMemoryABI::GetNumChunksForLayout(page_layout);
200       for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
201         SharedMemoryABI::ChunkState chunk_state =
202             abi->GetChunkState(page_idx, chunk_idx);
203         if (chunk_state != SharedMemoryABI::kChunkBeingWritten) {
204           continue;
205         }
206         return ChunkInABI{page_idx, page_layout, chunk_idx};
207       }
208     }
209     return std::nullopt;
210   }
211 
GetChunkFragments(size_t packets_count,const void * chunk_payload,size_t chunk_payload_size)212   static std::optional<std::vector<std::string>> GetChunkFragments(
213       size_t packets_count,
214       const void* chunk_payload,
215       size_t chunk_payload_size) {
216     std::vector<std::string> fragments;
217     const uint8_t* read_ptr = static_cast<const uint8_t*>(chunk_payload);
218     const uint8_t* const end_read_ptr = read_ptr + chunk_payload_size;
219 
220     for (size_t num_fragments = packets_count;
221          num_fragments && read_ptr < end_read_ptr; num_fragments--) {
222       uint64_t len;
223       read_ptr =
224           protozero::proto_utils::ParseVarInt(read_ptr, end_read_ptr, &len);
225       if (read_ptr + len > end_read_ptr) {
226         return std::nullopt;
227       }
228       fragments.push_back(std::string(reinterpret_cast<const char*>(read_ptr),
229                                       static_cast<size_t>(len)));
230       read_ptr += len;
231     }
232     return std::make_optional(std::move(fragments));
233   }
234 
235   SharedMemoryABI::PageLayout default_layout_;
236   CommitDataRequest last_commit_;
237   ProducerEndpoint::CommitDataCallback last_commit_callback_;
238   std::map<PatchKey, std::vector<CommitDataRequest::ChunkToPatch::Patch>>
239       patches_;
240   NiceMock<MockProducerEndpoint> mock_producer_endpoint_;
241 
242   std::unique_ptr<base::TestTaskRunner> task_runner_;
243   std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
244 };
245 
246 size_t const kPageSizes[] = {4096, 65536};
247 INSTANTIATE_TEST_SUITE_P(PageSize, TraceWriterImplTest, ValuesIn(kPageSizes));
248 
TEST_P(TraceWriterImplTest,NewTracePacket)249 TEST_P(TraceWriterImplTest, NewTracePacket) {
250   const BufferID kBufId = 42;
251   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
252   const size_t kNumPackets = 32;
253   for (size_t i = 0; i < kNumPackets; i++) {
254     auto packet = writer->NewTracePacket();
255     packet->set_for_testing()->set_str(
256         std::string("foobar " + std::to_string(i)));
257   }
258 
259   // Destroying the TraceWriteImpl should cause the last packet to be finalized
260   // and the chunk to be put back in the kChunkComplete state.
261   writer.reset();
262 
263   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
264   ASSERT_THAT(packets, SizeIs(kNumPackets));
265   for (size_t i = 0; i < kNumPackets; i++) {
266     protos::gen::TracePacket packet;
267     EXPECT_TRUE(packet.ParseFromString(packets[i]));
268     EXPECT_EQ(packet.for_testing().str(), "foobar " + std::to_string(i));
269     if (i == 0) {
270       EXPECT_TRUE(packet.first_packet_on_sequence());
271     } else {
272       EXPECT_FALSE(packet.first_packet_on_sequence());
273     }
274   }
275 }
276 
TEST_P(TraceWriterImplTest,NewTracePacketLargePackets)277 TEST_P(TraceWriterImplTest, NewTracePacketLargePackets) {
278   const BufferID kBufId = 42;
279   const size_t chunk_size = page_size() / 4;
280   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
281   {
282     auto packet = writer->NewTracePacket();
283     packet->set_for_testing()->set_str(std::string("PACKET_1") +
284                                        std::string(chunk_size, 'x'));
285   }
286   {
287     auto packet = writer->NewTracePacket();
288     packet->set_for_testing()->set_str(std::string("PACKET_2") +
289                                        std::string(chunk_size, 'x'));
290   }
291 
292   // Destroying the TraceWriteImpl should cause the last packet to be finalized
293   // and the chunk to be put back in the kChunkComplete state.
294   writer.reset();
295 
296   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
297   ASSERT_THAT(packets, SizeIs(2));
298   {
299     protos::gen::TracePacket packet;
300     EXPECT_TRUE(packet.ParseFromString(packets[0]));
301     EXPECT_EQ(packet.for_testing().str(),
302               std::string("PACKET_1") + std::string(chunk_size, 'x'));
303   }
304   {
305     protos::gen::TracePacket packet;
306     EXPECT_TRUE(packet.ParseFromString(packets[1]));
307     EXPECT_EQ(packet.for_testing().str(),
308               std::string("PACKET_2") + std::string(chunk_size, 'x'));
309   }
310 }
311 
312 // A prefix corresponding to first_packet_on_sequence = true in a serialized
313 // TracePacket proto.
314 constexpr char kFirstPacketOnSequenceFlagPrefix[] = {static_cast<char>(0xB8),
315                                                      0x5, 0x1, 0x0};
316 
TEST_P(TraceWriterImplTest,NewTracePacketTakeWriter)317 TEST_P(TraceWriterImplTest, NewTracePacketTakeWriter) {
318   const BufferID kBufId = 42;
319   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
320   const size_t kNumPackets = 32;
321   for (size_t i = 0; i < kNumPackets; i++) {
322     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
323     std::string raw_proto_bytes =
324         std::string("RAW_PROTO_BYTES_") + std::to_string(i);
325     sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
326                    raw_proto_bytes.size());
327     writer->FinishTracePacket();
328   }
329 
330   // Destroying the TraceWriteImpl should cause the last packet to be finalized
331   // and the chunk to be put back in the kChunkComplete state.
332   writer.reset();
333 
334   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
335   ASSERT_THAT(packets, SizeIs(kNumPackets));
336   for (size_t i = 0; i < kNumPackets; i++) {
337     std::string expected = "RAW_PROTO_BYTES_" + std::to_string(i);
338     if (i == 0) {
339       expected = kFirstPacketOnSequenceFlagPrefix + expected;
340     }
341     EXPECT_EQ(packets[i], expected);
342   }
343 }
344 
345 #if defined(GTEST_HAS_DEATH_TEST)
346 using TraceWriterImplDeathTest = TraceWriterImplTest;
347 INSTANTIATE_TEST_SUITE_P(PageSize,
348                          TraceWriterImplDeathTest,
349                          ValuesIn(kPageSizes));
350 
TEST_P(TraceWriterImplDeathTest,NewTracePacketTakeWriterNoFinish)351 TEST_P(TraceWriterImplDeathTest, NewTracePacketTakeWriterNoFinish) {
352   const BufferID kBufId = 42;
353   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
354 
355   TraceWriterImpl::TracePacketHandle handle = writer->NewTracePacket();
356 
357   // Avoid a secondary DCHECK failure from ~TraceWriterImpl() =>
358   // Message::Finalize() due to the stream writer being modified behind the
359   // Message's back. This turns the Finalize() call into a no-op.
360   handle->set_size_field(nullptr);
361 
362   ScatteredStreamWriter* sw = handle.TakeStreamWriter();
363   std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
364   sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
365                  raw_proto_bytes.size());
366 
367   EXPECT_DEATH({ writer->NewTracePacket(); }, "");
368 }
369 #endif  // defined(GTEST_HAS_DEATH_TEST)
370 
TEST_P(TraceWriterImplTest,AnnotatePatch)371 TEST_P(TraceWriterImplTest, AnnotatePatch) {
372   const BufferID kBufId = 42;
373   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
374   ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
375   std::string raw_proto_bytes = std::string("RAW_PROTO_BYTES");
376   sw->WriteBytes(reinterpret_cast<const uint8_t*>(raw_proto_bytes.data()),
377                  raw_proto_bytes.size());
378 
379   uint8_t* patch1 =
380       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
381   ASSERT_THAT(patch1, NotNull());
382   patch1[0] = 0;
383   patch1[1] = 0;
384   patch1[2] = 0;
385   patch1[3] = 0;
386   const uint8_t* old_chunk_pointer = patch1;
387   patch1 = sw->AnnotatePatch(patch1);
388   EXPECT_NE(patch1, old_chunk_pointer);
389   ASSERT_THAT(patch1, NotNull());
390 
391   sw->WriteByte('X');
392 
393   uint8_t* patch2 =
394       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
395   ASSERT_THAT(patch2, NotNull());
396   patch2[0] = 0;
397   patch2[1] = 0;
398   patch2[2] = 0;
399   patch2[3] = 0;
400   old_chunk_pointer = patch2;
401   patch2 = sw->AnnotatePatch(patch2);
402   EXPECT_NE(patch2, old_chunk_pointer);
403   ASSERT_THAT(patch2, NotNull());
404 
405   const size_t chunk_size = page_size() / 4;
406   std::string large_string(chunk_size, 'x');
407 
408   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
409                  large_string.size());
410 
411   uint8_t* patch3 =
412       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
413   ASSERT_THAT(patch3, NotNull());
414   patch3[0] = 0;
415   patch3[1] = 0;
416   patch3[2] = 0;
417   patch3[3] = 0;
418   old_chunk_pointer = patch3;
419   patch3 = sw->AnnotatePatch(patch3);
420   EXPECT_NE(patch3, old_chunk_pointer);
421   ASSERT_THAT(patch3, NotNull());
422 
423   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
424                  large_string.size());
425 
426   patch1[0] = 0x11;
427   patch1[1] = 0x11;
428   patch1[2] = 0x11;
429   patch1[3] = 0x11;
430 
431   patch2[0] = 0x22;
432   patch2[1] = 0x22;
433   patch2[2] = 0x22;
434   patch2[3] = 0x22;
435 
436   patch3[0] = 0x33;
437   patch3[1] = 0x33;
438   patch3[2] = 0x33;
439   patch3[3] = 0x33;
440 
441   writer->FinishTracePacket();
442 
443   // Destroying the TraceWriteImpl should cause the last packet to be finalized
444   // and the chunk to be put back in the kChunkComplete state.
445   writer.reset();
446 
447   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
448   EXPECT_THAT(
449       packets,
450       ElementsAre(
451           kFirstPacketOnSequenceFlagPrefix + std::string("RAW_PROTO_BYTES") +
452           std::string("\x11\x11\x11\x11") + std::string("X") +
453           std::string("\x22\x22\x22\x22") + std::string(chunk_size, 'x') +
454           std::string("\x33\x33\x33\x33") + std::string(chunk_size, 'x')));
455 }
456 
TEST_P(TraceWriterImplTest,MixManualTakeAndMessage)457 TEST_P(TraceWriterImplTest, MixManualTakeAndMessage) {
458   const BufferID kBufId = 42;
459   const size_t chunk_size = page_size() / 4;
460   const std::string large_string(chunk_size, 'x');
461 
462   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
463 
464   {
465     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
466     std::string packet1 = std::string("PACKET_1_");
467     sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet1.data()),
468                    packet1.size());
469     uint8_t* patch =
470         sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
471     ASSERT_THAT(patch, NotNull());
472     patch[0] = 0;
473     patch[1] = 0;
474     patch[2] = 0;
475     patch[3] = 0;
476     const uint8_t* old_chunk_pointer = patch;
477     patch = sw->AnnotatePatch(patch);
478     EXPECT_NE(patch, old_chunk_pointer);
479     ASSERT_THAT(patch, NotNull());
480     sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
481                    large_string.size());
482     patch[0] = 0xFF;
483     patch[1] = 0xFF;
484     patch[2] = 0xFF;
485     patch[3] = 0xFF;
486     writer->FinishTracePacket();
487   }
488 
489   {
490     auto msg = writer->NewTracePacket();
491     std::string packet2 = std::string("PACKET_2_");
492     msg->AppendRawProtoBytes(packet2.data(), packet2.size());
493     auto* nested = msg->BeginNestedMessage<protozero::Message>(1);
494     nested->AppendRawProtoBytes(large_string.data(), large_string.size());
495   }
496 
497   {
498     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
499     std::string packet3 = std::string("PACKET_3_");
500     sw->WriteBytes(reinterpret_cast<const uint8_t*>(packet3.data()),
501                    packet3.size());
502     uint8_t* patch =
503         sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
504     ASSERT_THAT(patch, NotNull());
505     patch[0] = 0;
506     patch[1] = 0;
507     patch[2] = 0;
508     patch[3] = 0;
509     const uint8_t* old_chunk_pointer = patch;
510     patch = sw->AnnotatePatch(patch);
511     EXPECT_NE(patch, old_chunk_pointer);
512     ASSERT_THAT(patch, NotNull());
513     sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
514                    large_string.size());
515     patch[0] = 0xFF;
516     patch[1] = 0xFF;
517     patch[2] = 0xFF;
518     patch[3] = 0xFF;
519     writer->FinishTracePacket();
520   }
521 
522   // Destroying the TraceWriteImpl should cause the last packet to be finalized
523   // and the chunk to be put back in the kChunkComplete state.
524   writer.reset();
525 
526   uint8_t buf[protozero::proto_utils::kMessageLengthFieldSize];
527   protozero::proto_utils::WriteRedundantVarInt(
528       static_cast<uint32_t>(large_string.size()), buf,
529       protozero::proto_utils::kMessageLengthFieldSize);
530   std::string encoded_size(reinterpret_cast<char*>(buf), sizeof(buf));
531 
532   std::vector<std::string> packets = GetPacketsFromShmemAndPatches();
533   EXPECT_THAT(
534       packets,
535       ElementsAre(kFirstPacketOnSequenceFlagPrefix + std::string("PACKET_1_") +
536                       std::string("\xFF\xFF\xFF\xFF") +
537                       std::string(chunk_size, 'x'),
538                   std::string("PACKET_2_") + std::string("\x0A") +
539                       encoded_size + std::string(chunk_size, 'x'),
540                   std::string("PACKET_3_") + std::string("\xFF\xFF\xFF\xFF") +
541                       std::string(chunk_size, 'x')));
542 }
543 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketScrapable)544 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketScrapable) {
545   const BufferID kBufId = 42;
546 
547   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
548 
549   auto packet = writer->NewTracePacket();
550   packet->set_for_testing()->set_str("packet1");
551 
552   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
553   ASSERT_TRUE(chunk_in_abi.has_value());
554 
555   auto* abi = arbiter_->shmem_abi_for_testing();
556   SharedMemoryABI::Chunk chunk =
557       abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
558                              chunk_in_abi->chunk_idx);
559   ASSERT_TRUE(chunk.is_valid());
560 
561   EXPECT_EQ(chunk.header()->packets.load().count, 1);
562   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
563             SharedMemoryABI::ChunkState::kChunkBeingWritten);
564 
565   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
566 
567   // After destroying the message handle, the chunk header should have an
568   // inflated packet count.
569   EXPECT_EQ(chunk.header()->packets.load().count, 2);
570   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
571             SharedMemoryABI::ChunkState::kChunkBeingWritten);
572 
573   writer.reset();
574 
575   EXPECT_EQ(chunk.header()->packets.load().count, 2);
576   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
577             SharedMemoryABI::ChunkState::kChunkComplete);
578   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
579               Optional(ElementsAre(Not(IsEmpty()))));
580 }
581 
TEST_P(TraceWriterImplTest,FinishTracePacketScrapable)582 TEST_P(TraceWriterImplTest, FinishTracePacketScrapable) {
583   const BufferID kBufId = 42;
584 
585   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
586 
587   {
588     protos::pbzero::TestEvent test_event;
589     protozero::MessageArena arena;
590     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
591     uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
592     uint8_t* data_end = protozero::proto_utils::WriteVarInt(
593         protozero::proto_utils::MakeTagLengthDelimited(
594             protos::pbzero::TracePacket::kForTestingFieldNumber),
595         data);
596     sw->WriteBytes(data, static_cast<size_t>(data_end - data));
597     test_event.Reset(sw, &arena);
598     test_event.set_size_field(
599         sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
600     test_event.set_str("payload1");
601   }
602 
603   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
604   ASSERT_TRUE(chunk_in_abi.has_value());
605 
606   auto* abi = arbiter_->shmem_abi_for_testing();
607   SharedMemoryABI::Chunk chunk =
608       abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
609                              chunk_in_abi->chunk_idx);
610   ASSERT_TRUE(chunk.is_valid());
611 
612   EXPECT_EQ(chunk.header()->packets.load().count, 1);
613   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
614             SharedMemoryABI::ChunkState::kChunkBeingWritten);
615 
616   writer->FinishTracePacket();
617 
618   // After a call to FinishTracePacket, the chunk header should have an inflated
619   // packet count.
620   EXPECT_EQ(chunk.header()->packets.load().count, 2);
621   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
622             SharedMemoryABI::ChunkState::kChunkBeingWritten);
623   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
624               Optional(ElementsAre(Not(IsEmpty()))));
625 
626   // An extra call to FinishTracePacket should have no effect.
627   EXPECT_EQ(chunk.header()->packets.load().count, 2);
628   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
629             SharedMemoryABI::ChunkState::kChunkBeingWritten);
630   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
631               Optional(ElementsAre(Not(IsEmpty()))));
632 
633   writer.reset();
634 
635   EXPECT_EQ(chunk.header()->packets.load().count, 2);
636   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
637             SharedMemoryABI::ChunkState::kChunkComplete);
638   EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
639               Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
640 }
641 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedAndFinishTracePacketScrapable)642 TEST_P(TraceWriterImplTest,
643        MessageHandleDestroyedAndFinishTracePacketScrapable) {
644   const BufferID kBufId = 42;
645 
646   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
647 
648   auto packet = writer->NewTracePacket();
649   packet->set_for_testing()->set_str("packet1");
650 
651   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
652   ASSERT_TRUE(chunk_in_abi.has_value());
653 
654   auto* abi = arbiter_->shmem_abi_for_testing();
655   SharedMemoryABI::Chunk chunk =
656       abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
657                              chunk_in_abi->chunk_idx);
658   ASSERT_TRUE(chunk.is_valid());
659 
660   EXPECT_EQ(chunk.header()->packets.load().count, 1);
661   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
662             SharedMemoryABI::ChunkState::kChunkBeingWritten);
663   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
664 
665   // After destroying the message handle, the chunk header should have an
666   // inflated packet count.
667   EXPECT_EQ(chunk.header()->packets.load().count, 2);
668   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
669             SharedMemoryABI::ChunkState::kChunkBeingWritten);
670 
671   writer->FinishTracePacket();
672 
673   // An extra call to FinishTracePacket should have no effect.
674   EXPECT_EQ(chunk.header()->packets.load().count, 2);
675   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
676             SharedMemoryABI::ChunkState::kChunkBeingWritten);
677   EXPECT_THAT(GetChunkFragments(1, chunk.payload_begin(), chunk.payload_size()),
678               Optional(ElementsAre(Not(IsEmpty()))));
679 
680   writer.reset();
681 
682   EXPECT_EQ(chunk.header()->packets.load().count, 2);
683   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
684             SharedMemoryABI::ChunkState::kChunkComplete);
685   EXPECT_THAT(GetChunkFragments(2, chunk.payload_begin(), chunk.payload_size()),
686               Optional(ElementsAre(Not(IsEmpty()), IsEmpty())));
687 }
688 
TEST_P(TraceWriterImplTest,MessageHandleDestroyedPacketFullChunk)689 TEST_P(TraceWriterImplTest, MessageHandleDestroyedPacketFullChunk) {
690   const BufferID kBufId = 42;
691 
692   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
693 
694   auto packet = writer->NewTracePacket();
695   protos::pbzero::TestEvent* test_event = packet->set_for_testing();
696   std::string chunk_filler(test_event->stream_writer()->bytes_available(),
697                            '\0');
698   test_event->AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
699 
700   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
701   ASSERT_TRUE(chunk_in_abi.has_value());
702 
703   auto* abi = arbiter_->shmem_abi_for_testing();
704   SharedMemoryABI::Chunk chunk =
705       abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
706                              chunk_in_abi->chunk_idx);
707   ASSERT_TRUE(chunk.is_valid());
708 
709   EXPECT_EQ(chunk.header()->packets.load().count, 1);
710   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
711             SharedMemoryABI::ChunkState::kChunkBeingWritten);
712   // Finish the TracePacket: since there's no space for an empty packet, the
713   // trace writer should immediately mark the chunk as completed.
714   packet = protozero::MessageHandle<protos::pbzero::TracePacket>();
715 
716   EXPECT_EQ(chunk.header()->packets.load().count, 1);
717   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
718             SharedMemoryABI::ChunkState::kChunkComplete);
719 
720   writer.reset();
721 
722   EXPECT_EQ(chunk.header()->packets.load().count, 1);
723   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
724             SharedMemoryABI::ChunkState::kChunkComplete);
725 }
726 
TEST_P(TraceWriterImplTest,FinishTracePacketFullChunk)727 TEST_P(TraceWriterImplTest, FinishTracePacketFullChunk) {
728   const BufferID kBufId = 42;
729 
730   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
731 
732   {
733     protos::pbzero::TestEvent test_event;
734     protozero::MessageArena arena;
735     ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
736     uint8_t data[protozero::proto_utils::kMaxTagEncodedSize];
737     uint8_t* data_end = protozero::proto_utils::WriteVarInt(
738         protozero::proto_utils::MakeTagLengthDelimited(
739             protos::pbzero::TracePacket::kForTestingFieldNumber),
740         data);
741     sw->WriteBytes(data, static_cast<size_t>(data_end - data));
742     test_event.Reset(sw, &arena);
743     test_event.set_size_field(
744         sw->ReserveBytes(protozero::proto_utils::kMessageLengthFieldSize));
745     std::string chunk_filler(sw->bytes_available(), '\0');
746     test_event.AppendRawProtoBytes(chunk_filler.data(), chunk_filler.size());
747   }
748 
749   std::optional<ChunkInABI> chunk_in_abi = GetFirstChunkBeingWritten();
750   ASSERT_TRUE(chunk_in_abi.has_value());
751 
752   auto* abi = arbiter_->shmem_abi_for_testing();
753   SharedMemoryABI::Chunk chunk =
754       abi->GetChunkUnchecked(chunk_in_abi->page_idx, chunk_in_abi->page_layout,
755                              chunk_in_abi->chunk_idx);
756   ASSERT_TRUE(chunk.is_valid());
757 
758   EXPECT_EQ(chunk.header()->packets.load().count, 1);
759   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
760             SharedMemoryABI::ChunkState::kChunkBeingWritten);
761 
762   // Finish the TracePacket: since there's no space for an empty packet, the
763   // trace writer should immediately mark the chunk as completed, instead of
764   // inflating the count.
765   writer->FinishTracePacket();
766 
767   EXPECT_EQ(chunk.header()->packets.load().count, 1);
768   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
769             SharedMemoryABI::ChunkState::kChunkComplete);
770 
771   writer.reset();
772 
773   EXPECT_EQ(chunk.header()->packets.load().count, 1);
774   EXPECT_EQ(abi->GetChunkState(chunk_in_abi->page_idx, chunk_in_abi->chunk_idx),
775             SharedMemoryABI::ChunkState::kChunkComplete);
776 }
777 
TEST_P(TraceWriterImplTest,FragmentingPacketWithProducerAndServicePatching)778 TEST_P(TraceWriterImplTest, FragmentingPacketWithProducerAndServicePatching) {
779   const BufferID kBufId = 42;
780   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
781 
782   // Write a packet that's guaranteed to span more than a single chunk, but
783   // less than two chunks.
784   auto packet = writer->NewTracePacket();
785   size_t chunk_size = page_size() / 4;
786   std::string large_string(chunk_size, 'x');
787   packet->set_for_testing()->set_str(large_string);
788 
789   // First chunk should be committed.
790   arbiter_->FlushPendingCommitDataRequests();
791   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
792   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
793   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
794   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
795   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
796 
797   // We will simulate a batching cycle by first setting the batching period to
798   // a very large value and then force-flushing when we are done writing data.
799   arbiter_->SetDirectSMBPatchingSupportedByService();
800   ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
801   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
802 
803   // Write a second packet that's guaranteed to span more than a single chunk.
804   // Starting a new trace packet should cause the patches for the first packet
805   // (i.e. for the first chunk) to be queued for sending to the service. They
806   // cannot be applied locally because the first chunk was already committed.
807   packet->Finalize();
808   auto packet2 = writer->NewTracePacket();
809   packet2->set_for_testing()->set_str(large_string);
810 
811   // Starting a new packet yet again should cause the patches for the second
812   // packet (i.e. for the second chunk) to be applied in the producer, because
813   // the second chunk has not been committed yet.
814   packet2->Finalize();
815   auto packet3 = writer->NewTracePacket();
816 
817   // Simulate the end of the batching period, which should trigger a commit to
818   // the service.
819   arbiter_->FlushPendingCommitDataRequests();
820 
821   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
822 
823   // The first allocated chunk should be complete but need patching, since the
824   // packet extended past the chunk and no patches for the packet size or
825   // string field size were applied yet.
826   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
827   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
828   ASSERT_TRUE(chunk.is_valid());
829   EXPECT_EQ(chunk.header()->packets.load().count, 1u);
830   EXPECT_TRUE(chunk.header()->packets.load().flags &
831               SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
832   EXPECT_TRUE(chunk.header()->packets.load().flags &
833               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
834 
835   // Verify that a patch for the first chunk was sent to the service.
836   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
837   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
838   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
839   EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
840             chunk.header()->chunk_id.load());
841   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
842   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
843 
844   // Verify that the second chunk was committed.
845   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
846   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
847   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 1u);
848   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
849 
850   // The second chunk should be in a complete state and should not need
851   // patching, as the patches to it should have been applied in the producer.
852   ASSERT_EQ(abi->GetChunkState(0u, 1u), SharedMemoryABI::kChunkComplete);
853   auto chunk2 = abi->TryAcquireChunkForReading(0u, 1u);
854   ASSERT_TRUE(chunk2.is_valid());
855   EXPECT_EQ(chunk2.header()->packets.load().count, 2);
856   EXPECT_TRUE(chunk2.header()->packets.load().flags &
857               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
858   EXPECT_FALSE(chunk2.header()->packets.load().flags &
859                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
860 }
861 
TEST_P(TraceWriterImplTest,FragmentingPacketWithoutEnablingProducerPatching)862 TEST_P(TraceWriterImplTest, FragmentingPacketWithoutEnablingProducerPatching) {
863   // We will simulate a batching cycle by first setting the batching period to
864   // a very large value and will force flush to simulate a flush happening
865   // when we believe it should - in this case when a patch is encountered.
866   //
867   // Note: direct producer-side patching should be disabled by default.
868   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
869 
870   const BufferID kBufId = 42;
871   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
872 
873   // Write a packet that's guaranteed to span more than a single chunk.
874   auto packet = writer->NewTracePacket();
875   size_t chunk_size = page_size() / 4;
876   std::string large_string(chunk_size, 'x');
877   packet->set_for_testing()->set_str(large_string);
878 
879   // Starting a new packet should cause the first chunk and its patches to be
880   // committed to the service.
881   packet->Finalize();
882   auto packet2 = writer->NewTracePacket();
883   arbiter_->FlushPendingCommitDataRequests();
884 
885   // The first allocated chunk should be complete but need patching, since the
886   // packet extended past the chunk and no patches for the packet size or
887   // string field size were applied in the producer.
888   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
889   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
890   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
891   ASSERT_TRUE(chunk.is_valid());
892   EXPECT_EQ(chunk.header()->packets.load().count, 1);
893   EXPECT_TRUE(chunk.header()->packets.load().flags &
894               SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
895   EXPECT_TRUE(chunk.header()->packets.load().flags &
896               SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
897 
898   // The first chunk was committed.
899   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
900   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
901   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
902   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
903 
904   // The patches for the first chunk were committed.
905   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
906   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
907   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
908   EXPECT_EQ(last_commit_.chunks_to_patch()[0].chunk_id(),
909             chunk.header()->chunk_id.load());
910   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
911   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(1));
912 }
913 
914 // Sets up a scenario in which the SMB is exhausted and TraceWriter fails to
915 // get a new chunk while fragmenting a packet. Verifies that data is dropped
916 // until the SMB is freed up and TraceWriter can get a new chunk.
TEST_P(TraceWriterImplTest,FragmentingPacketWhileBufferExhausted)917 TEST_P(TraceWriterImplTest, FragmentingPacketWhileBufferExhausted) {
918   const BufferID kBufId = 42;
919   std::unique_ptr<TraceWriter> writer =
920       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
921 
922   // Write a small first packet, so that |writer| owns a chunk.
923   auto packet = writer->NewTracePacket();
924   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
925                    ->drop_packets_for_testing());
926   // 3 bytes for the first_packet_on_sequence flag.
927   EXPECT_EQ(packet->Finalize(), 3u);
928 
929   // Grab all the remaining chunks in the SMB in new writers.
930   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
931   for (size_t i = 0; i < other_writers.size(); i++) {
932     other_writers[i] =
933         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
934     auto other_writer_packet = other_writers[i]->NewTracePacket();
935     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
936                      ->drop_packets_for_testing());
937   }
938 
939   // Write a packet that's guaranteed to span more than a single chunk,
940   // causing |writer| to attempt to acquire a new chunk but fail to do so.
941   auto packet2 = writer->NewTracePacket();
942   size_t chunk_size = page_size() / 4;
943   std::string large_string(chunk_size, 'x');
944   packet2->set_for_testing()->set_str(large_string);
945 
946   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
947                   ->drop_packets_for_testing());
948 
949   // First chunk should be committed.
950   arbiter_->FlushPendingCommitDataRequests();
951   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
952   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
953   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
954   EXPECT_EQ(last_commit_.chunks_to_move()[0].target_buffer(), kBufId);
955   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
956 
957   // It should not need patching and not have the continuation flag set.
958   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
959   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
960   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
961   ASSERT_TRUE(chunk.is_valid());
962   EXPECT_EQ(chunk.header()->packets.load().count, 2);
963   EXPECT_FALSE(chunk.header()->packets.load().flags &
964                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
965   EXPECT_FALSE(chunk.header()->packets.load().flags &
966                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
967 
968   // Writing more data while in garbage mode succeeds. This data is dropped.
969   packet2->Finalize();
970   auto packet3 = writer->NewTracePacket();
971   packet3->set_for_testing()->set_str(large_string);
972 
973   // Release the |writer|'s first chunk as free, so that it can grab it again.
974   abi->ReleaseChunkAsFree(std::move(chunk));
975 
976   // Starting a new packet should cause TraceWriter to attempt to grab a new
977   // chunk again, because we wrote enough data to wrap the garbage chunk.
978   packet3->Finalize();
979   auto packet4 = writer->NewTracePacket();
980 
981   // Grabbing the chunk should have succeeded.
982   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
983                    ->drop_packets_for_testing());
984 
985   // The first packet in the chunk should have the previous_packet_dropped
986   // flag set, so shouldn't be empty.
987   EXPECT_GT(packet4->Finalize(), 0u);
988 
989   // Flushing the writer causes the chunk to be released again.
990   writer->Flush();
991   EXPECT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
992   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
993   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
994   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
995 
996   // Chunk should contain only |packet4| and not have any continuation flag
997   // set.
998   ASSERT_EQ(abi->GetChunkState(0u, 0u), SharedMemoryABI::kChunkComplete);
999   chunk = abi->TryAcquireChunkForReading(0u, 0u);
1000   ASSERT_TRUE(chunk.is_valid());
1001   ASSERT_EQ(chunk.header()->packets.load().count, 1);
1002   EXPECT_FALSE(chunk.header()->packets.load().flags &
1003                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1004   EXPECT_FALSE(
1005       chunk.header()->packets.load().flags &
1006       SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1007   EXPECT_FALSE(chunk.header()->packets.load().flags &
1008                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1009 }
1010 
1011 // Verifies that a TraceWriter that is flushed before the SMB is full and then
1012 // acquires a garbage chunk later recovers and writes a
1013 // previous_packet_dropped marker into the trace.
TEST_P(TraceWriterImplTest,FlushBeforeBufferExhausted)1014 TEST_P(TraceWriterImplTest, FlushBeforeBufferExhausted) {
1015   const BufferID kBufId = 42;
1016   std::unique_ptr<TraceWriter> writer =
1017       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1018 
1019   // Write a small first packet and flush it, so that |writer| no longer owns
1020   // any chunk.
1021   auto packet = writer->NewTracePacket();
1022   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1023                    ->drop_packets_for_testing());
1024   // 3 bytes for the first_packet_on_sequence flag.
1025   EXPECT_EQ(packet->Finalize(), 3u);
1026 
1027   // Flush the first chunk away.
1028   writer->Flush();
1029 
1030   // First chunk should be committed. Don't release it as free just yet.
1031   arbiter_->FlushPendingCommitDataRequests();
1032   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1033   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1034   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1035 
1036   // Grab all the remaining chunks in the SMB in new writers.
1037   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 1> other_writers;
1038   for (size_t i = 0; i < other_writers.size(); i++) {
1039     other_writers[i] =
1040         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1041     auto other_writer_packet = other_writers[i]->NewTracePacket();
1042     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1043                      ->drop_packets_for_testing());
1044   }
1045 
1046   // Write another packet, causing |writer| to acquire a garbage chunk.
1047   auto packet2 = writer->NewTracePacket();
1048   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1049                   ->drop_packets_for_testing());
1050 
1051   // Writing more data while in garbage mode succeeds. This data is dropped.
1052   // Make sure that we fill the garbage chunk, so that |writer| tries to
1053   // re-acquire a valid chunk for the next packet.
1054   size_t chunk_size = page_size() / 4;
1055   std::string large_string(chunk_size, 'x');
1056   packet2->set_for_testing()->set_str(large_string);
1057   packet2->Finalize();
1058 
1059   // Next packet should still be in the garbage chunk.
1060   auto packet3 = writer->NewTracePacket();
1061   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1062                   ->drop_packets_for_testing());
1063 
1064   // Release the first chunk as free, so |writer| can acquire it again.
1065   SharedMemoryABI* abi = arbiter_->shmem_abi_for_testing();
1066   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1067   auto chunk = abi->TryAcquireChunkForReading(0u, 0u);
1068   ASSERT_TRUE(chunk.is_valid());
1069   abi->ReleaseChunkAsFree(std::move(chunk));
1070 
1071   // Fill the garbage chunk, so that the writer attempts to grab another chunk
1072   // for |packet4|.
1073   packet3->set_for_testing()->set_str(large_string);
1074   packet3->Finalize();
1075 
1076   // Next packet should go into the reacquired chunk we just released.
1077   auto packet4 = writer->NewTracePacket();
1078   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1079                    ->drop_packets_for_testing());
1080 
1081   // The first packet in the chunk should have the previous_packet_dropped
1082   // flag set, so shouldn't be empty.
1083   EXPECT_GT(packet4->Finalize(), 0u);
1084 
1085   // Flushing the writer causes the chunk to be released again.
1086   writer->Flush();
1087   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(1));
1088   EXPECT_EQ(last_commit_.chunks_to_move()[0].page(), 0u);
1089   EXPECT_EQ(last_commit_.chunks_to_move()[0].chunk(), 0u);
1090   EXPECT_THAT(last_commit_.chunks_to_patch(), SizeIs(0));
1091 
1092   // Chunk should contain only |packet4| and not have any continuation flag
1093   // set.
1094   ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi->GetChunkState(0u, 0u));
1095   chunk = abi->TryAcquireChunkForReading(0u, 0u);
1096   ASSERT_TRUE(chunk.is_valid());
1097   ASSERT_EQ(chunk.header()->packets.load().count, 1);
1098   ASSERT_FALSE(chunk.header()->packets.load().flags &
1099                SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
1100   ASSERT_FALSE(
1101       chunk.header()->packets.load().flags &
1102       SharedMemoryABI::ChunkHeader::kFirstPacketContinuesFromPrevChunk);
1103   ASSERT_FALSE(chunk.header()->packets.load().flags &
1104                SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
1105 }
1106 
1107 // Regression test that verifies that flushing a TraceWriter while a
1108 // fragmented packet still has uncommitted patches doesn't hit a DCHECK /
1109 // crash the writer thread.
TEST_P(TraceWriterImplTest,FlushAfterFragmentingPacketWhileBufferExhausted)1110 TEST_P(TraceWriterImplTest, FlushAfterFragmentingPacketWhileBufferExhausted) {
1111   const BufferID kBufId = 42;
1112   std::unique_ptr<TraceWriter> writer =
1113       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1114 
1115   // Write a small first packet, so that |writer| owns a chunk.
1116   auto packet = writer->NewTracePacket();
1117   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1118                    ->drop_packets_for_testing());
1119   // 3 bytes for the first_packet_on_sequence flag.
1120   EXPECT_EQ(packet->Finalize(), 3u);
1121 
1122   // Grab all but one of the remaining chunks in the SMB in new writers.
1123   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1124   for (size_t i = 0; i < other_writers.size(); i++) {
1125     other_writers[i] =
1126         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1127     auto other_writer_packet = other_writers[i]->NewTracePacket();
1128     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1129                      ->drop_packets_for_testing());
1130   }
1131 
1132   // Write a packet that's guaranteed to span more than a two chunks, causing
1133   // |writer| to attempt to acquire two new chunks, but fail to acquire the
1134   // second.
1135   auto packet2 = writer->NewTracePacket();
1136   size_t chunk_size = page_size() / 4;
1137   std::string large_string(chunk_size * 2, 'x');
1138   packet2->set_for_testing()->set_str(large_string);
1139 
1140   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1141                   ->drop_packets_for_testing());
1142 
1143   // First two chunks should be committed.
1144   arbiter_->FlushPendingCommitDataRequests();
1145   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1146 
1147   // Flushing should succeed, even though some patches are still in the
1148   // writer's patch list.
1149   packet2->Finalize();
1150   writer->Flush();
1151 }
1152 
TEST_P(TraceWriterImplTest,GarbageChunkWrap)1153 TEST_P(TraceWriterImplTest, GarbageChunkWrap) {
1154   const BufferID kBufId = 42;
1155 
1156   // Grab all chunks in the SMB in new writers.
1157   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4> other_writers;
1158   for (size_t i = 0; i < other_writers.size(); i++) {
1159     other_writers[i] =
1160         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1161     auto other_writer_packet = other_writers[i]->NewTracePacket();
1162     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1163                      ->drop_packets_for_testing());
1164   }
1165 
1166   // `writer` will only get garbage chunks, since the SMB is exhausted.
1167   std::unique_ptr<TraceWriter> writer =
1168       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1169 
1170   const size_t chunk_size = page_size() / 4;
1171   std::string half_chunk_string(chunk_size / 2, 'x');
1172 
1173   // Fill the first half of the garbage chunk.
1174   {
1175     auto packet = writer->NewTracePacket();
1176     EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1177                     ->drop_packets_for_testing());
1178     packet->set_for_testing()->set_str(half_chunk_string);
1179   }
1180 
1181   // Fill the second half of the garbage chunk and more. This will call
1182   // GetNewBuffer() and restart from the beginning of the garbage chunk.
1183   {
1184     auto packet = writer->NewTracePacket();
1185     packet->set_for_testing()->set_str(half_chunk_string);
1186   }
1187 
1188   // Check that TraceWriterImpl can write at the beginning of the garbage chunk
1189   // without any problems.
1190   {
1191     auto packet = writer->NewTracePacket();
1192     packet->set_for_testing()->set_str("str");
1193   }
1194 }
1195 
TEST_P(TraceWriterImplTest,AnnotatePatchWhileBufferExhausted)1196 TEST_P(TraceWriterImplTest, AnnotatePatchWhileBufferExhausted) {
1197   const BufferID kBufId = 42;
1198   std::unique_ptr<TraceWriter> writer =
1199       arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1200 
1201   // Write a small first packet, so that |writer| owns a chunk.
1202   ScatteredStreamWriter* sw = writer->NewTracePacket().TakeStreamWriter();
1203   sw->WriteBytes(reinterpret_cast<const uint8_t*>("X"), 1);
1204   writer->FinishTracePacket();
1205   EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1206                    ->drop_packets_for_testing());
1207 
1208   // Grab all but one of the remaining chunks in the SMB in new writers.
1209   std::array<std::unique_ptr<TraceWriter>, kNumPages * 4 - 2> other_writers;
1210   for (size_t i = 0; i < other_writers.size(); i++) {
1211     other_writers[i] =
1212         arbiter_->CreateTraceWriter(kBufId, BufferExhaustedPolicy::kDrop);
1213     auto other_writer_packet = other_writers[i]->NewTracePacket();
1214     EXPECT_FALSE(reinterpret_cast<TraceWriterImpl*>(other_writers[i].get())
1215                      ->drop_packets_for_testing());
1216   }
1217 
1218   // Write a packet that's guaranteed to span more than a two chunks, causing
1219   // |writer| to attempt to acquire two new chunks, but fail to acquire the
1220   // second.
1221   sw = writer->NewTracePacket().TakeStreamWriter();
1222   size_t chunk_size = page_size() / 4;
1223   std::string large_string(chunk_size * 2, 'x');
1224   sw->WriteBytes(reinterpret_cast<const uint8_t*>(large_string.data()),
1225                  large_string.size());
1226 
1227   EXPECT_TRUE(reinterpret_cast<TraceWriterImpl*>(writer.get())
1228                   ->drop_packets_for_testing());
1229 
1230   uint8_t* patch1 =
1231       sw->ReserveBytes(ScatteredStreamWriter::Delegate::kPatchSize);
1232   ASSERT_THAT(patch1, NotNull());
1233   patch1[0] = 0;
1234   patch1[1] = 0;
1235   patch1[2] = 0;
1236   patch1[3] = 0;
1237   patch1 = sw->AnnotatePatch(patch1);
1238   EXPECT_THAT(patch1, IsNull());
1239 
1240   // First two chunks should be committed.
1241   arbiter_->FlushPendingCommitDataRequests();
1242   ASSERT_THAT(last_commit_.chunks_to_move(), SizeIs(2));
1243 
1244   // Flushing should succeed, even though some patches are still in the
1245   // writer's patch list.
1246   writer->FinishTracePacket();
1247   writer->Flush();
1248 }
1249 
TEST_P(TraceWriterImplTest,Flush)1250 TEST_P(TraceWriterImplTest, Flush) {
1251   MockFunction<void()> flush_cb;
1252 
1253   const BufferID kBufId = 42;
1254   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1255   {
1256     auto packet = writer->NewTracePacket();
1257     packet->set_for_testing()->set_str("foobar");
1258   }
1259 
1260   EXPECT_CALL(flush_cb, Call).Times(0);
1261   ASSERT_FALSE(last_commit_callback_);
1262   writer->Flush(flush_cb.AsStdFunction());
1263   ASSERT_TRUE(last_commit_callback_);
1264   EXPECT_CALL(flush_cb, Call).Times(1);
1265   last_commit_callback_();
1266 }
1267 
TEST_P(TraceWriterImplTest,NestedMsgsPatches)1268 TEST_P(TraceWriterImplTest, NestedMsgsPatches) {
1269   const BufferID kBufId = 42;
1270   const uint32_t kNestedFieldId = 1;
1271   const uint32_t kStringFieldId = 2;
1272   const uint32_t kIntFieldId = 3;
1273   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(kBufId);
1274 
1275   size_t chunk_size = page_size() / 4;
1276   std::string large_string(chunk_size, 'x');
1277 
1278   auto packet = writer->NewTracePacket();
1279   auto* nested1 =
1280       packet->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1281   auto* nested2 =
1282       nested1->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1283   auto* nested3 =
1284       nested2->BeginNestedMessage<protozero::Message>(kNestedFieldId);
1285   uint8_t* const old_nested_1_size_field = nested1->size_field();
1286   uint8_t* const old_nested_2_size_field = nested2->size_field();
1287   uint8_t* const old_nested_3_size_field = nested3->size_field();
1288   EXPECT_THAT(old_nested_1_size_field, NotNull());
1289   EXPECT_THAT(old_nested_2_size_field, NotNull());
1290   EXPECT_THAT(old_nested_3_size_field, NotNull());
1291 
1292   // Append a small field, which will fit in the current chunk.
1293   nested3->AppendVarInt<uint64_t>(kIntFieldId, 1);
1294 
1295   // The `size_field`s still point to the same old location, inside the chunk.
1296   EXPECT_EQ(nested1->size_field(), old_nested_1_size_field);
1297   EXPECT_EQ(nested2->size_field(), old_nested_2_size_field);
1298   EXPECT_EQ(nested3->size_field(), old_nested_3_size_field);
1299 
1300   // Append a large string, which will not fit in the current chunk.
1301   nested3->AppendString(kStringFieldId, large_string);
1302 
1303   // The `size_field`s will now point to different locations (patches).
1304   EXPECT_THAT(nested1->size_field(),
1305               AllOf(Ne(old_nested_1_size_field), NotNull()));
1306   EXPECT_THAT(nested2->size_field(),
1307               AllOf(Ne(old_nested_2_size_field), NotNull()));
1308   EXPECT_THAT(nested3->size_field(),
1309               AllOf(Ne(old_nested_3_size_field), NotNull()));
1310 
1311   packet->Finalize();
1312   writer->Flush();
1313 
1314   arbiter_->FlushPendingCommitDataRequests();
1315 
1316   ASSERT_THAT(last_commit_.chunks_to_patch(), SizeIs(1));
1317   EXPECT_EQ(last_commit_.chunks_to_patch()[0].writer_id(), writer->writer_id());
1318   EXPECT_EQ(last_commit_.chunks_to_patch()[0].target_buffer(), kBufId);
1319   EXPECT_FALSE(last_commit_.chunks_to_patch()[0].has_more_patches());
1320   EXPECT_THAT(last_commit_.chunks_to_patch()[0].patches(), SizeIs(3));
1321 }
1322 
1323 // TODO(primiano): add multi-writer test.
1324 
1325 }  // namespace
1326 }  // namespace perfetto
1327