1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <algorithm>
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <cstring>
22 #include <limits>
23 #include <memory>
24 #include <utility>
25
26 #include "perfetto/base/compiler.h"
27 #include "perfetto/base/logging.h"
28 #include "perfetto/public/compiler.h"
29 #include "src/trace_processor/importers/common/parser_types.h"
30 #include "src/trace_processor/importers/common/trace_parser.h"
31 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
32 #include "src/trace_processor/importers/perf/record.h"
33 #include "src/trace_processor/sorter/trace_sorter.h"
34 #include "src/trace_processor/sorter/trace_token_buffer.h"
35 #include "src/trace_processor/storage/stats.h"
36 #include "src/trace_processor/types/trace_processor_context.h"
37 #include "src/trace_processor/util/bump_allocator.h"
38
39 namespace perfetto::trace_processor {
40
TraceSorter(TraceProcessorContext * context,SortingMode sorting_mode)41 TraceSorter::TraceSorter(TraceProcessorContext* context,
42 SortingMode sorting_mode)
43 : sorting_mode_(sorting_mode), storage_(context->storage) {
44 AddMachineContext(context);
45 const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
46 bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
47 if (bypass_next_stage_for_testing_)
48 PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
49 }
50
~TraceSorter()51 TraceSorter::~TraceSorter() {
52 // If trace processor encountered a fatal error, it's possible for some events
53 // to have been pushed without evicting them by pushing to the next stage. Do
54 // that now.
55 for (auto& sorter_data : sorter_data_by_machine_) {
56 for (auto& queue : sorter_data.queues) {
57 for (const auto& event : queue.events_) {
58 ExtractAndDiscardTokenizedObject(event);
59 }
60 }
61 }
62 }
63
Sort()64 void TraceSorter::Queue::Sort() {
65 PERFETTO_DCHECK(needs_sorting());
66 PERFETTO_DCHECK(sort_start_idx_ < events_.size());
67
68 // If sort_min_ts_ has been set, it will no long be max_int, and so will be
69 // smaller than max_ts_.
70 PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
71
72 // We know that all events between [0, sort_start_idx_] are sorted. Within
73 // this range, perform a bound search and find the iterator for the min
74 // timestamp that broke the monotonicity. Re-sort from there to the end.
75 auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
76 PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
77 auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
78 &TimestampedEvent::Compare);
79 std::sort(sort_begin, events_.end());
80 sort_start_idx_ = 0;
81 sort_min_ts_ = 0;
82
83 // At this point |events_| must be fully sorted
84 PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
85 }
86
87 // Removes all the events in |queues_| that are earlier than the given
88 // packet index and moves them to the next parser stages, respecting global
89 // timestamp order. This function is a "extract min from N sorted queues", with
90 // some little cleverness: we know that events tend to be bursty, so events are
91 // not going to be randomly distributed on the N |queues_|.
92 // Upon each iteration this function finds the first two queues (if any) that
93 // have the oldest events, and extracts events from the 1st until hitting the
94 // min_ts of the 2nd. Imagine the queues are as follows:
95 //
96 // q0 {min_ts: 10 max_ts: 30}
97 // q1 {min_ts:5 max_ts: 35}
98 // q2 {min_ts: 12 max_ts: 40}
99 //
100 // We know that we can extract all events from q1 until we hit ts=10 without
101 // looking at any other queue. After hitting ts=10, we need to re-look to all of
102 // them to figure out the next min-event.
103 // There are more suitable data structures to do this (e.g. keeping a min-heap
104 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
105 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
106 // time in a profiler.
SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId limit_alloc_id)107 void TraceSorter::SortAndExtractEventsUntilAllocId(
108 BumpAllocator::AllocId limit_alloc_id) {
109 constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
110 for (;;) {
111 size_t min_machine_idx = 0;
112 size_t min_queue_idx = 0; // The index of the queue with the min(ts).
113
114 // The top-2 min(ts) among all queues.
115 // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
116 int64_t min_queue_ts[2]{kTsMax, kTsMax};
117
118 // This loop identifies the queue which starts with the earliest event and
119 // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
120 bool all_queues_empty = true;
121 for (size_t m = 0; m < sorter_data_by_machine_.size(); m++) {
122 TraceSorterData& sorter_data = sorter_data_by_machine_[m];
123 for (size_t i = 0; i < sorter_data.queues.size(); i++) {
124 auto& queue = sorter_data.queues[i];
125 if (queue.events_.empty())
126 continue;
127 PERFETTO_DCHECK(queue.max_ts_ <= append_max_ts_);
128
129 // Checking for |all_queues_empty| is necessary here as in fuzzer cases
130 // we can end up with |int64::max()| as the value here.
131 // See https://crbug.com/oss-fuzz/69164 for an example.
132 if (all_queues_empty || queue.min_ts_ < min_queue_ts[0]) {
133 min_queue_ts[1] = min_queue_ts[0];
134 min_queue_ts[0] = queue.min_ts_;
135 min_queue_idx = i;
136 min_machine_idx = m;
137 } else if (queue.min_ts_ < min_queue_ts[1]) {
138 min_queue_ts[1] = queue.min_ts_;
139 }
140 all_queues_empty = false;
141 }
142 }
143 if (all_queues_empty)
144 break;
145
146 auto& sorter_data = sorter_data_by_machine_[min_machine_idx];
147 auto& queue = sorter_data.queues[min_queue_idx];
148 auto& events = queue.events_;
149 if (queue.needs_sorting())
150 queue.Sort();
151 PERFETTO_DCHECK(queue.min_ts_ == events.front().ts);
152
153 // Now that we identified the min-queue, extract all events from it until
154 // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
155 // limit, whichever comes first.
156 size_t num_extracted = 0;
157 for (auto& event : events) {
158 if (event.alloc_id() >= limit_alloc_id) {
159 break;
160 }
161
162 if (event.ts > min_queue_ts[1]) {
163 // We should never hit this condition on the first extraction as by
164 // the algorithm above (event.ts =) min_queue_ts[0] <= min_queue[1].
165 PERFETTO_DCHECK(num_extracted > 0);
166 break;
167 }
168
169 ++num_extracted;
170 MaybeExtractEvent(min_machine_idx, min_queue_idx, event);
171 } // for (event: events)
172
173 // The earliest event cannot be extracted without going past the limit.
174 if (!num_extracted)
175 break;
176
177 // Now remove the entries from the event buffer and update the queue-local
178 // and global time bounds.
179 events.erase_front(num_extracted);
180 events.shrink_to_fit();
181
182 // Since we likely just removed a bunch of items try to reduce the memory
183 // usage of the token buffer.
184 token_buffer_.FreeMemory();
185
186 // Update the queue timestamps to reflect the bounds after extraction.
187 if (events.empty()) {
188 queue.min_ts_ = kTsMax;
189 queue.max_ts_ = 0;
190 } else {
191 queue.min_ts_ = queue.events_.front().ts;
192 }
193 } // for(;;)
194 }
195
ParseTracePacket(TraceProcessorContext & context,const TimestampedEvent & event)196 void TraceSorter::ParseTracePacket(TraceProcessorContext& context,
197 const TimestampedEvent& event) {
198 TraceTokenBuffer::Id id = GetTokenBufferId(event);
199 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
200 case TimestampedEvent::Type::kPerfRecord:
201 context.perf_record_parser->ParsePerfRecord(
202 event.ts, token_buffer_.Extract<perf_importer::Record>(id));
203 return;
204 case TimestampedEvent::Type::kTracePacket:
205 context.proto_trace_parser->ParseTracePacket(
206 event.ts, token_buffer_.Extract<TracePacketData>(id));
207 return;
208 case TimestampedEvent::Type::kTrackEvent:
209 context.proto_trace_parser->ParseTrackEvent(
210 event.ts, token_buffer_.Extract<TrackEventData>(id));
211 return;
212 case TimestampedEvent::Type::kFuchsiaRecord:
213 context.fuchsia_record_parser->ParseFuchsiaRecord(
214 event.ts, token_buffer_.Extract<FuchsiaRecord>(id));
215 return;
216 case TimestampedEvent::Type::kJsonValue:
217 context.json_trace_parser->ParseJsonPacket(
218 event.ts, std::move(token_buffer_.Extract<JsonEvent>(id).value));
219 return;
220 case TimestampedEvent::Type::kSystraceLine:
221 context.json_trace_parser->ParseSystraceLine(
222 event.ts, token_buffer_.Extract<SystraceLine>(id));
223 return;
224 case TimestampedEvent::Type::kInlineSchedSwitch:
225 case TimestampedEvent::Type::kInlineSchedWaking:
226 case TimestampedEvent::Type::kEtwEvent:
227 case TimestampedEvent::Type::kFtraceEvent:
228 PERFETTO_FATAL("Invalid event type");
229 }
230 PERFETTO_FATAL("For GCC");
231 }
232
ParseEtwPacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)233 void TraceSorter::ParseEtwPacket(TraceProcessorContext& context,
234 uint32_t cpu,
235 const TimestampedEvent& event) {
236 TraceTokenBuffer::Id id = GetTokenBufferId(event);
237 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
238 case TimestampedEvent::Type::kEtwEvent:
239 context.proto_trace_parser->ParseEtwEvent(
240 cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
241 return;
242 case TimestampedEvent::Type::kInlineSchedSwitch:
243 case TimestampedEvent::Type::kInlineSchedWaking:
244 case TimestampedEvent::Type::kFtraceEvent:
245 case TimestampedEvent::Type::kTrackEvent:
246 case TimestampedEvent::Type::kSystraceLine:
247 case TimestampedEvent::Type::kTracePacket:
248 case TimestampedEvent::Type::kPerfRecord:
249 case TimestampedEvent::Type::kJsonValue:
250 case TimestampedEvent::Type::kFuchsiaRecord:
251 PERFETTO_FATAL("Invalid event type");
252 }
253 PERFETTO_FATAL("For GCC");
254 }
255
ParseFtracePacket(TraceProcessorContext & context,uint32_t cpu,const TimestampedEvent & event)256 void TraceSorter::ParseFtracePacket(TraceProcessorContext& context,
257 uint32_t cpu,
258 const TimestampedEvent& event) {
259 TraceTokenBuffer::Id id = GetTokenBufferId(event);
260 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
261 case TimestampedEvent::Type::kInlineSchedSwitch:
262 context.proto_trace_parser->ParseInlineSchedSwitch(
263 cpu, event.ts, token_buffer_.Extract<InlineSchedSwitch>(id));
264 return;
265 case TimestampedEvent::Type::kInlineSchedWaking:
266 context.proto_trace_parser->ParseInlineSchedWaking(
267 cpu, event.ts, token_buffer_.Extract<InlineSchedWaking>(id));
268 return;
269 case TimestampedEvent::Type::kFtraceEvent:
270 context.proto_trace_parser->ParseFtraceEvent(
271 cpu, event.ts, token_buffer_.Extract<TracePacketData>(id));
272 return;
273 case TimestampedEvent::Type::kEtwEvent:
274 case TimestampedEvent::Type::kTrackEvent:
275 case TimestampedEvent::Type::kSystraceLine:
276 case TimestampedEvent::Type::kTracePacket:
277 case TimestampedEvent::Type::kPerfRecord:
278 case TimestampedEvent::Type::kJsonValue:
279 case TimestampedEvent::Type::kFuchsiaRecord:
280 PERFETTO_FATAL("Invalid event type");
281 }
282 PERFETTO_FATAL("For GCC");
283 }
284
ExtractAndDiscardTokenizedObject(const TimestampedEvent & event)285 void TraceSorter::ExtractAndDiscardTokenizedObject(
286 const TimestampedEvent& event) {
287 TraceTokenBuffer::Id id = GetTokenBufferId(event);
288 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
289 case TimestampedEvent::Type::kTracePacket:
290 case TimestampedEvent::Type::kFtraceEvent:
291 case TimestampedEvent::Type::kEtwEvent:
292 base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
293 return;
294 case TimestampedEvent::Type::kTrackEvent:
295 base::ignore_result(token_buffer_.Extract<TrackEventData>(id));
296 return;
297 case TimestampedEvent::Type::kFuchsiaRecord:
298 base::ignore_result(token_buffer_.Extract<FuchsiaRecord>(id));
299 return;
300 case TimestampedEvent::Type::kJsonValue:
301 base::ignore_result(token_buffer_.Extract<JsonEvent>(id));
302 return;
303 case TimestampedEvent::Type::kSystraceLine:
304 base::ignore_result(token_buffer_.Extract<SystraceLine>(id));
305 return;
306 case TimestampedEvent::Type::kInlineSchedSwitch:
307 base::ignore_result(token_buffer_.Extract<InlineSchedSwitch>(id));
308 return;
309 case TimestampedEvent::Type::kInlineSchedWaking:
310 base::ignore_result(token_buffer_.Extract<InlineSchedWaking>(id));
311 return;
312 case TimestampedEvent::Type::kPerfRecord:
313 base::ignore_result(token_buffer_.Extract<perf_importer::Record>(id));
314 return;
315 }
316 PERFETTO_FATAL("For GCC");
317 }
318
MaybeExtractEvent(size_t min_machine_idx,size_t queue_idx,const TimestampedEvent & event)319 void TraceSorter::MaybeExtractEvent(size_t min_machine_idx,
320 size_t queue_idx,
321 const TimestampedEvent& event) {
322 auto* machine_context =
323 sorter_data_by_machine_[min_machine_idx].machine_context;
324 int64_t timestamp = event.ts;
325 if (timestamp < latest_pushed_event_ts_)
326 storage_->IncrementStats(stats::sorter_push_event_out_of_order);
327
328 latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
329
330 if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_)) {
331 // Parse* would extract this event and push it to the next stage. Since we
332 // are skipping that, just extract and discard it.
333 ExtractAndDiscardTokenizedObject(event);
334 return;
335 }
336
337 if (queue_idx == 0) {
338 ParseTracePacket(*machine_context, event);
339 } else {
340 // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
341 uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
342 auto event_type = static_cast<TimestampedEvent::Type>(event.event_type);
343
344 if (event_type == TimestampedEvent::Type::kEtwEvent) {
345 ParseEtwPacket(*machine_context, static_cast<uint32_t>(cpu), event);
346 } else {
347 ParseFtracePacket(*machine_context, cpu, event);
348 }
349 }
350 }
351
352 } // namespace perfetto::trace_processor
353