1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <algorithm>
18 #include <utility>
19
20 #include "perfetto/ext/base/utils.h"
21 #include "src/trace_processor/importers/proto/proto_trace_parser.h"
22 #include "src/trace_processor/trace_sorter.h"
23
24 namespace perfetto {
25 namespace trace_processor {
26
TraceSorter(std::unique_ptr<TraceParser> parser,int64_t window_size_ns)27 TraceSorter::TraceSorter(std::unique_ptr<TraceParser> parser,
28 int64_t window_size_ns)
29 : parser_(std::move(parser)), window_size_ns_(window_size_ns) {
30 const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
31 bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
32 if (bypass_next_stage_for_testing_)
33 PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
34 }
35
Sort()36 void TraceSorter::Queue::Sort() {
37 PERFETTO_DCHECK(needs_sorting());
38 PERFETTO_DCHECK(sort_start_idx_ < events_.size());
39
40 // If sort_min_ts_ has been set, it will no long be max_int, and so will be
41 // smaller than max_ts_.
42 PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
43
44 // We know that all events between [0, sort_start_idx_] are sorted. Within
45 // this range, perform a bound search and find the iterator for the min
46 // timestamp that broke the monotonicity. Re-sort from there to the end.
47 auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
48 PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
49 auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
50 &TimestampedTracePiece::Compare);
51 std::sort(sort_begin, events_.end());
52 sort_start_idx_ = 0;
53 sort_min_ts_ = 0;
54
55 // At this point |events_| must be fully sorted.
56 PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
57 }
58
59 // Removes all the events in |queues_| that are earlier than the given window
60 // size and moves them to the next parser stages, respecting global timestamp
61 // order. This function is a "extract min from N sorted queues", with some
62 // little cleverness: we know that events tend to be bursty, so events are
63 // not going to be randomly distributed on the N |queues_|.
64 // Upon each iteration this function finds the first two queues (if any) that
65 // have the oldest events, and extracts events from the 1st until hitting the
66 // min_ts of the 2nd. Imagine the queues are as follows:
67 //
68 // q0 {min_ts: 10 max_ts: 30}
69 // q1 {min_ts:5 max_ts: 35}
70 // q2 {min_ts: 12 max_ts: 40}
71 //
72 // We know that we can extract all events from q1 until we hit ts=10 without
73 // looking at any other queue. After hitting ts=10, we need to re-look to all of
74 // them to figure out the next min-event.
75 // There are more suitable data structures to do this (e.g. keeping a min-heap
76 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
77 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
78 // time in a profiler.
SortAndExtractEventsBeyondWindow(int64_t window_size_ns)79 void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
80 DCHECK_ftrace_batch_cpu(kNoBatch);
81
82 constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
83 const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
84 int64_t extract_end_ts = global_max_ts_ - window_size_ns;
85 size_t iterations = 0;
86 for (;; iterations++) {
87 size_t min_queue_idx = 0; // The index of the queue with the min(ts).
88
89 // The top-2 min(ts) among all queues.
90 // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
91 int64_t min_queue_ts[2]{kTsMax, kTsMax};
92
93 // This loop identifies the queue which starts with the earliest event and
94 // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
95 bool has_queues_with_expired_events = false;
96 for (size_t i = 0; i < queues_.size(); i++) {
97 auto& queue = queues_[i];
98 if (queue.events_.empty())
99 continue;
100 PERFETTO_DCHECK(queue.min_ts_ >= global_min_ts_);
101 PERFETTO_DCHECK(queue.max_ts_ <= global_max_ts_);
102 if (queue.min_ts_ < min_queue_ts[0]) {
103 min_queue_ts[1] = min_queue_ts[0];
104 min_queue_ts[0] = queue.min_ts_;
105 min_queue_idx = i;
106 has_queues_with_expired_events = true;
107 } else if (queue.min_ts_ < min_queue_ts[1]) {
108 min_queue_ts[1] = queue.min_ts_;
109 }
110 }
111 if (!has_queues_with_expired_events) {
112 // All the queues have events that start after the window (i.e. they are
113 // too recent and not eligible to be extracted given the current window).
114 break;
115 }
116
117 Queue& queue = queues_[min_queue_idx];
118 auto& events = queue.events_;
119 if (queue.needs_sorting())
120 queue.Sort();
121 PERFETTO_DCHECK(queue.min_ts_ == events.front().timestamp);
122 PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
123
124 // Now that we identified the min-queue, extract all events from it until
125 // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
126 // whichever comes first.
127 int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
128 size_t num_extracted = 0;
129 for (auto& event : events) {
130 int64_t timestamp = event.timestamp;
131 if (timestamp > extract_until_ts)
132 break;
133
134 ++num_extracted;
135 if (bypass_next_stage_for_testing_)
136 continue;
137
138 if (min_queue_idx == 0) {
139 // queues_[0] is for non-ftrace packets.
140 parser_->ParseTracePacket(timestamp, std::move(event));
141 } else {
142 // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
143 uint32_t cpu = static_cast<uint32_t>(min_queue_idx - 1);
144 parser_->ParseFtracePacket(cpu, timestamp, std::move(event));
145 }
146 } // for (event: events)
147
148 if (!num_extracted) {
149 // No events can be extracted from any of the queues. This means that
150 // either we hit the window or all queues are empty.
151 break;
152 }
153
154 // Now remove the entries from the event buffer and update the queue-local
155 // and global time bounds.
156 events.erase_front(num_extracted);
157
158 // Update the global_{min,max}_ts to reflect the bounds after extraction.
159 if (events.empty()) {
160 queue.min_ts_ = kTsMax;
161 queue.max_ts_ = 0;
162 global_min_ts_ = min_queue_ts[1];
163
164 // If we extraced the max entry from a queue (i.e. we emptied the queue)
165 // we need to recompute the global max, because it might have been the one
166 // just extracted.
167 global_max_ts_ = 0;
168 for (auto& q : queues_)
169 global_max_ts_ = std::max(global_max_ts_, q.max_ts_);
170 } else {
171 queue.min_ts_ = queue.events_.front().timestamp;
172 global_min_ts_ = std::min(queue.min_ts_, min_queue_ts[1]);
173 }
174 } // for(;;)
175
176 // We decide to extract events only when we know (using the global_{min,max}
177 // bounds) that there are eligible events. We should never end up in a
178 // situation where we call this function but then realize that there was
179 // nothing to extract.
180 PERFETTO_DCHECK(iterations > 0 || was_empty);
181
182 #if PERFETTO_DCHECK_IS_ON()
183 // Check that the global min/max are consistent.
184 int64_t dbg_min_ts = kTsMax;
185 int64_t dbg_max_ts = 0;
186 for (auto& q : queues_) {
187 dbg_min_ts = std::min(dbg_min_ts, q.min_ts_);
188 dbg_max_ts = std::max(dbg_max_ts, q.max_ts_);
189 }
190 PERFETTO_DCHECK(global_min_ts_ == dbg_min_ts);
191 PERFETTO_DCHECK(global_max_ts_ == dbg_max_ts);
192 #endif
193 }
194
195 } // namespace trace_processor
196 } // namespace perfetto
197