1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <algorithm>
18 #include <memory>
19 #include <utility>
20
21 #include "perfetto/base/compiler.h"
22 #include "src/trace_processor/importers/common/parser_types.h"
23 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
24 #include "src/trace_processor/sorter/trace_sorter.h"
25 #include "src/trace_processor/storage/trace_storage.h"
26 #include "src/trace_processor/util/bump_allocator.h"
27
28 namespace perfetto {
29 namespace trace_processor {
30
TraceSorter(TraceProcessorContext * context,std::unique_ptr<TraceParser> parser,SortingMode sorting_mode)31 TraceSorter::TraceSorter(TraceProcessorContext* context,
32 std::unique_ptr<TraceParser> parser,
33 SortingMode sorting_mode)
34 : context_(context),
35 parser_(std::move(parser)),
36 sorting_mode_(sorting_mode) {
37 const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
38 bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
39 if (bypass_next_stage_for_testing_)
40 PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
41 }
42
~TraceSorter()43 TraceSorter::~TraceSorter() {
44 // If trace processor encountered a fatal error, it's possible for some events
45 // to have been pushed without evicting them by pushing to the next stage. Do
46 // that now.
47 for (auto& queue : queues_) {
48 for (const auto& event : queue.events_) {
49 ExtractAndDiscardTokenizedObject(event);
50 }
51 }
52 }
53
Sort()54 void TraceSorter::Queue::Sort() {
55 PERFETTO_DCHECK(needs_sorting());
56 PERFETTO_DCHECK(sort_start_idx_ < events_.size());
57
58 // If sort_min_ts_ has been set, it will no long be max_int, and so will be
59 // smaller than max_ts_.
60 PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
61
62 // We know that all events between [0, sort_start_idx_] are sorted. Within
63 // this range, perform a bound search and find the iterator for the min
64 // timestamp that broke the monotonicity. Re-sort from there to the end.
65 auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
66 PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
67 auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
68 &TimestampedEvent::Compare);
69 std::sort(sort_begin, events_.end());
70 sort_start_idx_ = 0;
71 sort_min_ts_ = 0;
72
73 // At this point |events_| must be fully sorted
74 PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
75 }
76
77 // Removes all the events in |queues_| that are earlier than the given
78 // packet index and moves them to the next parser stages, respecting global
79 // timestamp order. This function is a "extract min from N sorted queues", with
80 // some little cleverness: we know that events tend to be bursty, so events are
81 // not going to be randomly distributed on the N |queues_|.
82 // Upon each iteration this function finds the first two queues (if any) that
83 // have the oldest events, and extracts events from the 1st until hitting the
84 // min_ts of the 2nd. Imagine the queues are as follows:
85 //
86 // q0 {min_ts: 10 max_ts: 30}
87 // q1 {min_ts:5 max_ts: 35}
88 // q2 {min_ts: 12 max_ts: 40}
89 //
90 // We know that we can extract all events from q1 until we hit ts=10 without
91 // looking at any other queue. After hitting ts=10, we need to re-look to all of
92 // them to figure out the next min-event.
93 // There are more suitable data structures to do this (e.g. keeping a min-heap
94 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
95 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
96 // time in a profiler.
SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId limit_alloc_id)97 void TraceSorter::SortAndExtractEventsUntilAllocId(
98 BumpAllocator::AllocId limit_alloc_id) {
99 constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
100 for (;;) {
101 size_t min_queue_idx = 0; // The index of the queue with the min(ts).
102
103 // The top-2 min(ts) among all queues.
104 // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
105 int64_t min_queue_ts[2]{kTsMax, kTsMax};
106
107 // This loop identifies the queue which starts with the earliest event and
108 // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
109 bool all_queues_empty = true;
110 for (size_t i = 0; i < queues_.size(); i++) {
111 auto& queue = queues_[i];
112 if (queue.events_.empty())
113 continue;
114 all_queues_empty = false;
115
116 PERFETTO_DCHECK(queue.max_ts_ <= append_max_ts_);
117 if (queue.min_ts_ < min_queue_ts[0]) {
118 min_queue_ts[1] = min_queue_ts[0];
119 min_queue_ts[0] = queue.min_ts_;
120 min_queue_idx = i;
121 } else if (queue.min_ts_ < min_queue_ts[1]) {
122 min_queue_ts[1] = queue.min_ts_;
123 }
124 }
125 if (all_queues_empty)
126 break;
127
128 Queue& queue = queues_[min_queue_idx];
129 auto& events = queue.events_;
130 if (queue.needs_sorting())
131 queue.Sort();
132 PERFETTO_DCHECK(queue.min_ts_ == events.front().ts);
133
134 // Now that we identified the min-queue, extract all events from it until
135 // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
136 // limit, whichever comes first.
137 size_t num_extracted = 0;
138 for (auto& event : events) {
139 if (event.alloc_id() >= limit_alloc_id) {
140 break;
141 }
142
143 if (event.ts > min_queue_ts[1]) {
144 // We should never hit this condition on the first extraction as by
145 // the algorithm above (event.ts =) min_queue_ts[0] <= min_queue[1].
146 PERFETTO_DCHECK(num_extracted > 0);
147 break;
148 }
149
150 ++num_extracted;
151 MaybeExtractEvent(min_queue_idx, event);
152 } // for (event: events)
153
154 // The earliest event cannot be extracted without going past the limit.
155 if (!num_extracted)
156 break;
157
158 // Now remove the entries from the event buffer and update the queue-local
159 // and global time bounds.
160 events.erase_front(num_extracted);
161 events.shrink_to_fit();
162
163 // Since we likely just removed a bunch of items try to reduce the memory
164 // usage of the token buffer.
165 token_buffer_.FreeMemory();
166
167 // Update the queue timestamps to reflect the bounds after extraction.
168 if (events.empty()) {
169 queue.min_ts_ = kTsMax;
170 queue.max_ts_ = 0;
171 } else {
172 queue.min_ts_ = queue.events_.front().ts;
173 }
174 } // for(;;)
175 }
176
ParseTracePacket(const TimestampedEvent & event)177 void TraceSorter::ParseTracePacket(const TimestampedEvent& event) {
178 TraceTokenBuffer::Id id = GetTokenBufferId(event);
179 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
180 case TimestampedEvent::Type::kTracePacket:
181 parser_->ParseTracePacket(event.ts,
182 token_buffer_.Extract<TracePacketData>(id));
183 return;
184 case TimestampedEvent::Type::kTrackEvent:
185 parser_->ParseTrackEvent(event.ts,
186 token_buffer_.Extract<TrackEventData>(id));
187 return;
188 case TimestampedEvent::Type::kFuchsiaRecord:
189 parser_->ParseFuchsiaRecord(event.ts,
190 token_buffer_.Extract<FuchsiaRecord>(id));
191 return;
192 case TimestampedEvent::Type::kJsonValue:
193 parser_->ParseJsonPacket(
194 event.ts, std::move(token_buffer_.Extract<JsonEvent>(id).value));
195 return;
196 case TimestampedEvent::Type::kSystraceLine:
197 parser_->ParseSystraceLine(event.ts,
198 token_buffer_.Extract<SystraceLine>(id));
199 return;
200 case TimestampedEvent::Type::kInlineSchedSwitch:
201 case TimestampedEvent::Type::kInlineSchedWaking:
202 case TimestampedEvent::Type::kFtraceEvent:
203 PERFETTO_FATAL("Invalid event type");
204 }
205 PERFETTO_FATAL("For GCC");
206 }
207
ParseFtracePacket(uint32_t cpu,const TimestampedEvent & event)208 void TraceSorter::ParseFtracePacket(uint32_t cpu,
209 const TimestampedEvent& event) {
210 TraceTokenBuffer::Id id = GetTokenBufferId(event);
211 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
212 case TimestampedEvent::Type::kInlineSchedSwitch:
213 parser_->ParseInlineSchedSwitch(
214 cpu, event.ts, token_buffer_.Extract<InlineSchedSwitch>(id));
215 return;
216 case TimestampedEvent::Type::kInlineSchedWaking:
217 parser_->ParseInlineSchedWaking(
218 cpu, event.ts, token_buffer_.Extract<InlineSchedWaking>(id));
219 return;
220 case TimestampedEvent::Type::kFtraceEvent:
221 parser_->ParseFtraceEvent(cpu, event.ts,
222 token_buffer_.Extract<TracePacketData>(id));
223 return;
224 case TimestampedEvent::Type::kTrackEvent:
225 case TimestampedEvent::Type::kSystraceLine:
226 case TimestampedEvent::Type::kTracePacket:
227 case TimestampedEvent::Type::kJsonValue:
228 case TimestampedEvent::Type::kFuchsiaRecord:
229 PERFETTO_FATAL("Invalid event type");
230 }
231 PERFETTO_FATAL("For GCC");
232 }
233
ExtractAndDiscardTokenizedObject(const TimestampedEvent & event)234 void TraceSorter::ExtractAndDiscardTokenizedObject(
235 const TimestampedEvent& event) {
236 TraceTokenBuffer::Id id = GetTokenBufferId(event);
237 switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
238 case TimestampedEvent::Type::kTracePacket:
239 base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
240 return;
241 case TimestampedEvent::Type::kTrackEvent:
242 base::ignore_result(token_buffer_.Extract<TrackEventData>(id));
243 return;
244 case TimestampedEvent::Type::kFuchsiaRecord:
245 base::ignore_result(token_buffer_.Extract<FuchsiaRecord>(id));
246 return;
247 case TimestampedEvent::Type::kJsonValue:
248 base::ignore_result(token_buffer_.Extract<JsonEvent>(id));
249 return;
250 case TimestampedEvent::Type::kSystraceLine:
251 base::ignore_result(token_buffer_.Extract<SystraceLine>(id));
252 return;
253 case TimestampedEvent::Type::kInlineSchedSwitch:
254 base::ignore_result(token_buffer_.Extract<InlineSchedSwitch>(id));
255 return;
256 case TimestampedEvent::Type::kInlineSchedWaking:
257 base::ignore_result(token_buffer_.Extract<InlineSchedWaking>(id));
258 return;
259 case TimestampedEvent::Type::kFtraceEvent:
260 base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
261 return;
262 }
263 PERFETTO_FATAL("For GCC");
264 }
265
MaybeExtractEvent(size_t queue_idx,const TimestampedEvent & event)266 void TraceSorter::MaybeExtractEvent(size_t queue_idx,
267 const TimestampedEvent& event) {
268 int64_t timestamp = event.ts;
269 if (timestamp < latest_pushed_event_ts_)
270 context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
271
272 latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
273
274 if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_)) {
275 // Parse* would extract this event and push it to the next stage. Since we
276 // are skipping that, just extract and discard it.
277 ExtractAndDiscardTokenizedObject(event);
278 return;
279 }
280
281 if (queue_idx == 0) {
282 ParseTracePacket(event);
283 } else {
284 // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
285 uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
286 ParseFtracePacket(cpu, event);
287 }
288 }
289
290 } // namespace trace_processor
291 } // namespace perfetto
292