• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <algorithm>
18 #include <utility>
19 
20 #include "perfetto/ext/base/utils.h"
21 #include "src/trace_processor/importers/proto/proto_trace_parser.h"
22 #include "src/trace_processor/trace_sorter.h"
23 
24 namespace perfetto {
25 namespace trace_processor {
26 
TraceSorter(TraceProcessorContext * context,std::unique_ptr<TraceParser> parser,SortingMode sorting_mode)27 TraceSorter::TraceSorter(TraceProcessorContext* context,
28                          std::unique_ptr<TraceParser> parser,
29                          SortingMode sorting_mode)
30     : context_(context),
31       parser_(std::move(parser)),
32       sorting_mode_(sorting_mode) {
33   const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
34   bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
35   if (bypass_next_stage_for_testing_)
36     PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
37 }
38 
Sort()39 void TraceSorter::Queue::Sort() {
40   PERFETTO_DCHECK(needs_sorting());
41   PERFETTO_DCHECK(sort_start_idx_ < events_.size());
42 
43   // If sort_min_ts_ has been set, it will no long be max_int, and so will be
44   // smaller than max_ts_.
45   PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
46 
47   // We know that all events between [0, sort_start_idx_] are sorted. Within
48   // this range, perform a bound search and find the iterator for the min
49   // timestamp that broke the monotonicity. Re-sort from there to the end.
50   auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
51   PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
52   auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
53                                      &TimestampedTracePiece::Compare);
54   std::sort(sort_begin, events_.end());
55   sort_start_idx_ = 0;
56   sort_min_ts_ = 0;
57 
58   // At this point |events_| must be fully sorted.
59   PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
60 }
61 
62 // Removes all the events in |queues_| that are earlier than the given
63 // packet index and moves them to the next parser stages, respecting global
64 // timestamp order. This function is a "extract min from N sorted queues", with
65 // some little cleverness: we know that events tend to be bursty, so events are
66 // not going to be randomly distributed on the N |queues_|.
67 // Upon each iteration this function finds the first two queues (if any) that
68 // have the oldest events, and extracts events from the 1st until hitting the
69 // min_ts of the 2nd. Imagine the queues are as follows:
70 //
71 //  q0           {min_ts: 10  max_ts: 30}
72 //  q1    {min_ts:5              max_ts: 35}
73 //  q2              {min_ts: 12    max_ts: 40}
74 //
75 // We know that we can extract all events from q1 until we hit ts=10 without
76 // looking at any other queue. After hitting ts=10, we need to re-look to all of
77 // them to figure out the next min-event.
78 // There are more suitable data structures to do this (e.g. keeping a min-heap
79 // to avoid re-scanning all the queues all the times) but doesn't seem worth it.
80 // With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
81 // time in a profiler.
SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx)82 void TraceSorter::SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx) {
83   constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
84   for (;;) {
85     size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
86 
87     // The top-2 min(ts) among all queues.
88     // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
89     int64_t min_queue_ts[2]{kTsMax, kTsMax};
90 
91     // This loop identifies the queue which starts with the earliest event and
92     // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
93     bool has_queues_with_expired_events = false;
94     for (size_t i = 0; i < queues_.size(); i++) {
95       auto& queue = queues_[i];
96       if (queue.events_.empty())
97         continue;
98       PERFETTO_DCHECK(queue.min_ts_ >= global_min_ts_);
99       PERFETTO_DCHECK(queue.max_ts_ <= global_max_ts_);
100       if (queue.min_ts_ < min_queue_ts[0]) {
101         min_queue_ts[1] = min_queue_ts[0];
102         min_queue_ts[0] = queue.min_ts_;
103         min_queue_idx = i;
104         has_queues_with_expired_events = true;
105       } else if (queue.min_ts_ < min_queue_ts[1]) {
106         min_queue_ts[1] = queue.min_ts_;
107       }
108     }
109     if (!has_queues_with_expired_events) {
110       // All the queues have events that start after the window (i.e. they are
111       // too recent and not eligible to be extracted given the current window).
112       break;
113     }
114 
115     Queue& queue = queues_[min_queue_idx];
116     auto& events = queue.events_;
117     if (queue.needs_sorting())
118       queue.Sort();
119     PERFETTO_DCHECK(queue.min_ts_ == events.front().timestamp);
120     PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
121 
122     // Now that we identified the min-queue, extract all events from it until
123     // we hit either: (1) the min-ts of the 2nd queue or (2) the packet index
124     // limit, whichever comes first.
125     size_t num_extracted = 0;
126     for (auto& event : events) {
127       if (event.packet_idx >= limit_packet_idx ||
128           event.timestamp > min_queue_ts[1]) {
129         break;
130       }
131 
132       ++num_extracted;
133       MaybePushEvent(min_queue_idx, std::move(event));
134     }  // for (event: events)
135 
136     if (!num_extracted) {
137       // No events can be extracted from any of the queues. This means that
138       // either we hit the window or all queues are empty.
139       break;
140     }
141 
142     // Now remove the entries from the event buffer and update the queue-local
143     // and global time bounds.
144     events.erase_front(num_extracted);
145 
146     // Update the global_{min,max}_ts to reflect the bounds after extraction.
147     if (events.empty()) {
148       queue.min_ts_ = kTsMax;
149       queue.max_ts_ = 0;
150       global_min_ts_ = min_queue_ts[1];
151 
152       // If we extraced the max entry from a queue (i.e. we emptied the queue)
153       // we need to recompute the global max, because it might have been the one
154       // just extracted.
155       global_max_ts_ = 0;
156       for (auto& q : queues_)
157         global_max_ts_ = std::max(global_max_ts_, q.max_ts_);
158     } else {
159       queue.min_ts_ = queue.events_.front().timestamp;
160       global_min_ts_ = std::min(queue.min_ts_, min_queue_ts[1]);
161     }
162   }  // for(;;)
163 
164 #if PERFETTO_DCHECK_IS_ON()
165   // Check that the global min/max are consistent.
166   int64_t dbg_min_ts = kTsMax;
167   int64_t dbg_max_ts = 0;
168   for (auto& q : queues_) {
169     dbg_min_ts = std::min(dbg_min_ts, q.min_ts_);
170     dbg_max_ts = std::max(dbg_max_ts, q.max_ts_);
171   }
172   PERFETTO_DCHECK(global_min_ts_ == dbg_min_ts);
173   PERFETTO_DCHECK(global_max_ts_ == dbg_max_ts);
174 #endif
175 }
176 
MaybePushEvent(size_t queue_idx,TimestampedTracePiece ttp)177 void TraceSorter::MaybePushEvent(size_t queue_idx, TimestampedTracePiece ttp) {
178   int64_t timestamp = ttp.timestamp;
179   if (timestamp < latest_pushed_event_ts_)
180     context_->storage->IncrementStats(stats::sorter_push_event_out_of_order);
181 
182   latest_pushed_event_ts_ = std::max(latest_pushed_event_ts_, timestamp);
183 
184   if (PERFETTO_UNLIKELY(bypass_next_stage_for_testing_))
185     return;
186 
187   if (queue_idx == 0) {
188     // queues_[0] is for non-ftrace packets.
189     parser_->ParseTracePacket(timestamp, std::move(ttp));
190   } else {
191     // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
192     uint32_t cpu = static_cast<uint32_t>(queue_idx - 1);
193     parser_->ParseFtracePacket(cpu, timestamp, std::move(ttp));
194   }
195 }
196 
197 }  // namespace trace_processor
198 }  // namespace perfetto
199