• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
19 
20 #include <algorithm>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 
25 #include "perfetto/ext/base/circular_queue.h"
26 #include "perfetto/trace_processor/basic_types.h"
27 #include "perfetto/trace_processor/trace_blob_view.h"
28 #include "src/trace_processor/timestamped_trace_piece.h"
29 
30 namespace perfetto {
31 namespace trace_processor {
32 
33 class PacketSequenceState;
34 struct SystraceLine;
35 
36 // This class takes care of sorting events parsed from the trace stream in
37 // arbitrary order and pushing them to the next pipeline stages (parsing) in
38 // order. In order to support streaming use-cases, sorting happens within a
39 // window.
40 //
41 // Events are held in the TraceSorter staging area (events_) until either:
42 // 1. We can determine that it's safe to extract events by observing
43 //  TracingServiceEvent Flush and ReadBuffer events
44 // 2. The trace EOF is reached
45 //
46 // Incremental extraction
47 //
48 // Incremental extraction happens by using a combination of flush and read
49 // buffer events from the tracing service. Note that incremental extraction
50 // is only applicable for write_into_file traces; ring-buffer traces will
51 // be sorted fully in-memory implicitly because there is only a single read
52 // buffer call at the end.
53 //
54 // The algorithm for incremental extraction is explained in detail at
55 // go/trace-sorting-is-complicated.
56 //
57 // Sorting algorithm
58 //
59 // The sorting algorithm is designed around the assumption that:
60 // - Most events come from ftrace.
61 // - Ftrace events are sorted within each cpu most of the times.
62 //
63 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
64 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
65 // necessary) before proceeding with the global merge-sort-extract.
66 //
67 // When an event is pushed through, it is just appended to the end of one of
68 // the N queues. While appending, we keep track of the fact that the queue
69 // is still ordered or just lost ordering. When an out-of-order event is
70 // detected on a queue we keep track of: (1) the offset within the queue where
71 // the chaos begun, (2) the timestamp that broke the ordering.
72 //
73 // When we decide to extract events from the queues into the next stages of
74 // the trace processor, we re-sort the events in the queue. Rather than
75 // re-sorting everything all the times, we use the above knowledge to restrict
76 // sorting to the (hopefully smaller) tail of the |events_| staging area.
77 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
78 // ordered, and the second partition [sort_start_idx_.. end] is not.
79 // We use a logarithmic bound search operation to figure out what is the index
80 // within the first partition where sorting should start, and sort all events
81 // from there to the end.
82 class TraceSorter {
83  public:
84   enum class SortingMode {
85     kDefault,
86     kFullSort,
87   };
88 
89   TraceSorter(TraceProcessorContext* context,
90               std::unique_ptr<TraceParser> parser,
91               SortingMode);
92 
PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)93   inline void PushTracePacket(int64_t timestamp,
94                               PacketSequenceState* state,
95                               TraceBlobView packet) {
96     AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
97                                                std::move(packet),
98                                                state->current_generation()));
99   }
100 
PushJsonValue(int64_t timestamp,std::string json_value)101   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
102     AppendNonFtraceEvent(
103         TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value)));
104   }
105 
PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)106   inline void PushFuchsiaRecord(int64_t timestamp,
107                                 std::unique_ptr<FuchsiaRecord> record) {
108     AppendNonFtraceEvent(
109         TimestampedTracePiece(timestamp, packet_idx_++, std::move(record)));
110   }
111 
PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)112   inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) {
113     int64_t timestamp = systrace_line->ts;
114     AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++,
115                                                std::move(systrace_line)));
116   }
117 
PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)118   inline void PushTrackEventPacket(int64_t timestamp,
119                                    std::unique_ptr<TrackEventData> data) {
120     AppendNonFtraceEvent(
121         TimestampedTracePiece(timestamp, packet_idx_++, std::move(data)));
122   }
123 
PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event,PacketSequenceState * state)124   inline void PushFtraceEvent(uint32_t cpu,
125                               int64_t timestamp,
126                               TraceBlobView event,
127                               PacketSequenceState* state) {
128     auto* queue = GetQueue(cpu + 1);
129     queue->Append(TimestampedTracePiece(
130         timestamp, packet_idx_++,
131         FtraceEventData{std::move(event), state->current_generation()}));
132     UpdateGlobalTs(queue);
133   }
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)134   inline void PushInlineFtraceEvent(uint32_t cpu,
135                                     int64_t timestamp,
136                                     InlineSchedSwitch inline_sched_switch) {
137     // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being
138     // pushed through this function), the ftrace batches will no longer be fully
139     // sorted by timestamp. In such situations, we will have to sort at the end
140     // of the batch. We can do better as both sub-sequences are sorted however.
141     // Consider adding extra queues, or pushing them in a merge-sort fashion
142     // instead.
143     auto* queue = GetQueue(cpu + 1);
144     queue->Append(
145         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch));
146     UpdateGlobalTs(queue);
147   }
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)148   inline void PushInlineFtraceEvent(uint32_t cpu,
149                                     int64_t timestamp,
150                                     InlineSchedWaking inline_sched_waking) {
151     auto* queue = GetQueue(cpu + 1);
152     queue->Append(
153         TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking));
154     UpdateGlobalTs(queue);
155   }
156 
ExtractEventsForced()157   void ExtractEventsForced() {
158     SortAndExtractEventsUntilPacket(packet_idx_);
159     queues_.resize(0);
160 
161     packet_idx_for_extraction_ = packet_idx_;
162     flushes_since_extraction_ = 0;
163   }
164 
NotifyFlushEvent()165   void NotifyFlushEvent() { flushes_since_extraction_++; }
166 
NotifyReadBufferEvent()167   void NotifyReadBufferEvent() {
168     if (sorting_mode_ == SortingMode::kFullSort ||
169         flushes_since_extraction_ < 2) {
170       return;
171     }
172 
173     SortAndExtractEventsUntilPacket(packet_idx_for_extraction_);
174     packet_idx_for_extraction_ = packet_idx_;
175     flushes_since_extraction_ = 0;
176   }
177 
max_timestamp()178   int64_t max_timestamp() const { return global_max_ts_; }
179 
180  private:
181   static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max();
182 
183   struct Queue {
AppendQueue184     inline void Append(TimestampedTracePiece ttp) {
185       const int64_t timestamp = ttp.timestamp;
186       events_.emplace_back(std::move(ttp));
187       min_ts_ = std::min(min_ts_, timestamp);
188 
189       // Events are often seen in order.
190       if (PERFETTO_LIKELY(timestamp >= max_ts_)) {
191         max_ts_ = timestamp;
192       } else {
193         // The event is breaking ordering. The first time it happens, keep
194         // track of which index we are at. We know that everything before that
195         // is sorted (because events were pushed monotonically). Everything
196         // after that index, instead, will need a sorting pass before moving
197         // events to the next pipeline stage.
198         if (sort_start_idx_ == 0) {
199           PERFETTO_DCHECK(events_.size() >= 2);
200           sort_start_idx_ = events_.size() - 1;
201           sort_min_ts_ = timestamp;
202         } else {
203           sort_min_ts_ = std::min(sort_min_ts_, timestamp);
204         }
205       }
206 
207       PERFETTO_DCHECK(min_ts_ <= max_ts_);
208     }
209 
needs_sortingQueue210     bool needs_sorting() const { return sort_start_idx_ != 0; }
211     void Sort();
212 
213     base::CircularQueue<TimestampedTracePiece> events_;
214     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
215     int64_t max_ts_ = 0;
216     size_t sort_start_idx_ = 0;
217     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
218   };
219 
220   void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx);
221 
GetQueue(size_t index)222   inline Queue* GetQueue(size_t index) {
223     if (PERFETTO_UNLIKELY(index >= queues_.size()))
224       queues_.resize(index + 1);
225     return &queues_[index];
226   }
227 
AppendNonFtraceEvent(TimestampedTracePiece ttp)228   inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) {
229     Queue* queue = GetQueue(0);
230     queue->Append(std::move(ttp));
231     UpdateGlobalTs(queue);
232   }
233 
UpdateGlobalTs(Queue * queue)234   inline void UpdateGlobalTs(Queue* queue) {
235     global_min_ts_ = std::min(global_min_ts_, queue->min_ts_);
236     global_max_ts_ = std::max(global_max_ts_, queue->max_ts_);
237   }
238 
239   void MaybePushEvent(size_t queue_idx,
240                       TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE;
241 
242   TraceProcessorContext* context_;
243   std::unique_ptr<TraceParser> parser_;
244 
245   // Whether we should ignore incremental extraction and just wait for
246   // forced extractionn at the end of the trace.
247   SortingMode sorting_mode_ = SortingMode::kDefault;
248 
249   // The packet index until which events should be extracted. Set based
250   // on the packet index in |OnReadBuffer|.
251   uint64_t packet_idx_for_extraction_ = 0;
252 
253   // The number of flushes which have happened since the last incremental
254   // extraction.
255   uint32_t flushes_since_extraction_ = 0;
256 
257   // queues_[0] is the general (non-ftrace) queue.
258   // queues_[1] is the ftrace queue for CPU(0).
259   // queues_[x] is the ftrace queue for CPU(x - 1).
260   std::vector<Queue> queues_;
261 
262   // max(e.timestamp for e in queues_).
263   int64_t global_max_ts_ = 0;
264 
265   // min(e.timestamp for e in queues_).
266   int64_t global_min_ts_ = std::numeric_limits<int64_t>::max();
267 
268   // Monotonic increasing value used to index timestamped trace pieces.
269   uint64_t packet_idx_ = 0;
270 
271   // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1.
272   bool bypass_next_stage_for_testing_ = false;
273 
274   // max(e.ts for e pushed to next stage)
275   int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
276 };
277 
278 }  // namespace trace_processor
279 }  // namespace perfetto
280 
281 #endif  // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_
282