• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
19 
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 #include "perfetto/base/logging.h"
33 #include "perfetto/ext/base/circular_queue.h"
34 #include "perfetto/public/compiler.h"
35 #include "perfetto/trace_processor/ref_counted.h"
36 #include "perfetto/trace_processor/trace_blob_view.h"
37 #include "src/trace_processor/importers/common/parser_types.h"
38 #include "src/trace_processor/importers/common/trace_parser.h"
39 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
40 #include "src/trace_processor/importers/perf/record.h"
41 #include "src/trace_processor/importers/systrace/systrace_line.h"
42 #include "src/trace_processor/sorter/trace_token_buffer.h"
43 #include "src/trace_processor/storage/trace_storage.h"
44 #include "src/trace_processor/types/trace_processor_context.h"
45 #include "src/trace_processor/util/bump_allocator.h"
46 
47 namespace perfetto::trace_processor {
48 
49 // This class takes care of sorting events parsed from the trace stream in
50 // arbitrary order and pushing them to the next pipeline stages (parsing) in
51 // order. In order to support streaming use-cases, sorting happens within a
52 // window.
53 //
54 // Events are held in the TraceSorter staging area (events_) until either:
55 // 1. We can determine that it's safe to extract events by observing
56 //  TracingServiceEvent Flush and ReadBuffer events
57 // 2. The trace EOF is reached
58 //
59 // Incremental extraction
60 //
61 // Incremental extraction happens by using a combination of flush and read
62 // buffer events from the tracing service. Note that incremental extraction
63 // is only applicable for write_into_file traces; ring-buffer traces will
64 // be sorted fully in-memory implicitly because there is only a single read
65 // buffer call at the end.
66 //
67 // The algorithm for incremental extraction is explained in detail at
68 // go/trace-sorting-is-complicated.
69 //
70 // Sorting algorithm
71 //
72 // The sorting algorithm is designed around the assumption that:
73 // - Most events come from ftrace.
74 // - Ftrace events are sorted within each cpu most of the times.
75 //
76 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
77 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
78 // necessary) before proceeding with the global merge-sort-extract.
79 //
80 // When an event is pushed through, it is just appended to the end of one of
81 // the N queues. While appending, we keep track of the fact that the queue
82 // is still ordered or just lost ordering. When an out-of-order event is
83 // detected on a queue we keep track of: (1) the offset within the queue where
84 // the chaos begun, (2) the timestamp that broke the ordering.
85 //
86 // When we decide to extract events from the queues into the next stages of
87 // the trace processor, we re-sort the events in the queue. Rather than
88 // re-sorting everything all the times, we use the above knowledge to restrict
89 // sorting to the (hopefully smaller) tail of the |events_| staging area.
90 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
91 // ordered, and the second partition [sort_start_idx_.. end] is not.
92 // We use a logarithmic bound search operation to figure out what is the index
93 // within the first partition where sorting should start, and sort all events
94 // from there to the end.
95 class TraceSorter {
96  public:
97   enum class SortingMode {
98     kDefault,
99     kFullSort,
100   };
101 
102   TraceSorter(TraceProcessorContext* context, SortingMode sorting_mode);
103 
104   ~TraceSorter();
105 
sorting_mode()106   SortingMode sorting_mode() const { return sorting_mode_; }
107 
AddMachineContext(TraceProcessorContext * context)108   inline void AddMachineContext(TraceProcessorContext* context) {
109     sorter_data_by_machine_.emplace_back(context);
110   }
111 
112   inline void PushPerfRecord(
113       int64_t timestamp,
114       perf_importer::Record record,
115       std::optional<MachineId> machine_id = std::nullopt) {
116     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(record));
117     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfRecord, id,
118                          machine_id);
119   }
120 
121   inline void PushTracePacket(
122       int64_t timestamp,
123       TracePacketData data,
124       std::optional<MachineId> machine_id = std::nullopt) {
125     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(data));
126     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket, id,
127                          machine_id);
128   }
129 
130   inline void PushTracePacket(
131       int64_t timestamp,
132       RefPtr<PacketSequenceStateGeneration> state,
133       TraceBlobView tbv,
134       std::optional<MachineId> machine_id = std::nullopt) {
135     PushTracePacket(timestamp,
136                     TracePacketData{std::move(tbv), std::move(state)},
137                     machine_id);
138   }
139 
PushJsonValue(int64_t timestamp,std::string json_value)140   inline void PushJsonValue(int64_t timestamp, std::string json_value) {
141     TraceTokenBuffer::Id id =
142         token_buffer_.Append(JsonEvent{std::move(json_value)});
143     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue, id);
144   }
145 
PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)146   inline void PushFuchsiaRecord(int64_t timestamp,
147                                 FuchsiaRecord fuchsia_record) {
148     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(fuchsia_record));
149     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord, id);
150   }
151 
PushSystraceLine(SystraceLine systrace_line)152   inline void PushSystraceLine(SystraceLine systrace_line) {
153     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(systrace_line));
154     AppendNonFtraceEvent(systrace_line.ts,
155                          TimestampedEvent::Type::kSystraceLine, id);
156   }
157 
158   inline void PushTrackEventPacket(
159       int64_t timestamp,
160       TrackEventData track_event,
161       std::optional<MachineId> machine_id = std::nullopt) {
162     TraceTokenBuffer::Id id = token_buffer_.Append(std::move(track_event));
163     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent, id,
164                          machine_id);
165   }
166 
167   inline void PushEtwEvent(uint32_t cpu,
168                            int64_t timestamp,
169                            TraceBlobView tbv,
170                            RefPtr<PacketSequenceStateGeneration> state,
171                            std::optional<MachineId> machine_id = std::nullopt) {
172     TraceTokenBuffer::Id id =
173         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
174     auto* queue = GetQueue(cpu + 1, machine_id);
175     queue->Append(timestamp, TimestampedEvent::Type::kEtwEvent, id);
176     UpdateAppendMaxTs(queue);
177   }
178 
179   inline void PushFtraceEvent(
180       uint32_t cpu,
181       int64_t timestamp,
182       TraceBlobView tbv,
183       RefPtr<PacketSequenceStateGeneration> state,
184       std::optional<MachineId> machine_id = std::nullopt) {
185     TraceTokenBuffer::Id id =
186         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
187     auto* queue = GetQueue(cpu + 1, machine_id);
188     queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id);
189     UpdateAppendMaxTs(queue);
190   }
191 
192   inline void PushInlineFtraceEvent(
193       uint32_t cpu,
194       int64_t timestamp,
195       InlineSchedSwitch inline_sched_switch,
196       std::optional<MachineId> machine_id = std::nullopt) {
197     // TODO(rsavitski): if a trace has a mix of normal & "compact" events
198     // (being pushed through this function), the ftrace batches will no longer
199     // be fully sorted by timestamp. In such situations, we will have to sort
200     // at the end of the batch. We can do better as both sub-sequences are
201     // sorted however. Consider adding extra queues, or pushing them in a
202     // merge-sort fashion
203     // // instead.
204     TraceTokenBuffer::Id id =
205         token_buffer_.Append(std::move(inline_sched_switch));
206     auto* queue = GetQueue(cpu + 1, machine_id);
207     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id);
208     UpdateAppendMaxTs(queue);
209   }
210 
211   inline void PushInlineFtraceEvent(
212       uint32_t cpu,
213       int64_t timestamp,
214       InlineSchedWaking inline_sched_waking,
215       std::optional<MachineId> machine_id = std::nullopt) {
216     TraceTokenBuffer::Id id =
217         token_buffer_.Append(std::move(inline_sched_waking));
218     auto* queue = GetQueue(cpu + 1, machine_id);
219     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id);
220     UpdateAppendMaxTs(queue);
221   }
222 
ExtractEventsForced()223   void ExtractEventsForced() {
224     BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId();
225     SortAndExtractEventsUntilAllocId(end_id);
226     for (auto& sorter_data : sorter_data_by_machine_) {
227       for (const auto& queue : sorter_data.queues) {
228         PERFETTO_CHECK(queue.events_.empty());
229       }
230       sorter_data.queues.clear();
231     }
232 
233     alloc_id_for_extraction_ = end_id;
234     flushes_since_extraction_ = 0;
235   }
236 
NotifyFlushEvent()237   void NotifyFlushEvent() { flushes_since_extraction_++; }
238 
NotifyReadBufferEvent()239   void NotifyReadBufferEvent() {
240     if (sorting_mode_ == SortingMode::kFullSort ||
241         flushes_since_extraction_ < 2) {
242       return;
243     }
244 
245     SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_);
246     alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId();
247     flushes_since_extraction_ = 0;
248   }
249 
max_timestamp()250   int64_t max_timestamp() const { return append_max_ts_; }
251 
252  private:
253   struct TimestampedEvent {
254     enum class Type : uint8_t {
255       kFtraceEvent,
256       kPerfRecord,
257       kTracePacket,
258       kInlineSchedSwitch,
259       kInlineSchedWaking,
260       kJsonValue,
261       kFuchsiaRecord,
262       kTrackEvent,
263       kSystraceLine,
264       kEtwEvent,
265       kMax = kEtwEvent,
266     };
267 
268     // Number of bits required to store the max element in |Type|.
269     static constexpr uint32_t kMaxTypeBits = 4;
270     static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits),
271                   "Max type does not fit inside storage");
272 
273     // The timestamp of this event.
274     int64_t ts;
275 
276     // The fields inside BumpAllocator::AllocId of this tokenized object
277     // corresponding to this event.
278     uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits;
279     uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits;
280 
281     // The type of this event. GCC7 does not like bit-field enums (see
282     // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of)
283     // so use an uint64_t instead and cast to the enum type.
284     uint64_t event_type : kMaxTypeBits;
285 
alloc_idTimestampedEvent286     BumpAllocator::AllocId alloc_id() const {
287       return BumpAllocator::AllocId{chunk_index, chunk_offset};
288     }
289 
290     // For std::lower_bound().
CompareTimestampedEvent291     static inline bool Compare(const TimestampedEvent& x, int64_t ts) {
292       return x.ts < ts;
293     }
294 
295     // For std::sort().
296     inline bool operator<(const TimestampedEvent& evt) const {
297       return std::tie(ts, chunk_index, chunk_offset) <
298              std::tie(evt.ts, evt.chunk_index, evt.chunk_offset);
299     }
300   };
301 
302   static_assert(sizeof(TimestampedEvent) == 16,
303                 "TimestampedEvent must be equal to 16 bytes");
304   static_assert(std::is_trivially_copyable_v<TimestampedEvent>,
305                 "TimestampedEvent must be trivially copyable");
306   static_assert(std::is_trivially_move_assignable_v<TimestampedEvent>,
307                 "TimestampedEvent must be trivially move assignable");
308   static_assert(std::is_trivially_move_constructible_v<TimestampedEvent>,
309                 "TimestampedEvent must be trivially move constructible");
310   static_assert(std::is_nothrow_swappable_v<TimestampedEvent>,
311                 "TimestampedEvent must be trivially swappable");
312 
313   struct Queue {
AppendQueue314     void Append(int64_t ts,
315                 TimestampedEvent::Type type,
316                 TraceTokenBuffer::Id id) {
317       {
318         TimestampedEvent event;
319         event.ts = ts;
320         event.chunk_index = id.alloc_id.chunk_index;
321         event.chunk_offset = id.alloc_id.chunk_offset;
322         event.event_type = static_cast<uint8_t>(type);
323         events_.emplace_back(std::move(event));
324       }
325 
326       // Events are often seen in order.
327       if (PERFETTO_LIKELY(ts >= max_ts_)) {
328         max_ts_ = ts;
329       } else {
330         // The event is breaking ordering. The first time it happens, keep
331         // track of which index we are at. We know that everything before that
332         // is sorted (because events were pushed monotonically). Everything
333         // after that index, instead, will need a sorting pass before moving
334         // events to the next pipeline stage.
335         if (sort_start_idx_ == 0) {
336           PERFETTO_DCHECK(events_.size() >= 2);
337           sort_start_idx_ = events_.size() - 1;
338           sort_min_ts_ = ts;
339         } else {
340           sort_min_ts_ = std::min(sort_min_ts_, ts);
341         }
342       }
343 
344       min_ts_ = std::min(min_ts_, ts);
345       PERFETTO_DCHECK(min_ts_ <= max_ts_);
346     }
347 
needs_sortingQueue348     bool needs_sorting() const { return sort_start_idx_ != 0; }
349     void Sort();
350 
351     base::CircularQueue<TimestampedEvent> events_;
352     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
353     int64_t max_ts_ = 0;
354     size_t sort_start_idx_ = 0;
355     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
356   };
357 
358   void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id);
359 
360   inline Queue* GetQueue(size_t index,
361                          std::optional<MachineId> machine_id = std::nullopt) {
362     // sorter_data_by_machine_[0] corresponds to the default machine.
363     PERFETTO_DCHECK(sorter_data_by_machine_[0].machine_id == std::nullopt);
364     auto* queues = &sorter_data_by_machine_[0].queues;
365 
366     // Find the TraceSorterData instance when |machine_id| is not nullopt.
367     if (PERFETTO_UNLIKELY(machine_id.has_value())) {
368       auto it = std::find_if(sorter_data_by_machine_.begin() + 1,
369                              sorter_data_by_machine_.end(),
370                              [machine_id](const TraceSorterData& item) {
371                                return item.machine_id == machine_id;
372                              });
373       PERFETTO_DCHECK(it != sorter_data_by_machine_.end());
374       queues = &it->queues;
375     }
376 
377     if (PERFETTO_UNLIKELY(index >= queues->size()))
378       queues->resize(index + 1);
379     return &queues->at(index);
380   }
381 
382   inline void AppendNonFtraceEvent(
383       int64_t ts,
384       TimestampedEvent::Type event_type,
385       TraceTokenBuffer::Id id,
386       std::optional<MachineId> machine_id = std::nullopt) {
387     Queue* queue = GetQueue(0, machine_id);
388     queue->Append(ts, event_type, id);
389     UpdateAppendMaxTs(queue);
390   }
391 
UpdateAppendMaxTs(Queue * queue)392   inline void UpdateAppendMaxTs(Queue* queue) {
393     append_max_ts_ = std::max(append_max_ts_, queue->max_ts_);
394   }
395 
396   void ParseTracePacket(TraceProcessorContext& context,
397                         const TimestampedEvent&);
398   void ParseFtracePacket(TraceProcessorContext& context,
399                          uint32_t cpu,
400                          const TimestampedEvent&);
401   void ParseEtwPacket(TraceProcessorContext& context,
402                       uint32_t cpu,
403                       const TimestampedEvent&);
404 
405   void MaybeExtractEvent(size_t machine_idx,
406                          size_t queue_idx,
407                          const TimestampedEvent&);
408   void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event);
409 
GetTokenBufferId(const TimestampedEvent & event)410   static TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) {
411     return TraceTokenBuffer::Id{event.alloc_id()};
412   }
413 
414   struct TraceSorterData {
TraceSorterDataTraceSorterData415     explicit TraceSorterData(TraceProcessorContext* _machine_context)
416         : machine_id(_machine_context->machine_id()),
417           machine_context(_machine_context) {}
418     std::optional<MachineId> machine_id;
419     // queues_[0] is the general (non-ftrace) queue.
420     // queues_[1] is the ftrace queue for CPU(0).
421     // queues_[x] is the ftrace queue for CPU(x - 1).
422     TraceProcessorContext* machine_context;
423     std::vector<Queue> queues;
424   };
425   std::vector<TraceSorterData> sorter_data_by_machine_;
426 
427   // Whether we should ignore incremental extraction and just wait for
428   // forced extractionn at the end of the trace.
429   SortingMode sorting_mode_ = SortingMode::kDefault;
430 
431   std::shared_ptr<TraceStorage> storage_;
432 
433   // Buffer for storing tokenized objects while the corresponding events are
434   // being sorted.
435   TraceTokenBuffer token_buffer_;
436 
437   // The AllocId until which events should be extracted. Set based
438   // on the AllocId in |OnReadBuffer|.
439   BumpAllocator::AllocId alloc_id_for_extraction_ =
440       token_buffer_.PastTheEndAllocId();
441 
442   // The number of flushes which have happened since the last incremental
443   // extraction.
444   uint32_t flushes_since_extraction_ = 0;
445 
446   // max(e.ts for e appended to the sorter)
447   int64_t append_max_ts_ = 0;
448 
449   // Used for performance tests. True when setting
450   // TRACE_PROCESSOR_SORT_ONLY=1.
451   bool bypass_next_stage_for_testing_ = false;
452 
453   // max(e.ts for e pushed to next stage)
454   int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
455 };
456 
457 }  // namespace perfetto::trace_processor
458 
459 #endif  // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
460