1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/importers/proto/proto_trace_tokenizer.h"
18
19 #include <string>
20
21 #include "perfetto/base/build_config.h"
22 #include "perfetto/base/logging.h"
23 #include "perfetto/ext/base/optional.h"
24 #include "perfetto/ext/base/string_view.h"
25 #include "perfetto/ext/base/utils.h"
26 #include "perfetto/protozero/proto_decoder.h"
27 #include "perfetto/protozero/proto_utils.h"
28 #include "perfetto/trace_processor/status.h"
29 #include "src/trace_processor/importers/common/clock_tracker.h"
30 #include "src/trace_processor/importers/common/event_tracker.h"
31 #include "src/trace_processor/importers/common/track_tracker.h"
32 #include "src/trace_processor/importers/ftrace/ftrace_module.h"
33 #include "src/trace_processor/importers/gzip/gzip_utils.h"
34 #include "src/trace_processor/importers/proto/packet_sequence_state.h"
35 #include "src/trace_processor/importers/proto/proto_incremental_state.h"
36 #include "src/trace_processor/storage/stats.h"
37 #include "src/trace_processor/storage/trace_storage.h"
38 #include "src/trace_processor/trace_sorter.h"
39
40 #include "protos/perfetto/common/builtin_clock.pbzero.h"
41 #include "protos/perfetto/config/trace_config.pbzero.h"
42 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
43 #include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
44 #include "protos/perfetto/trace/trace.pbzero.h"
45 #include "protos/perfetto/trace/trace_packet.pbzero.h"
46
47 namespace perfetto {
48 namespace trace_processor {
49
50 using protozero::proto_utils::MakeTagLengthDelimited;
51 using protozero::proto_utils::ParseVarInt;
52
53 namespace {
54
55 constexpr uint8_t kTracePacketTag =
56 MakeTagLengthDelimited(protos::pbzero::Trace::kPacketFieldNumber);
57
Decompress(GzipDecompressor * decompressor,TraceBlobView input)58 TraceBlobView Decompress(GzipDecompressor* decompressor, TraceBlobView input) {
59 PERFETTO_DCHECK(gzip::IsGzipSupported());
60
61 uint8_t out[4096];
62
63 std::vector<uint8_t> data;
64 data.reserve(input.length());
65
66 // Ensure that the decompressor is able to cope with a new stream of data.
67 decompressor->Reset();
68 decompressor->SetInput(input.data(), input.length());
69
70 using ResultCode = GzipDecompressor::ResultCode;
71 for (auto ret = ResultCode::kOk; ret != ResultCode::kEof;) {
72 auto res = decompressor->Decompress(out, base::ArraySize(out));
73 ret = res.ret;
74 if (ret == ResultCode::kError || ret == ResultCode::kNoProgress ||
75 ret == ResultCode::kNeedsMoreInput)
76 return TraceBlobView(nullptr, 0, 0);
77
78 data.insert(data.end(), out, out + res.bytes_written);
79 }
80
81 std::unique_ptr<uint8_t[]> output(new uint8_t[data.size()]);
82 memcpy(output.get(), data.data(), data.size());
83 return TraceBlobView(std::move(output), 0, data.size());
84 }
85
86 } // namespace
87
ProtoTraceTokenizer(TraceProcessorContext * ctx)88 ProtoTraceTokenizer::ProtoTraceTokenizer(TraceProcessorContext* ctx)
89 : context_(ctx) {}
90 ProtoTraceTokenizer::~ProtoTraceTokenizer() = default;
91
Parse(std::unique_ptr<uint8_t[]> owned_buf,size_t size)92 util::Status ProtoTraceTokenizer::Parse(std::unique_ptr<uint8_t[]> owned_buf,
93 size_t size) {
94 uint8_t* data = &owned_buf[0];
95 if (!partial_buf_.empty()) {
96 // It takes ~5 bytes for a proto preamble + the varint size.
97 const size_t kHeaderBytes = 5;
98 if (PERFETTO_UNLIKELY(partial_buf_.size() < kHeaderBytes)) {
99 size_t missing_len = std::min(kHeaderBytes - partial_buf_.size(), size);
100 partial_buf_.insert(partial_buf_.end(), &data[0], &data[missing_len]);
101 if (partial_buf_.size() < kHeaderBytes)
102 return util::OkStatus();
103 data += missing_len;
104 size -= missing_len;
105 }
106
107 // At this point we have enough data in |partial_buf_| to read at least the
108 // field header and know the size of the next TracePacket.
109 const uint8_t* pos = &partial_buf_[0];
110 uint8_t proto_field_tag = *pos;
111 uint64_t field_size = 0;
112 const uint8_t* next = ParseVarInt(++pos, &*partial_buf_.end(), &field_size);
113 bool parse_failed = next == pos;
114 pos = next;
115 if (proto_field_tag != kTracePacketTag || field_size == 0 || parse_failed) {
116 return util::ErrStatus(
117 "Failed parsing a TracePacket from the partial buffer");
118 }
119
120 // At this point we know how big the TracePacket is.
121 size_t hdr_size = static_cast<size_t>(pos - &partial_buf_[0]);
122 size_t size_incl_header = static_cast<size_t>(field_size + hdr_size);
123 PERFETTO_DCHECK(size_incl_header > partial_buf_.size());
124
125 // There is a good chance that between the |partial_buf_| and the new |data|
126 // of the current call we have enough bytes to parse a TracePacket.
127 if (partial_buf_.size() + size >= size_incl_header) {
128 // Create a new buffer for the whole TracePacket and copy into that:
129 // 1) The beginning of the TracePacket (including the proto header) from
130 // the partial buffer.
131 // 2) The rest of the TracePacket from the current |data| buffer (note
132 // that we might have consumed already a few bytes form |data| earlier
133 // in this function, hence we need to keep |off| into account).
134 std::unique_ptr<uint8_t[]> buf(new uint8_t[size_incl_header]);
135 memcpy(&buf[0], partial_buf_.data(), partial_buf_.size());
136 // |size_missing| is the number of bytes for the rest of the TracePacket
137 // in |data|.
138 size_t size_missing = size_incl_header - partial_buf_.size();
139 memcpy(&buf[partial_buf_.size()], &data[0], size_missing);
140 data += size_missing;
141 size -= size_missing;
142 partial_buf_.clear();
143 uint8_t* buf_start = &buf[0]; // Note that buf is std::moved below.
144 util::Status status =
145 ParseInternal(std::move(buf), buf_start, size_incl_header);
146 if (PERFETTO_UNLIKELY(!status.ok()))
147 return status;
148 } else {
149 partial_buf_.insert(partial_buf_.end(), data, &data[size]);
150 return util::OkStatus();
151 }
152 }
153 return ParseInternal(std::move(owned_buf), data, size);
154 }
155
ParseInternal(std::unique_ptr<uint8_t[]> owned_buf,uint8_t * data,size_t size)156 util::Status ProtoTraceTokenizer::ParseInternal(
157 std::unique_ptr<uint8_t[]> owned_buf,
158 uint8_t* data,
159 size_t size) {
160 PERFETTO_DCHECK(data >= &owned_buf[0]);
161 const uint8_t* start = &owned_buf[0];
162 const size_t data_off = static_cast<size_t>(data - start);
163 TraceBlobView whole_buf(std::move(owned_buf), data_off, size);
164
165 protos::pbzero::Trace::Decoder decoder(data, size);
166 for (auto it = decoder.packet(); it; ++it) {
167 protozero::ConstBytes packet = *it;
168 size_t field_offset = whole_buf.offset_of(packet.data);
169 util::Status status =
170 ParsePacket(whole_buf.slice(field_offset, packet.size));
171 if (PERFETTO_UNLIKELY(!status.ok()))
172 return status;
173 }
174
175 const size_t bytes_left = decoder.bytes_left();
176 if (bytes_left > 0) {
177 PERFETTO_DCHECK(partial_buf_.empty());
178 partial_buf_.insert(partial_buf_.end(), &data[decoder.read_offset()],
179 &data[decoder.read_offset() + bytes_left]);
180 }
181 return util::OkStatus();
182 }
183
ParsePacket(TraceBlobView packet)184 util::Status ProtoTraceTokenizer::ParsePacket(TraceBlobView packet) {
185 protos::pbzero::TracePacket::Decoder decoder(packet.data(), packet.length());
186 if (PERFETTO_UNLIKELY(decoder.bytes_left()))
187 return util::ErrStatus(
188 "Failed to parse proto packet fully; the trace is probably corrupt.");
189
190 const uint32_t seq_id = decoder.trusted_packet_sequence_id();
191 auto* state = GetIncrementalStateForPacketSequence(seq_id);
192
193 uint32_t sequence_flags = decoder.sequence_flags();
194
195 if (decoder.incremental_state_cleared() ||
196 sequence_flags &
197 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED) {
198 HandleIncrementalStateCleared(decoder);
199 } else if (decoder.previous_packet_dropped()) {
200 HandlePreviousPacketDropped(decoder);
201 }
202
203 // It is important that we parse defaults before parsing other fields such as
204 // the timestamp, since the defaults could affect them.
205 if (decoder.has_trace_packet_defaults()) {
206 auto field = decoder.trace_packet_defaults();
207 const size_t offset = packet.offset_of(field.data);
208 ParseTracePacketDefaults(decoder, packet.slice(offset, field.size));
209 }
210
211 if (decoder.has_interned_data()) {
212 auto field = decoder.interned_data();
213 const size_t offset = packet.offset_of(field.data);
214 ParseInternedData(decoder, packet.slice(offset, field.size));
215 }
216
217 if (decoder.has_clock_snapshot()) {
218 return ParseClockSnapshot(decoder.clock_snapshot(),
219 decoder.trusted_packet_sequence_id());
220 }
221
222 if (decoder.sequence_flags() &
223 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE) {
224 if (!seq_id) {
225 return util::ErrStatus(
226 "TracePacket specified SEQ_NEEDS_INCREMENTAL_STATE but the "
227 "TraceWriter's sequence_id is zero (the service is "
228 "probably too old)");
229 }
230
231 if (!state->IsIncrementalStateValid()) {
232 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
233 return util::OkStatus();
234 }
235 }
236
237 protos::pbzero::TracePacketDefaults::Decoder* defaults =
238 state->current_generation()->GetTracePacketDefaults();
239
240 int64_t timestamp;
241 if (decoder.has_timestamp()) {
242 timestamp = static_cast<int64_t>(decoder.timestamp());
243
244 uint32_t timestamp_clock_id =
245 decoder.has_timestamp_clock_id()
246 ? decoder.timestamp_clock_id()
247 : (defaults ? defaults->timestamp_clock_id() : 0);
248
249 if ((decoder.has_chrome_events() || decoder.has_chrome_metadata()) &&
250 (!timestamp_clock_id ||
251 timestamp_clock_id == protos::pbzero::BUILTIN_CLOCK_MONOTONIC)) {
252 // Chrome event timestamps are in MONOTONIC domain, but may occur in
253 // traces where (a) no clock snapshots exist or (b) no clock_id is
254 // specified for their timestamps. Adjust to trace time if we have a clock
255 // snapshot.
256 // TODO(eseckler): Set timestamp_clock_id and emit ClockSnapshots in
257 // chrome and then remove this.
258 auto trace_ts = context_->clock_tracker->ToTraceTime(
259 protos::pbzero::BUILTIN_CLOCK_MONOTONIC, timestamp);
260 if (trace_ts.has_value())
261 timestamp = trace_ts.value();
262 } else if (timestamp_clock_id) {
263 // If the TracePacket specifies a non-zero clock-id, translate the
264 // timestamp into the trace-time clock domain.
265 ClockTracker::ClockId converted_clock_id = timestamp_clock_id;
266 bool is_seq_scoped =
267 ClockTracker::IsReservedSeqScopedClockId(converted_clock_id);
268 if (is_seq_scoped) {
269 if (!seq_id) {
270 return util::ErrStatus(
271 "TracePacket specified a sequence-local clock id (%" PRIu32
272 ") but the TraceWriter's sequence_id is zero (the service is "
273 "probably too old)",
274 timestamp_clock_id);
275 }
276 converted_clock_id =
277 ClockTracker::SeqScopedClockIdToGlobal(seq_id, timestamp_clock_id);
278 }
279 auto trace_ts =
280 context_->clock_tracker->ToTraceTime(converted_clock_id, timestamp);
281 if (!trace_ts.has_value()) {
282 // ToTraceTime() will increase the |clock_sync_failure| stat on failure.
283 static const char seq_extra_err[] =
284 " Because the clock id is sequence-scoped, the ClockSnapshot must "
285 "be emitted on the same TraceWriter sequence of the packet that "
286 "refers to that clock id.";
287 return util::ErrStatus(
288 "Failed to convert TracePacket's timestamp from clock_id=%" PRIu32
289 " seq_id=%" PRIu32
290 ". This is usually due to the lack of a prior ClockSnapshot "
291 "proto.%s",
292 timestamp_clock_id, seq_id, is_seq_scoped ? seq_extra_err : "");
293 }
294 timestamp = trace_ts.value();
295 }
296 } else {
297 timestamp = std::max(latest_timestamp_, context_->sorter->max_timestamp());
298 }
299 latest_timestamp_ = std::max(timestamp, latest_timestamp_);
300
301 auto& modules = context_->modules_by_field;
302 for (uint32_t field_id = 1; field_id < modules.size(); ++field_id) {
303 if (modules[field_id] && decoder.Get(field_id).valid()) {
304 ModuleResult res = modules[field_id]->TokenizePacket(
305 decoder, &packet, timestamp, state, field_id);
306 if (!res.ignored())
307 return res.ToStatus();
308 }
309 }
310
311 if (decoder.has_compressed_packets()) {
312 if (!gzip::IsGzipSupported())
313 return util::Status("Cannot decode compressed packets. Zlib not enabled");
314
315 protozero::ConstBytes field = decoder.compressed_packets();
316 const size_t field_off = packet.offset_of(field.data);
317 TraceBlobView compressed_packets = packet.slice(field_off, field.size);
318 TraceBlobView packets =
319 Decompress(&decompressor_, std::move(compressed_packets));
320
321 const uint8_t* start = packets.data();
322 const uint8_t* end = packets.data() + packets.length();
323 const uint8_t* ptr = start;
324 while ((end - ptr) > 2) {
325 const uint8_t* packet_start = ptr;
326 if (PERFETTO_UNLIKELY(*ptr != kTracePacketTag))
327 return util::ErrStatus("Expected TracePacket tag");
328 uint64_t packet_size = 0;
329 ptr = ParseVarInt(++ptr, end, &packet_size);
330 size_t packet_offset = static_cast<size_t>(ptr - start);
331 ptr += packet_size;
332 if (PERFETTO_UNLIKELY((ptr - packet_start) < 2 || ptr > end))
333 return util::ErrStatus("Invalid packet size");
334 util::Status status = ParsePacket(
335 packets.slice(packet_offset, static_cast<size_t>(packet_size)));
336 if (PERFETTO_UNLIKELY(!status.ok()))
337 return status;
338 }
339 return util::OkStatus();
340 }
341
342 // If we're not forcing a full sort and this is a write_into_file trace, then
343 // use flush_period_ms as an indiciator for how big the sliding window for the
344 // sorter should be.
345 if (!context_->config.force_full_sort && decoder.has_trace_config()) {
346 auto config = decoder.trace_config();
347 protos::pbzero::TraceConfig::Decoder trace_config(config.data, config.size);
348
349 if (trace_config.write_into_file()) {
350 int64_t window_size_ns;
351 if (trace_config.has_flush_period_ms() &&
352 trace_config.flush_period_ms() > 0) {
353 // We use 2x the flush period as a margin of error to allow for any
354 // late flush responses to still be sorted correctly.
355 window_size_ns = static_cast<int64_t>(trace_config.flush_period_ms()) *
356 2 * 1000 * 1000;
357 } else {
358 constexpr uint64_t kDefaultWindowNs =
359 180 * 1000 * 1000 * 1000ULL; // 3 minutes.
360 PERFETTO_ELOG(
361 "It is strongly recommended to have flush_period_ms set when "
362 "write_into_file is turned on. You will likely have many dropped "
363 "events because of inability to sort the events correctly.");
364 window_size_ns = static_cast<int64_t>(kDefaultWindowNs);
365 }
366 context_->sorter->SetWindowSizeNs(window_size_ns);
367 }
368 }
369
370 // Use parent data and length because we want to parse this again
371 // later to get the exact type of the packet.
372 context_->sorter->PushTracePacket(timestamp, state, std::move(packet));
373
374 return util::OkStatus();
375 }
376
HandleIncrementalStateCleared(const protos::pbzero::TracePacket::Decoder & packet_decoder)377 void ProtoTraceTokenizer::HandleIncrementalStateCleared(
378 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
379 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
380 PERFETTO_ELOG(
381 "incremental_state_cleared without trusted_packet_sequence_id");
382 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
383 return;
384 }
385 GetIncrementalStateForPacketSequence(
386 packet_decoder.trusted_packet_sequence_id())
387 ->OnIncrementalStateCleared();
388 context_->track_tracker->OnIncrementalStateCleared(
389 packet_decoder.trusted_packet_sequence_id());
390 }
391
HandlePreviousPacketDropped(const protos::pbzero::TracePacket::Decoder & packet_decoder)392 void ProtoTraceTokenizer::HandlePreviousPacketDropped(
393 const protos::pbzero::TracePacket::Decoder& packet_decoder) {
394 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
395 PERFETTO_ELOG("previous_packet_dropped without trusted_packet_sequence_id");
396 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
397 return;
398 }
399 GetIncrementalStateForPacketSequence(
400 packet_decoder.trusted_packet_sequence_id())
401 ->OnPacketLoss();
402 }
403
ParseTracePacketDefaults(const protos::pbzero::TracePacket_Decoder & packet_decoder,TraceBlobView trace_packet_defaults)404 void ProtoTraceTokenizer::ParseTracePacketDefaults(
405 const protos::pbzero::TracePacket_Decoder& packet_decoder,
406 TraceBlobView trace_packet_defaults) {
407 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
408 PERFETTO_ELOG(
409 "TracePacketDefaults packet without trusted_packet_sequence_id");
410 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
411 return;
412 }
413
414 auto* state = GetIncrementalStateForPacketSequence(
415 packet_decoder.trusted_packet_sequence_id());
416 state->UpdateTracePacketDefaults(std::move(trace_packet_defaults));
417 }
418
ParseInternedData(const protos::pbzero::TracePacket::Decoder & packet_decoder,TraceBlobView interned_data)419 void ProtoTraceTokenizer::ParseInternedData(
420 const protos::pbzero::TracePacket::Decoder& packet_decoder,
421 TraceBlobView interned_data) {
422 if (PERFETTO_UNLIKELY(!packet_decoder.has_trusted_packet_sequence_id())) {
423 PERFETTO_ELOG("InternedData packet without trusted_packet_sequence_id");
424 context_->storage->IncrementStats(stats::interned_data_tokenizer_errors);
425 return;
426 }
427
428 auto* state = GetIncrementalStateForPacketSequence(
429 packet_decoder.trusted_packet_sequence_id());
430
431 // Don't parse interned data entries until incremental state is valid, because
432 // they could otherwise be associated with the wrong generation in the state.
433 if (!state->IsIncrementalStateValid()) {
434 context_->storage->IncrementStats(stats::tokenizer_skipped_packets);
435 return;
436 }
437
438 // Store references to interned data submessages into the sequence's state.
439 protozero::ProtoDecoder decoder(interned_data.data(), interned_data.length());
440 for (protozero::Field f = decoder.ReadField(); f.valid();
441 f = decoder.ReadField()) {
442 auto bytes = f.as_bytes();
443 auto offset = interned_data.offset_of(bytes.data);
444 state->InternMessage(f.id(), interned_data.slice(offset, bytes.size));
445 }
446 }
447
ParseClockSnapshot(ConstBytes blob,uint32_t seq_id)448 util::Status ProtoTraceTokenizer::ParseClockSnapshot(ConstBytes blob,
449 uint32_t seq_id) {
450 std::vector<ClockTracker::ClockValue> clocks;
451 protos::pbzero::ClockSnapshot::Decoder evt(blob.data, blob.size);
452 if (evt.primary_trace_clock()) {
453 context_->clock_tracker->SetTraceTimeClock(
454 static_cast<ClockTracker::ClockId>(evt.primary_trace_clock()));
455 }
456 for (auto it = evt.clocks(); it; ++it) {
457 protos::pbzero::ClockSnapshot::Clock::Decoder clk(*it);
458 ClockTracker::ClockId clock_id = clk.clock_id();
459 if (ClockTracker::IsReservedSeqScopedClockId(clk.clock_id())) {
460 if (!seq_id) {
461 return util::ErrStatus(
462 "ClockSnapshot packet is specifying a sequence-scoped clock id "
463 "(%" PRIu64 ") but the TracePacket sequence_id is zero",
464 clock_id);
465 }
466 clock_id = ClockTracker::SeqScopedClockIdToGlobal(seq_id, clk.clock_id());
467 }
468 int64_t unit_multiplier_ns =
469 clk.unit_multiplier_ns()
470 ? static_cast<int64_t>(clk.unit_multiplier_ns())
471 : 1;
472 clocks.emplace_back(clock_id, clk.timestamp(), unit_multiplier_ns,
473 clk.is_incremental());
474 }
475 context_->clock_tracker->AddSnapshot(clocks);
476 return util::OkStatus();
477 }
478
NotifyEndOfFile()479 void ProtoTraceTokenizer::NotifyEndOfFile() {}
480
481 } // namespace trace_processor
482 } // namespace perfetto
483