1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18
19 #include <optional>
20 #include <string>
21
22 #include "perfetto/base/build_config.h"
23 #include "perfetto/base/logging.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/metadata_tracker.h"
32 #include "src/trace_processor/importers/common/track_tracker.h"
33 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
34 #include "src/trace_processor/importers/proto/packet_analyzer.h"
35 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
36 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
37 #include "src/trace_processor/sorter/trace_sorter.h"
38 #include "src/trace_processor/storage/stats.h"
39 #include "src/trace_processor/storage/trace_storage.h"
40 #include "src/trace_processor/util/descriptors.h"
41 #include "src/trace_processor/util/gzip_utils.h"
42
43 #include "protos/perfetto/common/builtin_clock.pbzero.h"
44 #include "protos/perfetto/config/trace_config.pbzero.h"
45 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
46 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
47 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
48 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
49 #include "protos/perfetto/trace/trace.pbzero.h"
50 #include "protos/perfetto/trace/trace_packet.pbzero.h"
51
52 namespace perfetto {
53 namespace trace_processor {
54
ProtoTraceReader(TraceProcessorContext * ctx)55 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
56 : context_(ctx),
57 skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
58 invalid_incremental_state_key_id_(
59 ctx->storage->InternString("invalid_incremental_state")) {}
60 ProtoTraceReader::~ProtoTraceReader() = default;
61
Parse(TraceBlobView blob)62 util::Status ProtoTraceReader::Parse(TraceBlobView blob) {
63 return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
64 return ParsePacket(std::move(packet));
65 });
66 }
67
ParseExtensionDescriptor(ConstBytes descriptor)68 util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
69 protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
70 descriptor.size);
71
72 auto extension = decoder.extension_set();
73 return context_->descriptor_pool_->AddFromFileDescriptorSet(
74 extension.data, extension.size,
75 /*skip_prefixes*/ {},
76 /*merge_existing_messages=*/true);
77 }
78
ParsePacket(TraceBlobView packet)79 util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
80 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
81 if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
82 return util::ErrStatus(
83 "Failed to parse proto packet fully; the trace is probably corrupt.");
84 }
85
86 // Any compressed packets should have been handled by the tokenizer.
87 PERFETTO_CHECK(!decoder.has_compressed_packets());
88
89 const uint32_t seq_id = decoder.trusted_packet_sequence_id();
90 auto* state = GetIncrementalStateForPacketSequence(seq_id);
91
92 if (decoder.first_packet_on_sequence()) {
93 HandleFirstPacketOnSequence(seq_id);
94 }
95
96 uint32_t sequence_flags = decoder.sequence_flags();
97
98 if (decoder.incremental_state_cleared() ||
99 sequence_flags &
100 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
101 HandleIncrementalStateCleared(decoder);
102 } else if (decoder.previous_packet_dropped()) {
103 HandlePreviousPacketDropped(decoder);
104 }
105
106 // It is important that we parse defaults before parsing other fields such as
107 // the timestamp, since the defaults could affect them.
108 if (decoder.has_trace_packet_defaults()) {
109 auto field = decoder.trace_packet_defaults();
110 ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
111 }
112
113 if (decoder.has_interned_data()) {
114 auto field = decoder.interned_data();
115 ParseInternedData(decoder, packet.slice(field.data, field.size));
116 }
117
118 if (decoder.has_clock_snapshot()) {
119 return ParseClockSnapshot(decoder.clock_snapshot(),
120 decoder.trusted_packet_sequence_id());
121 }
122
123 if (decoder.has_service_event()) {
124 PERFETTO_DCHECK(decoder.has_timestamp());
125 int64_t ts = static_cast<int64_t>(decoder.timestamp());
126 return ParseServiceEvent(ts, decoder.service_event());
127 }
128
129 if (decoder.has_extension_descriptor()) {
130 return ParseExtensionDescriptor(decoder.extension_descriptor());
131 }
132
133 if (decoder.sequence_flags() &
134 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
135 if (!seq_id) {
136 return util::ErrStatus(
137 "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
138 "TraceWriter's sequence_id is zero (the service is "
139 "probably too old)");
140 }
141
142 if (!state->IsIncrementalStateValid()) {
143 if (context_->content_analyzer) {
144 // Account for the skipped packet for trace proto content analysis,
145 // with a special annotation.
146 PacketAnalyzer::SampleAnnotation annotation;
147 annotation.push_back(
148 {skipped_packet_key_id_, invalid_incremental_state_key_id_});
149 PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
150 }
151 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
152 return util::OkStatus();
153 }
154 }
155
156 protos::pbzero::TracePacketDefaults::Decoder* defaults =
157 state->current_generation()->GetTracePacketDefaults();
158
159 int64_t timestamp;
160 if (decoder.has_timestamp()) {
161 timestamp = static_cast<int64_t>(decoder.timestamp());
162
163 uint32_t timestamp_clock_id =
164 decoder.has_timestamp_clock_id()
165 ? decoder.timestamp_clock_id()
166 : (defaults ? defaults->timestamp_clock_id() : 0);
167
168 if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
169 (!timestamp_clock_id ||
170 timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
171 // Chrome event timestamps are in MONOTONIC domain, but may occur in
172 // traces where (a) no clock snapshots exist or (b) no clock_id is
173 // specified for their timestamps. Adjust to trace time if we have a clock
174 // snapshot.
175 // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
176 // chrome and then remove this.
177 auto trace_ts = context_->clock_tracker->ToTraceTime(
178 protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
179 if (trace_ts.ok())
180 timestamp = trace_ts.value();
181 } else if (timestamp_clock_id) {
182 // If the TracePacket specifies a non-zero clock-id, translate the
183 // timestamp into the trace-time clock domain.
184 ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
185 if (ClockTracker::IsSequenceClock(converted_clock_id)) {
186 if (!seq_id) {
187 return util::ErrStatus(
188 "TracePacket specified a sequence-local clock id (%" PRIu32
189 ") but the TraceWriter's sequence_id is zero (the service is "
190 "probably too old)",
191 timestamp_clock_id);
192 }
193 converted_clock_id =
194 ClockTracker::SeqenceToGlobalClock(seq_id, timestamp_clock_id);
195 }
196 auto trace_ts =
197 context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
198 if (!trace_ts.ok()) {
199 // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
200 // We don't return an error here as it will cause the trace to stop
201 // parsing. Instead, we rely on the stat increment in ToTraceTime() to
202 // inform the user about the error.
203 return util::OkStatus();
204 }
205 timestamp = trace_ts.value();
206 }
207 } else {
208 timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
209 }
210 latest_timestamp_ = std::max(timestamp, latest_timestamp_);
211
212 if (context_->content_analyzer && !decoder.has_track_event()) {
213 PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
214 }
215
216 auto& modules = context_->modules_by_field;
217 for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
218 if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
219 for (ProtoImporterModule* global_module :
220 context_->modules_for_all_fields) {
221 ModuleResult res = global_module->TokenizePacket(
222 decoder, &packet, timestamp, state, field_id);
223 if (!res.ignored())
224 return res.ToStatus();
225 }
226 for (ProtoImporterModule* module : modules[field_id]) {
227 ModuleResult res = module->TokenizePacket(decoder, &packet, timestamp,
228 state, field_id);
229 if (!res.ignored())
230 return res.ToStatus();
231 }
232 }
233 }
234
235 if (decoder.has_trace_config()) {
236 ParseTraceConfig(decoder.trace_config());
237 }
238
239 // Use parent data and length because we want to parse this again
240 // later to get the exact type of the packet.
241 context_->sorter->PushTracePacket(timestamp, state->current_generation(),
242 std::move(packet));
243
244 return util::OkStatus();
245 }
246
ParseTraceConfig(protozero::ConstBytes blob)247 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
248 protos::pbzero::TraceConfig::Decoder trace_config(blob);
249 if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
250 PERFETTO_ELOG(
251 "It is strongly recommended to have flush_period_ms set when "
252 "write_into_file is turned on. This trace will be loaded fully "
253 "into memory before sorting which increases the likelihood of "
254 "OOMs.");
255 }
256 }
257
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)258 void ProtoTraceReader::HandleIncrementalStateCleared(
259 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
260 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
261 PERFETTO_ELOG(
262 "incremental_state_cleared without trusted_packet_sequence_id");
263 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
264 return;
265 }
266 GetIncrementalStateForPacketSequence(
267 packet_decoder.trusted_packet_sequence_id())
268 ->OnIncrementalStateCleared();
269 for (auto& module : context_->modules) {
270 module->OnIncrementalStateCleared(
271 packet_decoder.trusted_packet_sequence_id());
272 }
273 }
274
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)275 void ProtoTraceReader::HandleFirstPacketOnSequence(
276 uint32_t packet_sequence_id) {
277 for (auto& module : context_->modules) {
278 module->OnFirstPacketOnSequence(packet_sequence_id);
279 }
280 }
281
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)282 void ProtoTraceReader::HandlePreviousPacketDropped(
283 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
284 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
285 PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
286 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
287 return;
288 }
289 GetIncrementalStateForPacketSequence(
290 packet_decoder.trusted_packet_sequence_id())
291 ->OnPacketLoss();
292 }
293
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)294 void ProtoTraceReader::ParseTracePacketDefaults(
295 const protos::pbzero::TracePacket_Decoder& packet_decoder,
296 TraceBlobView trace_packet_defaults) {
297 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
298 PERFETTO_ELOG(
299 "TracePacketDefaults packet without trusted_packet_sequence_id");
300 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
301 return;
302 }
303
304 auto* state = GetIncrementalStateForPacketSequence(
305 packet_decoder.trusted_packet_sequence_id());
306 state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
307 }
308
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)309 void ProtoTraceReader::ParseInternedData(
310 const protos::pbzero::TracePacket::Decoder& packet_decoder,
311 TraceBlobView interned_data) {
312 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
313 PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
314 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
315 return;
316 }
317
318 auto* state = GetIncrementalStateForPacketSequence(
319 packet_decoder.trusted_packet_sequence_id());
320
321 // Don't parse interned data entries until incremental state is valid, because
322 // they could otherwise be associated with the wrong generation in the state.
323 if (!state->IsIncrementalStateValid()) {
324 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
325 return;
326 }
327
328 // Store references to interned data submessages into the sequence's state.
329 protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
330 for (protozero::Field f = decoder.ReadField(); f.valid();
331 f = decoder.ReadField()) {
332 auto bytes = f.as_bytes();
333 state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
334 }
335 }
336
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)337 util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
338 uint32_t seq_id) {
339 std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
340 protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
341 if (evt.primary_trace_clock()) {
342 context_->clock_tracker->SetTraceTimeClock(
343 static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
344 }
345 for (auto it = evt.clocks(); it; ++it) {
346 protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
347 ClockTracker::ClockId clock_id = clk.clock_id();
348 if (ClockTracker::IsSequenceClock(clk.clock_id())) {
349 if (!seq_id) {
350 return util::ErrStatus(
351 "ClockSnapshot packet is specifying a sequence-scoped clock id "
352 "(%" PRIu64 ") but the TracePacket sequence_id is zero",
353 clock_id);
354 }
355 clock_id = ClockTracker::SeqenceToGlobalClock(seq_id, clk.clock_id());
356 }
357 int64_t unit_multiplier_ns =
358 clk.unit_multiplier_ns()
359 ? static_cast<int64_t>(clk.unit_multiplier_ns())
360 : 1;
361 clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
362 clk.is_incremental());
363 }
364
365 base::StatusOr<uint32_t> snapshot_id =
366 context_->clock_tracker->AddSnapshot(clock_timestamps);
367 if (!snapshot_id.ok()) {
368 PERFETTO_ELOG("%s", snapshot_id.status().c_message());
369 return base::OkStatus();
370 }
371
372 std::optional<int64_t> trace_time_from_snapshot =
373 context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
374
375 // Add the all the clock snapshots to the clock snapshot table.
376 std::optional<int64_t> trace_ts_for_check;
377 for (const auto& clock_timestamp : clock_timestamps) {
378 // If the clock is incremental, we need to use 0 to map correctly to
379 // |absolute_timestamp|.
380 int64_t ts_to_convert =
381 clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
382 // Even if we have trace time from snapshot, we still run ToTraceTime to
383 // optimise future conversions.
384 base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
385 clock_timestamp.clock.id, ts_to_convert);
386
387 if (!opt_trace_ts.ok()) {
388 // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
389 // clock is not monotonic. Try to fetch trace time from snapshot.
390 if (!trace_time_from_snapshot) {
391 PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
392 continue;
393 }
394 opt_trace_ts = *trace_time_from_snapshot;
395 }
396
397 // Double check that all the clocks in this snapshot resolve to the same
398 // trace timestamp value.
399 PERFETTO_DCHECK(!trace_ts_for_check ||
400 opt_trace_ts.value() == trace_ts_for_check.value());
401 trace_ts_for_check = *opt_trace_ts;
402
403 tables::ClockSnapshotTable::Row row;
404 row.ts = *opt_trace_ts;
405 row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
406 row.clock_value = clock_timestamp.timestamp;
407 row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
408 row.snapshot_id = *snapshot_id;
409
410 context_->storage->mutable_clock_snapshot_table()->Insert(row);
411 }
412 return util::OkStatus();
413 }
414
GetBuiltinClockNameOrNull(int64_t clock_id)415 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
416 int64_t clock_id) {
417 switch (clock_id) {
418 case protos::pbzero::ClockSnapshot::Clock::REALTIME:
419 return context_->storage->InternString("REALTIME");
420 case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
421 return context_->storage->InternString("REALTIME_COARSE");
422 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
423 return context_->storage->InternString("MONOTONIC");
424 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
425 return context_->storage->InternString("MONOTONIC_COARSE");
426 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
427 return context_->storage->InternString("MONOTONIC_RAW");
428 case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
429 return context_->storage->InternString("BOOTTIME");
430 default:
431 return std::nullopt;
432 }
433 }
434
ParseServiceEvent(int64_t ts,ConstBytes blob)435 util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
436 protos::pbzero::TracingServiceEvent::Decoder tse(blob);
437 if (tse.tracing_started()) {
438 context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
439 Variadic::Integer(ts));
440 }
441 if (tse.tracing_disabled()) {
442 context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
443 Variadic::Integer(ts));
444 }
445 if (tse.all_data_sources_started()) {
446 context_->metadata_tracker->SetMetadata(
447 metadata::all_data_source_started_ns, Variadic::Integer(ts));
448 }
449 if (tse.all_data_sources_flushed()) {
450 context_->sorter->NotifyFlushEvent();
451 }
452 if (tse.read_tracing_buffers_completed()) {
453 context_->sorter->NotifyReadBufferEvent();
454 }
455 return util::OkStatus();
456 }
457
NotifyEndOfFile()458 void ProtoTraceReader::NotifyEndOfFile() {}
459
460 } // namespace trace_processor
461 } // namespace perfetto
462