• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/ext/base/thread_annotations.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "src/tracing/core/shared_memory_arbiter_impl.h"
29 
30 #include "protos/perfetto/trace/trace_packet.pbzero.h"
31 
32 using protozero::proto_utils::kMessageLengthFieldSize;
33 using protozero::proto_utils::WriteRedundantVarInt;
34 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
35 
36 namespace perfetto {
37 
38 namespace {
39 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
40 uint8_t g_garbage_chunk[1024];
41 }  // namespace
42 
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)43 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
44                                  WriterID id,
45                                  MaybeUnboundBufferID target_buffer,
46                                  BufferExhaustedPolicy buffer_exhausted_policy)
47     : shmem_arbiter_(shmem_arbiter),
48       id_(id),
49       target_buffer_(target_buffer),
50       buffer_exhausted_policy_(buffer_exhausted_policy),
51       protobuf_stream_writer_(this),
52       process_id_(base::GetProcessId()) {
53   // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
54   // more gracefully and always return a no-op TracePacket in NewTracePacket().
55   PERFETTO_CHECK(id_ != 0);
56 
57   cur_packet_.reset(new protos::pbzero::TracePacket());
58   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
59 }
60 
~TraceWriterImpl()61 TraceWriterImpl::~TraceWriterImpl() {
62   if (cur_chunk_.is_valid()) {
63     cur_packet_->Finalize();
64     Flush();
65   }
66   shmem_arbiter_->ReleaseWriterID(id_);
67 }
68 
Flush(std::function<void ()> callback)69 void TraceWriterImpl::Flush(std::function<void()> callback) {
70   // Flush() cannot be called in the middle of a TracePacket.
71   PERFETTO_CHECK(cur_packet_->is_finalized());
72 
73   if (cur_chunk_.is_valid()) {
74     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
75                                          &patch_list_);
76   } else {
77     PERFETTO_DCHECK(patch_list_.empty());
78   }
79   // Always issue the Flush request, even if there is nothing to flush, just
80   // for the sake of getting the callback posted back.
81   shmem_arbiter_->FlushPendingCommitDataRequests(callback);
82   protobuf_stream_writer_.Reset({nullptr, nullptr});
83 
84   // |last_packet_size_field_| might have pointed into the chunk we returned.
85   last_packet_size_field_ = nullptr;
86 }
87 
NewTracePacket()88 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
89   // If we hit this, the caller is calling NewTracePacket() without having
90   // finalized the previous packet.
91   PERFETTO_CHECK(cur_packet_->is_finalized());
92   // If we hit this, this trace writer was created in a different process. This
93   // likely means that the process forked while tracing was active, and the
94   // forked child process tried to emit a trace event. This is not supported, as
95   // it would lead to two processes writing to the same tracing SMB.
96   PERFETTO_DCHECK(process_id_ == base::GetProcessId());
97 
98   fragmenting_packet_ = false;
99 
100   // Reserve space for the size of the message. Note: this call might re-enter
101   // into this class invoking GetNewBuffer() if there isn't enough space or if
102   // this is the very first call to NewTracePacket().
103   static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
104                 "The packet header must match the Message header size");
105 
106   bool was_dropping_packets = drop_packets_;
107 
108   // It doesn't make sense to begin a packet that is going to fragment
109   // immediately after (8 is just an arbitrary estimation on the minimum size of
110   // a realistic packet).
111   bool chunk_too_full =
112       protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
113   if (chunk_too_full || reached_max_packets_per_chunk_ ||
114       retry_new_chunk_after_packet_) {
115     protobuf_stream_writer_.Reset(GetNewBuffer());
116   }
117 
118   // Send any completed patches to the service to facilitate trace data
119   // recovery by the service. This should only happen when we're completing
120   // the first packet in a chunk which was a continuation from the previous
121   // chunk, i.e. at most once per chunk.
122   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
123     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
124   }
125 
126   cur_packet_->Reset(&protobuf_stream_writer_);
127   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
128   memset(header, 0, kPacketHeaderSize);
129   cur_packet_->set_size_field(header);
130   last_packet_size_field_ = header;
131 
132   TracePacketHandle handle(cur_packet_.get());
133   cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
134   fragmenting_packet_ = true;
135 
136   if (PERFETTO_LIKELY(!drop_packets_)) {
137     uint16_t new_packet_count = cur_chunk_.IncrementPacketCount();
138     reached_max_packets_per_chunk_ =
139         new_packet_count == ChunkHeader::Packets::kMaxCount;
140 
141     if (PERFETTO_UNLIKELY(was_dropping_packets)) {
142       // We've succeeded to get a new chunk from the SMB after we entered
143       // drop_packets_ mode. Record a marker into the new packet to indicate the
144       // data loss.
145       cur_packet_->set_previous_packet_dropped(true);
146     }
147   }
148 
149   return handle;
150 }
151 
152 // Called by the Message. We can get here in two cases:
153 // 1. In the middle of writing a Message,
154 // when |fragmenting_packet_| == true. In this case we want to update the
155 // chunk header with a partial packet and start a new partial packet in the
156 // new chunk.
157 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
158 // In this case |fragmenting_packet_| == false and we just want a new chunk
159 // without creating any fragments.
GetNewBuffer()160 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
161   if (fragmenting_packet_ && drop_packets_) {
162     // We can't write the remaining data of the fragmenting packet to a new
163     // chunk, because we have already lost some of its data in the garbage
164     // chunk. Thus, we will wrap around in the garbage chunk, wait until the
165     // current packet was completed, and then attempt to get a new chunk from
166     // the SMB again. Instead, if |drop_packets_| is true and
167     // |fragmenting_packet_| is false, we try to acquire a valid chunk because
168     // the SMB exhaustion might be resolved.
169     retry_new_chunk_after_packet_ = true;
170     return protozero::ContiguousMemoryRange{
171         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
172   }
173 
174   // Attempt to grab the next chunk before finalizing the current one, so that
175   // we know whether we need to start dropping packets before writing the
176   // current packet fragment's header.
177   ChunkHeader::Packets packets = {};
178   if (fragmenting_packet_) {
179     packets.count = 1;
180     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
181   }
182 
183   // The memory order of the stores below doesn't really matter. This |header|
184   // is just a local temporary object. The GetNewChunk() call below will copy it
185   // into the shared buffer with the proper barriers.
186   ChunkHeader header = {};
187   header.writer_id.store(id_, std::memory_order_relaxed);
188   header.chunk_id.store(next_chunk_id_, std::memory_order_relaxed);
189   header.packets.store(packets, std::memory_order_relaxed);
190 
191   SharedMemoryABI::Chunk new_chunk =
192       shmem_arbiter_->GetNewChunk(header, buffer_exhausted_policy_);
193   if (!new_chunk.is_valid()) {
194     // Shared memory buffer exhausted, switch into |drop_packets_| mode. We'll
195     // drop data until the garbage chunk has been filled once and then retry.
196 
197     // If we started a packet in one of the previous (valid) chunks, we need to
198     // tell the service to discard it.
199     if (fragmenting_packet_) {
200       // We can only end up here if the previous chunk was a valid chunk,
201       // because we never try to acquire a new chunk in |drop_packets_| mode
202       // while fragmenting.
203       PERFETTO_DCHECK(!drop_packets_);
204 
205       // Backfill the last fragment's header with an invalid size (too large),
206       // so that the service's TraceBuffer throws out the incomplete packet.
207       // It'll restart reading from the next chunk we submit.
208       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
209                            cur_packet_->size_field());
210 
211       // Reset the size field, since we should not write the current packet's
212       // size anymore after this.
213       cur_packet_->set_size_field(nullptr);
214 
215       // We don't set kLastPacketContinuesOnNextChunk or kChunkNeedsPatching on
216       // the last chunk, because its last fragment will be discarded anyway.
217       // However, the current packet fragment points to a valid |cur_chunk_| and
218       // may have non-finalized nested messages which will continue in the
219       // garbage chunk and currently still point into |cur_chunk_|. As we are
220       // about to return |cur_chunk_|, we need to invalidate the size fields of
221       // those nested messages. Normally we move them in the |patch_list_| (see
222       // below) but in this case, it doesn't make sense to send patches for a
223       // fragment that will be discarded for sure. Thus, we clean up any size
224       // field references into |cur_chunk_|.
225       for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
226            nested_msg = nested_msg->nested_message()) {
227         uint8_t* const cur_hdr = nested_msg->size_field();
228 
229         // If this is false the protozero Message has already been instructed to
230         // write, upon Finalize(), its size into the patch list.
231         bool size_field_points_within_chunk =
232             cur_hdr >= cur_chunk_.payload_begin() &&
233             cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
234 
235         if (size_field_points_within_chunk)
236           nested_msg->set_size_field(nullptr);
237       }
238     } else if (!drop_packets_ && last_packet_size_field_) {
239       // If we weren't dropping packets before, we should indicate to the
240       // service that we're about to lose data. We do this by invalidating the
241       // size of the last packet in |cur_chunk_|. The service will record
242       // statistics about packets with kPacketSizeDropPacket size.
243       PERFETTO_DCHECK(cur_packet_->is_finalized());
244       PERFETTO_DCHECK(cur_chunk_.is_valid());
245 
246       // |last_packet_size_field_| should point within |cur_chunk_|'s payload.
247       PERFETTO_DCHECK(last_packet_size_field_ >= cur_chunk_.payload_begin() &&
248                       last_packet_size_field_ + kMessageLengthFieldSize <=
249                           cur_chunk_.end());
250 
251       WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
252                            last_packet_size_field_);
253     }
254 
255     if (cur_chunk_.is_valid()) {
256       shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_),
257                                            target_buffer_, &patch_list_);
258     }
259 
260     drop_packets_ = true;
261     cur_chunk_ = SharedMemoryABI::Chunk();  // Reset to an invalid chunk.
262     reached_max_packets_per_chunk_ = false;
263     retry_new_chunk_after_packet_ = false;
264     last_packet_size_field_ = nullptr;
265 
266     PERFETTO_ANNOTATE_BENIGN_RACE_SIZED(&g_garbage_chunk,
267                                         sizeof(g_garbage_chunk),
268                                         "nobody reads the garbage chunk")
269     return protozero::ContiguousMemoryRange{
270         &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
271   }  // if (!new_chunk.is_valid())
272 
273   PERFETTO_DCHECK(new_chunk.is_valid());
274 
275   if (fragmenting_packet_) {
276     // We should not be fragmenting a packet after we exited drop_packets_ mode,
277     // because we only retry to get a new chunk when a fresh packet is started.
278     PERFETTO_DCHECK(!drop_packets_);
279 
280     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
281     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
282     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
283     PERFETTO_DCHECK(partial_size < cur_chunk_.size());
284 
285     // Backfill the packet header with the fragment size.
286     PERFETTO_DCHECK(partial_size > 0);
287     cur_packet_->inc_size_already_written(partial_size);
288     cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
289     WriteRedundantVarInt(partial_size, cur_packet_->size_field());
290 
291     // Descend in the stack of non-finalized nested submessages (if any) and
292     // detour their |size_field| into the |patch_list_|. At this point we have
293     // to release the chunk and they cannot write anymore into that.
294     // TODO(primiano): add tests to cover this logic.
295     bool chunk_needs_patching = false;
296     for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
297          nested_msg = nested_msg->nested_message()) {
298       uint8_t* const cur_hdr = nested_msg->size_field();
299 
300       // If this is false the protozero Message has already been instructed to
301       // write, upon Finalize(), its size into the patch list.
302       bool size_field_points_within_chunk =
303           cur_hdr >= cur_chunk_.payload_begin() &&
304           cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
305 
306       if (size_field_points_within_chunk) {
307         auto offset =
308             static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin());
309         const ChunkID cur_chunk_id =
310             cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
311         Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
312         nested_msg->set_size_field(&patch->size_field[0]);
313         chunk_needs_patching = true;
314       } else {
315 #if PERFETTO_DCHECK_IS_ON()
316         // Ensure that the size field of the message points to an element of the
317         // patch list.
318         auto patch_it = std::find_if(
319             patch_list_.begin(), patch_list_.end(),
320             [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
321         PERFETTO_DCHECK(patch_it != patch_list_.end());
322 #endif
323       }
324     }  // for(nested_msg
325 
326     if (chunk_needs_patching)
327       cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
328   }  // if(fragmenting_packet)
329 
330   if (cur_chunk_.is_valid()) {
331     // ReturnCompletedChunk will consume the first patched entries from
332     // |patch_list_| and shrink it.
333     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
334                                          &patch_list_);
335   }
336 
337   // Switch to the new chunk.
338   drop_packets_ = false;
339   reached_max_packets_per_chunk_ = false;
340   retry_new_chunk_after_packet_ = false;
341   next_chunk_id_++;
342   cur_chunk_ = std::move(new_chunk);
343   last_packet_size_field_ = nullptr;
344 
345   uint8_t* payload_begin = cur_chunk_.payload_begin();
346   if (fragmenting_packet_) {
347     cur_packet_->set_size_field(payload_begin);
348     last_packet_size_field_ = payload_begin;
349     memset(payload_begin, 0, kPacketHeaderSize);
350     payload_begin += kPacketHeaderSize;
351     cur_fragment_start_ = payload_begin;
352   }
353 
354   return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
355 }
356 
writer_id() const357 WriterID TraceWriterImpl::writer_id() const {
358   return id_;
359 }
360 
361 // Base class definitions.
362 TraceWriter::TraceWriter() = default;
363 TraceWriter::~TraceWriter() = default;
364 
365 }  // namespace perfetto
366