1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18
19 #include <algorithm>
20 #include <cinttypes>
21 #include <cstddef>
22 #include <cstdint>
23 #include <map>
24 #include <numeric>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30
31 #include "perfetto/base/logging.h"
32 #include "perfetto/base/status.h"
33 #include "perfetto/ext/base/flat_hash_map.h"
34 #include "perfetto/ext/base/status_or.h"
35 #include "perfetto/ext/base/string_view.h"
36 #include "perfetto/protozero/field.h"
37 #include "perfetto/protozero/proto_decoder.h"
38 #include "perfetto/public/compiler.h"
39 #include "src/trace_processor/importers/common/clock_tracker.h"
40 #include "src/trace_processor/importers/common/event_tracker.h"
41 #include "src/trace_processor/importers/common/metadata_tracker.h"
42 #include "src/trace_processor/importers/proto/packet_analyzer.h"
43 #include "src/trace_processor/importers/proto/proto_importer_module.h"
44 #include "src/trace_processor/sorter/trace_sorter.h"
45 #include "src/trace_processor/storage/metadata.h"
46 #include "src/trace_processor/storage/stats.h"
47 #include "src/trace_processor/storage/trace_storage.h"
48 #include "src/trace_processor/tables/metadata_tables_py.h"
49 #include "src/trace_processor/types/variadic.h"
50 #include "src/trace_processor/util/descriptors.h"
51
52 #include "protos/perfetto/common/builtin_clock.pbzero.h"
53 #include "protos/perfetto/common/trace_stats.pbzero.h"
54 #include "protos/perfetto/config/trace_config.pbzero.h"
55 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
56 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
57 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
58 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
59 #include "protos/perfetto/trace/trace.pbzero.h"
60 #include "protos/perfetto/trace/trace_packet.pbzero.h"
61
62 namespace perfetto::trace_processor {
63
ProtoTraceReader(TraceProcessorContext * ctx)64 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
65 : context_(ctx),
66 skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
67 invalid_incremental_state_key_id_(
68 ctx->storage->InternString("invalid_incremental_state")) {}
69 ProtoTraceReader::~ProtoTraceReader() = default;
70
Parse(TraceBlobView blob)71 base::Status ProtoTraceReader::Parse(TraceBlobView blob) {
72 return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
73 return ParsePacket(std::move(packet));
74 });
75 }
76
ParseExtensionDescriptor(ConstBytes descriptor)77 base::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
78 protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
79 descriptor.size);
80
81 auto extension = decoder.extension_set();
82 return context_->descriptor_pool_->AddFromFileDescriptorSet(
83 extension.data, extension.size,
84 /*skip_prefixes*/ {},
85 /*merge_existing_messages=*/true);
86 }
87
ParsePacket(TraceBlobView packet)88 base::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
89 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
90 if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
91 return base::ErrStatus(
92 "Failed to parse proto packet fully; the trace is probably corrupt.");
93 }
94
95 // Any compressed packets should have been handled by the tokenizer.
96 PERFETTO_CHECK(!decoder.has_compressed_packets());
97
98 // When the trace packet is emitted from a remote machine: parse the packet
99 // using a different ProtoTraceReader instance. The packet will be parsed
100 // in the context of the remote machine.
101 if (PERFETTO_UNLIKELY(decoder.has_machine_id())) {
102 if (!context_->machine_id()) {
103 // Default context: switch to another reader instance to parse the packet.
104 PERFETTO_DCHECK(context_->multi_machine_trace_manager);
105 auto* reader = context_->multi_machine_trace_manager->GetOrCreateReader(
106 decoder.machine_id());
107 return reader->ParsePacket(std::move(packet));
108 }
109 }
110 // Assert that the packet is parsed using the right instance of reader.
111 PERFETTO_DCHECK(decoder.has_machine_id() == !!context_->machine_id());
112
113 uint32_t seq_id = decoder.trusted_packet_sequence_id();
114 auto [scoped_state, inserted] = sequence_state_.Insert(seq_id, {});
115 if (decoder.has_trusted_packet_sequence_id()) {
116 if (!inserted && decoder.previous_packet_dropped()) {
117 ++scoped_state->previous_packet_dropped_count;
118 }
119 }
120
121 if (decoder.first_packet_on_sequence()) {
122 HandleFirstPacketOnSequence(seq_id);
123 }
124
125 uint32_t sequence_flags = decoder.sequence_flags();
126 if (decoder.incremental_state_cleared() ||
127 sequence_flags &
128 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
129 HandleIncrementalStateCleared(decoder);
130 } else if (decoder.previous_packet_dropped()) {
131 HandlePreviousPacketDropped(decoder);
132 }
133
134 // It is important that we parse defaults before parsing other fields such as
135 // the timestamp, since the defaults could affect them.
136 if (decoder.has_trace_packet_defaults()) {
137 auto field = decoder.trace_packet_defaults();
138 ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
139 }
140
141 if (decoder.has_interned_data()) {
142 auto field = decoder.interned_data();
143 ParseInternedData(decoder, packet.slice(field.data, field.size));
144 }
145
146 if (decoder.has_clock_snapshot()) {
147 return ParseClockSnapshot(decoder.clock_snapshot(), seq_id);
148 }
149
150 if (decoder.has_trace_stats()) {
151 ParseTraceStats(decoder.trace_stats());
152 }
153
154 if (decoder.has_remote_clock_sync()) {
155 PERFETTO_DCHECK(context_->machine_id());
156 return ParseRemoteClockSync(decoder.remote_clock_sync());
157 }
158
159 if (decoder.has_service_event()) {
160 PERFETTO_DCHECK(decoder.has_timestamp());
161 int64_t ts = static_cast<int64_t>(decoder.timestamp());
162 return ParseServiceEvent(ts, decoder.service_event());
163 }
164
165 if (decoder.has_extension_descriptor()) {
166 return ParseExtensionDescriptor(decoder.extension_descriptor());
167 }
168
169 auto* state = GetIncrementalStateForPacketSequence(seq_id);
170 if (decoder.sequence_flags() &
171 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
172 if (!seq_id) {
173 return base::ErrStatus(
174 "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
175 "TraceWriter's sequence_id is zero (the service is "
176 "probably too old)");
177 }
178 scoped_state->needs_incremental_state_total++;
179
180 if (!state->IsIncrementalStateValid()) {
181 if (context_->content_analyzer) {
182 // Account for the skipped packet for trace proto content analysis,
183 // with a special annotation.
184 PacketAnalyzer::SampleAnnotation annotation;
185 annotation.emplace_back(skipped_packet_key_id_,
186 invalid_incremental_state_key_id_);
187 PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
188 }
189 scoped_state->needs_incremental_state_skipped++;
190 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
191 return base::OkStatus();
192 }
193 }
194
195 if (context_->content_analyzer && !decoder.has_track_event()) {
196 PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
197 }
198
199 if (decoder.has_trace_config()) {
200 ParseTraceConfig(decoder.trace_config());
201 }
202
203 return TimestampTokenizeAndPushToSorter(std::move(packet));
204 }
205
TimestampTokenizeAndPushToSorter(TraceBlobView packet)206 base::Status ProtoTraceReader::TimestampTokenizeAndPushToSorter(
207 TraceBlobView packet) {
208 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
209
210 uint32_t seq_id = decoder.trusted_packet_sequence_id();
211 auto* state = GetIncrementalStateForPacketSequence(seq_id);
212
213 protos::pbzero::TracePacketDefaults::Decoder* defaults =
214 state->current_generation()->GetTracePacketDefaults();
215
216 int64_t timestamp;
217 if (decoder.has_timestamp()) {
218 timestamp = static_cast<int64_t>(decoder.timestamp());
219
220 uint32_t timestamp_clock_id =
221 decoder.has_timestamp_clock_id()
222 ? decoder.timestamp_clock_id()
223 : (defaults ? defaults->timestamp_clock_id() : 0);
224
225 if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
226 (!timestamp_clock_id ||
227 timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
228 // Chrome event timestamps are in MONOTONIC domain, but may occur in
229 // traces where (a) no clock snapshots exist or (b) no clock_id is
230 // specified for their timestamps. Adjust to trace time if we have a clock
231 // snapshot.
232 // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
233 // chrome and then remove this.
234 auto trace_ts = context_->clock_tracker->ToTraceTime(
235 protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
236 if (trace_ts.ok())
237 timestamp = trace_ts.value();
238 } else if (timestamp_clock_id) {
239 // If the TracePacket specifies a non-zero clock-id, translate the
240 // timestamp into the trace-time clock domain.
241 ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
242 if (ClockTracker::IsSequenceClock(converted_clock_id)) {
243 if (!seq_id) {
244 return base::ErrStatus(
245 "TracePacket specified a sequence-local clock id (%" PRIu32
246 ") but the TraceWriter's sequence_id is zero (the service is "
247 "probably too old)",
248 timestamp_clock_id);
249 }
250 converted_clock_id =
251 ClockTracker::SequenceToGlobalClock(seq_id, timestamp_clock_id);
252 }
253 // If the clock tracker is missing a path to trace time for this clock
254 // then try to save this packet for processing later when a path exists.
255 if (!context_->clock_tracker->HasPathToTraceTime(converted_clock_id)) {
256 // We need to switch to full sorting mode to ensure that packets with
257 // missing timestamp are handled correctly. Don't save the packet unless
258 // switching to full sorting mode succeeded.
259 if (!received_eof_ && context_->sorter->SetSortingMode(
260 TraceSorter::SortingMode::kFullSort)) {
261 eof_deferred_packets_.push_back(std::move(packet));
262 return base::OkStatus();
263 }
264 // Fall-through and let ToTraceTime fail below.
265 }
266 auto trace_ts =
267 context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
268 if (!trace_ts.ok()) {
269 // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
270 // We don't return an error here as it will cause the trace to stop
271 // parsing. Instead, we rely on the stat increment in ToTraceTime() to
272 // inform the user about the error.
273 return base::OkStatus();
274 }
275 timestamp = trace_ts.value();
276 }
277 } else {
278 timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
279 }
280 latest_timestamp_ = std::max(timestamp, latest_timestamp_);
281
282 auto& modules = context_->modules_by_field;
283 for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
284 if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
285 for (ProtoImporterModule* global_module :
286 context_->modules_for_all_fields) {
287 ModuleResult res = global_module->TokenizePacket(
288 decoder, &packet, timestamp, state->current_generation(), field_id);
289 if (!res.ignored())
290 return res.ToStatus();
291 }
292 for (ProtoImporterModule* module : modules[field_id]) {
293 ModuleResult res = module->TokenizePacket(
294 decoder, &packet, timestamp, state->current_generation(), field_id);
295 if (!res.ignored())
296 return res.ToStatus();
297 }
298 }
299 }
300
301 // Use parent data and length because we want to parse this again
302 // later to get the exact type of the packet.
303 context_->sorter->PushTracePacket(timestamp, state->current_generation(),
304 std::move(packet), context_->machine_id());
305
306 return base::OkStatus();
307 }
308
ParseTraceConfig(protozero::ConstBytes blob)309 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
310 using Config = protos::pbzero::TraceConfig;
311 Config::Decoder trace_config(blob);
312 if (trace_config.write_into_file()) {
313 if (!trace_config.flush_period_ms()) {
314 context_->storage->IncrementStats(stats::config_write_into_file_no_flush);
315 }
316 int i = 0;
317 for (auto it = trace_config.buffers(); it; ++it, ++i) {
318 Config::BufferConfig::Decoder buf(*it);
319 if (buf.fill_policy() == Config::BufferConfig::FillPolicy::DISCARD) {
320 context_->storage->IncrementIndexedStats(
321 stats::config_write_into_file_discard, i);
322 }
323 }
324 }
325 }
326
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)327 void ProtoTraceReader::HandleIncrementalStateCleared(
328 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
329 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
330 PERFETTO_ELOG(
331 "incremental_state_cleared without trusted_packet_sequence_id");
332 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
333 return;
334 }
335 GetIncrementalStateForPacketSequence(
336 packet_decoder.trusted_packet_sequence_id())
337 ->OnIncrementalStateCleared();
338 for (auto& module : context_->modules) {
339 module->OnIncrementalStateCleared(
340 packet_decoder.trusted_packet_sequence_id());
341 }
342 }
343
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)344 void ProtoTraceReader::HandleFirstPacketOnSequence(
345 uint32_t packet_sequence_id) {
346 for (auto& module : context_->modules) {
347 module->OnFirstPacketOnSequence(packet_sequence_id);
348 }
349 }
350
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)351 void ProtoTraceReader::HandlePreviousPacketDropped(
352 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
353 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
354 PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
355 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
356 return;
357 }
358 GetIncrementalStateForPacketSequence(
359 packet_decoder.trusted_packet_sequence_id())
360 ->OnPacketLoss();
361 }
362
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)363 void ProtoTraceReader::ParseTracePacketDefaults(
364 const protos::pbzero::TracePacket_Decoder& packet_decoder,
365 TraceBlobView trace_packet_defaults) {
366 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
367 PERFETTO_ELOG(
368 "TracePacketDefaults packet without trusted_packet_sequence_id");
369 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
370 return;
371 }
372
373 auto* state = GetIncrementalStateForPacketSequence(
374 packet_decoder.trusted_packet_sequence_id());
375 state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
376 }
377
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)378 void ProtoTraceReader::ParseInternedData(
379 const protos::pbzero::TracePacket::Decoder& packet_decoder,
380 TraceBlobView interned_data) {
381 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
382 PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
383 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
384 return;
385 }
386
387 auto* state = GetIncrementalStateForPacketSequence(
388 packet_decoder.trusted_packet_sequence_id());
389
390 // Don't parse interned data entries until incremental state is valid, because
391 // they could otherwise be associated with the wrong generation in the state.
392 if (!state->IsIncrementalStateValid()) {
393 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
394 return;
395 }
396
397 // Store references to interned data submessages into the sequence's state.
398 protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
399 for (protozero::Field f = decoder.ReadField(); f.valid();
400 f = decoder.ReadField()) {
401 auto bytes = f.as_bytes();
402 state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
403 }
404 }
405
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)406 base::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
407 uint32_t seq_id) {
408 std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
409 protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
410 if (evt.primary_trace_clock()) {
411 context_->clock_tracker->SetTraceTimeClock(
412 static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
413 }
414 for (auto it = evt.clocks(); it; ++it) {
415 protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
416 ClockTracker::ClockId clock_id = clk.clock_id();
417 if (ClockTracker::IsSequenceClock(clk.clock_id())) {
418 if (!seq_id) {
419 return base::ErrStatus(
420 "ClockSnapshot packet is specifying a sequence-scoped clock id "
421 "(%" PRId64 ") but the TracePacket sequence_id is zero",
422 clock_id);
423 }
424 clock_id = ClockTracker::SequenceToGlobalClock(seq_id, clk.clock_id());
425 }
426 int64_t unit_multiplier_ns =
427 clk.unit_multiplier_ns()
428 ? static_cast<int64_t>(clk.unit_multiplier_ns())
429 : 1;
430 clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
431 clk.is_incremental());
432 }
433
434 base::StatusOr<uint32_t> snapshot_id =
435 context_->clock_tracker->AddSnapshot(clock_timestamps);
436 if (!snapshot_id.ok()) {
437 PERFETTO_ELOG("%s", snapshot_id.status().c_message());
438 return base::OkStatus();
439 }
440
441 std::optional<int64_t> trace_time_from_snapshot =
442 context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
443
444 // Add the all the clock snapshots to the clock snapshot table.
445 std::optional<int64_t> trace_ts_for_check;
446 for (const auto& clock_timestamp : clock_timestamps) {
447 // If the clock is incremental, we need to use 0 to map correctly to
448 // |absolute_timestamp|.
449 int64_t ts_to_convert =
450 clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
451 // Even if we have trace time from snapshot, we still run ToTraceTime to
452 // optimise future conversions.
453 base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
454 clock_timestamp.clock.id, ts_to_convert);
455
456 if (!opt_trace_ts.ok()) {
457 // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
458 // clock is not monotonic. Try to fetch trace time from snapshot.
459 if (!trace_time_from_snapshot) {
460 PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
461 continue;
462 }
463 opt_trace_ts = *trace_time_from_snapshot;
464 }
465
466 // Double check that all the clocks in this snapshot resolve to the same
467 // trace timestamp value.
468 PERFETTO_DCHECK(!trace_ts_for_check ||
469 opt_trace_ts.value() == trace_ts_for_check.value());
470 trace_ts_for_check = *opt_trace_ts;
471
472 tables::ClockSnapshotTable::Row row;
473 row.ts = *opt_trace_ts;
474 row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
475 row.clock_value =
476 clock_timestamp.timestamp * clock_timestamp.clock.unit_multiplier_ns;
477 row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
478 row.snapshot_id = *snapshot_id;
479 row.machine_id = context_->machine_id();
480
481 context_->storage->mutable_clock_snapshot_table()->Insert(row);
482 }
483 return base::OkStatus();
484 }
485
ParseRemoteClockSync(ConstBytes blob)486 base::Status ProtoTraceReader::ParseRemoteClockSync(ConstBytes blob) {
487 protos::pbzero::RemoteClockSync::Decoder evt(blob.data, blob.size);
488
489 std::vector<SyncClockSnapshots> sync_clock_snapshots;
490 // Decode the RemoteClockSync message into a struct for calculating offsets.
491 for (auto it = evt.synced_clocks(); it; ++it) {
492 sync_clock_snapshots.emplace_back();
493 auto& sync_clocks = sync_clock_snapshots.back();
494
495 protos::pbzero::RemoteClockSync::SyncedClocks::Decoder synced_clocks(*it);
496 protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder host_clocks(
497 synced_clocks.host_clocks());
498 for (auto clock_it = host_clocks.clocks(); clock_it; clock_it++) {
499 protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
500 *clock_it);
501 sync_clocks[clock.clock_id()].first = clock.timestamp();
502 }
503
504 std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
505 protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder client_clocks(
506 synced_clocks.client_clocks());
507 for (auto clock_it = client_clocks.clocks(); clock_it; clock_it++) {
508 protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
509 *clock_it);
510 sync_clocks[clock.clock_id()].second = clock.timestamp();
511 clock_timestamps.emplace_back(clock.clock_id(), clock.timestamp(), 1,
512 false);
513 }
514
515 // In addition for calculating clock offsets, client clock snapshots are
516 // also added to clock tracker to emulate tracing service taking periodical
517 // clock snapshots. This builds a clock conversion path from a local trace
518 // time (e.g. Chrome trace time) to client builtin clock (CLOCK_MONOTONIC)
519 // which can be converted to host trace time (CLOCK_BOOTTIME).
520 context_->clock_tracker->AddSnapshot(clock_timestamps);
521 }
522
523 // Calculate clock offsets and report to the ClockTracker.
524 auto clock_offsets = CalculateClockOffsets(sync_clock_snapshots);
525 for (auto it = clock_offsets.GetIterator(); it; ++it) {
526 context_->clock_tracker->SetClockOffset(it.key(), it.value());
527 }
528
529 return base::OkStatus();
530 }
531
532 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
CalculateClockOffsets(std::vector<SyncClockSnapshots> & sync_clock_snapshots)533 ProtoTraceReader::CalculateClockOffsets(
534 std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
535 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/> clock_offsets;
536
537 // The RemoteClockSync message contains a sequence of |synced_clocks|
538 // messages. Each |synced_clocks| message contains pairs of ClockSnapshots
539 // taken on both the client and host sides.
540 //
541 // The "synced_clocks" messages are emitted periodically. A single round of
542 // data collection involves four snapshots:
543 // 1. Client snapshot
544 // 2. Host snapshot (triggered by client's IPC message)
545 // 3. Client snapshot (triggered by host's IPC message)
546 // 4. Host snapshot
547 //
548 // These four snapshots are used to estimate the clock offset between the
549 // client and host for each default clock domain present in the ClockSnapshot.
550 std::map<int64_t, std::vector<int64_t>> raw_clock_offsets;
551 // Remote clock syncs happen in an interval of 30 sec. 2 adjacent clock
552 // snapshots belong to the same round if they happen within 30 secs.
553 constexpr uint64_t clock_sync_interval_ns = 30lu * 1000000000;
554 for (size_t i = 1; i < sync_clock_snapshots.size(); i++) {
555 // Synced clocks are taken by client snapshot -> host snapshot.
556 auto& ping_clocks = sync_clock_snapshots[i - 1];
557 auto& update_clocks = sync_clock_snapshots[i];
558
559 auto ping_client =
560 ping_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
561 .second;
562 auto update_client =
563 update_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
564 .second;
565 // |ping_clocks| and |update_clocks| belong to 2 different rounds of remote
566 // clock sync rounds.
567 if (update_client - ping_client >= clock_sync_interval_ns)
568 continue;
569
570 for (auto it = ping_clocks.GetIterator(); it; ++it) {
571 const auto clock_id = it.key();
572 const auto [t1h, t1c] = it.value();
573 const auto [t2h, t2c] = update_clocks[clock_id];
574
575 if (!t1h || !t1c || !t2h || !t2c)
576 continue;
577
578 int64_t offset1 =
579 static_cast<int64_t>(t1c + t2c) / 2 - static_cast<int64_t>(t1h);
580 int64_t offset2 =
581 static_cast<int64_t>(t2c) - static_cast<int64_t>(t1h + t2h) / 2;
582
583 // Clock values are taken in the order of t1c, t1h, t2c, t2h. Offset
584 // calculation requires at least 3 timestamps as a round trip. We have 4,
585 // which can be treated as 2 round trips:
586 // 1. t1c, t1h, t2c as the round trip initiated by the client. Offset 1
587 // = (t1c + t2c) / 2 - t1h
588 // 2. t1h, t2c, t2h as the round trip initiated by the host. Offset 2 =
589 // t2c - (t1h + t2h) / 2
590 raw_clock_offsets[clock_id].push_back(offset1);
591 raw_clock_offsets[clock_id].push_back(offset2);
592 }
593
594 // Use the average of estimated clock offsets in the clock tracker.
595 for (const auto& [clock_id, offsets] : raw_clock_offsets) {
596 int64_t avg_offset =
597 std::accumulate(offsets.begin(), offsets.end(), 0LL) /
598 static_cast<int64_t>(offsets.size());
599 clock_offsets[clock_id] = avg_offset;
600 }
601 }
602
603 return clock_offsets;
604 }
605
GetBuiltinClockNameOrNull(int64_t clock_id)606 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
607 int64_t clock_id) {
608 switch (clock_id) {
609 case protos::pbzero::ClockSnapshot::Clock::REALTIME:
610 return context_->storage->InternString("REALTIME");
611 case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
612 return context_->storage->InternString("REALTIME_COARSE");
613 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
614 return context_->storage->InternString("MONOTONIC");
615 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
616 return context_->storage->InternString("MONOTONIC_COARSE");
617 case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
618 return context_->storage->InternString("MONOTONIC_RAW");
619 case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
620 return context_->storage->InternString("BOOTTIME");
621 default:
622 return std::nullopt;
623 }
624 }
625
ParseServiceEvent(int64_t ts,ConstBytes blob)626 base::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
627 protos::pbzero::TracingServiceEvent::Decoder tse(blob);
628 if (tse.tracing_started()) {
629 context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
630 Variadic::Integer(ts));
631 }
632 if (tse.tracing_disabled()) {
633 context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
634 Variadic::Integer(ts));
635 }
636 if (tse.all_data_sources_started()) {
637 context_->metadata_tracker->SetMetadata(
638 metadata::all_data_source_started_ns, Variadic::Integer(ts));
639 }
640 if (tse.all_data_sources_flushed()) {
641 context_->metadata_tracker->AppendMetadata(
642 metadata::all_data_source_flushed_ns, Variadic::Integer(ts));
643 context_->sorter->NotifyFlushEvent();
644 }
645 if (tse.read_tracing_buffers_completed()) {
646 context_->sorter->NotifyReadBufferEvent();
647 }
648 if (tse.has_slow_starting_data_sources()) {
649 protos::pbzero::TracingServiceEvent::DataSources::Decoder msg(
650 tse.slow_starting_data_sources());
651 for (auto it = msg.data_source(); it; it++) {
652 protos::pbzero::TracingServiceEvent::DataSources::DataSource::Decoder
653 data_source(*it);
654 std::string formatted = data_source.producer_name().ToStdString() + " " +
655 data_source.data_source_name().ToStdString();
656 context_->metadata_tracker->AppendMetadata(
657 metadata::slow_start_data_source,
658 Variadic::String(
659 context_->storage->InternString(base::StringView(formatted))));
660 }
661 }
662 if (tse.has_clone_started()) {
663 context_->storage->SetStats(stats::traced_clone_started_timestamp_ns, ts);
664 }
665 if (tse.has_buffer_cloned()) {
666 context_->storage->SetIndexedStats(
667 stats::traced_buf_clone_done_timestamp_ns,
668 static_cast<int>(tse.buffer_cloned()), ts);
669 }
670 return base::OkStatus();
671 }
672
ParseTraceStats(ConstBytes blob)673 void ProtoTraceReader::ParseTraceStats(ConstBytes blob) {
674 protos::pbzero::TraceStats::Decoder evt(blob.data, blob.size);
675 auto* storage = context_->storage.get();
676 storage->SetStats(stats::traced_producers_connected,
677 static_cast<int64_t>(evt.producers_connected()));
678 storage->SetStats(stats::traced_producers_seen,
679 static_cast<int64_t>(evt.producers_seen()));
680 storage->SetStats(stats::traced_data_sources_registered,
681 static_cast<int64_t>(evt.data_sources_registered()));
682 storage->SetStats(stats::traced_data_sources_seen,
683 static_cast<int64_t>(evt.data_sources_seen()));
684 storage->SetStats(stats::traced_tracing_sessions,
685 static_cast<int64_t>(evt.tracing_sessions()));
686 storage->SetStats(stats::traced_total_buffers,
687 static_cast<int64_t>(evt.total_buffers()));
688 storage->SetStats(stats::traced_chunks_discarded,
689 static_cast<int64_t>(evt.chunks_discarded()));
690 storage->SetStats(stats::traced_patches_discarded,
691 static_cast<int64_t>(evt.patches_discarded()));
692 storage->SetStats(stats::traced_flushes_requested,
693 static_cast<int64_t>(evt.flushes_requested()));
694 storage->SetStats(stats::traced_flushes_succeeded,
695 static_cast<int64_t>(evt.flushes_succeeded()));
696 storage->SetStats(stats::traced_flushes_failed,
697 static_cast<int64_t>(evt.flushes_failed()));
698
699 if (evt.has_filter_stats()) {
700 protos::pbzero::TraceStats::FilterStats::Decoder fstat(evt.filter_stats());
701 storage->SetStats(stats::filter_errors,
702 static_cast<int64_t>(fstat.errors()));
703 storage->SetStats(stats::filter_input_bytes,
704 static_cast<int64_t>(fstat.input_bytes()));
705 storage->SetStats(stats::filter_input_packets,
706 static_cast<int64_t>(fstat.input_packets()));
707 storage->SetStats(stats::filter_output_bytes,
708 static_cast<int64_t>(fstat.output_bytes()));
709 storage->SetStats(stats::filter_time_taken_ns,
710 static_cast<int64_t>(fstat.time_taken_ns()));
711 for (auto [i, it] = std::tuple{0, fstat.bytes_discarded_per_buffer()}; it;
712 ++it, ++i) {
713 storage->SetIndexedStats(stats::traced_buf_bytes_filtered_out, i,
714 static_cast<int64_t>(*it));
715 }
716 }
717
718 switch (evt.final_flush_outcome()) {
719 case protos::pbzero::TraceStats::FINAL_FLUSH_SUCCEEDED:
720 storage->IncrementStats(stats::traced_final_flush_succeeded, 1);
721 break;
722 case protos::pbzero::TraceStats::FINAL_FLUSH_FAILED:
723 storage->IncrementStats(stats::traced_final_flush_failed, 1);
724 break;
725 case protos::pbzero::TraceStats::FINAL_FLUSH_UNSPECIFIED:
726 break;
727 }
728
729 int buf_num = 0;
730 for (auto it = evt.buffer_stats(); it; ++it, ++buf_num) {
731 protos::pbzero::TraceStats::BufferStats::Decoder buf(*it);
732 storage->SetIndexedStats(stats::traced_buf_buffer_size, buf_num,
733 static_cast<int64_t>(buf.buffer_size()));
734 storage->SetIndexedStats(stats::traced_buf_bytes_written, buf_num,
735 static_cast<int64_t>(buf.bytes_written()));
736 storage->SetIndexedStats(stats::traced_buf_bytes_overwritten, buf_num,
737 static_cast<int64_t>(buf.bytes_overwritten()));
738 storage->SetIndexedStats(stats::traced_buf_bytes_read, buf_num,
739 static_cast<int64_t>(buf.bytes_read()));
740 storage->SetIndexedStats(stats::traced_buf_padding_bytes_written, buf_num,
741 static_cast<int64_t>(buf.padding_bytes_written()));
742 storage->SetIndexedStats(stats::traced_buf_padding_bytes_cleared, buf_num,
743 static_cast<int64_t>(buf.padding_bytes_cleared()));
744 storage->SetIndexedStats(stats::traced_buf_chunks_written, buf_num,
745 static_cast<int64_t>(buf.chunks_written()));
746 storage->SetIndexedStats(stats::traced_buf_chunks_rewritten, buf_num,
747 static_cast<int64_t>(buf.chunks_rewritten()));
748 storage->SetIndexedStats(stats::traced_buf_chunks_overwritten, buf_num,
749 static_cast<int64_t>(buf.chunks_overwritten()));
750 storage->SetIndexedStats(stats::traced_buf_chunks_discarded, buf_num,
751 static_cast<int64_t>(buf.chunks_discarded()));
752 storage->SetIndexedStats(stats::traced_buf_chunks_read, buf_num,
753 static_cast<int64_t>(buf.chunks_read()));
754 storage->SetIndexedStats(
755 stats::traced_buf_chunks_committed_out_of_order, buf_num,
756 static_cast<int64_t>(buf.chunks_committed_out_of_order()));
757 storage->SetIndexedStats(stats::traced_buf_write_wrap_count, buf_num,
758 static_cast<int64_t>(buf.write_wrap_count()));
759 storage->SetIndexedStats(stats::traced_buf_patches_succeeded, buf_num,
760 static_cast<int64_t>(buf.patches_succeeded()));
761 storage->SetIndexedStats(stats::traced_buf_patches_failed, buf_num,
762 static_cast<int64_t>(buf.patches_failed()));
763 storage->SetIndexedStats(stats::traced_buf_readaheads_succeeded, buf_num,
764 static_cast<int64_t>(buf.readaheads_succeeded()));
765 storage->SetIndexedStats(stats::traced_buf_readaheads_failed, buf_num,
766 static_cast<int64_t>(buf.readaheads_failed()));
767 storage->SetIndexedStats(stats::traced_buf_abi_violations, buf_num,
768 static_cast<int64_t>(buf.abi_violations()));
769 storage->SetIndexedStats(
770 stats::traced_buf_trace_writer_packet_loss, buf_num,
771 static_cast<int64_t>(buf.trace_writer_packet_loss()));
772 }
773
774 struct BufStats {
775 uint32_t packet_loss = 0;
776 uint32_t incremental_sequences_dropped = 0;
777 };
778 base::FlatHashMap<int32_t, BufStats> stats_per_buffer;
779 for (auto it = evt.writer_stats(); it; ++it) {
780 protos::pbzero::TraceStats::WriterStats::Decoder w(*it);
781 auto seq_id = static_cast<uint32_t>(w.sequence_id());
782 if (auto* s = sequence_state_.Find(seq_id)) {
783 auto& stats = stats_per_buffer[static_cast<int32_t>(w.buffer())];
784 stats.packet_loss += s->previous_packet_dropped_count;
785 stats.incremental_sequences_dropped +=
786 s->needs_incremental_state_skipped > 0 &&
787 s->needs_incremental_state_skipped ==
788 s->needs_incremental_state_total;
789 }
790 }
791
792 for (auto it = stats_per_buffer.GetIterator(); it; ++it) {
793 auto& v = it.value();
794 storage->SetIndexedStats(stats::traced_buf_sequence_packet_loss, it.key(),
795 v.packet_loss);
796 storage->SetIndexedStats(stats::traced_buf_incremental_sequences_dropped,
797 it.key(), v.incremental_sequences_dropped);
798 }
799 }
800
NotifyEndOfFile()801 base::Status ProtoTraceReader::NotifyEndOfFile() {
802 received_eof_ = true;
803 for (auto& packet : eof_deferred_packets_) {
804 RETURN_IF_ERROR(TimestampTokenizeAndPushToSorter(std::move(packet)));
805 }
806 return base::OkStatus();
807 }
808
809 } // namespace perfetto::trace_processor
810