• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/ext/base/thread_annotations.h"
27 #include "perfetto/protozero/message.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "perfetto/protozero/root_message.h"
30 #include "perfetto/protozero/static_buffer.h"
31 #include "src/tracing/core/shared_memory_arbiter_impl.h"
32 
33 #include "protos/perfetto/trace/trace_packet.pbzero.h"
34 
35 using protozero::proto_utils::kMessageLengthFieldSize;
36 using protozero::proto_utils::WriteRedundantVarInt;
37 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
38 
39 namespace perfetto {
40 
41 namespace {
42 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
43 uint8_t g_garbage_chunk[1024];
44 }  // namespace
45 
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)46 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
47                                  WriterID id,
48                                  MaybeUnboundBufferID target_buffer,
49                                  BufferExhaustedPolicy buffer_exhausted_policy)
50     : shmem_arbiter_(shmem_arbiter),
51       id_(id),
52       target_buffer_(target_buffer),
53       buffer_exhausted_policy_(buffer_exhausted_policy),
54       protobuf_stream_writer_(this),
55       process_id_(base::GetProcessId()) {
56   // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
57   // more gracefully and always return a no-op TracePacket in NewTracePacket().
58   PERFETTO_CHECK(id_ != 0);
59 
60   cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
61   cur_packet_->Finalize();  // To avoid the CHECK in NewTracePacket().
62 }
63 
~TraceWriterImpl()64 TraceWriterImpl::~TraceWriterImpl() {
65   if (cur_chunk_.is_valid()) {
66     cur_packet_->Finalize();
67     Flush();
68   }
69   // This call may cause the shared memory arbiter (and the underlying memory)
70   // to get asynchronously deleted if this was the last trace writer targeting
71   // the arbiter and the arbiter was marked for shutdown.
72   shmem_arbiter_->ReleaseWriterID(id_);
73 }
74 
Flush(std::function<void ()> callback)75 void TraceWriterImpl::Flush(std::function<void()> callback) {
76   // Flush() cannot be called in the middle of a TracePacket.
77   PERFETTO_CHECK(cur_packet_->is_finalized());
78 
79   if (cur_chunk_.is_valid()) {
80     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
81                                          &patch_list_);
82   } else {
83     // When in stall mode, all patches should have been returned with the last
84     // chunk, since the last packet was completed. In drop_packets_ mode, this
85     // may not be the case because the packet may have been fragmenting when
86     // SMB exhaustion occurred and |cur_chunk_| became invalid. In this case,
87     // drop_packets_ should be true.
88     PERFETTO_DCHECK(patch_list_.empty() || drop_packets_);
89   }
90 
91   // Always issue the Flush request, even if there is nothing to flush, just
92   // for the sake of getting the callback posted back.
93   shmem_arbiter_->FlushPendingCommitDataRequests(callback);
94   protobuf_stream_writer_.Reset({nullptr, nullptr});
95 
96   // |last_packet_size_field_| might have pointed into the chunk we returned.
97   last_packet_size_field_ = nullptr;
98 }
99 
NewTracePacket()100 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
101   // If we hit this, the caller is calling NewTracePacket() without having
102   // finalized the previous packet.
103   PERFETTO_CHECK(cur_packet_->is_finalized());
104   // If we hit this, this trace writer was created in a different process. This
105   // likely means that the process forked while tracing was active, and the
106   // forked child process tried to emit a trace event. This is not supported, as
107   // it would lead to two processes writing to the same tracing SMB.
108   PERFETTO_DCHECK(process_id_ == base::GetProcessId());
109 
110   fragmenting_packet_ = false;
111 
112   // Reserve space for the size of the message. Note: this call might re-enter
113   // into this class invoking GetNewBuffer() if there isn't enough space or if
114   // this is the very first call to NewTracePacket().
115   static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
116                 "The packet header must match the Message header size");
117 
118   bool was_dropping_packets = drop_packets_;
119 
120   // It doesn't make sense to begin a packet that is going to fragment
121   // immediately after (8 is just an arbitrary estimation on the minimum size of
122   // a realistic packet).
123   bool chunk_too_full =
124       protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
125   if (chunk_too_full || reached_max_packets_per_chunk_ ||
126       retry_new_chunk_after_packet_) {
127     protobuf_stream_writer_.Reset(GetNewBuffer());
128   }
129 
130   // Send any completed patches to the service to facilitate trace data
131   // recovery by the service. This should only happen when we're completing
132   // the first packet in a chunk which was a continuation from the previous
133   // chunk, i.e. at most once per chunk.
134   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
135     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
136   }
137 
138   cur_packet_->Reset(&protobuf_stream_writer_);
139   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
140   memset(header, 0, kPacketHeaderSize);
141   cur_packet_->set_size_field(header);
142   last_packet_size_field_ = header;
143 
144   TracePacketHandle handle(cur_packet_.get());
145   cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
146   fragmenting_packet_ = true;
147 
148   if (PERFETTO_LIKELY(!drop_packets_)) {
149     uint16_t new_packet_count = cur_chunk_.IncrementPacketCount();
150     reached_max_packets_per_chunk_ =
151         new_packet_count == ChunkHeader::Packets::kMaxCount;
152 
153     if (PERFETTO_UNLIKELY(was_dropping_packets)) {
154       // We've succeeded to get a new chunk from the SMB after we entered
155       // drop_packets_ mode. Record a marker into the new packet to indicate the
156       // data loss.
157       cur_packet_->set_previous_packet_dropped(true);
158     }
159   }
160 
161   if (PERFETTO_UNLIKELY(first_packet_on_sequence_)) {
162     cur_packet_->set_first_packet_on_sequence(true);
163     first_packet_on_sequence_ = false;
164   }
165 
166   return handle;
167 }
168 
169 // Called by the Message. We can get here in two cases:
170 // 1. In the middle of writing a Message,
171 // when |fragmenting_packet_| == true. In this case we want to update the
172 // chunk header with a partial packet and start a new partial packet in the
173 // new chunk.
174 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
175 // In this case |fragmenting_packet_| == false and we just want a new chunk
176 // without creating any fragments.
GetNewBuffer()177 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
178   if (fragmenting_packet_ && drop_packets_) {
179     // We can't write the remaining data of the fragmenting packet to a new
180     // chunk, because we have already lost some of its data in the garbage
181     // chunk. Thus, we will wrap around in the garbage chunk, wait until the
182     // current packet was completed, and then attempt to get a new chunk from
183     // the SMB again. Instead, if |drop_packets_| is true and
184     // |fragmenting_packet_| is false, we try to acquire a valid chunk because
185     // the SMB exhaustion might be resolved.
186     retry_new_chunk_after_packet_ = true;
187     return protozero::ContiguousMemoryRange{
188         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
189   }
190 
191   // Attempt to grab the next chunk before finalizing the current one, so that
192   // we know whether we need to start dropping packets before writing the
193   // current packet fragment's header.
194   ChunkHeader::Packets packets = {};
195   if (fragmenting_packet_) {
196     packets.count = 1;
197     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
198   }
199 
200   // The memory order of the stores below doesn't really matter. This |header|
201   // is just a local temporary object. The GetNewChunk() call below will copy it
202   // into the shared buffer with the proper barriers.
203   ChunkHeader header = {};
204   header.writer_id.store(id_, std::memory_order_relaxed);
205   header.chunk_id.store(next_chunk_id_, std::memory_order_relaxed);
206   header.packets.store(packets, std::memory_order_relaxed);
207 
208   SharedMemoryABI::Chunk new_chunk =
209       shmem_arbiter_->GetNewChunk(header, buffer_exhausted_policy_);
210   if (!new_chunk.is_valid()) {
211     // Shared memory buffer exhausted, switch into |drop_packets_| mode. We'll
212     // drop data until the garbage chunk has been filled once and then retry.
213 
214     // If we started a packet in one of the previous (valid) chunks, we need to
215     // tell the service to discard it.
216     if (fragmenting_packet_) {
217       // We can only end up here if the previous chunk was a valid chunk,
218       // because we never try to acquire a new chunk in |drop_packets_| mode
219       // while fragmenting.
220       PERFETTO_DCHECK(!drop_packets_);
221 
222       // Backfill the last fragment's header with an invalid size (too large),
223       // so that the service's TraceBuffer throws out the incomplete packet.
224       // It'll restart reading from the next chunk we submit.
225       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
226                            cur_packet_->size_field());
227 
228       // Reset the size field, since we should not write the current packet's
229       // size anymore after this.
230       cur_packet_->set_size_field(nullptr);
231 
232       // We don't set kLastPacketContinuesOnNextChunk or kChunkNeedsPatching on
233       // the last chunk, because its last fragment will be discarded anyway.
234       // However, the current packet fragment points to a valid |cur_chunk_| and
235       // may have non-finalized nested messages which will continue in the
236       // garbage chunk and currently still point into |cur_chunk_|. As we are
237       // about to return |cur_chunk_|, we need to invalidate the size fields of
238       // those nested messages. Normally we move them in the |patch_list_| (see
239       // below) but in this case, it doesn't make sense to send patches for a
240       // fragment that will be discarded for sure. Thus, we clean up any size
241       // field references into |cur_chunk_|.
242       for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
243            nested_msg = nested_msg->nested_message()) {
244         uint8_t* const cur_hdr = nested_msg->size_field();
245 
246         // If this is false the protozero Message has already been instructed to
247         // write, upon Finalize(), its size into the patch list.
248         bool size_field_points_within_chunk =
249             cur_hdr >= cur_chunk_.payload_begin() &&
250             cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
251 
252         if (size_field_points_within_chunk)
253           nested_msg->set_size_field(nullptr);
254       }
255     } else if (!drop_packets_ && last_packet_size_field_) {
256       // If we weren't dropping packets before, we should indicate to the
257       // service that we're about to lose data. We do this by invalidating the
258       // size of the last packet in |cur_chunk_|. The service will record
259       // statistics about packets with kPacketSizeDropPacket size.
260       PERFETTO_DCHECK(cur_packet_->is_finalized());
261       PERFETTO_DCHECK(cur_chunk_.is_valid());
262 
263       // |last_packet_size_field_| should point within |cur_chunk_|'s payload.
264       PERFETTO_DCHECK(last_packet_size_field_ >= cur_chunk_.payload_begin() &&
265                       last_packet_size_field_ + kMessageLengthFieldSize <=
266                           cur_chunk_.end());
267 
268       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
269                            last_packet_size_field_);
270     }
271 
272     if (cur_chunk_.is_valid()) {
273       shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_),
274                                            target_buffer_, &patch_list_);
275     }
276 
277     drop_packets_ = true;
278     cur_chunk_ = SharedMemoryABI::Chunk();  // Reset to an invalid chunk.
279     reached_max_packets_per_chunk_ = false;
280     retry_new_chunk_after_packet_ = false;
281     last_packet_size_field_ = nullptr;
282 
283     PERFETTO_ANNOTATE_BENIGN_RACE_SIZED(&g_garbage_chunk,
284                                         sizeof(g_garbage_chunk),
285                                         "nobody reads the garbage chunk")
286     return protozero::ContiguousMemoryRange{
287         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
288   }  // if (!new_chunk.is_valid())
289 
290   PERFETTO_DCHECK(new_chunk.is_valid());
291 
292   if (fragmenting_packet_) {
293     // We should not be fragmenting a packet after we exited drop_packets_ mode,
294     // because we only retry to get a new chunk when a fresh packet is started.
295     PERFETTO_DCHECK(!drop_packets_);
296 
297     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
298     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
299     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
300     PERFETTO_DCHECK(partial_size < cur_chunk_.size());
301 
302     // Backfill the packet header with the fragment size.
303     PERFETTO_DCHECK(partial_size > 0);
304     cur_packet_->inc_size_already_written(partial_size);
305     cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
306     WriteRedundantVarInt(partial_size, cur_packet_->size_field());
307 
308     // Descend in the stack of non-finalized nested submessages (if any) and
309     // detour their |size_field| into the |patch_list_|. At this point we have
310     // to release the chunk and they cannot write anymore into that.
311     for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
312          nested_msg = nested_msg->nested_message()) {
313       uint8_t* cur_hdr = nested_msg->size_field();
314 
315       // If this is false the protozero Message has already been instructed to
316       // write, upon Finalize(), its size into the patch list.
317       bool size_field_points_within_chunk =
318           cur_hdr >= cur_chunk_.payload_begin() &&
319           cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
320 
321       if (size_field_points_within_chunk) {
322         cur_hdr = TraceWriterImpl::AnnotatePatch(cur_hdr);
323         nested_msg->set_size_field(cur_hdr);
324       } else {
325 #if PERFETTO_DCHECK_IS_ON()
326         // Ensure that the size field of the message points to an element of the
327         // patch list.
328         auto patch_it = std::find_if(
329             patch_list_.begin(), patch_list_.end(),
330             [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
331         PERFETTO_DCHECK(patch_it != patch_list_.end());
332 #endif
333       }
334     }  // for(nested_msg)
335   }    // if(fragmenting_packet)
336 
337   if (cur_chunk_.is_valid()) {
338     // ReturnCompletedChunk will consume the first patched entries from
339     // |patch_list_| and shrink it.
340     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
341                                          &patch_list_);
342   }
343 
344   // Switch to the new chunk.
345   drop_packets_ = false;
346   reached_max_packets_per_chunk_ = false;
347   retry_new_chunk_after_packet_ = false;
348   next_chunk_id_++;
349   cur_chunk_ = std::move(new_chunk);
350   last_packet_size_field_ = nullptr;
351 
352   uint8_t* payload_begin = cur_chunk_.payload_begin();
353   if (fragmenting_packet_) {
354     cur_packet_->set_size_field(payload_begin);
355     last_packet_size_field_ = payload_begin;
356     memset(payload_begin, 0, kPacketHeaderSize);
357     payload_begin += kPacketHeaderSize;
358     cur_fragment_start_ = payload_begin;
359   }
360 
361   return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
362 }
363 
FinishTracePacket()364 void TraceWriterImpl::FinishTracePacket() {
365   // If we hit this, this trace writer was created in a different process. This
366   // likely means that the process forked while tracing was active, and the
367   // forked child process tried to emit a trace event. This is not supported, as
368   // it would lead to two processes writing to the same tracing SMB.
369   PERFETTO_DCHECK(process_id_ == base::GetProcessId());
370 
371   // If the caller uses TakeStreamWriter(), cur_packet_->size() is not up to
372   // date, only the stream writer knows the exact size.
373   // cur_packet_->size_field() is still used to store the start of the fragment.
374   if (cur_packet_->size_field()) {
375     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
376     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
377     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
378 
379     WriteRedundantVarInt(partial_size, last_packet_size_field_);
380   }
381 
382   cur_packet_->Reset(&protobuf_stream_writer_);
383   cur_packet_->Finalize();  // To avoid the CHECK in NewTracePacket().
384 
385   // Send any completed patches to the service to facilitate trace data
386   // recovery by the service. This should only happen when we're completing
387   // the first packet in a chunk which was a continuation from the previous
388   // chunk, i.e. at most once per chunk.
389   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
390     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
391   }
392 }
393 
AnnotatePatch(uint8_t * to_patch)394 uint8_t* TraceWriterImpl::AnnotatePatch(uint8_t* to_patch) {
395   if (!cur_chunk_.is_valid()) {
396     return nullptr;
397   }
398   auto offset = static_cast<uint16_t>(to_patch - cur_chunk_.payload_begin());
399   const ChunkID cur_chunk_id =
400       cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
401   static_assert(kPatchSize == sizeof(Patch::PatchContent),
402                 "Patch size mismatch");
403   Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
404   // Check that the flag is not already set before setting it. This is not
405   // necessary, but it makes the code faster.
406   if (!(cur_chunk_.GetPacketCountAndFlags().second &
407         ChunkHeader::kChunkNeedsPatching)) {
408     cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
409   }
410   return &patch->size_field[0];
411 }
412 
writer_id() const413 WriterID TraceWriterImpl::writer_id() const {
414   return id_;
415 }
416 
417 // Base class definitions.
418 TraceWriter::TraceWriter() = default;
419 TraceWriter::~TraceWriter() = default;
420 
421 }  // namespace perfetto
422