• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
19 
20 #include <vector>
21 
22 #include "perfetto/ext/base/circular_queue.h"
23 #include "perfetto/trace_processor/basic_types.h"
24 #include "src/trace_processor/storage/trace_storage.h"
25 #include "src/trace_processor/timestamped_trace_piece.h"
26 #include "src/trace_processor/trace_blob_view.h"
27 
28 namespace Json {
29 class Value;
30 }  // namespace Json
31 
32 namespace perfetto {
33 namespace trace_processor {
34 
35 class FuchsiaProviderView;
36 class PacketSequenceState;
37 struct SystraceLine;
38 
39 // This class takes care of sorting events parsed from the trace stream in
40 // arbitrary order and pushing them to the next pipeline stages (parsing) in
41 // order. In order to support streaming use-cases, sorting happens within a
42 // max window. Events are held in the TraceSorter staging area (events_) until
43 // either (1) the (max - min) timestamp > window_size; (2) trace EOF.
44 //
45 // This class is designed around the assumption that:
46 // - Most events come from ftrace.
47 // - Ftrace events are sorted within each cpu most of the times.
48 //
49 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
50 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
51 // necessary) before proceeding with the global merge-sort-extract.
52 // When an event is pushed through, it is just appeneded to the end of one of
53 // the N queues. While appending, we keep track of the fact that the queue
54 // is still ordered or just lost ordering. When an out-of-order event is
55 // detected on a queue we keep track of: (1) the offset within the queue where
56 // the chaos begun, (2) the timestamp that broke the ordering.
57 // When we decide to extract events from the queues into the next stages of
58 // the trace processor, we re-sort the events in the queue. Rather than
59 // re-sorting everything all the times, we use the above knowledge to restrict
60 // sorting to the (hopefully smaller) tail of the |events_| staging area.
61 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
62 // ordered, and the second partition [sort_start_idx_.. end] is not.
63 // We use a logarithmic bound search operation to figure out what is the index
64 // within the first partition where sorting should start, and sort all events
65 // from there to the end.
66 class TraceSorter {
67  public:
68   TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns);
69 
PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)70   inline void PushTracePacket(int64_t timestamp,
71                               PacketSequenceState* state,
72                               TraceBlobView packet) {
73     DCHECK_ftrace_batch_cpu(kNoBatch);
74     auto* queue = GetQueue(0);
75     queue->Append(TimestampedTracePiece(timestamp, packet_idx_++,
76                                         std::move(packet),
77                                         state->current_generation()));
78     MaybeExtractEvents(queue);
79   }
80 
PushJsonValue(int64_t timestamp,std::unique_ptr<Json::Value> json_value)81   inline void PushJsonValue(int64_t timestamp,
82                             std::unique_ptr<Json::Value> json_value) {
83     auto* queue = GetQueue(0);
84     queue->Append(
85         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
86     MaybeExtractEvents(queue);
87   }
88 
PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)89   inline void PushFuchsiaRecord(int64_t timestamp,
90                                 std::unique_ptr<FuchsiaRecord> record) {
91     DCHECK_ftrace_batch_cpu(kNoBatch);
92     auto* queue = GetQueue(0);
93     queue->Append(
94         TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
95     MaybeExtractEvents(queue);
96   }
97 
PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)98   inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
99     DCHECK_ftrace_batch_cpu(kNoBatch);
100     auto* queue = GetQueue(0);
101     int64_t timestamp = systrace_line->ts;
102     queue->Append(TimestampedTracePiece(timestamp, packet_idx_++,
103                                         std::move(systrace_line)));
104     MaybeExtractEvents(queue);
105   }
106 
PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event)107   inline void PushFtraceEvent(uint32_t cpu,
108                               int64_t timestamp,
109                               TraceBlobView event) {
110     set_ftrace_batch_cpu_for_DCHECK(cpu);
111     GetQueue(cpu + 1)->Append(
112         TimestampedTracePiece(timestamp, packet_idx_++, std::move(event)));
113 
114     // The caller must call FinalizeFtraceEventBatch() after having pushed a
115     // batch of ftrace events. This is to amortize the overhead of handling
116     // global ordering and doing that in batches only after all ftrace events
117     // for a bundle are pushed.
118   }
119 
120   // As with |PushFtraceEvent|, doesn't immediately sort the affected queues.
121   // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
122   // pushed through this function), the ftrace batches will no longer be fully
123   // sorted by timestamp. In such situations, we will have to sort at the end of
124   // the batch. We can do better as both sub-sequences are sorted however.
125   // Consider adding extra queues, or pushing them in a merge-sort fashion
126   // instead.
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)127   inline void PushInlineFtraceEvent(uint32_t cpu,
128                                     int64_t timestamp,
129                                     InlineSchedSwitch inline_sched_switch) {
130     set_ftrace_batch_cpu_for_DCHECK(cpu);
131     GetQueue(cpu + 1)->Append(
132         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
133   }
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)134   inline void PushInlineFtraceEvent(uint32_t cpu,
135                                     int64_t timestamp,
136                                     InlineSchedWaking inline_sched_waking) {
137     set_ftrace_batch_cpu_for_DCHECK(cpu);
138     GetQueue(cpu + 1)->Append(
139         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
140   }
141 
PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)142   inline void PushTrackEventPacket(int64_t timestamp,
143                                    std::unique_ptr<TrackEventData> data) {
144     auto* queue = GetQueue(0);
145     queue->Append(
146         TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
147     MaybeExtractEvents(queue);
148   }
149 
FinalizeFtraceEventBatch(uint32_t cpu)150   inline void FinalizeFtraceEventBatch(uint32_t cpu) {
151     DCHECK_ftrace_batch_cpu(cpu);
152     set_ftrace_batch_cpu_for_DCHECK(kNoBatch);
153     MaybeExtractEvents(GetQueue(cpu + 1));
154   }
155 
156   // Extract all events ignoring the window.
ExtractEventsForced()157   void ExtractEventsForced() {
158     SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0);
159     queues_.resize(0);
160   }
161 
162   // Sets the window size to be the size specified (which should be lower than
163   // any previous window size specified) and flushes any data beyond
164   // this window size.
165   // It is undefined to call this function with a window size greater than than
166   // the current size.
SetWindowSizeNs(int64_t window_size_ns)167   void SetWindowSizeNs(int64_t window_size_ns) {
168     PERFETTO_DCHECK(window_size_ns <= window_size_ns_);
169 
170     PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns);
171     window_size_ns_ = window_size_ns;
172 
173     // Fast path: if, globally, we are within the window size, then just exit.
174     if (global_max_ts_ - global_min_ts_ < window_size_ns)
175       return;
176     SortAndExtractEventsBeyondWindow(window_size_ns_);
177   }
178 
max_timestamp()179   int64_t max_timestamp() const { return global_max_ts_; }
180 
181  private:
182   static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max();
183 
184   struct Queue {
AppendQueue185     inline void Append(TimestampedTracePiece ttp) {
186       const int64_t timestamp = ttp.timestamp;
187       events_.emplace_back(std::move(ttp));
188       min_ts_ = std::min(min_ts_, timestamp);
189 
190       // Events are often seen in order.
191       if (PERFETTO_LIKELY(timestamp >= max_ts_)) {
192         max_ts_ = timestamp;
193       } else {
194         // The event is breaking ordering. The first time it happens, keep
195         // track of which index we are at. We know that everything before that
196         // is sorted (because events were pushed monotonically). Everything
197         // after that index, instead, will need a sorting pass before moving
198         // events to the next pipeline stage.
199         if (sort_start_idx_ == 0) {
200           PERFETTO_DCHECK(events_.size() >= 2);
201           sort_start_idx_ = events_.size() - 1;
202           sort_min_ts_ = timestamp;
203         } else {
204           sort_min_ts_ = std::min(sort_min_ts_, timestamp);
205         }
206       }
207 
208       PERFETTO_DCHECK(min_ts_ <= max_ts_);
209     }
210 
needs_sortingQueue211     bool needs_sorting() const { return sort_start_idx_ != 0; }
212     void Sort();
213 
214     base::CircularQueue<TimestampedTracePiece> events_;
215     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
216     int64_t max_ts_ = 0;
217     size_t sort_start_idx_ = 0;
218     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
219   };
220 
221   // This method passes any events older than window_size_ns to the
222   // parser to be parsed and then stored.
223   void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns);
224 
GetQueue(size_t index)225   inline Queue* GetQueue(size_t index) {
226     if (PERFETTO_UNLIKELY(index >= queues_.size()))
227       queues_.resize(index + 1);
228     return &queues_[index];
229   }
230 
MaybeExtractEvents(Queue * queue)231   inline void MaybeExtractEvents(Queue* queue) {
232     DCHECK_ftrace_batch_cpu(kNoBatch);
233     global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
234     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
235 
236     // Fast path: if, globally, we are within the window size, then just exit.
237     if (global_max_ts_ - global_min_ts_ < window_size_ns_)
238       return;
239     SortAndExtractEventsBeyondWindow(window_size_ns_);
240   }
241 
242   std::unique_ptr<TraceParser> parser_;
243 
244   // queues_[0] is the general (non-ftrace) queue.
245   // queues_[1] is the ftrace queue for CPU(0).
246   // queues_[x] is the ftrace queue for CPU(x - 1).
247   std::vector<Queue> queues_;
248 
249   // Events are propagated to the next stage only after (max - min) timestamp
250   // is larger than this value.
251   int64_t window_size_ns_;
252 
253   // max(e.timestamp for e in queues_).
254   int64_t global_max_ts_ = 0;
255 
256   // min(e.timestamp for e in queues_).
257   int64_t global_min_ts_ = std::numeric_limits<int64_t>::max();
258 
259   // Monotonic increasing value used to index timestamped trace pieces.
260   uint64_t packet_idx_ = 0;
261 
262   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
263   bool bypass_next_stage_for_testing_ = false;
264 
265 #if PERFETTO_DCHECK_IS_ON()
266   // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called.
267   uint32_t ftrace_batch_cpu_ = kNoBatch;
268 
DCHECK_ftrace_batch_cpu(uint32_t cpu)269   inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) {
270     PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu);
271   }
272 
set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)273   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) {
274     PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch ||
275                     cpu == kNoBatch);
276     ftrace_batch_cpu_ = cpu;
277   }
278 #else
DCHECK_ftrace_batch_cpu(uint32_t)279   inline void DCHECK_ftrace_batch_cpu(uint32_t) {}
set_ftrace_batch_cpu_for_DCHECK(uint32_t)280   inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {}
281 #endif
282 };
283 
284 }  // namespace trace_processor
285 }  // namespace perfetto
286 
287 #endif  // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
288