• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
18 
19 #include <string>
20 
21 #include "perfetto/base/build_config.h"
22 #include "perfetto/base/logging.h"
23 #include "perfetto/ext/base/optional.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/track_tracker.h"
32 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
33 #include "src/trace_processor/importers/gzip/gzip_utils.h"
34 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
35 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
36 #include "src/trace_processor/storage/stats.h"
37 #include "src/trace_processor/storage/trace_storage.h"
38 #include "src/trace_processor/trace_sorter.h"
39 
40 #include "protos/perfetto/common/builtin_clock.pbzero.h"
41 #include "protos/perfetto/config/trace_config.pbzero.h"
42 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
43 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
44 #include "protos/perfetto/trace/trace.pbzero.h"
45 #include "protos/perfetto/trace/trace_packet.pbzero.h"
46 
47 namespace perfetto {
48 namespace trace_processor {
49 
50 using protozero::proto_utils::MakeTagLengthDelimited;
51 using protozero::proto_utils::ParseVarInt;
52 
53 namespace {
54 
55 constexpr uint8_t kTracePacketTag =
56     MakeTagLengthDelimited(protos::pbzero::Trace::kPacketFieldNumber);
57 
Decompress(GzipDecompressor * decompressor,TraceBlobView input)58 TraceBlobView Decompress(GzipDecompressor* decompressor, TraceBlobView input) {
59   PERFETTO_DCHECK(gzip::IsGzipSupported());
60 
61   uint8_t out[4096];
62 
63   std::vector<uint8_t> data;
64   data.reserve(input.length());
65 
66   // Ensure that the decompressor is able to cope with a new stream of data.
67   decompressor->Reset();
68   decompressor->SetInput(input.data(), input.length());
69 
70   using ResultCode = GzipDecompressor::ResultCode;
71   for (auto ret = ResultCode::kOk; ret != ResultCode::kEof;) {
72     auto res = decompressor->Decompress(out, base::ArraySize(out));
73     ret = res.ret;
74     if (ret == ResultCode::kError || ret == ResultCode::kNoProgress ||
75         ret == ResultCode::kNeedsMoreInput)
76       return TraceBlobView(nullptr, 0, 0);
77 
78     data.insert(data.end(), out, out + res.bytes_written);
79   }
80 
81   std::unique_ptr<uint8_t[]> output(new uint8_t[data.size()]);
82   memcpy(output.get(), data.data(), data.size());
83   return TraceBlobView(std::move(output), 0, data.size());
84 }
85 
86 }  // namespace
87 
ProtoTraceTokenizer(TraceProcessorContext * ctx)88 ProtoTraceTokenizer::ProtoTraceTokenizer(TraceProcessorContext* ctx)
89     : context_(ctx) {}
90 ProtoTraceTokenizer::~ProtoTraceTokenizer() = default;
91 
Parse(std::unique_ptr<uint8_t[]> owned_buf,size_t size)92 util::Status ProtoTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> owned_buf,
93                                         size_t size) {
94   uint8_t* data = &owned_buf[0];
95   if (!partial_buf_.empty()) {
96     // It takes ~5 bytes for a proto preamble + the varint size.
97     const size_t kHeaderBytes = 5;
98     if (PERFETTO_UNLIKELY(partial_buf_.size() < kHeaderBytes)) {
99       size_t missing_len = std::min(kHeaderBytes - partial_buf_.size(), size);
100       partial_buf_.insert(partial_buf_.end(), &data[0], &data[missing_len]);
101       if (partial_buf_.size() < kHeaderBytes)
102         return util::OkStatus();
103       data += missing_len;
104       size -= missing_len;
105     }
106 
107     // At this point we have enough data in |partial_buf_| to read at least the
108     // field header and know the size of the next TracePacket.
109     const uint8_t* pos = &partial_buf_[0];
110     uint8_t proto_field_tag = *pos;
111     uint64_t field_size = 0;
112     const uint8_t* next = ParseVarInt(++pos, &*partial_buf_.end(), &field_size);
113     bool parse_failed = next == pos;
114     pos = next;
115     if (proto_field_tag != kTracePacketTag || field_size == 0 || parse_failed) {
116       return util::ErrStatus(
117           "Failed parsing a TracePacket from the partial buffer");
118     }
119 
120     // At this point we know how big the TracePacket is.
121     size_t hdr_size = static_cast<size_t>(pos - &partial_buf_[0]);
122     size_t size_incl_header = static_cast<size_t>(field_size + hdr_size);
123     PERFETTO_DCHECK(size_incl_header > partial_buf_.size());
124 
125     // There is a good chance that between the |partial_buf_| and the new |data|
126     // of the current call we have enough bytes to parse a TracePacket.
127     if (partial_buf_.size() + size >= size_incl_header) {
128       // Create a new buffer for the whole TracePacket and copy into that:
129       // 1) The beginning of the TracePacket (including the proto header) from
130       //    the partial buffer.
131       // 2) The rest of the TracePacket from the current |data| buffer (note
132       //    that we might have consumed already a few bytes form |data| earlier
133       //    in this function, hence we need to keep |off| into account).
134       std::unique_ptr<uint8_t[]> buf(new uint8_t[size_incl_header]);
135       memcpy(&buf[0], partial_buf_.data(), partial_buf_.size());
136       // |size_missing| is the number of bytes for the rest of the TracePacket
137       // in |data|.
138       size_t size_missing = size_incl_header - partial_buf_.size();
139       memcpy(&buf[partial_buf_.size()], &data[0], size_missing);
140       data += size_missing;
141       size -= size_missing;
142       partial_buf_.clear();
143       uint8_t* buf_start = &buf[0];  // Note that buf is std::moved below.
144       util::Status status =
145           ParseInternal(std::move(buf), buf_start, size_incl_header);
146       if (PERFETTO_UNLIKELY(!status.ok()))
147         return status;
148     } else {
149       partial_buf_.insert(partial_buf_.end(), data, &data[size]);
150       return util::OkStatus();
151     }
152   }
153   return ParseInternal(std::move(owned_buf), data, size);
154 }
155 
ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,uint8_t * data,size_t size)156 util::Status ProtoTraceTokenizer::ParseInternal(
157     std::unique_ptr<uint8_t[]> owned_buf,
158     uint8_t* data,
159     size_t size) {
160   PERFETTO_DCHECK(data >= &owned_buf[0]);
161   const uint8_t* start = &owned_buf[0];
162   const size_t data_off = static_cast<size_t>(data - start);
163   TraceBlobView whole_buf(std::move(owned_buf), data_off, size);
164 
165   protos::pbzero::Trace::Decoder decoder(data, size);
166   for (auto it = decoder.packet(); it; ++it) {
167     protozero::ConstBytes packet = *it;
168     size_t field_offset = whole_buf.offset_of(packet.data);
169     util::Status status =
170         ParsePacket(whole_buf.slice(field_offset, packet.size));
171     if (PERFETTO_UNLIKELY(!status.ok()))
172       return status;
173   }
174 
175   const size_t bytes_left = decoder.bytes_left();
176   if (bytes_left > 0) {
177     PERFETTO_DCHECK(partial_buf_.empty());
178     partial_buf_.insert(partial_buf_.end(), &data[decoder.read_offset()],
179                         &data[decoder.read_offset() + bytes_left]);
180   }
181   return util::OkStatus();
182 }
183 
ParsePacket(TraceBlobView packet)184 util::Status ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
185   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
186   if (PERFETTO_UNLIKELY(decoder.bytes_left()))
187     return util::ErrStatus(
188         "Failed to parse proto packet fully; the trace is probably corrupt.");
189 
190   const uint32_t seq_id = decoder.trusted_packet_sequence_id();
191   auto* state = GetIncrementalStateForPacketSequence(seq_id);
192 
193   uint32_t sequence_flags = decoder.sequence_flags();
194 
195   if (decoder.incremental_state_cleared() ||
196       sequence_flags &
197           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
198     HandleIncrementalStateCleared(decoder);
199   } else if (decoder.previous_packet_dropped()) {
200     HandlePreviousPacketDropped(decoder);
201   }
202 
203   // It is important that we parse defaults before parsing other fields such as
204   // the timestamp, since the defaults could affect them.
205   if (decoder.has_trace_packet_defaults()) {
206     auto field = decoder.trace_packet_defaults();
207     const size_t offset = packet.offset_of(field.data);
208     ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
209   }
210 
211   if (decoder.has_interned_data()) {
212     auto field = decoder.interned_data();
213     const size_t offset = packet.offset_of(field.data);
214     ParseInternedData(decoder, packet.slice(offset, field.size));
215   }
216 
217   if (decoder.has_clock_snapshot()) {
218     return ParseClockSnapshot(decoder.clock_snapshot(),
219                               decoder.trusted_packet_sequence_id());
220   }
221 
222   if (decoder.sequence_flags() &
223       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
224     if (!seq_id) {
225       return util::ErrStatus(
226           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
227           "TraceWriter's sequence_id is zero (the service is "
228           "probably too old)");
229     }
230 
231     if (!state->IsIncrementalStateValid()) {
232       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
233       return util::OkStatus();
234     }
235   }
236 
237   protos::pbzero::TracePacketDefaults::Decoder* defaults =
238       state->current_generation()->GetTracePacketDefaults();
239 
240   int64_t timestamp;
241   if (decoder.has_timestamp()) {
242     timestamp = static_cast<int64_t>(decoder.timestamp());
243 
244     uint32_t timestamp_clock_id =
245         decoder.has_timestamp_clock_id()
246             ? decoder.timestamp_clock_id()
247             : (defaults ? defaults->timestamp_clock_id() : 0);
248 
249     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
250         (!timestamp_clock_id ||
251          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
252       // Chrome event timestamps are in MONOTONIC domain, but may occur in
253       // traces where (a) no clock snapshots exist or (b) no clock_id is
254       // specified for their timestamps. Adjust to trace time if we have a clock
255       // snapshot.
256       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
257       // chrome and then remove this.
258       auto trace_ts = context_->clock_tracker->ToTraceTime(
259           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
260       if (trace_ts.has_value())
261         timestamp = trace_ts.value();
262     } else if (timestamp_clock_id) {
263       // If the TracePacket specifies a non-zero clock-id, translate the
264       // timestamp into the trace-time clock domain.
265       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
266       bool is_seq_scoped =
267           ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
268       if (is_seq_scoped) {
269         if (!seq_id) {
270           return util::ErrStatus(
271               "TracePacket specified a sequence-local clock id (%" PRIu32
272               ") but the TraceWriter's sequence_id is zero (the service is "
273               "probably too old)",
274               timestamp_clock_id);
275         }
276         converted_clock_id =
277             ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
278       }
279       auto trace_ts =
280           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
281       if (!trace_ts.has_value()) {
282         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
283         static const char seq_extra_err[] =
284             " Because the clock id is sequence-scoped, the ClockSnapshot must "
285             "be emitted on the same TraceWriter sequence of the packet that "
286             "refers to that clock id.";
287         return util::ErrStatus(
288             "Failed to convert TracePacket's timestamp from clock_id=%" PRIu32
289             " seq_id=%" PRIu32
290             ". This is usually due to the lack of a prior ClockSnapshot "
291             "proto.%s",
292             timestamp_clock_id, seq_id, is_seq_scoped ? seq_extra_err : "");
293       }
294       timestamp = trace_ts.value();
295     }
296   } else {
297     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
298   }
299   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
300 
301   auto& modules = context_->modules_by_field;
302   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
303     if (modules[field_id] && decoder.Get(field_id).valid()) {
304       ModuleResult res = modules[field_id]->TokenizePacket(
305           decoder, &packet, timestamp, state, field_id);
306       if (!res.ignored())
307         return res.ToStatus();
308     }
309   }
310 
311   if (decoder.has_compressed_packets()) {
312     if (!gzip::IsGzipSupported())
313       return util::Status("Cannot decode compressed packets. Zlib not enabled");
314 
315     protozero::ConstBytes field = decoder.compressed_packets();
316     const size_t field_off = packet.offset_of(field.data);
317     TraceBlobView compressed_packets = packet.slice(field_off, field.size);
318     TraceBlobView packets =
319         Decompress(&decompressor_, std::move(compressed_packets));
320 
321     const uint8_t* start = packets.data();
322     const uint8_t* end = packets.data() + packets.length();
323     const uint8_t* ptr = start;
324     while ((end - ptr) > 2) {
325       const uint8_t* packet_start = ptr;
326       if (PERFETTO_UNLIKELY(*ptr != kTracePacketTag))
327         return util::ErrStatus("Expected TracePacket tag");
328       uint64_t packet_size = 0;
329       ptr = ParseVarInt(++ptr, end, &packet_size);
330       size_t packet_offset = static_cast<size_t>(ptr - start);
331       ptr += packet_size;
332       if (PERFETTO_UNLIKELY((ptr - packet_start) < 2 || ptr > end))
333         return util::ErrStatus("Invalid packet size");
334       util::Status status = ParsePacket(
335           packets.slice(packet_offset, static_cast<size_t>(packet_size)));
336       if (PERFETTO_UNLIKELY(!status.ok()))
337         return status;
338     }
339     return util::OkStatus();
340   }
341 
342   // If we're not forcing a full sort and this is a write_into_file trace, then
343   // use flush_period_ms as an indiciator for how big the sliding window for the
344   // sorter should be.
345   if (!context_->config.force_full_sort && decoder.has_trace_config()) {
346     auto config = decoder.trace_config();
347     protos::pbzero::TraceConfig::Decoder trace_config(config.data, config.size);
348 
349     if (trace_config.write_into_file()) {
350       int64_t window_size_ns;
351       if (trace_config.has_flush_period_ms() &&
352           trace_config.flush_period_ms() > 0) {
353         // We use 2x the flush period as a margin of error to allow for any
354         // late flush responses to still be sorted correctly.
355         window_size_ns = static_cast<int64_t>(trace_config.flush_period_ms()) *
356                          2 * 1000 * 1000;
357       } else {
358         constexpr uint64_t kDefaultWindowNs =
359             180 * 1000 * 1000 * 1000ULL;  // 3 minutes.
360         PERFETTO_ELOG(
361             "It is strongly recommended to have flush_period_ms set when "
362             "write_into_file is turned on. You will likely have many dropped "
363             "events because of inability to sort the events correctly.");
364         window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
365       }
366       context_->sorter->SetWindowSizeNs(window_size_ns);
367     }
368   }
369 
370   // Use parent data and length because we want to parse this again
371   // later to get the exact type of the packet.
372   context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
373 
374   return util::OkStatus();
375 }
376 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)377 void ProtoTraceTokenizer::HandleIncrementalStateCleared(
378     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
379   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
380     PERFETTO_ELOG(
381         "incremental_state_cleared without trusted_packet_sequence_id");
382     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
383     return;
384   }
385   GetIncrementalStateForPacketSequence(
386       packet_decoder.trusted_packet_sequence_id())
387       ->OnIncrementalStateCleared();
388   context_->track_tracker->OnIncrementalStateCleared(
389       packet_decoder.trusted_packet_sequence_id());
390 }
391 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)392 void ProtoTraceTokenizer::HandlePreviousPacketDropped(
393     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
394   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
395     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
396     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
397     return;
398   }
399   GetIncrementalStateForPacketSequence(
400       packet_decoder.trusted_packet_sequence_id())
401       ->OnPacketLoss();
402 }
403 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)404 void ProtoTraceTokenizer::ParseTracePacketDefaults(
405     const protos::pbzero::TracePacket_Decoder& packet_decoder,
406     TraceBlobView trace_packet_defaults) {
407   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
408     PERFETTO_ELOG(
409         "TracePacketDefaults packet without trusted_packet_sequence_id");
410     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
411     return;
412   }
413 
414   auto* state = GetIncrementalStateForPacketSequence(
415       packet_decoder.trusted_packet_sequence_id());
416   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
417 }
418 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)419 void ProtoTraceTokenizer::ParseInternedData(
420     const protos::pbzero::TracePacket::Decoder& packet_decoder,
421     TraceBlobView interned_data) {
422   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
423     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
424     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
425     return;
426   }
427 
428   auto* state = GetIncrementalStateForPacketSequence(
429       packet_decoder.trusted_packet_sequence_id());
430 
431   // Don't parse interned data entries until incremental state is valid, because
432   // they could otherwise be associated with the wrong generation in the state.
433   if (!state->IsIncrementalStateValid()) {
434     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
435     return;
436   }
437 
438   // Store references to interned data submessages into the sequence's state.
439   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
440   for (protozero::Field f = decoder.ReadField(); f.valid();
441        f = decoder.ReadField()) {
442     auto bytes = f.as_bytes();
443     auto offset = interned_data.offset_of(bytes.data);
444     state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
445   }
446 }
447 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)448 util::Status ProtoTraceTokenizer::ParseClockSnapshot(ConstBytes blob,
449                                                      uint32_t seq_id) {
450   std::vector<ClockTracker::ClockValue> clocks;
451   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
452   if (evt.primary_trace_clock()) {
453     context_->clock_tracker->SetTraceTimeClock(
454         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
455   }
456   for (auto it = evt.clocks(); it; ++it) {
457     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
458     ClockTracker::ClockId clock_id = clk.clock_id();
459     if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
460       if (!seq_id) {
461         return util::ErrStatus(
462             "ClockSnapshot packet is specifying a sequence-scoped clock id "
463             "(%" PRIu64 ") but the TracePacket sequence_id is zero",
464             clock_id);
465       }
466       clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
467     }
468     int64_t unit_multiplier_ns =
469         clk.unit_multiplier_ns()
470             ? static_cast<int64_t>(clk.unit_multiplier_ns())
471             : 1;
472     clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
473                         clk.is_incremental());
474   }
475   context_->clock_tracker->AddSnapshot(clocks);
476   return util::OkStatus();
477 }
478 
NotifyEndOfFile()479 void ProtoTraceTokenizer::NotifyEndOfFile() {}
480 
481 }  // namespace trace_processor
482 }  // namespace perfetto
483