• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
19 
20 #include <algorithm>
21 #include <memory>
22 #include <utility>
23 #include <vector>
24 
25 #include "perfetto/ext/base/circular_queue.h"
26 #include "perfetto/ext/base/utils.h"
27 #include "perfetto/trace_processor/basic_types.h"
28 #include "perfetto/trace_processor/trace_blob_view.h"
29 #include "src/trace_processor/importers/common/parser_types.h"
30 #include "src/trace_processor/importers/common/trace_parser.h"
31 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
32 #include "src/trace_processor/importers/systrace/systrace_line.h"
33 #include "src/trace_processor/sorter/trace_token_buffer.h"
34 #include "src/trace_processor/types/trace_processor_context.h"
35 #include "src/trace_processor/util/bump_allocator.h"
36 
37 namespace perfetto {
38 namespace trace_processor {
39 
40 // This class takes care of sorting events parsed from the trace stream in
41 // arbitrary order and pushing them to the next pipeline stages (parsing) in
42 // order. In order to support streaming use-cases, sorting happens within a
43 // window.
44 //
45 // Events are held in the TraceSorter staging area (events_) until either:
46 // 1. We can determine that it's safe to extract events by observing
47 //  TracingServiceEvent Flush and ReadBuffer events
48 // 2. The trace EOF is reached
49 //
50 // Incremental extraction
51 //
52 // Incremental extraction happens by using a combination of flush and read
53 // buffer events from the tracing service. Note that incremental extraction
54 // is only applicable for write_into_file traces; ring-buffer traces will
55 // be sorted fully in-memory implicitly because there is only a single read
56 // buffer call at the end.
57 //
58 // The algorithm for incremental extraction is explained in detail at
59 // go/trace-sorting-is-complicated.
60 //
61 // Sorting algorithm
62 //
63 // The sorting algorithm is designed around the assumption that:
64 // - Most events come from ftrace.
65 // - Ftrace events are sorted within each cpu most of the times.
66 //
67 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
68 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
69 // necessary) before proceeding with the global merge-sort-extract.
70 //
71 // When an event is pushed through, it is just appended to the end of one of
72 // the N queues. While appending, we keep track of the fact that the queue
73 // is still ordered or just lost ordering. When an out-of-order event is
74 // detected on a queue we keep track of: (1) the offset within the queue where
75 // the chaos begun, (2) the timestamp that broke the ordering.
76 //
77 // When we decide to extract events from the queues into the next stages of
78 // the trace processor, we re-sort the events in the queue. Rather than
79 // re-sorting everything all the times, we use the above knowledge to restrict
80 // sorting to the (hopefully smaller) tail of the |events_| staging area.
81 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
82 // ordered, and the second partition [sort_start_idx_.. end] is not.
83 // We use a logarithmic bound search operation to figure out what is the index
84 // within the first partition where sorting should start, and sort all events
85 // from there to the end.
86 class TraceSorter {
87  public:
88   enum class SortingMode {
89     kDefault,
90     kFullSort,
91   };
92 
93   TraceSorter(TraceProcessorContext* context,
94               std::unique_ptr<TraceParser> parser,
95               SortingMode);
96   ~TraceSorter();
97 
PushTracePacket(int64_t timestamp,RefPtr<PacketSequenceStateGeneration> state,TraceBlobView tbv)98   inline void PushTracePacket(int64_t timestamp,
99                               RefPtr<PacketSequenceStateGeneration> state,
100                               TraceBlobView tbv) {
101     TraceTokenBuffer::Id id =
102         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
103     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket, id);
104   }
105 
PushJsonValue(int64_t timestamp,std::string json_value)106   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
107     TraceTokenBuffer::Id id =
108         token_buffer_.Append(JsonEvent{std::move(json_value)});
109     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue, id);
110   }
111 
PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)112   inline void PushFuchsiaRecord(int64_t timestamp,
113                                 FuchsiaRecord fuchsia_record) {
114     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(fuchsia_record));
115     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord, id);
116   }
117 
PushSystraceLine(SystraceLine systrace_line)118   inline void PushSystraceLine(SystraceLine systrace_line) {
119     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(systrace_line));
120     AppendNonFtraceEvent(systrace_line.ts,
121                          TimestampedEvent::Type::kSystraceLine, id);
122   }
123 
PushTrackEventPacket(int64_t timestamp,TrackEventData track_event)124   inline void PushTrackEventPacket(int64_t timestamp,
125                                    TrackEventData track_event) {
126     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(track_event));
127     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent, id);
128   }
129 
PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView tbv,RefPtr<PacketSequenceStateGeneration> state)130   inline void PushFtraceEvent(uint32_t cpu,
131                               int64_t timestamp,
132                               TraceBlobView tbv,
133                               RefPtr<PacketSequenceStateGeneration> state) {
134     TraceTokenBuffer::Id id =
135         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
136     auto* queue = GetQueue(cpu + 1);
137     queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id);
138     UpdateAppendMaxTs(queue);
139   }
140 
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)141   inline void PushInlineFtraceEvent(uint32_t cpu,
142                                     int64_t timestamp,
143                                     InlineSchedSwitch inline_sched_switch) {
144     // TODO(rsavitski): if a trace has a mix of normal & "compact" events
145     // (being pushed through this function), the ftrace batches will no longer
146     // be fully sorted by timestamp. In such situations, we will have to sort
147     // at the end of the batch. We can do better as both sub-sequences are
148     // sorted however. Consider adding extra queues, or pushing them in a
149     // merge-sort fashion
150     // // instead.
151     TraceTokenBuffer::Id id =
152         token_buffer_.Append(std::move(inline_sched_switch));
153     auto* queue = GetQueue(cpu + 1);
154     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id);
155     UpdateAppendMaxTs(queue);
156   }
157 
PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)158   inline void PushInlineFtraceEvent(uint32_t cpu,
159                                     int64_t timestamp,
160                                     InlineSchedWaking inline_sched_waking) {
161     TraceTokenBuffer::Id id =
162         token_buffer_.Append(std::move(inline_sched_waking));
163     auto* queue = GetQueue(cpu + 1);
164     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id);
165     UpdateAppendMaxTs(queue);
166   }
167 
ExtractEventsForced()168   void ExtractEventsForced() {
169     BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId();
170     SortAndExtractEventsUntilAllocId(end_id);
171     for (const auto& queue : queues_) {
172       PERFETTO_DCHECK(queue.events_.empty());
173     }
174     queues_.clear();
175 
176     alloc_id_for_extraction_ = end_id;
177     flushes_since_extraction_ = 0;
178   }
179 
NotifyFlushEvent()180   void NotifyFlushEvent() { flushes_since_extraction_++; }
181 
NotifyReadBufferEvent()182   void NotifyReadBufferEvent() {
183     if (sorting_mode_ == SortingMode::kFullSort ||
184         flushes_since_extraction_ < 2) {
185       return;
186     }
187 
188     SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_);
189     alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId();
190     flushes_since_extraction_ = 0;
191   }
192 
max_timestamp()193   int64_t max_timestamp() const { return append_max_ts_; }
194 
195  private:
196   struct TimestampedEvent {
197     enum class Type : uint8_t {
198       kFtraceEvent,
199       kTracePacket,
200       kInlineSchedSwitch,
201       kInlineSchedWaking,
202       kJsonValue,
203       kFuchsiaRecord,
204       kTrackEvent,
205       kSystraceLine,
206       kMax = kSystraceLine,
207     };
208 
209     // Number of bits required to store the max element in |Type|.
210     static constexpr uint32_t kMaxTypeBits = 4;
211     static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits),
212                   "Max type does not fit inside storage");
213 
214     // The timestamp of this event.
215     int64_t ts;
216 
217     // The fields inside BumpAllocator::AllocId of this tokenized object
218     // corresponding to this event.
219     uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits;
220     uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits;
221 
222     // The type of this event. GCC7 does not like bit-field enums (see
223     // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of)
224     // so use an uint64_t instead and cast to the enum type.
225     uint64_t event_type : kMaxTypeBits;
226 
alloc_idTimestampedEvent227     BumpAllocator::AllocId alloc_id() const {
228       return BumpAllocator::AllocId{chunk_index, chunk_offset};
229     }
230 
231     // For std::lower_bound().
CompareTimestampedEvent232     static inline bool Compare(const TimestampedEvent& x, int64_t ts) {
233       return x.ts < ts;
234     }
235 
236     // For std::sort().
237     inline bool operator<(const TimestampedEvent& evt) const {
238       return std::tie(ts, chunk_index, chunk_offset) <
239              std::tie(evt.ts, evt.chunk_index, evt.chunk_offset);
240     }
241   };
242   static_assert(sizeof(TimestampedEvent) == 16,
243                 "TimestampedEvent must be equal to 16 bytes");
244   static_assert(std::is_trivially_copyable<TimestampedEvent>::value,
245                 "TimestampedEvent must be trivially copyable");
246   static_assert(std::is_trivially_move_assignable<TimestampedEvent>::value,
247                 "TimestampedEvent must be trivially move assignable");
248   static_assert(std::is_trivially_move_constructible<TimestampedEvent>::value,
249                 "TimestampedEvent must be trivially move constructible");
250   static_assert(std::is_nothrow_swappable<TimestampedEvent>::value,
251                 "TimestampedEvent must be trivially swappable");
252 
253   struct Queue {
AppendQueue254     void Append(int64_t ts,
255                 TimestampedEvent::Type type,
256                 TraceTokenBuffer::Id id) {
257       {
258         TimestampedEvent event;
259         event.ts = ts;
260         event.chunk_index = id.alloc_id.chunk_index;
261         event.chunk_offset = id.alloc_id.chunk_offset;
262         event.event_type = static_cast<uint8_t>(type);
263         events_.emplace_back(std::move(event));
264       }
265 
266       // Events are often seen in order.
267       if (PERFETTO_LIKELY(ts >= max_ts_)) {
268         max_ts_ = ts;
269       } else {
270         // The event is breaking ordering. The first time it happens, keep
271         // track of which index we are at. We know that everything before that
272         // is sorted (because events were pushed monotonically). Everything
273         // after that index, instead, will need a sorting pass before moving
274         // events to the next pipeline stage.
275         if (sort_start_idx_ == 0) {
276           PERFETTO_DCHECK(events_.size() >= 2);
277           sort_start_idx_ = events_.size() - 1;
278           sort_min_ts_ = ts;
279         } else {
280           sort_min_ts_ = std::min(sort_min_ts_, ts);
281         }
282       }
283 
284       min_ts_ = std::min(min_ts_, ts);
285       PERFETTO_DCHECK(min_ts_ <= max_ts_);
286     }
287 
needs_sortingQueue288     bool needs_sorting() const { return sort_start_idx_ != 0; }
289     void Sort();
290 
291     base::CircularQueue<TimestampedEvent> events_;
292     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
293     int64_t max_ts_ = 0;
294     size_t sort_start_idx_ = 0;
295     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
296   };
297 
298   void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id);
299 
GetQueue(size_t index)300   inline Queue* GetQueue(size_t index) {
301     if (PERFETTO_UNLIKELY(index >= queues_.size()))
302       queues_.resize(index + 1);
303     return &queues_[index];
304   }
305 
AppendNonFtraceEvent(int64_t ts,TimestampedEvent::Type event_type,TraceTokenBuffer::Id id)306   inline void AppendNonFtraceEvent(int64_t ts,
307                                    TimestampedEvent::Type event_type,
308                                    TraceTokenBuffer::Id id) {
309     Queue* queue = GetQueue(0);
310     queue->Append(ts, event_type, id);
311     UpdateAppendMaxTs(queue);
312   }
313 
UpdateAppendMaxTs(Queue * queue)314   inline void UpdateAppendMaxTs(Queue* queue) {
315     append_max_ts_ = std::max(append_max_ts_, queue->max_ts_);
316   }
317 
318   void ParseTracePacket(const TimestampedEvent&);
319   void ParseFtracePacket(uint32_t cpu, const TimestampedEvent&);
320 
321   void MaybeExtractEvent(size_t queue_idx, const TimestampedEvent&);
322   void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event);
323 
GetTokenBufferId(const TimestampedEvent & event)324   TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) {
325     return TraceTokenBuffer::Id{event.alloc_id()};
326   }
327 
328   TraceProcessorContext* context_ = nullptr;
329   std::unique_ptr<TraceParser> parser_;
330 
331   // Whether we should ignore incremental extraction and just wait for
332   // forced extractionn at the end of the trace.
333   SortingMode sorting_mode_ = SortingMode::kDefault;
334 
335   // Buffer for storing tokenized objects while the corresponding events are
336   // being sorted.
337   TraceTokenBuffer token_buffer_;
338 
339   // The AllocId until which events should be extracted. Set based
340   // on the AllocId in |OnReadBuffer|.
341   BumpAllocator::AllocId alloc_id_for_extraction_ =
342       token_buffer_.PastTheEndAllocId();
343 
344   // The number of flushes which have happened since the last incremental
345   // extraction.
346   uint32_t flushes_since_extraction_ = 0;
347 
348   // queues_[0] is the general (non-ftrace) queue.
349   // queues_[1] is the ftrace queue for CPU(0).
350   // queues_[x] is the ftrace queue for CPU(x - 1).
351   std::vector<Queue> queues_;
352 
353   // max(e.ts for e appended to the sorter)
354   int64_t append_max_ts_ = 0;
355 
356   // Used for performance tests. True when setting
357   // TRACE_PROCESSOR_SORT_ONLY=1.
358   bool bypass_next_stage_for_testing_ = false;
359 
360   // max(e.ts for e pushed to next stage)
361   int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
362 };
363 
364 }  // namespace trace_processor
365 }  // namespace perfetto
366 
367 #endif  // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
368