• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <cstring>
22 #include <limits>
23 #include <memory>
24 #include <utility>
25 
26 #include "perfetto/base/compiler.h"
27 #include "perfetto/base/logging.h"
28 #include "perfetto/public/compiler.h"
29 #include "src/trace_processor/importers/common/parser_types.h"
30 #include "src/trace_processor/importers/common/trace_parser.h"
31 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
32 #include "src/trace_processor/importers/perf/record.h"
33 #include "src/trace_processor/sorter/trace_sorter.h"
34 #include "src/trace_processor/sorter/trace_token_buffer.h"
35 #include "src/trace_processor/storage/stats.h"
36 #include "src/trace_processor/types/trace_processor_context.h"
37 #include "src/trace_processor/util/bump_allocator.h"
38 
39 namespace perfetto::trace_processor {
40 
TraceSorter(TraceProcessorContext * context,SortingMode sorting_mode)41 TraceSorter::TraceSorter(TraceProcessorContext* context,
42                          SortingMode sorting_mode)
43     : sorting_mode_(sorting_mode), storage_(context->storage) {
44   AddMachineContext(context);
45   const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
46   bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
47   if (bypass_next_stage_for_testing_)
48     PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
49 }
50 
~TraceSorter()51 TraceSorter::~TraceSorter() {
52   // If trace processor encountered a fatal error, it's possible for some events
53   // to have been pushed without evicting them by pushing to the next stage. Do
54   // that now.
55   for (auto& sorter_data : sorter_data_by_machine_) {
56     for (auto& queue : sorter_data.queues) {
57       for (const auto& event : queue.events_) {
58         ExtractAndDiscardTokenizedObject(event);
59       }
60     }
61   }
62 }
63 
Sort()64 void TraceSorter::Queue::Sort() {
65   PERFETTO_DCHECK(needs_sorting());
66   PERFETTO_DCHECK(sort_start_idx_ < events_.size());
67 
68   // If sort_min_ts_ has been set, it will no long be max_int, and so will be
69   // smaller than max_ts_.
70   PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
71 
72   // We know that all events between [0, sort_start_idx_] are sorted. Within
73   // this range, perform a bound search and find the iterator for the min
74   // timestamp that broke the monotonicity. Re-sort from there to the end.
75   auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
76   PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
77   auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
78                                      &TimestampedEvent::Compare);
79   std::sort(sort_begin, events_.end());
80   sort_start_idx_ = 0;
81   sort_min_ts_ = 0;
82 
83   // At this point |events_| must be fully sorted
84   PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
85 }
86 
87 // Removes all the events in |queues_| that are earlier than the given
88 // packet index and moves them to the next parser stages, respecting global
89 // timestamp order. This function is a "extract min from N sorted queues", with
90 // some little cleverness: we know that events tend to be bursty, so events are
91 // not going to be randomly distributed on the N |queues_|.
92 // Upon each iteration this function finds the first two queues (if any) that
93 // have the oldest events, and extracts events from the 1st until hitting the
94 // min_ts of the 2nd. Imagine the queues are as follows:
95 //
96 //  q0           {min_ts: 10  max_ts: 30}
97 //  q1    {min_ts:5              max_ts: 35}
98 //  q2              {min_ts: 12    max_ts: 40}
99 //
100 // We know that we can extract all events from q1 until we hit ts=10 without
101 // looking at any other queue. After hitting ts=10, we need to re-look to all of
102 // them to figure out the next min-event.
103 // There are more suitable data structures to do this (e.g. keeping a min-heap
104 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
105 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
106 // time in a profiler.
SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId limit_alloc_id)107 void TraceSorter::SortAndExtractEventsUntilAllocId(
108     BumpAllocator::AllocId limit_alloc_id) {
109   constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
110   for (;;) {
111     size_t min_machine_idx = 0;
112     size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
113 
114     // The top-2 min(ts) among all queues.
115     // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
116     int64_t min_queue_ts[2]{kTsMax, kTsMax};
117 
118     // This loop identifies the queue which starts with the earliest event and
119     // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
120     bool all_queues_empty = true;
121     for (size_t m = 0; m < sorter_data_by_machine_.size(); m++) {
122       TraceSorterData& sorter_data = sorter_data_by_machine_[m];
123       for (size_t i = 0; i < sorter_data.queues.size(); i++) {
124         auto& queue = sorter_data.queues[i];
125         if (queue.events_.empty())
126           continue;
127         PERFETTO_DCHECK(queue.max_ts_ <= append_max_ts_);
128 
129         // Checking for |all_queues_empty| is necessary here as in fuzzer cases
130         // we can end up with |int64::max()| as the value here.
131         // See https://crbug.com/oss-fuzz/69164 for an example.
132         if (all_queues_empty || queue.min_ts_ < min_queue_ts[0]) {
133           min_queue_ts[1] = min_queue_ts[0];
134           min_queue_ts[0] = queue.min_ts_;
135           min_queue_idx = i;
136           min_machine_idx = m;
137         } else if (queue.min_ts_ < min_queue_ts[1]) {
138           min_queue_ts[1] = queue.min_ts_;
139         }
140         all_queues_empty = false;
141       }
142     }
143     if (all_queues_empty)
144       break;
145 
146     auto& sorter_data = sorter_data_by_machine_[min_machine_idx];
147     auto& queue = sorter_data.queues[min_queue_idx];
148     auto& events = queue.events_;
149     if (queue.needs_sorting())
150       queue.Sort();
151     PERFETTO_DCHECK(queue.min_ts_ == events.front().ts);
152 
153     // Now that we identified the min-queue, extract all events from it until
154     // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
155     // limit, whichever comes first.
156     size_t num_extracted = 0;
157     for (auto& event : events) {
158       if (event.alloc_id() >= limit_alloc_id) {
159         break;
160       }
161 
162       if (event.ts > min_queue_ts[1]) {
163         // We should never hit this condition on the first extraction as by
164         // the algorithm above (event.ts =) min_queue_ts[0] <= min_queue[1].
165         PERFETTO_DCHECK(num_extracted > 0);
166         break;
167       }
168 
169       ++num_extracted;
170       MaybeExtractEvent(min_machine_idx, min_queue_idx, event);
171     }  // for (event: events)
172 
173     // The earliest event cannot be extracted without going past the limit.
174     if (!num_extracted)
175       break;
176 
177     // Now remove the entries from the event buffer and update the queue-local
178     // and global time bounds.
179     events.erase_front(num_extracted);
180     events.shrink_to_fit();
181 
182     // Since we likely just removed a bunch of items try to reduce the memory
183     // usage of the token buffer.
184     token_buffer_.FreeMemory();
185 
186     // Update the queue timestamps to reflect the bounds after extraction.
187     if (events.empty()) {
188       queue.min_ts_ = kTsMax;
189       queue.max_ts_ = 0;
190     } else {
191       queue.min_ts_ = queue.events_.front().ts;
192     }
193   }  // for(;;)
194 }
195 
ParseTracePacket(TraceProcessorContext & context,const TimestampedEvent & event)196 void TraceSorter::ParseTracePacket(TraceProcessorContext& context,
197                                    const TimestampedEvent& event) {
198   TraceTokenBuffer::Id id = GetTokenBufferId(event);
199   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
200     case TimestampedEvent::Type::kPerfRecord:
201       context.perf_record_parser->ParsePerfRecord(
202           event.ts, token_buffer_.Extract<perf_importer::Record>(id));
203       return;
204     case TimestampedEvent::Type::kTracePacket:
205       context.proto_trace_parser->ParseTracePacket(
206           event.ts, token_buffer_.Extract<TracePacketData>(id));
207       return;
208     case TimestampedEvent::Type::kTrackEvent:
209       context.proto_trace_parser->ParseTrackEvent(
210           event.ts, token_buffer_.Extract<TrackEventData>(id));
211       return;
212     case TimestampedEvent::Type::kFuchsiaRecord:
213       context.fuchsia_record_parser->ParseFuchsiaRecord(
214           event.ts, token_buffer_.Extract<FuchsiaRecord>(id));
215       return;
216     case TimestampedEvent::Type::kJsonValue:
217       context.json_trace_parser->ParseJsonPacket(
218           event.ts, std::move(token_buffer_.Extract<JsonEvent>(id).value));
219       return;
220     case TimestampedEvent::Type::kSystraceLine:
221       context.json_trace_parser->ParseSystraceLine(
222           event.ts, token_buffer_.Extract<SystraceLine>(id));
223       return;
224     case TimestampedEvent::Type::kInlineSchedSwitch:
225     case TimestampedEvent::Type::kInlineSchedWaking:
226     case TimestampedEvent::Type::kEtwEvent:
227     case TimestampedEvent::Type::kFtraceEvent:
228       PERFETTO_FATAL("Invalid event type");
229   }
230   PERFETTO_FATAL("For GCC");
231 }
232 
ParseEtwPacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)233 void TraceSorter::ParseEtwPacket(TraceProcessorContext& context,
234                                  uint32_t cpu,
235                                  const TimestampedEvent& event) {
236   TraceTokenBuffer::Id id = GetTokenBufferId(event);
237   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
238     case TimestampedEvent::Type::kEtwEvent:
239       context.proto_trace_parser->ParseEtwEvent(
240           cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
241       return;
242     case TimestampedEvent::Type::kInlineSchedSwitch:
243     case TimestampedEvent::Type::kInlineSchedWaking:
244     case TimestampedEvent::Type::kFtraceEvent:
245     case TimestampedEvent::Type::kTrackEvent:
246     case TimestampedEvent::Type::kSystraceLine:
247     case TimestampedEvent::Type::kTracePacket:
248     case TimestampedEvent::Type::kPerfRecord:
249     case TimestampedEvent::Type::kJsonValue:
250     case TimestampedEvent::Type::kFuchsiaRecord:
251       PERFETTO_FATAL("Invalid event type");
252   }
253   PERFETTO_FATAL("For GCC");
254 }
255 
ParseFtracePacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)256 void TraceSorter::ParseFtracePacket(TraceProcessorContext& context,
257                                     uint32_t cpu,
258                                     const TimestampedEvent& event) {
259   TraceTokenBuffer::Id id = GetTokenBufferId(event);
260   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
261     case TimestampedEvent::Type::kInlineSchedSwitch:
262       context.proto_trace_parser->ParseInlineSchedSwitch(
263           cpu, event.ts, token_buffer_.Extract<InlineSchedSwitch>(id));
264       return;
265     case TimestampedEvent::Type::kInlineSchedWaking:
266       context.proto_trace_parser->ParseInlineSchedWaking(
267           cpu, event.ts, token_buffer_.Extract<InlineSchedWaking>(id));
268       return;
269     case TimestampedEvent::Type::kFtraceEvent:
270       context.proto_trace_parser->ParseFtraceEvent(
271           cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
272       return;
273     case TimestampedEvent::Type::kEtwEvent:
274     case TimestampedEvent::Type::kTrackEvent:
275     case TimestampedEvent::Type::kSystraceLine:
276     case TimestampedEvent::Type::kTracePacket:
277     case TimestampedEvent::Type::kPerfRecord:
278     case TimestampedEvent::Type::kJsonValue:
279     case TimestampedEvent::Type::kFuchsiaRecord:
280       PERFETTO_FATAL("Invalid event type");
281   }
282   PERFETTO_FATAL("For GCC");
283 }
284 
ExtractAndDiscardTokenizedObject(const TimestampedEvent & event)285 void TraceSorter::ExtractAndDiscardTokenizedObject(
286     const TimestampedEvent& event) {
287   TraceTokenBuffer::Id id = GetTokenBufferId(event);
288   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
289     case TimestampedEvent::Type::kTracePacket:
290     case TimestampedEvent::Type::kFtraceEvent:
291     case TimestampedEvent::Type::kEtwEvent:
292       base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
293       return;
294     case TimestampedEvent::Type::kTrackEvent:
295       base::ignore_result(token_buffer_.Extract<TrackEventData>(id));
296       return;
297     case TimestampedEvent::Type::kFuchsiaRecord:
298       base::ignore_result(token_buffer_.Extract<FuchsiaRecord>(id));
299       return;
300     case TimestampedEvent::Type::kJsonValue:
301       base::ignore_result(token_buffer_.Extract<JsonEvent>(id));
302       return;
303     case TimestampedEvent::Type::kSystraceLine:
304       base::ignore_result(token_buffer_.Extract<SystraceLine>(id));
305       return;
306     case TimestampedEvent::Type::kInlineSchedSwitch:
307       base::ignore_result(token_buffer_.Extract<InlineSchedSwitch>(id));
308       return;
309     case TimestampedEvent::Type::kInlineSchedWaking:
310       base::ignore_result(token_buffer_.Extract<InlineSchedWaking>(id));
311       return;
312     case TimestampedEvent::Type::kPerfRecord:
313       base::ignore_result(token_buffer_.Extract<perf_importer::Record>(id));
314       return;
315   }
316   PERFETTO_FATAL("For GCC");
317 }
318 
MaybeExtractEvent(size_t min_machine_idx,size_t queue_idx,const TimestampedEvent & event)319 void TraceSorter::MaybeExtractEvent(size_t min_machine_idx,
320                                     size_t queue_idx,
321                                     const TimestampedEvent& event) {
322   auto* machine_context =
323       sorter_data_by_machine_[min_machine_idx].machine_context;
324   int64_t timestamp = event.ts;
325   if (timestamp < latest_pushed_event_ts_)
326     storage_->IncrementStats(stats::sorter_push_event_out_of_order);
327 
328   latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
329 
330   if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_)) {
331     // Parse* would extract this event and push it to the next stage. Since we
332     // are skipping that, just extract and discard it.
333     ExtractAndDiscardTokenizedObject(event);
334     return;
335   }
336 
337   if (queue_idx == 0) {
338     ParseTracePacket(*machine_context, event);
339   } else {
340     // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
341     uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
342     auto event_type = static_cast<TimestampedEvent::Type>(event.event_type);
343 
344     if (event_type == TimestampedEvent::Type::kEtwEvent) {
345       ParseEtwPacket(*machine_context, static_cast<uint32_t>(cpu), event);
346     } else {
347       ParseFtracePacket(*machine_context, cpu, event);
348     }
349   }
350 }
351 
352 }  // namespace perfetto::trace_processor
353