• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18 
19 #include <algorithm>
20 #include <cinttypes>
21 #include <cstddef>
22 #include <cstdint>
23 #include <map>
24 #include <numeric>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <utility>
29 #include <vector>
30 
31 #include "perfetto/base/logging.h"
32 #include "perfetto/base/status.h"
33 #include "perfetto/ext/base/flat_hash_map.h"
34 #include "perfetto/ext/base/status_or.h"
35 #include "perfetto/ext/base/string_view.h"
36 #include "perfetto/protozero/field.h"
37 #include "perfetto/protozero/proto_decoder.h"
38 #include "perfetto/public/compiler.h"
39 #include "src/trace_processor/importers/common/clock_tracker.h"
40 #include "src/trace_processor/importers/common/event_tracker.h"
41 #include "src/trace_processor/importers/common/metadata_tracker.h"
42 #include "src/trace_processor/importers/proto/packet_analyzer.h"
43 #include "src/trace_processor/importers/proto/proto_importer_module.h"
44 #include "src/trace_processor/sorter/trace_sorter.h"
45 #include "src/trace_processor/storage/metadata.h"
46 #include "src/trace_processor/storage/stats.h"
47 #include "src/trace_processor/storage/trace_storage.h"
48 #include "src/trace_processor/tables/metadata_tables_py.h"
49 #include "src/trace_processor/types/variadic.h"
50 #include "src/trace_processor/util/descriptors.h"
51 
52 #include "protos/perfetto/common/builtin_clock.pbzero.h"
53 #include "protos/perfetto/common/trace_stats.pbzero.h"
54 #include "protos/perfetto/config/trace_config.pbzero.h"
55 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
56 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
57 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
58 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
59 #include "protos/perfetto/trace/trace.pbzero.h"
60 #include "protos/perfetto/trace/trace_packet.pbzero.h"
61 
62 namespace perfetto::trace_processor {
63 
ProtoTraceReader(TraceProcessorContext * ctx)64 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
65     : context_(ctx),
66       skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
67       invalid_incremental_state_key_id_(
68           ctx->storage->InternString("invalid_incremental_state")) {}
69 ProtoTraceReader::~ProtoTraceReader() = default;
70 
Parse(TraceBlobView blob)71 base::Status ProtoTraceReader::Parse(TraceBlobView blob) {
72   return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
73     return ParsePacket(std::move(packet));
74   });
75 }
76 
ParseExtensionDescriptor(ConstBytes descriptor)77 base::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
78   protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
79                                                        descriptor.size);
80 
81   auto extension = decoder.extension_set();
82   return context_->descriptor_pool_->AddFromFileDescriptorSet(
83       extension.data, extension.size,
84       /*skip_prefixes*/ {},
85       /*merge_existing_messages=*/true);
86 }
87 
ParsePacket(TraceBlobView packet)88 base::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
89   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
90   if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
91     return base::ErrStatus(
92         "Failed to parse proto packet fully; the trace is probably corrupt.");
93   }
94 
95   // Any compressed packets should have been handled by the tokenizer.
96   PERFETTO_CHECK(!decoder.has_compressed_packets());
97 
98   // When the trace packet is emitted from a remote machine: parse the packet
99   // using a different ProtoTraceReader instance. The packet will be parsed
100   // in the context of the remote machine.
101   if (PERFETTO_UNLIKELY(decoder.has_machine_id())) {
102     if (!context_->machine_id()) {
103       // Default context: switch to another reader instance to parse the packet.
104       PERFETTO_DCHECK(context_->multi_machine_trace_manager);
105       auto* reader = context_->multi_machine_trace_manager->GetOrCreateReader(
106           decoder.machine_id());
107       return reader->ParsePacket(std::move(packet));
108     }
109   }
110   // Assert that the packet is parsed using the right instance of reader.
111   PERFETTO_DCHECK(decoder.has_machine_id() == !!context_->machine_id());
112 
113   uint32_t seq_id = decoder.trusted_packet_sequence_id();
114   auto [scoped_state, inserted] = sequence_state_.Insert(seq_id, {});
115   if (decoder.has_trusted_packet_sequence_id()) {
116     if (!inserted && decoder.previous_packet_dropped()) {
117       ++scoped_state->previous_packet_dropped_count;
118     }
119   }
120 
121   if (decoder.first_packet_on_sequence()) {
122     HandleFirstPacketOnSequence(seq_id);
123   }
124 
125   uint32_t sequence_flags = decoder.sequence_flags();
126   if (decoder.incremental_state_cleared() ||
127       sequence_flags &
128           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
129     HandleIncrementalStateCleared(decoder);
130   } else if (decoder.previous_packet_dropped()) {
131     HandlePreviousPacketDropped(decoder);
132   }
133 
134   // It is important that we parse defaults before parsing other fields such as
135   // the timestamp, since the defaults could affect them.
136   if (decoder.has_trace_packet_defaults()) {
137     auto field = decoder.trace_packet_defaults();
138     ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
139   }
140 
141   if (decoder.has_interned_data()) {
142     auto field = decoder.interned_data();
143     ParseInternedData(decoder, packet.slice(field.data, field.size));
144   }
145 
146   if (decoder.has_clock_snapshot()) {
147     return ParseClockSnapshot(decoder.clock_snapshot(), seq_id);
148   }
149 
150   if (decoder.has_trace_stats()) {
151     ParseTraceStats(decoder.trace_stats());
152   }
153 
154   if (decoder.has_remote_clock_sync()) {
155     PERFETTO_DCHECK(context_->machine_id());
156     return ParseRemoteClockSync(decoder.remote_clock_sync());
157   }
158 
159   if (decoder.has_service_event()) {
160     PERFETTO_DCHECK(decoder.has_timestamp());
161     int64_t ts = static_cast<int64_t>(decoder.timestamp());
162     return ParseServiceEvent(ts, decoder.service_event());
163   }
164 
165   if (decoder.has_extension_descriptor()) {
166     return ParseExtensionDescriptor(decoder.extension_descriptor());
167   }
168 
169   auto* state = GetIncrementalStateForPacketSequence(seq_id);
170   if (decoder.sequence_flags() &
171       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
172     if (!seq_id) {
173       return base::ErrStatus(
174           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
175           "TraceWriter's sequence_id is zero (the service is "
176           "probably too old)");
177     }
178     scoped_state->needs_incremental_state_total++;
179 
180     if (!state->IsIncrementalStateValid()) {
181       if (context_->content_analyzer) {
182         // Account for the skipped packet for trace proto content analysis,
183         // with a special annotation.
184         PacketAnalyzer::SampleAnnotation annotation;
185         annotation.emplace_back(skipped_packet_key_id_,
186                                 invalid_incremental_state_key_id_);
187         PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
188       }
189       scoped_state->needs_incremental_state_skipped++;
190       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
191       return base::OkStatus();
192     }
193   }
194 
195   if (context_->content_analyzer && !decoder.has_track_event()) {
196     PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
197   }
198 
199   if (decoder.has_trace_config()) {
200     ParseTraceConfig(decoder.trace_config());
201   }
202 
203   return TimestampTokenizeAndPushToSorter(std::move(packet));
204 }
205 
TimestampTokenizeAndPushToSorter(TraceBlobView packet)206 base::Status ProtoTraceReader::TimestampTokenizeAndPushToSorter(
207     TraceBlobView packet) {
208   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
209 
210   uint32_t seq_id = decoder.trusted_packet_sequence_id();
211   auto* state = GetIncrementalStateForPacketSequence(seq_id);
212 
213   protos::pbzero::TracePacketDefaults::Decoder* defaults =
214       state->current_generation()->GetTracePacketDefaults();
215 
216   int64_t timestamp;
217   if (decoder.has_timestamp()) {
218     timestamp = static_cast<int64_t>(decoder.timestamp());
219 
220     uint32_t timestamp_clock_id =
221         decoder.has_timestamp_clock_id()
222             ? decoder.timestamp_clock_id()
223             : (defaults ? defaults->timestamp_clock_id() : 0);
224 
225     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
226         (!timestamp_clock_id ||
227          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
228       // Chrome event timestamps are in MONOTONIC domain, but may occur in
229       // traces where (a) no clock snapshots exist or (b) no clock_id is
230       // specified for their timestamps. Adjust to trace time if we have a clock
231       // snapshot.
232       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
233       // chrome and then remove this.
234       auto trace_ts = context_->clock_tracker->ToTraceTime(
235           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
236       if (trace_ts.ok())
237         timestamp = trace_ts.value();
238     } else if (timestamp_clock_id) {
239       // If the TracePacket specifies a non-zero clock-id, translate the
240       // timestamp into the trace-time clock domain.
241       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
242       if (ClockTracker::IsSequenceClock(converted_clock_id)) {
243         if (!seq_id) {
244           return base::ErrStatus(
245               "TracePacket specified a sequence-local clock id (%" PRIu32
246               ") but the TraceWriter's sequence_id is zero (the service is "
247               "probably too old)",
248               timestamp_clock_id);
249         }
250         converted_clock_id =
251             ClockTracker::SequenceToGlobalClock(seq_id, timestamp_clock_id);
252       }
253       // If the clock tracker is missing a path to trace time for this clock
254       // then try to save this packet for processing later when a path exists.
255       if (!context_->clock_tracker->HasPathToTraceTime(converted_clock_id)) {
256         // We need to switch to full sorting mode to ensure that packets with
257         // missing timestamp are handled correctly. Don't save the packet unless
258         // switching to full sorting mode succeeded.
259         if (!received_eof_ && context_->sorter->SetSortingMode(
260                                   TraceSorter::SortingMode::kFullSort)) {
261           eof_deferred_packets_.push_back(std::move(packet));
262           return base::OkStatus();
263         }
264         // Fall-through and let ToTraceTime fail below.
265       }
266       auto trace_ts =
267           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
268       if (!trace_ts.ok()) {
269         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
270         // We don't return an error here as it will cause the trace to stop
271         // parsing. Instead, we rely on the stat increment in ToTraceTime() to
272         // inform the user about the error.
273         return base::OkStatus();
274       }
275       timestamp = trace_ts.value();
276     }
277   } else {
278     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
279   }
280   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
281 
282   auto& modules = context_->modules_by_field;
283   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
284     if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
285       for (ProtoImporterModule* global_module :
286            context_->modules_for_all_fields) {
287         ModuleResult res = global_module->TokenizePacket(
288             decoder, &packet, timestamp, state->current_generation(), field_id);
289         if (!res.ignored())
290           return res.ToStatus();
291       }
292       for (ProtoImporterModule* module : modules[field_id]) {
293         ModuleResult res = module->TokenizePacket(
294             decoder, &packet, timestamp, state->current_generation(), field_id);
295         if (!res.ignored())
296           return res.ToStatus();
297       }
298     }
299   }
300 
301   // Use parent data and length because we want to parse this again
302   // later to get the exact type of the packet.
303   context_->sorter->PushTracePacket(timestamp, state->current_generation(),
304                                     std::move(packet), context_->machine_id());
305 
306   return base::OkStatus();
307 }
308 
ParseTraceConfig(protozero::ConstBytes blob)309 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
310   using Config = protos::pbzero::TraceConfig;
311   Config::Decoder trace_config(blob);
312   if (trace_config.write_into_file()) {
313     if (!trace_config.flush_period_ms()) {
314       context_->storage->IncrementStats(stats::config_write_into_file_no_flush);
315     }
316     int i = 0;
317     for (auto it = trace_config.buffers(); it; ++it, ++i) {
318       Config::BufferConfig::Decoder buf(*it);
319       if (buf.fill_policy() == Config::BufferConfig::FillPolicy::DISCARD) {
320         context_->storage->IncrementIndexedStats(
321             stats::config_write_into_file_discard, i);
322       }
323     }
324   }
325 }
326 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)327 void ProtoTraceReader::HandleIncrementalStateCleared(
328     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
329   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
330     PERFETTO_ELOG(
331         "incremental_state_cleared without trusted_packet_sequence_id");
332     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
333     return;
334   }
335   GetIncrementalStateForPacketSequence(
336       packet_decoder.trusted_packet_sequence_id())
337       ->OnIncrementalStateCleared();
338   for (auto& module : context_->modules) {
339     module->OnIncrementalStateCleared(
340         packet_decoder.trusted_packet_sequence_id());
341   }
342 }
343 
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)344 void ProtoTraceReader::HandleFirstPacketOnSequence(
345     uint32_t packet_sequence_id) {
346   for (auto& module : context_->modules) {
347     module->OnFirstPacketOnSequence(packet_sequence_id);
348   }
349 }
350 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)351 void ProtoTraceReader::HandlePreviousPacketDropped(
352     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
353   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
354     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
355     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
356     return;
357   }
358   GetIncrementalStateForPacketSequence(
359       packet_decoder.trusted_packet_sequence_id())
360       ->OnPacketLoss();
361 }
362 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)363 void ProtoTraceReader::ParseTracePacketDefaults(
364     const protos::pbzero::TracePacket_Decoder& packet_decoder,
365     TraceBlobView trace_packet_defaults) {
366   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
367     PERFETTO_ELOG(
368         "TracePacketDefaults packet without trusted_packet_sequence_id");
369     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
370     return;
371   }
372 
373   auto* state = GetIncrementalStateForPacketSequence(
374       packet_decoder.trusted_packet_sequence_id());
375   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
376 }
377 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)378 void ProtoTraceReader::ParseInternedData(
379     const protos::pbzero::TracePacket::Decoder& packet_decoder,
380     TraceBlobView interned_data) {
381   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
382     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
383     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
384     return;
385   }
386 
387   auto* state = GetIncrementalStateForPacketSequence(
388       packet_decoder.trusted_packet_sequence_id());
389 
390   // Don't parse interned data entries until incremental state is valid, because
391   // they could otherwise be associated with the wrong generation in the state.
392   if (!state->IsIncrementalStateValid()) {
393     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
394     return;
395   }
396 
397   // Store references to interned data submessages into the sequence's state.
398   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
399   for (protozero::Field f = decoder.ReadField(); f.valid();
400        f = decoder.ReadField()) {
401     auto bytes = f.as_bytes();
402     state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
403   }
404 }
405 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)406 base::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
407                                                   uint32_t seq_id) {
408   std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
409   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
410   if (evt.primary_trace_clock()) {
411     context_->clock_tracker->SetTraceTimeClock(
412         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
413   }
414   for (auto it = evt.clocks(); it; ++it) {
415     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
416     ClockTracker::ClockId clock_id = clk.clock_id();
417     if (ClockTracker::IsSequenceClock(clk.clock_id())) {
418       if (!seq_id) {
419         return base::ErrStatus(
420             "ClockSnapshot packet is specifying a sequence-scoped clock id "
421             "(%" PRId64 ") but the TracePacket sequence_id is zero",
422             clock_id);
423       }
424       clock_id = ClockTracker::SequenceToGlobalClock(seq_id, clk.clock_id());
425     }
426     int64_t unit_multiplier_ns =
427         clk.unit_multiplier_ns()
428             ? static_cast<int64_t>(clk.unit_multiplier_ns())
429             : 1;
430     clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
431                                   clk.is_incremental());
432   }
433 
434   base::StatusOr<uint32_t> snapshot_id =
435       context_->clock_tracker->AddSnapshot(clock_timestamps);
436   if (!snapshot_id.ok()) {
437     PERFETTO_ELOG("%s", snapshot_id.status().c_message());
438     return base::OkStatus();
439   }
440 
441   std::optional<int64_t> trace_time_from_snapshot =
442       context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
443 
444   // Add the all the clock snapshots to the clock snapshot table.
445   std::optional<int64_t> trace_ts_for_check;
446   for (const auto& clock_timestamp : clock_timestamps) {
447     // If the clock is incremental, we need to use 0 to map correctly to
448     // |absolute_timestamp|.
449     int64_t ts_to_convert =
450         clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
451     // Even if we have trace time from snapshot, we still run ToTraceTime to
452     // optimise future conversions.
453     base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
454         clock_timestamp.clock.id, ts_to_convert);
455 
456     if (!opt_trace_ts.ok()) {
457       // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
458       // clock is not monotonic. Try to fetch trace time from snapshot.
459       if (!trace_time_from_snapshot) {
460         PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
461         continue;
462       }
463       opt_trace_ts = *trace_time_from_snapshot;
464     }
465 
466     // Double check that all the clocks in this snapshot resolve to the same
467     // trace timestamp value.
468     PERFETTO_DCHECK(!trace_ts_for_check ||
469                     opt_trace_ts.value() == trace_ts_for_check.value());
470     trace_ts_for_check = *opt_trace_ts;
471 
472     tables::ClockSnapshotTable::Row row;
473     row.ts = *opt_trace_ts;
474     row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
475     row.clock_value =
476         clock_timestamp.timestamp * clock_timestamp.clock.unit_multiplier_ns;
477     row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
478     row.snapshot_id = *snapshot_id;
479     row.machine_id = context_->machine_id();
480 
481     context_->storage->mutable_clock_snapshot_table()->Insert(row);
482   }
483   return base::OkStatus();
484 }
485 
ParseRemoteClockSync(ConstBytes blob)486 base::Status ProtoTraceReader::ParseRemoteClockSync(ConstBytes blob) {
487   protos::pbzero::RemoteClockSync::Decoder evt(blob.data, blob.size);
488 
489   std::vector<SyncClockSnapshots> sync_clock_snapshots;
490   // Decode the RemoteClockSync message into a struct for calculating offsets.
491   for (auto it = evt.synced_clocks(); it; ++it) {
492     sync_clock_snapshots.emplace_back();
493     auto& sync_clocks = sync_clock_snapshots.back();
494 
495     protos::pbzero::RemoteClockSync::SyncedClocks::Decoder synced_clocks(*it);
496     protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder host_clocks(
497         synced_clocks.host_clocks());
498     for (auto clock_it = host_clocks.clocks(); clock_it; clock_it++) {
499       protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
500           *clock_it);
501       sync_clocks[clock.clock_id()].first = clock.timestamp();
502     }
503 
504     std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
505     protos::pbzero::ClockSnapshot::ClockSnapshot::Decoder client_clocks(
506         synced_clocks.client_clocks());
507     for (auto clock_it = client_clocks.clocks(); clock_it; clock_it++) {
508       protos::pbzero::ClockSnapshot::ClockSnapshot::Clock::Decoder clock(
509           *clock_it);
510       sync_clocks[clock.clock_id()].second = clock.timestamp();
511       clock_timestamps.emplace_back(clock.clock_id(), clock.timestamp(), 1,
512                                     false);
513     }
514 
515     // In addition for calculating clock offsets, client clock snapshots are
516     // also added to clock tracker to emulate tracing service taking periodical
517     // clock snapshots. This builds a clock conversion path from a local trace
518     // time (e.g. Chrome trace time) to client builtin clock (CLOCK_MONOTONIC)
519     // which can be converted to host trace time (CLOCK_BOOTTIME).
520     context_->clock_tracker->AddSnapshot(clock_timestamps);
521   }
522 
523   // Calculate clock offsets and report to the ClockTracker.
524   auto clock_offsets = CalculateClockOffsets(sync_clock_snapshots);
525   for (auto it = clock_offsets.GetIterator(); it; ++it) {
526     context_->clock_tracker->SetClockOffset(it.key(), it.value());
527   }
528 
529   return base::OkStatus();
530 }
531 
532 base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/>
CalculateClockOffsets(std::vector<SyncClockSnapshots> & sync_clock_snapshots)533 ProtoTraceReader::CalculateClockOffsets(
534     std::vector<SyncClockSnapshots>& sync_clock_snapshots) {
535   base::FlatHashMap<int64_t /*Clock Id*/, int64_t /*Offset*/> clock_offsets;
536 
537   // The RemoteClockSync message contains a sequence of |synced_clocks|
538   // messages. Each |synced_clocks| message contains pairs of ClockSnapshots
539   // taken on both the client and host sides.
540   //
541   // The "synced_clocks" messages are emitted periodically. A single round of
542   // data collection involves four snapshots:
543   //   1. Client snapshot
544   //   2. Host snapshot (triggered by client's IPC message)
545   //   3. Client snapshot (triggered by host's IPC message)
546   //   4. Host snapshot
547   //
548   // These four snapshots are used to estimate the clock offset between the
549   // client and host for each default clock domain present in the ClockSnapshot.
550   std::map<int64_t, std::vector<int64_t>> raw_clock_offsets;
551   // Remote clock syncs happen in an interval of 30 sec. 2 adjacent clock
552   // snapshots belong to the same round if they happen within 30 secs.
553   constexpr uint64_t clock_sync_interval_ns = 30lu * 1000000000;
554   for (size_t i = 1; i < sync_clock_snapshots.size(); i++) {
555     // Synced clocks are taken by client snapshot -> host snapshot.
556     auto& ping_clocks = sync_clock_snapshots[i - 1];
557     auto& update_clocks = sync_clock_snapshots[i];
558 
559     auto ping_client =
560         ping_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
561             .second;
562     auto update_client =
563         update_clocks[protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME]
564             .second;
565     // |ping_clocks| and |update_clocks| belong to 2 different rounds of remote
566     // clock sync rounds.
567     if (update_client - ping_client >= clock_sync_interval_ns)
568       continue;
569 
570     for (auto it = ping_clocks.GetIterator(); it; ++it) {
571       const auto clock_id = it.key();
572       const auto [t1h, t1c] = it.value();
573       const auto [t2h, t2c] = update_clocks[clock_id];
574 
575       if (!t1h || !t1c || !t2h || !t2c)
576         continue;
577 
578       int64_t offset1 =
579           static_cast<int64_t>(t1c + t2c) / 2 - static_cast<int64_t>(t1h);
580       int64_t offset2 =
581           static_cast<int64_t>(t2c) - static_cast<int64_t>(t1h + t2h) / 2;
582 
583       // Clock values are taken in the order of t1c, t1h, t2c, t2h. Offset
584       // calculation requires at least 3 timestamps as a round trip. We have 4,
585       // which can be treated as 2 round trips:
586       //   1. t1c, t1h, t2c as the round trip initiated by the client. Offset 1
587       //      = (t1c + t2c) / 2 - t1h
588       //   2. t1h, t2c, t2h as the round trip initiated by the host. Offset 2 =
589       //      t2c - (t1h + t2h) / 2
590       raw_clock_offsets[clock_id].push_back(offset1);
591       raw_clock_offsets[clock_id].push_back(offset2);
592     }
593 
594     // Use the average of estimated clock offsets in the clock tracker.
595     for (const auto& [clock_id, offsets] : raw_clock_offsets) {
596       int64_t avg_offset =
597           std::accumulate(offsets.begin(), offsets.end(), 0LL) /
598           static_cast<int64_t>(offsets.size());
599       clock_offsets[clock_id] = avg_offset;
600     }
601   }
602 
603   return clock_offsets;
604 }
605 
GetBuiltinClockNameOrNull(int64_t clock_id)606 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
607     int64_t clock_id) {
608   switch (clock_id) {
609     case protos::pbzero::ClockSnapshot::Clock::REALTIME:
610       return context_->storage->InternString("REALTIME");
611     case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
612       return context_->storage->InternString("REALTIME_COARSE");
613     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
614       return context_->storage->InternString("MONOTONIC");
615     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
616       return context_->storage->InternString("MONOTONIC_COARSE");
617     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
618       return context_->storage->InternString("MONOTONIC_RAW");
619     case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
620       return context_->storage->InternString("BOOTTIME");
621     default:
622       return std::nullopt;
623   }
624 }
625 
ParseServiceEvent(int64_t ts,ConstBytes blob)626 base::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
627   protos::pbzero::TracingServiceEvent::Decoder tse(blob);
628   if (tse.tracing_started()) {
629     context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
630                                             Variadic::Integer(ts));
631   }
632   if (tse.tracing_disabled()) {
633     context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
634                                             Variadic::Integer(ts));
635   }
636   if (tse.all_data_sources_started()) {
637     context_->metadata_tracker->SetMetadata(
638         metadata::all_data_source_started_ns, Variadic::Integer(ts));
639   }
640   if (tse.all_data_sources_flushed()) {
641     context_->metadata_tracker->AppendMetadata(
642         metadata::all_data_source_flushed_ns, Variadic::Integer(ts));
643     context_->sorter->NotifyFlushEvent();
644   }
645   if (tse.read_tracing_buffers_completed()) {
646     context_->sorter->NotifyReadBufferEvent();
647   }
648   if (tse.has_slow_starting_data_sources()) {
649     protos::pbzero::TracingServiceEvent::DataSources::Decoder msg(
650         tse.slow_starting_data_sources());
651     for (auto it = msg.data_source(); it; it++) {
652       protos::pbzero::TracingServiceEvent::DataSources::DataSource::Decoder
653           data_source(*it);
654       std::string formatted = data_source.producer_name().ToStdString() + " " +
655                               data_source.data_source_name().ToStdString();
656       context_->metadata_tracker->AppendMetadata(
657           metadata::slow_start_data_source,
658           Variadic::String(
659               context_->storage->InternString(base::StringView(formatted))));
660     }
661   }
662   if (tse.has_clone_started()) {
663     context_->storage->SetStats(stats::traced_clone_started_timestamp_ns, ts);
664   }
665   if (tse.has_buffer_cloned()) {
666     context_->storage->SetIndexedStats(
667         stats::traced_buf_clone_done_timestamp_ns,
668         static_cast<int>(tse.buffer_cloned()), ts);
669   }
670   return base::OkStatus();
671 }
672 
ParseTraceStats(ConstBytes blob)673 void ProtoTraceReader::ParseTraceStats(ConstBytes blob) {
674   protos::pbzero::TraceStats::Decoder evt(blob.data, blob.size);
675   auto* storage = context_->storage.get();
676   storage->SetStats(stats::traced_producers_connected,
677                     static_cast<int64_t>(evt.producers_connected()));
678   storage->SetStats(stats::traced_producers_seen,
679                     static_cast<int64_t>(evt.producers_seen()));
680   storage->SetStats(stats::traced_data_sources_registered,
681                     static_cast<int64_t>(evt.data_sources_registered()));
682   storage->SetStats(stats::traced_data_sources_seen,
683                     static_cast<int64_t>(evt.data_sources_seen()));
684   storage->SetStats(stats::traced_tracing_sessions,
685                     static_cast<int64_t>(evt.tracing_sessions()));
686   storage->SetStats(stats::traced_total_buffers,
687                     static_cast<int64_t>(evt.total_buffers()));
688   storage->SetStats(stats::traced_chunks_discarded,
689                     static_cast<int64_t>(evt.chunks_discarded()));
690   storage->SetStats(stats::traced_patches_discarded,
691                     static_cast<int64_t>(evt.patches_discarded()));
692   storage->SetStats(stats::traced_flushes_requested,
693                     static_cast<int64_t>(evt.flushes_requested()));
694   storage->SetStats(stats::traced_flushes_succeeded,
695                     static_cast<int64_t>(evt.flushes_succeeded()));
696   storage->SetStats(stats::traced_flushes_failed,
697                     static_cast<int64_t>(evt.flushes_failed()));
698 
699   if (evt.has_filter_stats()) {
700     protos::pbzero::TraceStats::FilterStats::Decoder fstat(evt.filter_stats());
701     storage->SetStats(stats::filter_errors,
702                       static_cast<int64_t>(fstat.errors()));
703     storage->SetStats(stats::filter_input_bytes,
704                       static_cast<int64_t>(fstat.input_bytes()));
705     storage->SetStats(stats::filter_input_packets,
706                       static_cast<int64_t>(fstat.input_packets()));
707     storage->SetStats(stats::filter_output_bytes,
708                       static_cast<int64_t>(fstat.output_bytes()));
709     storage->SetStats(stats::filter_time_taken_ns,
710                       static_cast<int64_t>(fstat.time_taken_ns()));
711     for (auto [i, it] = std::tuple{0, fstat.bytes_discarded_per_buffer()}; it;
712          ++it, ++i) {
713       storage->SetIndexedStats(stats::traced_buf_bytes_filtered_out, i,
714                                static_cast<int64_t>(*it));
715     }
716   }
717 
718   switch (evt.final_flush_outcome()) {
719     case protos::pbzero::TraceStats::FINAL_FLUSH_SUCCEEDED:
720       storage->IncrementStats(stats::traced_final_flush_succeeded, 1);
721       break;
722     case protos::pbzero::TraceStats::FINAL_FLUSH_FAILED:
723       storage->IncrementStats(stats::traced_final_flush_failed, 1);
724       break;
725     case protos::pbzero::TraceStats::FINAL_FLUSH_UNSPECIFIED:
726       break;
727   }
728 
729   int buf_num = 0;
730   for (auto it = evt.buffer_stats(); it; ++it, ++buf_num) {
731     protos::pbzero::TraceStats::BufferStats::Decoder buf(*it);
732     storage->SetIndexedStats(stats::traced_buf_buffer_size, buf_num,
733                              static_cast<int64_t>(buf.buffer_size()));
734     storage->SetIndexedStats(stats::traced_buf_bytes_written, buf_num,
735                              static_cast<int64_t>(buf.bytes_written()));
736     storage->SetIndexedStats(stats::traced_buf_bytes_overwritten, buf_num,
737                              static_cast<int64_t>(buf.bytes_overwritten()));
738     storage->SetIndexedStats(stats::traced_buf_bytes_read, buf_num,
739                              static_cast<int64_t>(buf.bytes_read()));
740     storage->SetIndexedStats(stats::traced_buf_padding_bytes_written, buf_num,
741                              static_cast<int64_t>(buf.padding_bytes_written()));
742     storage->SetIndexedStats(stats::traced_buf_padding_bytes_cleared, buf_num,
743                              static_cast<int64_t>(buf.padding_bytes_cleared()));
744     storage->SetIndexedStats(stats::traced_buf_chunks_written, buf_num,
745                              static_cast<int64_t>(buf.chunks_written()));
746     storage->SetIndexedStats(stats::traced_buf_chunks_rewritten, buf_num,
747                              static_cast<int64_t>(buf.chunks_rewritten()));
748     storage->SetIndexedStats(stats::traced_buf_chunks_overwritten, buf_num,
749                              static_cast<int64_t>(buf.chunks_overwritten()));
750     storage->SetIndexedStats(stats::traced_buf_chunks_discarded, buf_num,
751                              static_cast<int64_t>(buf.chunks_discarded()));
752     storage->SetIndexedStats(stats::traced_buf_chunks_read, buf_num,
753                              static_cast<int64_t>(buf.chunks_read()));
754     storage->SetIndexedStats(
755         stats::traced_buf_chunks_committed_out_of_order, buf_num,
756         static_cast<int64_t>(buf.chunks_committed_out_of_order()));
757     storage->SetIndexedStats(stats::traced_buf_write_wrap_count, buf_num,
758                              static_cast<int64_t>(buf.write_wrap_count()));
759     storage->SetIndexedStats(stats::traced_buf_patches_succeeded, buf_num,
760                              static_cast<int64_t>(buf.patches_succeeded()));
761     storage->SetIndexedStats(stats::traced_buf_patches_failed, buf_num,
762                              static_cast<int64_t>(buf.patches_failed()));
763     storage->SetIndexedStats(stats::traced_buf_readaheads_succeeded, buf_num,
764                              static_cast<int64_t>(buf.readaheads_succeeded()));
765     storage->SetIndexedStats(stats::traced_buf_readaheads_failed, buf_num,
766                              static_cast<int64_t>(buf.readaheads_failed()));
767     storage->SetIndexedStats(stats::traced_buf_abi_violations, buf_num,
768                              static_cast<int64_t>(buf.abi_violations()));
769     storage->SetIndexedStats(
770         stats::traced_buf_trace_writer_packet_loss, buf_num,
771         static_cast<int64_t>(buf.trace_writer_packet_loss()));
772   }
773 
774   struct BufStats {
775     uint32_t packet_loss = 0;
776     uint32_t incremental_sequences_dropped = 0;
777   };
778   base::FlatHashMap<int32_t, BufStats> stats_per_buffer;
779   for (auto it = evt.writer_stats(); it; ++it) {
780     protos::pbzero::TraceStats::WriterStats::Decoder w(*it);
781     auto seq_id = static_cast<uint32_t>(w.sequence_id());
782     if (auto* s = sequence_state_.Find(seq_id)) {
783       auto& stats = stats_per_buffer[static_cast<int32_t>(w.buffer())];
784       stats.packet_loss += s->previous_packet_dropped_count;
785       stats.incremental_sequences_dropped +=
786           s->needs_incremental_state_skipped > 0 &&
787           s->needs_incremental_state_skipped ==
788               s->needs_incremental_state_total;
789     }
790   }
791 
792   for (auto it = stats_per_buffer.GetIterator(); it; ++it) {
793     auto& v = it.value();
794     storage->SetIndexedStats(stats::traced_buf_sequence_packet_loss, it.key(),
795                              v.packet_loss);
796     storage->SetIndexedStats(stats::traced_buf_incremental_sequences_dropped,
797                              it.key(), v.incremental_sequences_dropped);
798   }
799 }
800 
NotifyEndOfFile()801 base::Status ProtoTraceReader::NotifyEndOfFile() {
802   received_eof_ = true;
803   for (auto& packet : eof_deferred_packets_) {
804     RETURN_IF_ERROR(TimestampTokenizeAndPushToSorter(std::move(packet)));
805   }
806   return base::OkStatus();
807 }
808 
809 }  // namespace perfetto::trace_processor
810