1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18
19 #include <string>
20
21 #include "perfetto/base/build_config.h"
22 #include "perfetto/base/logging.h"
23 #include "perfetto/ext/base/optional.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/track_tracker.h"
32 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
33 #include "src/trace_processor/importers/gzip/gzip_utils.h"
34 #include "src/trace_processor/importers/proto/metadata_tracker.h"
35 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
36 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
37 #include "src/trace_processor/storage/stats.h"
38 #include "src/trace_processor/storage/trace_storage.h"
39 #include "src/trace_processor/trace_sorter.h"
40 #include "src/trace_processor/util/descriptors.h"
41
42 #include "protos/perfetto/common/builtin_clock.pbzero.h"
43 #include "protos/perfetto/config/trace_config.pbzero.h"
44 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
45 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
46 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
47 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
48 #include "protos/perfetto/trace/trace.pbzero.h"
49 #include "protos/perfetto/trace/trace_packet.pbzero.h"
50
51 namespace perfetto {
52 namespace trace_processor {
53
ProtoTraceReader(TraceProcessorContext * ctx)54 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
55 : context_(ctx) {}
56 ProtoTraceReader::~ProtoTraceReader() = default;
57
Parse(std::unique_ptr<uint8_t[]> owned_buf,size_t size)58 util::Status ProtoTraceReader::Parse(std::unique_ptr<uint8_t[]> owned_buf,
59 size_t size) {
60 return tokenizer_.Tokenize(
61 std::move(owned_buf), size,
62 [this](TraceBlobView packet) { return ParsePacket(std::move(packet)); });
63 }
64
ParseExtensionDescriptor(ConstBytes descriptor)65 util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
66 protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
67 descriptor.size);
68
69 auto extension = decoder.extension_set();
70 return context_->descriptor_pool_->AddFromFileDescriptorSet(
71 extension.data, extension.size,
72 /*merge_existing_messages=*/true);
73 }
74
ParsePacket(TraceBlobView packet)75 util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
76 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
77 if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
78 return util::ErrStatus(
79 "Failed to parse proto packet fully; the trace is probably corrupt.");
80 }
81
82 // Any compressed packets should have been handled by the tokenizer.
83 PERFETTO_CHECK(!decoder.has_compressed_packets());
84
85 const uint32_t seq_id = decoder.trusted_packet_sequence_id();
86 auto* state = GetIncrementalStateForPacketSequence(seq_id);
87
88 uint32_t sequence_flags = decoder.sequence_flags();
89
90 if (decoder.incremental_state_cleared() ||
91 sequence_flags &
92 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
93 HandleIncrementalStateCleared(decoder);
94 } else if (decoder.previous_packet_dropped()) {
95 HandlePreviousPacketDropped(decoder);
96 }
97
98 // It is important that we parse defaults before parsing other fields such as
99 // the timestamp, since the defaults could affect them.
100 if (decoder.has_trace_packet_defaults()) {
101 auto field = decoder.trace_packet_defaults();
102 const size_t offset = packet.offset_of(field.data);
103 ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
104 }
105
106 if (decoder.has_interned_data()) {
107 auto field = decoder.interned_data();
108 const size_t offset = packet.offset_of(field.data);
109 ParseInternedData(decoder, packet.slice(offset, field.size));
110 }
111
112 if (decoder.has_clock_snapshot()) {
113 return ParseClockSnapshot(decoder.clock_snapshot(),
114 decoder.trusted_packet_sequence_id());
115 }
116
117 if (decoder.has_service_event()) {
118 PERFETTO_DCHECK(decoder.has_timestamp());
119 int64_t ts = static_cast<int64_t>(decoder.timestamp());
120 return ParseServiceEvent(ts, decoder.service_event());
121 }
122
123 if (decoder.has_extension_descriptor()) {
124 return ParseExtensionDescriptor(decoder.extension_descriptor());
125 }
126
127 if (decoder.sequence_flags() &
128 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
129 if (!seq_id) {
130 return util::ErrStatus(
131 "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
132 "TraceWriter's sequence_id is zero (the service is "
133 "probably too old)");
134 }
135
136 if (!state->IsIncrementalStateValid()) {
137 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
138 return util::OkStatus();
139 }
140 }
141
142 // Workaround a bug in the frame timeline traces which is emitting packets
143 // with zero timestamp (b/179905685).
144 // TODO(primiano): around mid-2021 there should be no traces that have this
145 // bug and we should be able to remove this workaround.
146 if (decoder.has_frame_timeline_event() && decoder.timestamp() == 0) {
147 context_->storage->IncrementStats(
148 stats::frame_timeline_event_parser_errors);
149 return util::OkStatus();
150 }
151
152 protos::pbzero::TracePacketDefaults::Decoder* defaults =
153 state->current_generation()->GetTracePacketDefaults();
154
155 int64_t timestamp;
156 if (decoder.has_timestamp()) {
157 timestamp = static_cast<int64_t>(decoder.timestamp());
158
159 uint32_t timestamp_clock_id =
160 decoder.has_timestamp_clock_id()
161 ? decoder.timestamp_clock_id()
162 : (defaults ? defaults->timestamp_clock_id() : 0);
163
164 if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
165 (!timestamp_clock_id ||
166 timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
167 // Chrome event timestamps are in MONOTONIC domain, but may occur in
168 // traces where (a) no clock snapshots exist or (b) no clock_id is
169 // specified for their timestamps. Adjust to trace time if we have a clock
170 // snapshot.
171 // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
172 // chrome and then remove this.
173 auto trace_ts = context_->clock_tracker->ToTraceTime(
174 protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
175 if (trace_ts.has_value())
176 timestamp = trace_ts.value();
177 } else if (timestamp_clock_id) {
178 // If the TracePacket specifies a non-zero clock-id, translate the
179 // timestamp into the trace-time clock domain.
180 ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
181 bool is_seq_scoped =
182 ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
183 if (is_seq_scoped) {
184 if (!seq_id) {
185 return util::ErrStatus(
186 "TracePacket specified a sequence-local clock id (%" PRIu32
187 ") but the TraceWriter's sequence_id is zero (the service is "
188 "probably too old)",
189 timestamp_clock_id);
190 }
191 converted_clock_id =
192 ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
193 }
194 auto trace_ts =
195 context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
196 if (!trace_ts.has_value()) {
197 // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
198 // We don't return an error here as it will cause the trace to stop
199 // parsing. Instead, we rely on the stat increment in ToTraceTime() to
200 // inform the user about the error.
201 return util::OkStatus();
202 }
203 timestamp = trace_ts.value();
204 }
205 } else {
206 timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
207 }
208 latest_timestamp_ = std::max(timestamp, latest_timestamp_);
209
210 auto& modules = context_->modules_by_field;
211 for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
212 if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
213 for (ProtoImporterModule* module : modules[field_id]) {
214 ModuleResult res = module->TokenizePacket(decoder, &packet, timestamp,
215 state, field_id);
216 if (!res.ignored())
217 return res.ToStatus();
218 }
219 }
220 }
221
222 if (decoder.has_trace_config()) {
223 ParseTraceConfig(decoder.trace_config());
224 }
225
226 // Use parent data and length because we want to parse this again
227 // later to get the exact type of the packet.
228 context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
229
230 return util::OkStatus();
231 }
232
ParseTraceConfig(protozero::ConstBytes blob)233 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
234 protos::pbzero::TraceConfig::Decoder trace_config(blob);
235 // If we're forcing a full sort, we can keep using the INT_MAX duration set
236 // when we created the sorter.
237 const auto& cfg = context_->config;
238 if (cfg.sorting_mode == SortingMode::kForceFullSort) {
239 return;
240 }
241
242 base::Optional<int64_t> flush_period_window_size_ns;
243 if (trace_config.has_flush_period_ms() &&
244 trace_config.flush_period_ms() > 0) {
245 flush_period_window_size_ns =
246 static_cast<int64_t>(trace_config.flush_period_ms()) * 2 * 1000 * 1000;
247 }
248
249 // If we're trying to force flush period based sorting, use that as the
250 // window size if it exists.
251 if (cfg.sorting_mode == SortingMode::kForceFlushPeriodWindowedSort &&
252 flush_period_window_size_ns.has_value()) {
253 context_->sorter->SetWindowSizeNs(flush_period_window_size_ns.value());
254 return;
255 }
256
257 // If we end up here, we should use heuristics because either the sorting mode
258 // was set as such or we don't have a flush period to force the window size
259 // to.
260
261 // If we're not forcing anything and this is a write_into_file trace, then
262 // use flush_period_ms as an indiciator for how big the sliding window for the
263 // sorter should be.
264 if (trace_config.write_into_file()) {
265 int64_t window_size_ns;
266 if (flush_period_window_size_ns.has_value()) {
267 window_size_ns = flush_period_window_size_ns.value();
268 } else {
269 constexpr uint64_t kDefaultWindowNs =
270 180 * 1000 * 1000 * 1000ULL; // 3 minutes.
271 PERFETTO_ELOG(
272 "It is strongly recommended to have flush_period_ms set when "
273 "write_into_file is turned on. You will likely have many dropped "
274 "events because of inability to sort the events correctly.");
275 window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
276 }
277 context_->sorter->SetWindowSizeNs(window_size_ns);
278 }
279 }
280
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)281 void ProtoTraceReader::HandleIncrementalStateCleared(
282 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
283 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
284 PERFETTO_ELOG(
285 "incremental_state_cleared without trusted_packet_sequence_id");
286 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
287 return;
288 }
289 GetIncrementalStateForPacketSequence(
290 packet_decoder.trusted_packet_sequence_id())
291 ->OnIncrementalStateCleared();
292 for (auto& module : context_->modules) {
293 module->OnIncrementalStateCleared(
294 packet_decoder.trusted_packet_sequence_id());
295 }
296 }
297
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)298 void ProtoTraceReader::HandlePreviousPacketDropped(
299 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
300 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
301 PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
302 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
303 return;
304 }
305 GetIncrementalStateForPacketSequence(
306 packet_decoder.trusted_packet_sequence_id())
307 ->OnPacketLoss();
308 }
309
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)310 void ProtoTraceReader::ParseTracePacketDefaults(
311 const protos::pbzero::TracePacket_Decoder& packet_decoder,
312 TraceBlobView trace_packet_defaults) {
313 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
314 PERFETTO_ELOG(
315 "TracePacketDefaults packet without trusted_packet_sequence_id");
316 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
317 return;
318 }
319
320 auto* state = GetIncrementalStateForPacketSequence(
321 packet_decoder.trusted_packet_sequence_id());
322 state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
323 }
324
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)325 void ProtoTraceReader::ParseInternedData(
326 const protos::pbzero::TracePacket::Decoder& packet_decoder,
327 TraceBlobView interned_data) {
328 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
329 PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
330 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
331 return;
332 }
333
334 auto* state = GetIncrementalStateForPacketSequence(
335 packet_decoder.trusted_packet_sequence_id());
336
337 // Don't parse interned data entries until incremental state is valid, because
338 // they could otherwise be associated with the wrong generation in the state.
339 if (!state->IsIncrementalStateValid()) {
340 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
341 return;
342 }
343
344 // Store references to interned data submessages into the sequence's state.
345 protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
346 for (protozero::Field f = decoder.ReadField(); f.valid();
347 f = decoder.ReadField()) {
348 auto bytes = f.as_bytes();
349 auto offset = interned_data.offset_of(bytes.data);
350 state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
351 }
352 }
353
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)354 util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
355 uint32_t seq_id) {
356 std::vector<ClockTracker::ClockValue> clocks;
357 protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
358 if (evt.primary_trace_clock()) {
359 context_->clock_tracker->SetTraceTimeClock(
360 static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
361 }
362 for (auto it = evt.clocks(); it; ++it) {
363 protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
364 ClockTracker::ClockId clock_id = clk.clock_id();
365 if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
366 if (!seq_id) {
367 return util::ErrStatus(
368 "ClockSnapshot packet is specifying a sequence-scoped clock id "
369 "(%" PRIu64 ") but the TracePacket sequence_id is zero",
370 clock_id);
371 }
372 clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
373 }
374 int64_t unit_multiplier_ns =
375 clk.unit_multiplier_ns()
376 ? static_cast<int64_t>(clk.unit_multiplier_ns())
377 : 1;
378 clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
379 clk.is_incremental());
380 }
381
382 uint32_t snapshot_id = context_->clock_tracker->AddSnapshot(clocks);
383
384 // Add the all the clock values to the clock snapshot table.
385 base::Optional<int64_t> trace_ts_for_check;
386 for (const auto& clock : clocks) {
387 // If the clock is incremental, we need to use 0 to map correctly to
388 // |absolute_timestamp|.
389 int64_t ts_to_convert = clock.is_incremental ? 0 : clock.absolute_timestamp;
390 base::Optional<int64_t> opt_trace_ts =
391 context_->clock_tracker->ToTraceTime(clock.clock_id, ts_to_convert);
392 if (!opt_trace_ts) {
393 // This can happen if |AddSnapshot| failed to resolve this clock. Just
394 // ignore this and move on.
395 continue;
396 }
397
398 // Double check that all the clocks in this snapshot resolve to the same
399 // trace timestamp value.
400 PERFETTO_DCHECK(!trace_ts_for_check || opt_trace_ts == trace_ts_for_check);
401 trace_ts_for_check = *opt_trace_ts;
402
403 tables::ClockSnapshotTable::Row row;
404 row.ts = *opt_trace_ts;
405 row.clock_id = static_cast<int64_t>(clock.clock_id);
406 row.clock_value = clock.absolute_timestamp;
407 row.clock_name = GetBuiltinClockNameOrNull(clock.clock_id);
408 row.snapshot_id = snapshot_id;
409
410 auto* snapshot_table = context_->storage->mutable_clock_snapshot_table();
411 snapshot_table->Insert(row);
412 }
413 return util::OkStatus();
414 }
415
GetBuiltinClockNameOrNull(uint64_t clock_id)416 base::Optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
417 uint64_t clock_id) {
418 switch (clock_id) {
419 case protos::pbzero::ClockSnapshot::Clock::REALTIME:
420 return context_->storage->InternString("REALTIME");
421 case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
422 return context_->storage->InternString("REALTIME_COARSE");
423 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
424 return context_->storage->InternString("MONOTONIC");
425 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
426 return context_->storage->InternString("MONOTONIC_COARSE");
427 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
428 return context_->storage->InternString("MONOTONIC_RAW");
429 case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
430 return context_->storage->InternString("BOOTTIME");
431 default:
432 return base::nullopt;
433 }
434 }
435
ParseServiceEvent(int64_t ts,ConstBytes blob)436 util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
437 protos::pbzero::TracingServiceEvent::Decoder tse(blob);
438 if (tse.tracing_started()) {
439 context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
440 Variadic::Integer(ts));
441 }
442 if (tse.tracing_disabled()) {
443 context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
444 Variadic::Integer(ts));
445 }
446 if (tse.all_data_sources_started()) {
447 context_->metadata_tracker->SetMetadata(
448 metadata::all_data_source_started_ns, Variadic::Integer(ts));
449 }
450 return util::OkStatus();
451 }
452
NotifyEndOfFile()453 void ProtoTraceReader::NotifyEndOfFile() {}
454
455 } // namespace trace_processor
456 } // namespace perfetto
457