1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/ext/base/thread_annotations.h"
27 #include "perfetto/protozero/message.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "perfetto/protozero/root_message.h"
30 #include "perfetto/protozero/static_buffer.h"
31 #include "src/tracing/core/shared_memory_arbiter_impl.h"
32 
33 #include "protos/perfetto/trace/trace_packet.pbzero.h"
34 
35 using protozero::proto_utils::kMessageLengthFieldSize;
36 using protozero::proto_utils::WriteRedundantVarInt;
37 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
38 
39 namespace perfetto {
40 
41 namespace {
42 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
43 // The -1 is because we want to leave extra room to inflate the counter.
44 constexpr size_t kMaxPacketsPerChunk = ChunkHeader::Packets::kMaxCount - 1;
45 // When the packet count in a chunk is inflated, TraceWriter is always going to
46 // leave this kExtraRoomForInflatedPacket bytes to write an empty trace packet
47 // if it needs to.
48 constexpr size_t kExtraRoomForInflatedPacket = 1;
49 uint8_t g_garbage_chunk[1024];
50 }  // namespace
51 
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)52 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
53                                  WriterID id,
54                                  MaybeUnboundBufferID target_buffer,
55                                  BufferExhaustedPolicy buffer_exhausted_policy)
56     : shmem_arbiter_(shmem_arbiter),
57       id_(id),
58       target_buffer_(target_buffer),
59       buffer_exhausted_policy_(buffer_exhausted_policy),
60       protobuf_stream_writer_(this),
61       process_id_(base::GetProcessId()) {
62   // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
63   // more gracefully and always return a no-op TracePacket in NewTracePacket().
64   PERFETTO_CHECK(id_ != 0);
65 
66   cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
67   cur_packet_->Finalize();  // To avoid the CHECK in NewTracePacket().
68 }
69 
~TraceWriterImpl()70 TraceWriterImpl::~TraceWriterImpl() {
71   if (cur_chunk_.is_valid()) {
72     cur_packet_->Finalize();
73     Flush();
74   }
75   // This call may cause the shared memory arbiter (and the underlying memory)
76   // to get asynchronously deleted if this was the last trace writer targeting
77   // the arbiter and the arbiter was marked for shutdown.
78   shmem_arbiter_->ReleaseWriterID(id_);
79 }
80 
ReturnCompletedChunk()81 void TraceWriterImpl::ReturnCompletedChunk() {
82   PERFETTO_DCHECK(cur_chunk_.is_valid());
83   if (cur_chunk_packet_count_inflated_) {
84     uint8_t zero_size = 0;
85     static_assert(sizeof zero_size == kExtraRoomForInflatedPacket);
86     PERFETTO_CHECK(protobuf_stream_writer_.bytes_available() != 0);
87     protobuf_stream_writer_.WriteBytesUnsafe(&zero_size, sizeof zero_size);
88     cur_chunk_packet_count_inflated_ = false;
89   }
90   shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
91                                        &patch_list_);
92 }
93 
Flush(std::function<void ()> callback)94 void TraceWriterImpl::Flush(std::function<void()> callback) {
95   // Flush() cannot be called in the middle of a TracePacket.
96   PERFETTO_CHECK(cur_packet_->is_finalized());
97   // cur_packet_ is finalized: that means that the size is correct for all the
98   // nested submessages. The root fragment size however is not handled by
99   // protozero::Message::Finalize() and must be filled here.
100   FinalizeFragmentIfRequired();
101 
102   if (cur_chunk_.is_valid()) {
103     ReturnCompletedChunk();
104   } else {
105     // When in stall mode, all patches should have been returned with the last
106     // chunk, since the last packet was completed. In drop_packets_ mode, this
107     // may not be the case because the packet may have been fragmenting when
108     // SMB exhaustion occurred and |cur_chunk_| became invalid. In this case,
109     // drop_packets_ should be true.
110     PERFETTO_DCHECK(patch_list_.empty() || drop_packets_);
111   }
112 
113   // Always issue the Flush request, even if there is nothing to flush, just
114   // for the sake of getting the callback posted back.
115   shmem_arbiter_->FlushPendingCommitDataRequests(callback);
116   protobuf_stream_writer_.Reset({nullptr, nullptr});
117 }
118 
NewTracePacket()119 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
120   // If we hit this, the caller is calling NewTracePacket() without having
121   // finalized the previous packet.
122   PERFETTO_CHECK(cur_packet_->is_finalized());
123   // If we hit this, this trace writer was created in a different process. This
124   // likely means that the process forked while tracing was active, and the
125   // forked child process tried to emit a trace event. This is not supported, as
126   // it would lead to two processes writing to the same tracing SMB.
127   PERFETTO_DCHECK(process_id_ == base::GetProcessId());
128 
129   // Before starting a new packet, make sure that the last fragment size has ben
130   // written correctly. The root fragment size is not written by
131   // protozero::Message::Finalize().
132   FinalizeFragmentIfRequired();
133 
134   fragmenting_packet_ = false;
135 
136   // Reserve space for the size of the message. Note: this call might re-enter
137   // into this class invoking GetNewBuffer() if there isn't enough space or if
138   // this is the very first call to NewTracePacket().
139   static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
140                 "The packet header must match the Message header size");
141 
142   bool was_dropping_packets = drop_packets_;
143 
144   // It doesn't make sense to begin a packet that is going to fragment
145   // immediately after (8 is just an arbitrary estimation on the minimum size of
146   // a realistic packet).
147   bool chunk_too_full =
148       protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
149   if (chunk_too_full || reached_max_packets_per_chunk_ ||
150       retry_new_chunk_after_packet_) {
151     protobuf_stream_writer_.Reset(GetNewBuffer());
152   }
153 
154   // Send any completed patches to the service to facilitate trace data
155   // recovery by the service. This should only happen when we're completing
156   // the first packet in a chunk which was a continuation from the previous
157   // chunk, i.e. at most once per chunk.
158   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
159     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
160   }
161 
162   cur_packet_->Reset(&protobuf_stream_writer_);
163   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
164   memset(header, 0, kPacketHeaderSize);
165   cur_fragment_size_field_ = header;
166 
167   TracePacketHandle handle(cur_packet_.get());
168   cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
169   fragmenting_packet_ = true;
170 
171   if (PERFETTO_LIKELY(!drop_packets_)) {
172     uint16_t new_packet_count;
173     if (cur_chunk_packet_count_inflated_) {
174       new_packet_count =
175           cur_chunk_.header()->packets.load(std::memory_order_relaxed).count;
176       cur_chunk_packet_count_inflated_ = false;
177     } else {
178       new_packet_count = cur_chunk_.IncrementPacketCount();
179     }
180     reached_max_packets_per_chunk_ = new_packet_count == kMaxPacketsPerChunk;
181 
182     if (PERFETTO_UNLIKELY(was_dropping_packets)) {
183       // We've succeeded to get a new chunk from the SMB after we entered
184       // drop_packets_ mode. Record a marker into the new packet to indicate the
185       // data loss.
186       cur_packet_->set_previous_packet_dropped(true);
187     }
188   }
189 
190   if (PERFETTO_UNLIKELY(first_packet_on_sequence_)) {
191     cur_packet_->set_first_packet_on_sequence(true);
192     first_packet_on_sequence_ = false;
193   }
194 
195   handle.set_finalization_listener(this);
196 
197   return handle;
198 }
199 
200 // Called by the Message. We can get here in two cases:
201 // 1. In the middle of writing a Message,
202 // when |fragmenting_packet_| == true. In this case we want to update the
203 // chunk header with a partial packet and start a new partial packet in the
204 // new chunk.
205 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
206 // In this case |fragmenting_packet_| == false and we just want a new chunk
207 // without creating any fragments.
GetNewBuffer()208 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
209   if (fragmenting_packet_ && drop_packets_) {
210     // We can't write the remaining data of the fragmenting packet to a new
211     // chunk, because we have already lost some of its data in the garbage
212     // chunk. Thus, we will wrap around in the garbage chunk, wait until the
213     // current packet was completed, and then attempt to get a new chunk from
214     // the SMB again. Instead, if |drop_packets_| is true and
215     // |fragmenting_packet_| is false, we try to acquire a valid chunk because
216     // the SMB exhaustion might be resolved.
217     retry_new_chunk_after_packet_ = true;
218     cur_fragment_size_field_ = nullptr;
219     cur_fragment_start_ = &g_garbage_chunk[0];
220     return protozero::ContiguousMemoryRange{
221         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
222   }
223 
224   // Attempt to grab the next chunk before finalizing the current one, so that
225   // we know whether we need to start dropping packets before writing the
226   // current packet fragment's header.
227   ChunkHeader::Packets packets = {};
228   if (fragmenting_packet_) {
229     packets.count = 1;
230     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
231   }
232 
233   // The memory order of the stores below doesn't really matter. This |header|
234   // is just a local temporary object. The GetNewChunk() call below will copy it
235   // into the shared buffer with the proper barriers.
236   ChunkHeader header = {};
237   header.writer_id.store(id_, std::memory_order_relaxed);
238   header.chunk_id.store(next_chunk_id_, std::memory_order_relaxed);
239   header.packets.store(packets, std::memory_order_relaxed);
240 
241   SharedMemoryABI::Chunk new_chunk =
242       shmem_arbiter_->GetNewChunk(header, buffer_exhausted_policy_);
243   if (!new_chunk.is_valid()) {
244     // Shared memory buffer exhausted, switch into |drop_packets_| mode. We'll
245     // drop data until the garbage chunk has been filled once and then retry.
246 
247     // If we started a packet in one of the previous (valid) chunks, we need to
248     // tell the service to discard it.
249     if (fragmenting_packet_) {
250       // We can only end up here if the previous chunk was a valid chunk,
251       // because we never try to acquire a new chunk in |drop_packets_| mode
252       // while fragmenting.
253       PERFETTO_DCHECK(!drop_packets_);
254 
255       // Backfill the last fragment's header with an invalid size (too large),
256       // so that the service's TraceBuffer throws out the incomplete packet.
257       // It'll restart reading from the next chunk we submit.
258       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
259                            cur_fragment_size_field_);
260 
261       // Reset the size field, since we should not write the current packet's
262       // size anymore after this.
263       cur_fragment_size_field_ = nullptr;
264 
265       // We don't set kLastPacketContinuesOnNextChunk or kChunkNeedsPatching on
266       // the last chunk, because its last fragment will be discarded anyway.
267       // However, the current packet fragment points to a valid |cur_chunk_| and
268       // may have non-finalized nested messages which will continue in the
269       // garbage chunk and currently still point into |cur_chunk_|. As we are
270       // about to return |cur_chunk_|, we need to invalidate the size fields of
271       // those nested messages. Normally we move them in the |patch_list_| (see
272       // below) but in this case, it doesn't make sense to send patches for a
273       // fragment that will be discarded for sure. Thus, we clean up any size
274       // field references into |cur_chunk_|.
275       for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
276            nested_msg = nested_msg->nested_message()) {
277         uint8_t* const cur_hdr = nested_msg->size_field();
278 
279         // If this is false the protozero Message has already been instructed to
280         // write, upon Finalize(), its size into the patch list.
281         bool size_field_points_within_chunk =
282             cur_hdr >= cur_chunk_.payload_begin() &&
283             cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
284 
285         if (size_field_points_within_chunk)
286           nested_msg->set_size_field(nullptr);
287       }
288     } else if (!drop_packets_ && cur_fragment_size_field_) {
289       // If we weren't dropping packets before, we should indicate to the
290       // service that we're about to lose data. We do this by invalidating the
291       // size of the last packet in |cur_chunk_|. The service will record
292       // statistics about packets with kPacketSizeDropPacket size.
293       PERFETTO_DCHECK(cur_packet_->is_finalized());
294       PERFETTO_DCHECK(cur_chunk_.is_valid());
295 
296       // |cur_fragment_size_field_| should point within |cur_chunk_|'s payload.
297       PERFETTO_DCHECK(cur_fragment_size_field_ >= cur_chunk_.payload_begin() &&
298                       cur_fragment_size_field_ + kMessageLengthFieldSize <=
299                           cur_chunk_.end());
300 
301       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
302                            cur_fragment_size_field_);
303     }
304 
305     if (cur_chunk_.is_valid()) {
306       ReturnCompletedChunk();
307     }
308 
309     // Only increment the count if we are newly entering this state not
310     // otherwise.
311     drop_count_ += !drop_packets_;
312     drop_packets_ = true;
313     cur_chunk_ = SharedMemoryABI::Chunk();  // Reset to an invalid chunk.
314     cur_chunk_packet_count_inflated_ = false;
315     reached_max_packets_per_chunk_ = false;
316     retry_new_chunk_after_packet_ = false;
317     cur_fragment_size_field_ = nullptr;
318     cur_fragment_start_ = &g_garbage_chunk[0];
319 
320     PERFETTO_ANNOTATE_BENIGN_RACE_SIZED(&g_garbage_chunk,
321                                         sizeof(g_garbage_chunk),
322                                         "nobody reads the garbage chunk")
323     return protozero::ContiguousMemoryRange{
324         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
325   }  // if (!new_chunk.is_valid())
326 
327   PERFETTO_DCHECK(new_chunk.is_valid());
328 
329   if (fragmenting_packet_) {
330     // We should not be fragmenting a packet after we exited drop_packets_ mode,
331     // because we only retry to get a new chunk when a fresh packet is started.
332     PERFETTO_DCHECK(!drop_packets_);
333 
334     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
335     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
336     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
337     PERFETTO_DCHECK(partial_size < cur_chunk_.size());
338 
339     // Backfill the packet header with the fragment size.
340     PERFETTO_DCHECK(partial_size > 0);
341     cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
342     WriteRedundantVarInt(partial_size, cur_fragment_size_field_);
343 
344     // Descend in the stack of non-finalized nested submessages (if any) and
345     // detour their |size_field| into the |patch_list_|. At this point we have
346     // to release the chunk and they cannot write anymore into that.
347     for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
348          nested_msg = nested_msg->nested_message()) {
349       uint8_t* cur_hdr = nested_msg->size_field();
350 
351       // If this is false the protozero Message has already been instructed to
352       // write, upon Finalize(), its size into the patch list.
353       bool size_field_points_within_chunk =
354           cur_hdr >= cur_chunk_.payload_begin() &&
355           cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
356 
357       if (size_field_points_within_chunk) {
358         cur_hdr = TraceWriterImpl::AnnotatePatch(cur_hdr);
359         nested_msg->set_size_field(cur_hdr);
360       } else {
361 #if PERFETTO_DCHECK_IS_ON()
362         // Ensure that the size field of the message points to an element of the
363         // patch list.
364         auto patch_it = std::find_if(
365             patch_list_.begin(), patch_list_.end(),
366             [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
367         PERFETTO_DCHECK(patch_it != patch_list_.end());
368 #endif
369       }
370     }  // for(nested_msg)
371   }    // if(fragmenting_packet)
372 
373   if (cur_chunk_.is_valid()) {
374     // ReturnCompletedChunk will consume the first patched entries from
375     // |patch_list_| and shrink it.
376     ReturnCompletedChunk();
377   }
378 
379   // Switch to the new chunk.
380   drop_packets_ = false;
381   reached_max_packets_per_chunk_ = false;
382   retry_new_chunk_after_packet_ = false;
383   next_chunk_id_++;
384   cur_chunk_ = std::move(new_chunk);
385   cur_chunk_packet_count_inflated_ = false;
386   cur_fragment_size_field_ = nullptr;
387 
388   uint8_t* payload_begin = cur_chunk_.payload_begin();
389   if (fragmenting_packet_) {
390     cur_fragment_size_field_ = payload_begin;
391     memset(payload_begin, 0, kPacketHeaderSize);
392     payload_begin += kPacketHeaderSize;
393     cur_fragment_start_ = payload_begin;
394   }
395 
396   return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
397 }
398 
FinishTracePacket()399 void TraceWriterImpl::FinishTracePacket() {
400   // If we hit this, this trace writer was created in a different process. This
401   // likely means that the process forked while tracing was active, and the
402   // forked child process tried to emit a trace event. This is not supported, as
403   // it would lead to two processes writing to the same tracing SMB.
404   PERFETTO_DCHECK(process_id_ == base::GetProcessId());
405 
406   FinalizeFragmentIfRequired();
407 
408   cur_packet_->Reset(&protobuf_stream_writer_);
409   cur_packet_->Finalize();  // To avoid the CHECK in NewTracePacket().
410 
411   // cur_chunk_packet_count_inflated_ can be true if FinishTracePacket() is
412   // called multiple times.
413   if (cur_chunk_.is_valid() && !cur_chunk_packet_count_inflated_) {
414     if (protobuf_stream_writer_.bytes_available() <
415         kExtraRoomForInflatedPacket) {
416       ReturnCompletedChunk();
417     } else {
418       cur_chunk_packet_count_inflated_ = true;
419       cur_chunk_.IncrementPacketCount();
420     }
421   }
422 
423   // Send any completed patches to the service to facilitate trace data
424   // recovery by the service. This should only happen when we're completing
425   // the first packet in a chunk which was a continuation from the previous
426   // chunk, i.e. at most once per chunk.
427   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
428     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
429   }
430 }
431 
FinalizeFragmentIfRequired()432 void TraceWriterImpl::FinalizeFragmentIfRequired() {
433   if (!cur_fragment_size_field_) {
434     return;
435   }
436   uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
437   PERFETTO_DCHECK(wptr >= cur_fragment_start_);
438   uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
439 
440   // cur_fragment_size_field_, if not nullptr, is always inside or immediately
441   // before protobuf_stream_writer_.cur_range().
442   if (partial_size < protozero::proto_utils::kMaxOneByteMessageLength &&
443       cur_fragment_size_field_ >= protobuf_stream_writer_.cur_range().begin) {
444     // This handles compaction of the root message. For nested messages, the
445     // compaction is handled by protozero::Message::Finalize().
446     protobuf_stream_writer_.Rewind(
447         partial_size, protozero::proto_utils::kMessageLengthFieldSize - 1u);
448     *cur_fragment_size_field_ = static_cast<uint8_t>(partial_size);
449   } else {
450     WriteRedundantVarInt(partial_size, cur_fragment_size_field_);
451   }
452   cur_fragment_size_field_ = nullptr;
453 }
454 
AnnotatePatch(uint8_t * to_patch)455 uint8_t* TraceWriterImpl::AnnotatePatch(uint8_t* to_patch) {
456   if (!cur_chunk_.is_valid()) {
457     return nullptr;
458   }
459   auto offset = static_cast<uint16_t>(to_patch - cur_chunk_.payload_begin());
460   const ChunkID cur_chunk_id =
461       cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
462   static_assert(kPatchSize == sizeof(Patch::PatchContent),
463                 "Patch size mismatch");
464   Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
465   // Check that the flag is not already set before setting it. This is not
466   // necessary, but it makes the code faster.
467   if (!(cur_chunk_.GetPacketCountAndFlags().second &
468         ChunkHeader::kChunkNeedsPatching)) {
469     cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
470   }
471   return &patch->size_field[0];
472 }
473 
OnMessageFinalized(protozero::Message *)474 void TraceWriterImpl::OnMessageFinalized(protozero::Message*) {
475   TraceWriterImpl::FinishTracePacket();
476 }
477 
writer_id() const478 WriterID TraceWriterImpl::writer_id() const {
479   return id_;
480 }
481 
482 // Base class definitions.
483 TraceWriter::TraceWriter() = default;
484 TraceWriter::~TraceWriter() = default;
485 
486 }  // namespace perfetto
487