• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <memory>
19 #include <utility>
20 
21 #include "perfetto/base/compiler.h"
22 #include "src/trace_processor/importers/common/parser_types.h"
23 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
24 #include "src/trace_processor/sorter/trace_sorter.h"
25 #include "src/trace_processor/storage/trace_storage.h"
26 #include "src/trace_processor/util/bump_allocator.h"
27 
28 namespace perfetto {
29 namespace trace_processor {
30 
TraceSorter(TraceProcessorContext * context,std::unique_ptr<TraceParser> parser,SortingMode sorting_mode)31 TraceSorter::TraceSorter(TraceProcessorContext* context,
32                          std::unique_ptr<TraceParser> parser,
33                          SortingMode sorting_mode)
34     : context_(context),
35       parser_(std::move(parser)),
36       sorting_mode_(sorting_mode) {
37   const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
38   bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
39   if (bypass_next_stage_for_testing_)
40     PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
41 }
42 
~TraceSorter()43 TraceSorter::~TraceSorter() {
44   // If trace processor encountered a fatal error, it's possible for some events
45   // to have been pushed without evicting them by pushing to the next stage. Do
46   // that now.
47   for (auto& queue : queues_) {
48     for (const auto& event : queue.events_) {
49       ExtractAndDiscardTokenizedObject(event);
50     }
51   }
52 }
53 
Sort()54 void TraceSorter::Queue::Sort() {
55   PERFETTO_DCHECK(needs_sorting());
56   PERFETTO_DCHECK(sort_start_idx_ < events_.size());
57 
58   // If sort_min_ts_ has been set, it will no long be max_int, and so will be
59   // smaller than max_ts_.
60   PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
61 
62   // We know that all events between [0, sort_start_idx_] are sorted. Within
63   // this range, perform a bound search and find the iterator for the min
64   // timestamp that broke the monotonicity. Re-sort from there to the end.
65   auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
66   PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
67   auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
68                                      &TimestampedEvent::Compare);
69   std::sort(sort_begin, events_.end());
70   sort_start_idx_ = 0;
71   sort_min_ts_ = 0;
72 
73   // At this point |events_| must be fully sorted
74   PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
75 }
76 
77 // Removes all the events in |queues_| that are earlier than the given
78 // packet index and moves them to the next parser stages, respecting global
79 // timestamp order. This function is a "extract min from N sorted queues", with
80 // some little cleverness: we know that events tend to be bursty, so events are
81 // not going to be randomly distributed on the N |queues_|.
82 // Upon each iteration this function finds the first two queues (if any) that
83 // have the oldest events, and extracts events from the 1st until hitting the
84 // min_ts of the 2nd. Imagine the queues are as follows:
85 //
86 //  q0           {min_ts: 10  max_ts: 30}
87 //  q1    {min_ts:5              max_ts: 35}
88 //  q2              {min_ts: 12    max_ts: 40}
89 //
90 // We know that we can extract all events from q1 until we hit ts=10 without
91 // looking at any other queue. After hitting ts=10, we need to re-look to all of
92 // them to figure out the next min-event.
93 // There are more suitable data structures to do this (e.g. keeping a min-heap
94 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
95 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
96 // time in a profiler.
SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId limit_alloc_id)97 void TraceSorter::SortAndExtractEventsUntilAllocId(
98     BumpAllocator::AllocId limit_alloc_id) {
99   constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
100   for (;;) {
101     size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
102 
103     // The top-2 min(ts) among all queues.
104     // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
105     int64_t min_queue_ts[2]{kTsMax, kTsMax};
106 
107     // This loop identifies the queue which starts with the earliest event and
108     // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
109     bool all_queues_empty = true;
110     for (size_t i = 0; i < queues_.size(); i++) {
111       auto& queue = queues_[i];
112       if (queue.events_.empty())
113         continue;
114       all_queues_empty = false;
115 
116       PERFETTO_DCHECK(queue.max_ts_ <= append_max_ts_);
117       if (queue.min_ts_ < min_queue_ts[0]) {
118         min_queue_ts[1] = min_queue_ts[0];
119         min_queue_ts[0] = queue.min_ts_;
120         min_queue_idx = i;
121       } else if (queue.min_ts_ < min_queue_ts[1]) {
122         min_queue_ts[1] = queue.min_ts_;
123       }
124     }
125     if (all_queues_empty)
126       break;
127 
128     Queue& queue = queues_[min_queue_idx];
129     auto& events = queue.events_;
130     if (queue.needs_sorting())
131       queue.Sort();
132     PERFETTO_DCHECK(queue.min_ts_ == events.front().ts);
133 
134     // Now that we identified the min-queue, extract all events from it until
135     // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
136     // limit, whichever comes first.
137     size_t num_extracted = 0;
138     for (auto& event : events) {
139       if (event.alloc_id() >= limit_alloc_id) {
140         break;
141       }
142 
143       if (event.ts > min_queue_ts[1]) {
144         // We should never hit this condition on the first extraction as by
145         // the algorithm above (event.ts =) min_queue_ts[0] <= min_queue[1].
146         PERFETTO_DCHECK(num_extracted > 0);
147         break;
148       }
149 
150       ++num_extracted;
151       MaybeExtractEvent(min_queue_idx, event);
152     }  // for (event: events)
153 
154     // The earliest event cannot be extracted without going past the limit.
155     if (!num_extracted)
156       break;
157 
158     // Now remove the entries from the event buffer and update the queue-local
159     // and global time bounds.
160     events.erase_front(num_extracted);
161     events.shrink_to_fit();
162 
163     // Since we likely just removed a bunch of items try to reduce the memory
164     // usage of the token buffer.
165     token_buffer_.FreeMemory();
166 
167     // Update the queue timestamps to reflect the bounds after extraction.
168     if (events.empty()) {
169       queue.min_ts_ = kTsMax;
170       queue.max_ts_ = 0;
171     } else {
172       queue.min_ts_ = queue.events_.front().ts;
173     }
174   }  // for(;;)
175 }
176 
ParseTracePacket(const TimestampedEvent & event)177 void TraceSorter::ParseTracePacket(const TimestampedEvent& event) {
178   TraceTokenBuffer::Id id = GetTokenBufferId(event);
179   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
180     case TimestampedEvent::Type::kTracePacket:
181       parser_->ParseTracePacket(event.ts,
182                                 token_buffer_.Extract<TracePacketData>(id));
183       return;
184     case TimestampedEvent::Type::kTrackEvent:
185       parser_->ParseTrackEvent(event.ts,
186                                token_buffer_.Extract<TrackEventData>(id));
187       return;
188     case TimestampedEvent::Type::kFuchsiaRecord:
189       parser_->ParseFuchsiaRecord(event.ts,
190                                   token_buffer_.Extract<FuchsiaRecord>(id));
191       return;
192     case TimestampedEvent::Type::kJsonValue:
193       parser_->ParseJsonPacket(
194           event.ts, std::move(token_buffer_.Extract<JsonEvent>(id).value));
195       return;
196     case TimestampedEvent::Type::kSystraceLine:
197       parser_->ParseSystraceLine(event.ts,
198                                  token_buffer_.Extract<SystraceLine>(id));
199       return;
200     case TimestampedEvent::Type::kInlineSchedSwitch:
201     case TimestampedEvent::Type::kInlineSchedWaking:
202     case TimestampedEvent::Type::kFtraceEvent:
203       PERFETTO_FATAL("Invalid event type");
204   }
205   PERFETTO_FATAL("For GCC");
206 }
207 
ParseFtracePacket(uint32_t cpu,const TimestampedEvent & event)208 void TraceSorter::ParseFtracePacket(uint32_t cpu,
209                                     const TimestampedEvent& event) {
210   TraceTokenBuffer::Id id = GetTokenBufferId(event);
211   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
212     case TimestampedEvent::Type::kInlineSchedSwitch:
213       parser_->ParseInlineSchedSwitch(
214           cpu, event.ts, token_buffer_.Extract<InlineSchedSwitch>(id));
215       return;
216     case TimestampedEvent::Type::kInlineSchedWaking:
217       parser_->ParseInlineSchedWaking(
218           cpu, event.ts, token_buffer_.Extract<InlineSchedWaking>(id));
219       return;
220     case TimestampedEvent::Type::kFtraceEvent:
221       parser_->ParseFtraceEvent(cpu, event.ts,
222                                 token_buffer_.Extract<TracePacketData>(id));
223       return;
224     case TimestampedEvent::Type::kTrackEvent:
225     case TimestampedEvent::Type::kSystraceLine:
226     case TimestampedEvent::Type::kTracePacket:
227     case TimestampedEvent::Type::kJsonValue:
228     case TimestampedEvent::Type::kFuchsiaRecord:
229       PERFETTO_FATAL("Invalid event type");
230   }
231   PERFETTO_FATAL("For GCC");
232 }
233 
ExtractAndDiscardTokenizedObject(const TimestampedEvent & event)234 void TraceSorter::ExtractAndDiscardTokenizedObject(
235     const TimestampedEvent& event) {
236   TraceTokenBuffer::Id id = GetTokenBufferId(event);
237   switch (static_cast<TimestampedEvent::Type>(event.event_type)) {
238     case TimestampedEvent::Type::kTracePacket:
239       base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
240       return;
241     case TimestampedEvent::Type::kTrackEvent:
242       base::ignore_result(token_buffer_.Extract<TrackEventData>(id));
243       return;
244     case TimestampedEvent::Type::kFuchsiaRecord:
245       base::ignore_result(token_buffer_.Extract<FuchsiaRecord>(id));
246       return;
247     case TimestampedEvent::Type::kJsonValue:
248       base::ignore_result(token_buffer_.Extract<JsonEvent>(id));
249       return;
250     case TimestampedEvent::Type::kSystraceLine:
251       base::ignore_result(token_buffer_.Extract<SystraceLine>(id));
252       return;
253     case TimestampedEvent::Type::kInlineSchedSwitch:
254       base::ignore_result(token_buffer_.Extract<InlineSchedSwitch>(id));
255       return;
256     case TimestampedEvent::Type::kInlineSchedWaking:
257       base::ignore_result(token_buffer_.Extract<InlineSchedWaking>(id));
258       return;
259     case TimestampedEvent::Type::kFtraceEvent:
260       base::ignore_result(token_buffer_.Extract<TracePacketData>(id));
261       return;
262   }
263   PERFETTO_FATAL("For GCC");
264 }
265 
MaybeExtractEvent(size_t queue_idx,const TimestampedEvent & event)266 void TraceSorter::MaybeExtractEvent(size_t queue_idx,
267                                     const TimestampedEvent& event) {
268   int64_t timestamp = event.ts;
269   if (timestamp < latest_pushed_event_ts_)
270     context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
271 
272   latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
273 
274   if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_)) {
275     // Parse* would extract this event and push it to the next stage. Since we
276     // are skipping that, just extract and discard it.
277     ExtractAndDiscardTokenizedObject(event);
278     return;
279   }
280 
281   if (queue_idx == 0) {
282     ParseTracePacket(event);
283   } else {
284     // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
285     uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
286     ParseFtracePacket(cpu, event);
287   }
288 }
289 
290 }  // namespace trace_processor
291 }  // namespace perfetto
292