• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18 
19 #include <optional>
20 #include <string>
21 
22 #include "perfetto/base/build_config.h"
23 #include "perfetto/base/logging.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/metadata_tracker.h"
32 #include "src/trace_processor/importers/common/track_tracker.h"
33 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
34 #include "src/trace_processor/importers/proto/packet_analyzer.h"
35 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
36 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
37 #include "src/trace_processor/sorter/trace_sorter.h"
38 #include "src/trace_processor/storage/stats.h"
39 #include "src/trace_processor/storage/trace_storage.h"
40 #include "src/trace_processor/util/descriptors.h"
41 #include "src/trace_processor/util/gzip_utils.h"
42 
43 #include "protos/perfetto/common/builtin_clock.pbzero.h"
44 #include "protos/perfetto/config/trace_config.pbzero.h"
45 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
46 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
47 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
48 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
49 #include "protos/perfetto/trace/trace.pbzero.h"
50 #include "protos/perfetto/trace/trace_packet.pbzero.h"
51 
52 namespace perfetto {
53 namespace trace_processor {
54 
ProtoTraceReader(TraceProcessorContext * ctx)55 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
56     : context_(ctx),
57       skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
58       invalid_incremental_state_key_id_(
59           ctx->storage->InternString("invalid_incremental_state")) {}
60 ProtoTraceReader::~ProtoTraceReader() = default;
61 
Parse(TraceBlobView blob)62 util::Status ProtoTraceReader::Parse(TraceBlobView blob) {
63   return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
64     return ParsePacket(std::move(packet));
65   });
66 }
67 
ParseExtensionDescriptor(ConstBytes descriptor)68 util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
69   protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
70                                                        descriptor.size);
71 
72   auto extension = decoder.extension_set();
73   return context_->descriptor_pool_->AddFromFileDescriptorSet(
74       extension.data, extension.size,
75       /*skip_prefixes*/ {},
76       /*merge_existing_messages=*/true);
77 }
78 
ParsePacket(TraceBlobView packet)79 util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
80   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
81   if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
82     return util::ErrStatus(
83         "Failed to parse proto packet fully; the trace is probably corrupt.");
84   }
85 
86   // Any compressed packets should have been handled by the tokenizer.
87   PERFETTO_CHECK(!decoder.has_compressed_packets());
88 
89   const uint32_t seq_id = decoder.trusted_packet_sequence_id();
90   auto* state = GetIncrementalStateForPacketSequence(seq_id);
91 
92   if (decoder.first_packet_on_sequence()) {
93     HandleFirstPacketOnSequence(seq_id);
94   }
95 
96   uint32_t sequence_flags = decoder.sequence_flags();
97 
98   if (decoder.incremental_state_cleared() ||
99       sequence_flags &
100           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
101     HandleIncrementalStateCleared(decoder);
102   } else if (decoder.previous_packet_dropped()) {
103     HandlePreviousPacketDropped(decoder);
104   }
105 
106   // It is important that we parse defaults before parsing other fields such as
107   // the timestamp, since the defaults could affect them.
108   if (decoder.has_trace_packet_defaults()) {
109     auto field = decoder.trace_packet_defaults();
110     ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
111   }
112 
113   if (decoder.has_interned_data()) {
114     auto field = decoder.interned_data();
115     ParseInternedData(decoder, packet.slice(field.data, field.size));
116   }
117 
118   if (decoder.has_clock_snapshot()) {
119     return ParseClockSnapshot(decoder.clock_snapshot(),
120                               decoder.trusted_packet_sequence_id());
121   }
122 
123   if (decoder.has_service_event()) {
124     PERFETTO_DCHECK(decoder.has_timestamp());
125     int64_t ts = static_cast<int64_t>(decoder.timestamp());
126     return ParseServiceEvent(ts, decoder.service_event());
127   }
128 
129   if (decoder.has_extension_descriptor()) {
130     return ParseExtensionDescriptor(decoder.extension_descriptor());
131   }
132 
133   if (decoder.sequence_flags() &
134       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
135     if (!seq_id) {
136       return util::ErrStatus(
137           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
138           "TraceWriter's sequence_id is zero (the service is "
139           "probably too old)");
140     }
141 
142     if (!state->IsIncrementalStateValid()) {
143       if (context_->content_analyzer) {
144         // Account for the skipped packet for trace proto content analysis,
145         // with a special annotation.
146         PacketAnalyzer::SampleAnnotation annotation;
147         annotation.push_back(
148             {skipped_packet_key_id_, invalid_incremental_state_key_id_});
149         PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
150       }
151       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
152       return util::OkStatus();
153     }
154   }
155 
156   protos::pbzero::TracePacketDefaults::Decoder* defaults =
157       state->current_generation()->GetTracePacketDefaults();
158 
159   int64_t timestamp;
160   if (decoder.has_timestamp()) {
161     timestamp = static_cast<int64_t>(decoder.timestamp());
162 
163     uint32_t timestamp_clock_id =
164         decoder.has_timestamp_clock_id()
165             ? decoder.timestamp_clock_id()
166             : (defaults ? defaults->timestamp_clock_id() : 0);
167 
168     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
169         (!timestamp_clock_id ||
170          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
171       // Chrome event timestamps are in MONOTONIC domain, but may occur in
172       // traces where (a) no clock snapshots exist or (b) no clock_id is
173       // specified for their timestamps. Adjust to trace time if we have a clock
174       // snapshot.
175       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
176       // chrome and then remove this.
177       auto trace_ts = context_->clock_tracker->ToTraceTime(
178           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
179       if (trace_ts.ok())
180         timestamp = trace_ts.value();
181     } else if (timestamp_clock_id) {
182       // If the TracePacket specifies a non-zero clock-id, translate the
183       // timestamp into the trace-time clock domain.
184       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
185       if (ClockTracker::IsSequenceClock(converted_clock_id)) {
186         if (!seq_id) {
187           return util::ErrStatus(
188               "TracePacket specified a sequence-local clock id (%" PRIu32
189               ") but the TraceWriter's sequence_id is zero (the service is "
190               "probably too old)",
191               timestamp_clock_id);
192         }
193         converted_clock_id =
194             ClockTracker::SeqenceToGlobalClock(seq_id, timestamp_clock_id);
195       }
196       auto trace_ts =
197           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
198       if (!trace_ts.ok()) {
199         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
200         // We don't return an error here as it will cause the trace to stop
201         // parsing. Instead, we rely on the stat increment in ToTraceTime() to
202         // inform the user about the error.
203         return util::OkStatus();
204       }
205       timestamp = trace_ts.value();
206     }
207   } else {
208     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
209   }
210   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
211 
212   if (context_->content_analyzer && !decoder.has_track_event()) {
213     PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
214   }
215 
216   auto& modules = context_->modules_by_field;
217   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
218     if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
219       for (ProtoImporterModule* global_module :
220            context_->modules_for_all_fields) {
221         ModuleResult res = global_module->TokenizePacket(
222             decoder, &packet, timestamp, state, field_id);
223         if (!res.ignored())
224           return res.ToStatus();
225       }
226       for (ProtoImporterModule* module : modules[field_id]) {
227         ModuleResult res = module->TokenizePacket(decoder, &packet, timestamp,
228                                                   state, field_id);
229         if (!res.ignored())
230           return res.ToStatus();
231       }
232     }
233   }
234 
235   if (decoder.has_trace_config()) {
236     ParseTraceConfig(decoder.trace_config());
237   }
238 
239   // Use parent data and length because we want to parse this again
240   // later to get the exact type of the packet.
241   context_->sorter->PushTracePacket(timestamp, state->current_generation(),
242                                     std::move(packet));
243 
244   return util::OkStatus();
245 }
246 
ParseTraceConfig(protozero::ConstBytes blob)247 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
248   protos::pbzero::TraceConfig::Decoder trace_config(blob);
249   if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
250     PERFETTO_ELOG(
251         "It is strongly recommended to have flush_period_ms set when "
252         "write_into_file is turned on. This trace will be loaded fully "
253         "into memory before sorting which increases the likelihood of "
254         "OOMs.");
255   }
256 }
257 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)258 void ProtoTraceReader::HandleIncrementalStateCleared(
259     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
260   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
261     PERFETTO_ELOG(
262         "incremental_state_cleared without trusted_packet_sequence_id");
263     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
264     return;
265   }
266   GetIncrementalStateForPacketSequence(
267       packet_decoder.trusted_packet_sequence_id())
268       ->OnIncrementalStateCleared();
269   for (auto& module : context_->modules) {
270     module->OnIncrementalStateCleared(
271         packet_decoder.trusted_packet_sequence_id());
272   }
273 }
274 
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)275 void ProtoTraceReader::HandleFirstPacketOnSequence(
276     uint32_t packet_sequence_id) {
277   for (auto& module : context_->modules) {
278     module->OnFirstPacketOnSequence(packet_sequence_id);
279   }
280 }
281 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)282 void ProtoTraceReader::HandlePreviousPacketDropped(
283     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
284   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
285     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
286     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
287     return;
288   }
289   GetIncrementalStateForPacketSequence(
290       packet_decoder.trusted_packet_sequence_id())
291       ->OnPacketLoss();
292 }
293 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)294 void ProtoTraceReader::ParseTracePacketDefaults(
295     const protos::pbzero::TracePacket_Decoder& packet_decoder,
296     TraceBlobView trace_packet_defaults) {
297   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
298     PERFETTO_ELOG(
299         "TracePacketDefaults packet without trusted_packet_sequence_id");
300     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
301     return;
302   }
303 
304   auto* state = GetIncrementalStateForPacketSequence(
305       packet_decoder.trusted_packet_sequence_id());
306   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
307 }
308 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)309 void ProtoTraceReader::ParseInternedData(
310     const protos::pbzero::TracePacket::Decoder& packet_decoder,
311     TraceBlobView interned_data) {
312   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
313     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
314     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
315     return;
316   }
317 
318   auto* state = GetIncrementalStateForPacketSequence(
319       packet_decoder.trusted_packet_sequence_id());
320 
321   // Don't parse interned data entries until incremental state is valid, because
322   // they could otherwise be associated with the wrong generation in the state.
323   if (!state->IsIncrementalStateValid()) {
324     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
325     return;
326   }
327 
328   // Store references to interned data submessages into the sequence's state.
329   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
330   for (protozero::Field f = decoder.ReadField(); f.valid();
331        f = decoder.ReadField()) {
332     auto bytes = f.as_bytes();
333     state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
334   }
335 }
336 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)337 util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
338                                                   uint32_t seq_id) {
339   std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
340   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
341   if (evt.primary_trace_clock()) {
342     context_->clock_tracker->SetTraceTimeClock(
343         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
344   }
345   for (auto it = evt.clocks(); it; ++it) {
346     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
347     ClockTracker::ClockId clock_id = clk.clock_id();
348     if (ClockTracker::IsSequenceClock(clk.clock_id())) {
349       if (!seq_id) {
350         return util::ErrStatus(
351             "ClockSnapshot packet is specifying a sequence-scoped clock id "
352             "(%" PRIu64 ") but the TracePacket sequence_id is zero",
353             clock_id);
354       }
355       clock_id = ClockTracker::SeqenceToGlobalClock(seq_id, clk.clock_id());
356     }
357     int64_t unit_multiplier_ns =
358         clk.unit_multiplier_ns()
359             ? static_cast<int64_t>(clk.unit_multiplier_ns())
360             : 1;
361     clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
362                                   clk.is_incremental());
363   }
364 
365   base::StatusOr<uint32_t> snapshot_id =
366       context_->clock_tracker->AddSnapshot(clock_timestamps);
367   if (!snapshot_id.ok()) {
368     PERFETTO_ELOG("%s", snapshot_id.status().c_message());
369     return base::OkStatus();
370   }
371 
372   std::optional<int64_t> trace_time_from_snapshot =
373       context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
374 
375   // Add the all the clock snapshots to the clock snapshot table.
376   std::optional<int64_t> trace_ts_for_check;
377   for (const auto& clock_timestamp : clock_timestamps) {
378     // If the clock is incremental, we need to use 0 to map correctly to
379     // |absolute_timestamp|.
380     int64_t ts_to_convert =
381         clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
382     // Even if we have trace time from snapshot, we still run ToTraceTime to
383     // optimise future conversions.
384     base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
385         clock_timestamp.clock.id, ts_to_convert);
386 
387     if (!opt_trace_ts.ok()) {
388       // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
389       // clock is not monotonic. Try to fetch trace time from snapshot.
390       if (!trace_time_from_snapshot) {
391         PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
392         continue;
393       }
394       opt_trace_ts = *trace_time_from_snapshot;
395     }
396 
397     // Double check that all the clocks in this snapshot resolve to the same
398     // trace timestamp value.
399     PERFETTO_DCHECK(!trace_ts_for_check ||
400                     opt_trace_ts.value() == trace_ts_for_check.value());
401     trace_ts_for_check = *opt_trace_ts;
402 
403     tables::ClockSnapshotTable::Row row;
404     row.ts = *opt_trace_ts;
405     row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
406     row.clock_value = clock_timestamp.timestamp;
407     row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
408     row.snapshot_id = *snapshot_id;
409 
410     context_->storage->mutable_clock_snapshot_table()->Insert(row);
411   }
412   return util::OkStatus();
413 }
414 
GetBuiltinClockNameOrNull(int64_t clock_id)415 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
416     int64_t clock_id) {
417   switch (clock_id) {
418     case protos::pbzero::ClockSnapshot::Clock::REALTIME:
419       return context_->storage->InternString("REALTIME");
420     case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
421       return context_->storage->InternString("REALTIME_COARSE");
422     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
423       return context_->storage->InternString("MONOTONIC");
424     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
425       return context_->storage->InternString("MONOTONIC_COARSE");
426     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
427       return context_->storage->InternString("MONOTONIC_RAW");
428     case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
429       return context_->storage->InternString("BOOTTIME");
430     default:
431       return std::nullopt;
432   }
433 }
434 
ParseServiceEvent(int64_t ts,ConstBytes blob)435 util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
436   protos::pbzero::TracingServiceEvent::Decoder tse(blob);
437   if (tse.tracing_started()) {
438     context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
439                                             Variadic::Integer(ts));
440   }
441   if (tse.tracing_disabled()) {
442     context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
443                                             Variadic::Integer(ts));
444   }
445   if (tse.all_data_sources_started()) {
446     context_->metadata_tracker->SetMetadata(
447         metadata::all_data_source_started_ns, Variadic::Integer(ts));
448   }
449   if (tse.all_data_sources_flushed()) {
450     context_->sorter->NotifyFlushEvent();
451   }
452   if (tse.read_tracing_buffers_completed()) {
453     context_->sorter->NotifyReadBufferEvent();
454   }
455   return util::OkStatus();
456 }
457 
NotifyEndOfFile()458 void ProtoTraceReader::NotifyEndOfFile() {}
459 
460 }  // namespace trace_processor
461 }  // namespace perfetto
462