• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/importers/proto/proto_trace_reader.h"
18 
19 #include <optional>
20 #include <string>
21 
22 #include "perfetto/base/build_config.h"
23 #include "perfetto/base/logging.h"
24 #include "perfetto/ext/base/flat_hash_map.h"
25 #include "perfetto/ext/base/string_view.h"
26 #include "perfetto/ext/base/utils.h"
27 #include "perfetto/protozero/proto_decoder.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "perfetto/public/compiler.h"
30 #include "perfetto/trace_processor/status.h"
31 #include "src/trace_processor/importers/common/clock_tracker.h"
32 #include "src/trace_processor/importers/common/event_tracker.h"
33 #include "src/trace_processor/importers/common/machine_tracker.h"
34 #include "src/trace_processor/importers/common/metadata_tracker.h"
35 #include "src/trace_processor/importers/common/track_tracker.h"
36 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
37 #include "src/trace_processor/importers/proto/packet_analyzer.h"
38 #include "src/trace_processor/sorter/trace_sorter.h"
39 #include "src/trace_processor/storage/stats.h"
40 #include "src/trace_processor/storage/trace_storage.h"
41 #include "src/trace_processor/util/descriptors.h"
42 #include "src/trace_processor/util/gzip_utils.h"
43 
44 #include "protos/perfetto/common/builtin_clock.pbzero.h"
45 #include "protos/perfetto/common/trace_stats.pbzero.h"
46 #include "protos/perfetto/config/trace_config.pbzero.h"
47 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
48 #include "protos/perfetto/trace/extension_descriptor.pbzero.h"
49 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
50 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
51 #include "protos/perfetto/trace/trace.pbzero.h"
52 #include "protos/perfetto/trace/trace_packet.pbzero.h"
53 
54 namespace perfetto {
55 namespace trace_processor {
56 
ProtoTraceReader(TraceProcessorContext * ctx)57 ProtoTraceReader::ProtoTraceReader(TraceProcessorContext* ctx)
58     : context_(ctx),
59       skipped_packet_key_id_(ctx->storage->InternString("skipped_packet")),
60       invalid_incremental_state_key_id_(
61           ctx->storage->InternString("invalid_incremental_state")) {}
62 ProtoTraceReader::~ProtoTraceReader() = default;
63 
Parse(TraceBlobView blob)64 util::Status ProtoTraceReader::Parse(TraceBlobView blob) {
65   return tokenizer_.Tokenize(std::move(blob), [this](TraceBlobView packet) {
66     return ParsePacket(std::move(packet));
67   });
68 }
69 
ParseExtensionDescriptor(ConstBytes descriptor)70 util::Status ProtoTraceReader::ParseExtensionDescriptor(ConstBytes descriptor) {
71   protos::pbzero::ExtensionDescriptor::Decoder decoder(descriptor.data,
72                                                        descriptor.size);
73 
74   auto extension = decoder.extension_set();
75   return context_->descriptor_pool_->AddFromFileDescriptorSet(
76       extension.data, extension.size,
77       /*skip_prefixes*/ {},
78       /*merge_existing_messages=*/true);
79 }
80 
ParsePacket(TraceBlobView packet)81 util::Status ProtoTraceReader::ParsePacket(TraceBlobView packet) {
82   protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
83   if (PERFETTO_UNLIKELY(decoder.bytes_left())) {
84     return util::ErrStatus(
85         "Failed to parse proto packet fully; the trace is probably corrupt.");
86   }
87 
88   // Any compressed packets should have been handled by the tokenizer.
89   PERFETTO_CHECK(!decoder.has_compressed_packets());
90 
91   // When the trace packet is emitted from a remote machine: parse the packet
92   // using a different ProtoTraceReader instance. The packet will be parsed
93   // in the context of the remote machine.
94   if (PERFETTO_UNLIKELY(decoder.has_machine_id())) {
95     if (!context_->machine_id()) {
96       // Default context: switch to another reader instance to parse the packet.
97       PERFETTO_DCHECK(context_->multi_machine_trace_manager);
98       auto* reader = context_->multi_machine_trace_manager->GetOrCreateReader(
99           decoder.machine_id());
100       return reader->ParsePacket(std::move(packet));
101     }
102   }
103   // Assert that the packet is parsed using the right instance of reader.
104   PERFETTO_DCHECK(decoder.has_machine_id() == !!context_->machine_id());
105 
106   const uint32_t seq_id = decoder.trusted_packet_sequence_id();
107   auto* state = GetIncrementalStateForPacketSequence(seq_id);
108 
109   if (decoder.first_packet_on_sequence()) {
110     HandleFirstPacketOnSequence(seq_id);
111   }
112 
113   uint32_t sequence_flags = decoder.sequence_flags();
114 
115   if (decoder.incremental_state_cleared() ||
116       sequence_flags &
117           protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
118     HandleIncrementalStateCleared(decoder);
119   } else if (decoder.previous_packet_dropped()) {
120     HandlePreviousPacketDropped(decoder);
121   }
122 
123   uint32_t sequence_id = decoder.trusted_packet_sequence_id();
124   if (sequence_id) {
125     auto [data_loss, inserted] =
126         packet_sequence_data_loss_.Insert(sequence_id, 0);
127 
128     if (!inserted && decoder.previous_packet_dropped()) {
129       *data_loss += 1;
130     }
131   }
132 
133   // It is important that we parse defaults before parsing other fields such as
134   // the timestamp, since the defaults could affect them.
135   if (decoder.has_trace_packet_defaults()) {
136     auto field = decoder.trace_packet_defaults();
137     ParseTracePacketDefaults(decoder, packet.slice(field.data, field.size));
138   }
139 
140   if (decoder.has_interned_data()) {
141     auto field = decoder.interned_data();
142     ParseInternedData(decoder, packet.slice(field.data, field.size));
143   }
144 
145   if (decoder.has_clock_snapshot()) {
146     return ParseClockSnapshot(decoder.clock_snapshot(), sequence_id);
147   }
148 
149   if (decoder.has_trace_stats()) {
150     ParseTraceStats(decoder.trace_stats());
151   }
152 
153   if (decoder.has_service_event()) {
154     PERFETTO_DCHECK(decoder.has_timestamp());
155     int64_t ts = static_cast<int64_t>(decoder.timestamp());
156     return ParseServiceEvent(ts, decoder.service_event());
157   }
158 
159   if (decoder.has_extension_descriptor()) {
160     return ParseExtensionDescriptor(decoder.extension_descriptor());
161   }
162 
163   if (decoder.sequence_flags() &
164       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
165     if (!seq_id) {
166       return util::ErrStatus(
167           "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
168           "TraceWriter's sequence_id is zero (the service is "
169           "probably too old)");
170     }
171 
172     if (!state->IsIncrementalStateValid()) {
173       if (context_->content_analyzer) {
174         // Account for the skipped packet for trace proto content analysis,
175         // with a special annotation.
176         PacketAnalyzer::SampleAnnotation annotation;
177         annotation.push_back(
178             {skipped_packet_key_id_, invalid_incremental_state_key_id_});
179         PacketAnalyzer::Get(context_)->ProcessPacket(packet, annotation);
180       }
181       context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
182       return util::OkStatus();
183     }
184   }
185 
186   protos::pbzero::TracePacketDefaults::Decoder* defaults =
187       state->current_generation()->GetTracePacketDefaults();
188 
189   int64_t timestamp;
190   if (decoder.has_timestamp()) {
191     timestamp = static_cast<int64_t>(decoder.timestamp());
192 
193     uint32_t timestamp_clock_id =
194         decoder.has_timestamp_clock_id()
195             ? decoder.timestamp_clock_id()
196             : (defaults ? defaults->timestamp_clock_id() : 0);
197 
198     if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
199         (!timestamp_clock_id ||
200          timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
201       // Chrome event timestamps are in MONOTONIC domain, but may occur in
202       // traces where (a) no clock snapshots exist or (b) no clock_id is
203       // specified for their timestamps. Adjust to trace time if we have a clock
204       // snapshot.
205       // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
206       // chrome and then remove this.
207       auto trace_ts = context_->clock_tracker->ToTraceTime(
208           protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
209       if (trace_ts.ok())
210         timestamp = trace_ts.value();
211     } else if (timestamp_clock_id) {
212       // If the TracePacket specifies a non-zero clock-id, translate the
213       // timestamp into the trace-time clock domain.
214       ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
215       if (ClockTracker::IsSequenceClock(converted_clock_id)) {
216         if (!seq_id) {
217           return util::ErrStatus(
218               "TracePacket specified a sequence-local clock id (%" PRIu32
219               ") but the TraceWriter's sequence_id is zero (the service is "
220               "probably too old)",
221               timestamp_clock_id);
222         }
223         converted_clock_id =
224             ClockTracker::SequenceToGlobalClock(seq_id, timestamp_clock_id);
225       }
226       auto trace_ts =
227           context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
228       if (!trace_ts.ok()) {
229         // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
230         // We don't return an error here as it will cause the trace to stop
231         // parsing. Instead, we rely on the stat increment in ToTraceTime() to
232         // inform the user about the error.
233         return util::OkStatus();
234       }
235       timestamp = trace_ts.value();
236     }
237   } else {
238     timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
239   }
240   latest_timestamp_ = std::max(timestamp, latest_timestamp_);
241 
242   if (context_->content_analyzer && !decoder.has_track_event()) {
243     PacketAnalyzer::Get(context_)->ProcessPacket(packet, {});
244   }
245 
246   auto& modules = context_->modules_by_field;
247   for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
248     if (!modules[field_id].empty() && decoder.Get(field_id).valid()) {
249       for (ProtoImporterModule* global_module :
250            context_->modules_for_all_fields) {
251         ModuleResult res = global_module->TokenizePacket(
252             decoder, &packet, timestamp, state->current_generation(), field_id);
253         if (!res.ignored())
254           return res.ToStatus();
255       }
256       for (ProtoImporterModule* module : modules[field_id]) {
257         ModuleResult res = module->TokenizePacket(
258             decoder, &packet, timestamp, state->current_generation(), field_id);
259         if (!res.ignored())
260           return res.ToStatus();
261       }
262     }
263   }
264 
265   if (decoder.has_trace_config()) {
266     ParseTraceConfig(decoder.trace_config());
267   }
268 
269   // Use parent data and length because we want to parse this again
270   // later to get the exact type of the packet.
271   context_->sorter->PushTracePacket(timestamp, state->current_generation(),
272                                     std::move(packet), context_->machine_id());
273 
274   return util::OkStatus();
275 }
276 
ParseTraceConfig(protozero::ConstBytes blob)277 void ProtoTraceReader::ParseTraceConfig(protozero::ConstBytes blob) {
278   protos::pbzero::TraceConfig::Decoder trace_config(blob);
279   if (trace_config.write_into_file() && !trace_config.flush_period_ms()) {
280     PERFETTO_ELOG(
281         "It is strongly recommended to have flush_period_ms set when "
282         "write_into_file is turned on. This trace will be loaded fully "
283         "into memory before sorting which increases the likelihood of "
284         "OOMs.");
285   }
286 }
287 
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)288 void ProtoTraceReader::HandleIncrementalStateCleared(
289     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
290   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
291     PERFETTO_ELOG(
292         "incremental_state_cleared without trusted_packet_sequence_id");
293     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
294     return;
295   }
296   GetIncrementalStateForPacketSequence(
297       packet_decoder.trusted_packet_sequence_id())
298       ->OnIncrementalStateCleared();
299   for (auto& module : context_->modules) {
300     module->OnIncrementalStateCleared(
301         packet_decoder.trusted_packet_sequence_id());
302   }
303 }
304 
HandleFirstPacketOnSequence(uint32_t packet_sequence_id)305 void ProtoTraceReader::HandleFirstPacketOnSequence(
306     uint32_t packet_sequence_id) {
307   for (auto& module : context_->modules) {
308     module->OnFirstPacketOnSequence(packet_sequence_id);
309   }
310 }
311 
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)312 void ProtoTraceReader::HandlePreviousPacketDropped(
313     const protos::pbzero::TracePacket::Decoder& packet_decoder) {
314   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
315     PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
316     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
317     return;
318   }
319   GetIncrementalStateForPacketSequence(
320       packet_decoder.trusted_packet_sequence_id())
321       ->OnPacketLoss();
322 }
323 
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)324 void ProtoTraceReader::ParseTracePacketDefaults(
325     const protos::pbzero::TracePacket_Decoder& packet_decoder,
326     TraceBlobView trace_packet_defaults) {
327   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
328     PERFETTO_ELOG(
329         "TracePacketDefaults packet without trusted_packet_sequence_id");
330     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
331     return;
332   }
333 
334   auto* state = GetIncrementalStateForPacketSequence(
335       packet_decoder.trusted_packet_sequence_id());
336   state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
337 }
338 
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)339 void ProtoTraceReader::ParseInternedData(
340     const protos::pbzero::TracePacket::Decoder& packet_decoder,
341     TraceBlobView interned_data) {
342   if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
343     PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
344     context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
345     return;
346   }
347 
348   auto* state = GetIncrementalStateForPacketSequence(
349       packet_decoder.trusted_packet_sequence_id());
350 
351   // Don't parse interned data entries until incremental state is valid, because
352   // they could otherwise be associated with the wrong generation in the state.
353   if (!state->IsIncrementalStateValid()) {
354     context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
355     return;
356   }
357 
358   // Store references to interned data submessages into the sequence's state.
359   protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
360   for (protozero::Field f = decoder.ReadField(); f.valid();
361        f = decoder.ReadField()) {
362     auto bytes = f.as_bytes();
363     state->InternMessage(f.id(), interned_data.slice(bytes.data, bytes.size));
364   }
365 }
366 
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)367 util::Status ProtoTraceReader::ParseClockSnapshot(ConstBytes blob,
368                                                   uint32_t seq_id) {
369   std::vector<ClockTracker::ClockTimestamp> clock_timestamps;
370   protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
371   if (evt.primary_trace_clock()) {
372     context_->clock_tracker->SetTraceTimeClock(
373         static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
374   }
375   for (auto it = evt.clocks(); it; ++it) {
376     protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
377     ClockTracker::ClockId clock_id = clk.clock_id();
378     if (ClockTracker::IsSequenceClock(clk.clock_id())) {
379       if (!seq_id) {
380         return util::ErrStatus(
381             "ClockSnapshot packet is specifying a sequence-scoped clock id "
382             "(%" PRIu64 ") but the TracePacket sequence_id is zero",
383             clock_id);
384       }
385       clock_id = ClockTracker::SequenceToGlobalClock(seq_id, clk.clock_id());
386     }
387     int64_t unit_multiplier_ns =
388         clk.unit_multiplier_ns()
389             ? static_cast<int64_t>(clk.unit_multiplier_ns())
390             : 1;
391     clock_timestamps.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
392                                   clk.is_incremental());
393   }
394 
395   base::StatusOr<uint32_t> snapshot_id =
396       context_->clock_tracker->AddSnapshot(clock_timestamps);
397   if (!snapshot_id.ok()) {
398     PERFETTO_ELOG("%s", snapshot_id.status().c_message());
399     return base::OkStatus();
400   }
401 
402   std::optional<int64_t> trace_time_from_snapshot =
403       context_->clock_tracker->ToTraceTimeFromSnapshot(clock_timestamps);
404 
405   // Add the all the clock snapshots to the clock snapshot table.
406   std::optional<int64_t> trace_ts_for_check;
407   for (const auto& clock_timestamp : clock_timestamps) {
408     // If the clock is incremental, we need to use 0 to map correctly to
409     // |absolute_timestamp|.
410     int64_t ts_to_convert =
411         clock_timestamp.clock.is_incremental ? 0 : clock_timestamp.timestamp;
412     // Even if we have trace time from snapshot, we still run ToTraceTime to
413     // optimise future conversions.
414     base::StatusOr<int64_t> opt_trace_ts = context_->clock_tracker->ToTraceTime(
415         clock_timestamp.clock.id, ts_to_convert);
416 
417     if (!opt_trace_ts.ok()) {
418       // This can happen if |AddSnapshot| failed to resolve this clock, e.g. if
419       // clock is not monotonic. Try to fetch trace time from snapshot.
420       if (!trace_time_from_snapshot) {
421         PERFETTO_DLOG("%s", opt_trace_ts.status().c_message());
422         continue;
423       }
424       opt_trace_ts = *trace_time_from_snapshot;
425     }
426 
427     // Double check that all the clocks in this snapshot resolve to the same
428     // trace timestamp value.
429     PERFETTO_DCHECK(!trace_ts_for_check ||
430                     opt_trace_ts.value() == trace_ts_for_check.value());
431     trace_ts_for_check = *opt_trace_ts;
432 
433     tables::ClockSnapshotTable::Row row;
434     row.ts = *opt_trace_ts;
435     row.clock_id = static_cast<int64_t>(clock_timestamp.clock.id);
436     row.clock_value =
437         clock_timestamp.timestamp * clock_timestamp.clock.unit_multiplier_ns;
438     row.clock_name = GetBuiltinClockNameOrNull(clock_timestamp.clock.id);
439     row.snapshot_id = *snapshot_id;
440     row.machine_id = context_->machine_id();
441 
442     context_->storage->mutable_clock_snapshot_table()->Insert(row);
443   }
444   return util::OkStatus();
445 }
446 
GetBuiltinClockNameOrNull(int64_t clock_id)447 std::optional<StringId> ProtoTraceReader::GetBuiltinClockNameOrNull(
448     int64_t clock_id) {
449   switch (clock_id) {
450     case protos::pbzero::ClockSnapshot::Clock::REALTIME:
451       return context_->storage->InternString("REALTIME");
452     case protos::pbzero::ClockSnapshot::Clock::REALTIME_COARSE:
453       return context_->storage->InternString("REALTIME_COARSE");
454     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC:
455       return context_->storage->InternString("MONOTONIC");
456     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_COARSE:
457       return context_->storage->InternString("MONOTONIC_COARSE");
458     case protos::pbzero::ClockSnapshot::Clock::MONOTONIC_RAW:
459       return context_->storage->InternString("MONOTONIC_RAW");
460     case protos::pbzero::ClockSnapshot::Clock::BOOTTIME:
461       return context_->storage->InternString("BOOTTIME");
462     default:
463       return std::nullopt;
464   }
465 }
466 
ParseServiceEvent(int64_t ts,ConstBytes blob)467 util::Status ProtoTraceReader::ParseServiceEvent(int64_t ts, ConstBytes blob) {
468   protos::pbzero::TracingServiceEvent::Decoder tse(blob);
469   if (tse.tracing_started()) {
470     context_->metadata_tracker->SetMetadata(metadata::tracing_started_ns,
471                                             Variadic::Integer(ts));
472   }
473   if (tse.tracing_disabled()) {
474     context_->metadata_tracker->SetMetadata(metadata::tracing_disabled_ns,
475                                             Variadic::Integer(ts));
476   }
477   if (tse.all_data_sources_started()) {
478     context_->metadata_tracker->SetMetadata(
479         metadata::all_data_source_started_ns, Variadic::Integer(ts));
480   }
481   if (tse.all_data_sources_flushed()) {
482     context_->metadata_tracker->AppendMetadata(
483         metadata::all_data_source_flushed_ns, Variadic::Integer(ts));
484     context_->sorter->NotifyFlushEvent();
485   }
486   if (tse.read_tracing_buffers_completed()) {
487     context_->sorter->NotifyReadBufferEvent();
488   }
489   return util::OkStatus();
490 }
491 
ParseTraceStats(ConstBytes blob)492 void ProtoTraceReader::ParseTraceStats(ConstBytes blob) {
493   protos::pbzero::TraceStats::Decoder evt(blob.data, blob.size);
494   auto* storage = context_->storage.get();
495   storage->SetStats(stats::traced_producers_connected,
496                     static_cast<int64_t>(evt.producers_connected()));
497   storage->SetStats(stats::traced_producers_seen,
498                     static_cast<int64_t>(evt.producers_seen()));
499   storage->SetStats(stats::traced_data_sources_registered,
500                     static_cast<int64_t>(evt.data_sources_registered()));
501   storage->SetStats(stats::traced_data_sources_seen,
502                     static_cast<int64_t>(evt.data_sources_seen()));
503   storage->SetStats(stats::traced_tracing_sessions,
504                     static_cast<int64_t>(evt.tracing_sessions()));
505   storage->SetStats(stats::traced_total_buffers,
506                     static_cast<int64_t>(evt.total_buffers()));
507   storage->SetStats(stats::traced_chunks_discarded,
508                     static_cast<int64_t>(evt.chunks_discarded()));
509   storage->SetStats(stats::traced_patches_discarded,
510                     static_cast<int64_t>(evt.patches_discarded()));
511   storage->SetStats(stats::traced_flushes_requested,
512                     static_cast<int64_t>(evt.flushes_requested()));
513   storage->SetStats(stats::traced_flushes_succeeded,
514                     static_cast<int64_t>(evt.flushes_succeeded()));
515   storage->SetStats(stats::traced_flushes_failed,
516                     static_cast<int64_t>(evt.flushes_failed()));
517 
518   if (evt.has_filter_stats()) {
519     protos::pbzero::TraceStats::FilterStats::Decoder fstat(evt.filter_stats());
520     storage->SetStats(stats::filter_errors,
521                       static_cast<int64_t>(fstat.errors()));
522     storage->SetStats(stats::filter_input_bytes,
523                       static_cast<int64_t>(fstat.input_bytes()));
524     storage->SetStats(stats::filter_input_packets,
525                       static_cast<int64_t>(fstat.input_packets()));
526     storage->SetStats(stats::filter_output_bytes,
527                       static_cast<int64_t>(fstat.output_bytes()));
528     storage->SetStats(stats::filter_time_taken_ns,
529                       static_cast<int64_t>(fstat.time_taken_ns()));
530     for (auto [i, it] = std::tuple{0, fstat.bytes_discarded_per_buffer()}; it;
531          ++it, ++i) {
532       storage->SetIndexedStats(stats::traced_buf_bytes_filtered_out, i,
533                                static_cast<int64_t>(*it));
534     }
535   }
536 
537   switch (evt.final_flush_outcome()) {
538     case protos::pbzero::TraceStats::FINAL_FLUSH_SUCCEEDED:
539       storage->IncrementStats(stats::traced_final_flush_succeeded, 1);
540       break;
541     case protos::pbzero::TraceStats::FINAL_FLUSH_FAILED:
542       storage->IncrementStats(stats::traced_final_flush_failed, 1);
543       break;
544     case protos::pbzero::TraceStats::FINAL_FLUSH_UNSPECIFIED:
545       break;
546   }
547 
548   int buf_num = 0;
549   for (auto it = evt.buffer_stats(); it; ++it, ++buf_num) {
550     protos::pbzero::TraceStats::BufferStats::Decoder buf(*it);
551     storage->SetIndexedStats(stats::traced_buf_buffer_size, buf_num,
552                              static_cast<int64_t>(buf.buffer_size()));
553     storage->SetIndexedStats(stats::traced_buf_bytes_written, buf_num,
554                              static_cast<int64_t>(buf.bytes_written()));
555     storage->SetIndexedStats(stats::traced_buf_bytes_overwritten, buf_num,
556                              static_cast<int64_t>(buf.bytes_overwritten()));
557     storage->SetIndexedStats(stats::traced_buf_bytes_read, buf_num,
558                              static_cast<int64_t>(buf.bytes_read()));
559     storage->SetIndexedStats(stats::traced_buf_padding_bytes_written, buf_num,
560                              static_cast<int64_t>(buf.padding_bytes_written()));
561     storage->SetIndexedStats(stats::traced_buf_padding_bytes_cleared, buf_num,
562                              static_cast<int64_t>(buf.padding_bytes_cleared()));
563     storage->SetIndexedStats(stats::traced_buf_chunks_written, buf_num,
564                              static_cast<int64_t>(buf.chunks_written()));
565     storage->SetIndexedStats(stats::traced_buf_chunks_rewritten, buf_num,
566                              static_cast<int64_t>(buf.chunks_rewritten()));
567     storage->SetIndexedStats(stats::traced_buf_chunks_overwritten, buf_num,
568                              static_cast<int64_t>(buf.chunks_overwritten()));
569     storage->SetIndexedStats(stats::traced_buf_chunks_discarded, buf_num,
570                              static_cast<int64_t>(buf.chunks_discarded()));
571     storage->SetIndexedStats(stats::traced_buf_chunks_read, buf_num,
572                              static_cast<int64_t>(buf.chunks_read()));
573     storage->SetIndexedStats(
574         stats::traced_buf_chunks_committed_out_of_order, buf_num,
575         static_cast<int64_t>(buf.chunks_committed_out_of_order()));
576     storage->SetIndexedStats(stats::traced_buf_write_wrap_count, buf_num,
577                              static_cast<int64_t>(buf.write_wrap_count()));
578     storage->SetIndexedStats(stats::traced_buf_patches_succeeded, buf_num,
579                              static_cast<int64_t>(buf.patches_succeeded()));
580     storage->SetIndexedStats(stats::traced_buf_patches_failed, buf_num,
581                              static_cast<int64_t>(buf.patches_failed()));
582     storage->SetIndexedStats(stats::traced_buf_readaheads_succeeded, buf_num,
583                              static_cast<int64_t>(buf.readaheads_succeeded()));
584     storage->SetIndexedStats(stats::traced_buf_readaheads_failed, buf_num,
585                              static_cast<int64_t>(buf.readaheads_failed()));
586     storage->SetIndexedStats(stats::traced_buf_abi_violations, buf_num,
587                              static_cast<int64_t>(buf.abi_violations()));
588     storage->SetIndexedStats(
589         stats::traced_buf_trace_writer_packet_loss, buf_num,
590         static_cast<int64_t>(buf.trace_writer_packet_loss()));
591   }
592 
593   base::FlatHashMap<int32_t, int64_t> data_loss_per_buffer;
594 
595   for (auto it = evt.writer_stats(); it; ++it) {
596     protos::pbzero::TraceStats::WriterStats::Decoder writer(*it);
597     auto* data_loss = packet_sequence_data_loss_.Find(
598         static_cast<uint32_t>(writer.sequence_id()));
599     if (data_loss) {
600       data_loss_per_buffer[static_cast<int32_t>(writer.buffer())] +=
601           static_cast<int64_t>(*data_loss);
602     }
603   }
604 
605   for (auto it = data_loss_per_buffer.GetIterator(); it; ++it) {
606     storage->SetIndexedStats(stats::traced_buf_sequence_packet_loss, it.key(),
607                              it.value());
608   }
609 }
610 
NotifyEndOfFile()611 void ProtoTraceReader::NotifyEndOfFile() {}
612 
613 }  // namespace trace_processor
614 }  // namespace perfetto
615