• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
19 
20 #include <algorithm>
21 #include <cstddef>
22 #include <cstdint>
23 #include <limits>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 #include <tuple>
28 #include <type_traits>
29 #include <utility>
30 #include <variant>
31 #include <vector>
32 
33 #include "perfetto/base/compiler.h"
34 #include "perfetto/base/logging.h"
35 #include "perfetto/ext/base/circular_queue.h"
36 #include "perfetto/public/compiler.h"
37 #include "perfetto/trace_processor/ref_counted.h"
38 #include "perfetto/trace_processor/trace_blob_view.h"
39 #include "src/trace_processor/importers/android_bugreport/android_dumpstate_event.h"
40 #include "src/trace_processor/importers/android_bugreport/android_log_event.h"
41 #include "src/trace_processor/importers/art_method/art_method_event.h"
42 #include "src/trace_processor/importers/common/parser_types.h"
43 #include "src/trace_processor/importers/common/trace_parser.h"
44 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h"
45 #include "src/trace_processor/importers/gecko/gecko_event.h"
46 #include "src/trace_processor/importers/instruments/row.h"
47 #include "src/trace_processor/importers/perf/record.h"
48 #include "src/trace_processor/importers/perf_text/perf_text_event.h"
49 #include "src/trace_processor/importers/systrace/systrace_line.h"
50 #include "src/trace_processor/sorter/trace_token_buffer.h"
51 #include "src/trace_processor/storage/trace_storage.h"
52 #include "src/trace_processor/types/trace_processor_context.h"
53 #include "src/trace_processor/util/bump_allocator.h"
54 
55 namespace perfetto::trace_processor {
56 
57 // This class takes care of sorting events parsed from the trace stream in
58 // arbitrary order and pushing them to the next pipeline stages (parsing) in
59 // order. In order to support streaming use-cases, sorting happens within a
60 // window.
61 //
62 // Events are held in the TraceSorter staging area (events_) until either:
63 // 1. We can determine that it's safe to extract events by observing
64 //  TracingServiceEvent Flush and ReadBuffer events
65 // 2. The trace EOF is reached
66 //
67 // Incremental extraction
68 //
69 // Incremental extraction happens by using a combination of flush and read
70 // buffer events from the tracing service. Note that incremental extraction
71 // is only applicable for write_into_file traces; ring-buffer traces will
72 // be sorted fully in-memory implicitly because there is only a single read
73 // buffer call at the end.
74 //
75 // The algorithm for incremental extraction is explained in detail at
76 // go/trace-sorting-is-complicated.
77 //
78 // Sorting algorithm
79 //
80 // The sorting algorithm is designed around the assumption that:
81 // - Most events come from ftrace.
82 // - Ftrace events are sorted within each cpu most of the times.
83 //
84 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues
85 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if
86 // necessary) before proceeding with the global merge-sort-extract.
87 //
88 // When an event is pushed through, it is just appended to the end of one of
89 // the N queues. While appending, we keep track of the fact that the queue
90 // is still ordered or just lost ordering. When an out-of-order event is
91 // detected on a queue we keep track of: (1) the offset within the queue where
92 // the chaos begun, (2) the timestamp that broke the ordering.
93 //
94 // When we decide to extract events from the queues into the next stages of
95 // the trace processor, we re-sort the events in the queue. Rather than
96 // re-sorting everything all the times, we use the above knowledge to restrict
97 // sorting to the (hopefully smaller) tail of the |events_| staging area.
98 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is
99 // ordered, and the second partition [sort_start_idx_.. end] is not.
100 // We use a logarithmic bound search operation to figure out what is the index
101 // within the first partition where sorting should start, and sort all events
102 // from there to the end.
103 class TraceSorter {
104  public:
105   enum class SortingMode {
106     kDefault,
107     kFullSort,
108   };
109   enum class EventHandling {
110     // Indicates that events should be sorted and pushed to the parsing stage.
111     kSortAndPush,
112 
113     // Indicates that events should be sorted but then dropped before pushing
114     // to the parsing stage.
115     // Used for performance analysis of the sorter.
116     kSortAndDrop,
117 
118     // Indicates that the events should be dropped as soon as they enter the
119     // sorter.
120     // Used in cases where we only want to perform tokenization: dropping data
121     // when it hits the sorter is much cleaner than trying to handle this
122     // at every different tokenizer.
123     kDrop,
124   };
125 
126   TraceSorter(TraceProcessorContext*,
127               SortingMode,
128               EventHandling = EventHandling::kSortAndPush);
129 
130   ~TraceSorter();
131 
sorting_mode()132   SortingMode sorting_mode() const { return sorting_mode_; }
133   bool SetSortingMode(SortingMode sorting_mode);
134 
AddMachineContext(TraceProcessorContext * context)135   void AddMachineContext(TraceProcessorContext* context) {
136     sorter_data_by_machine_.emplace_back(context);
137   }
138 
139   void PushAndroidDumpstateEvent(
140       int64_t timestamp,
141       const AndroidDumpstateEvent& event,
142       std::optional<MachineId> machine_id = std::nullopt) {
143     AppendNonFtraceEvent(timestamp,
144                          TimestampedEvent::Type::kAndroidDumpstateEvent, event,
145                          machine_id);
146   }
147 
148   void PushAndroidLogEvent(int64_t timestamp,
149                            const AndroidLogEvent& event,
150                            std::optional<MachineId> machine_id = std::nullopt) {
151     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kAndroidLogEvent,
152                          event, machine_id);
153   }
154 
155   void PushPerfRecord(int64_t timestamp,
156                       perf_importer::Record record,
157                       std::optional<MachineId> machine_id = std::nullopt) {
158     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfRecord,
159                          std::move(record), machine_id);
160   }
161 
162   void PushSpeRecord(int64_t timestamp,
163                      TraceBlobView record,
164                      std::optional<MachineId> machine_id = std::nullopt) {
165     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kSpeRecord,
166                          std::move(record), machine_id);
167   }
168 
169   void PushInstrumentsRow(int64_t timestamp,
170                           const instruments_importer::Row& row,
171                           std::optional<MachineId> machine_id = std::nullopt) {
172     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kInstrumentsRow,
173                          row, machine_id);
174   }
175 
176   void PushTracePacket(int64_t timestamp,
177                        TracePacketData data,
178                        std::optional<MachineId> machine_id = std::nullopt) {
179     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket,
180                          std::move(data), machine_id);
181   }
182 
183   void PushTracePacket(int64_t timestamp,
184                        RefPtr<PacketSequenceStateGeneration> state,
185                        TraceBlobView tbv,
186                        std::optional<MachineId> machine_id = std::nullopt) {
187     PushTracePacket(timestamp,
188                     TracePacketData{std::move(tbv), std::move(state)},
189                     machine_id);
190   }
191 
PushJsonValue(int64_t timestamp,std::string json_value,const JsonEvent::Type & type)192   void PushJsonValue(int64_t timestamp,
193                      std::string json_value,
194                      const JsonEvent::Type& type) {
195     if (const auto* scoped = std::get_if<JsonEvent::Scoped>(&type); scoped) {
196       // We need to account for slices with duration by sorting them specially:
197       // this requires us to use the slower comparator which takes this into
198       // account.
199       use_slow_sorting_ = true;
200     }
201     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue,
202                          JsonEvent{std::move(json_value), type});
203   }
204 
PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)205   void PushFuchsiaRecord(int64_t timestamp, FuchsiaRecord fuchsia_record) {
206     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord,
207                          std::move(fuchsia_record));
208   }
209 
PushSystraceLine(SystraceLine systrace_line)210   void PushSystraceLine(SystraceLine systrace_line) {
211     AppendNonFtraceEvent(systrace_line.ts,
212                          TimestampedEvent::Type::kSystraceLine,
213                          std::move(systrace_line));
214   }
215 
216   void PushTrackEventPacket(
217       int64_t timestamp,
218       TrackEventData track_event,
219       std::optional<MachineId> machine_id = std::nullopt) {
220     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent,
221                          std::move(track_event), machine_id);
222   }
223 
PushLegacyV8CpuProfileEvent(int64_t timestamp,uint64_t session_id,uint32_t pid,uint32_t tid,uint32_t callsite_id)224   void PushLegacyV8CpuProfileEvent(int64_t timestamp,
225                                    uint64_t session_id,
226                                    uint32_t pid,
227                                    uint32_t tid,
228                                    uint32_t callsite_id) {
229     AppendNonFtraceEvent(
230         timestamp, TimestampedEvent::Type::kLegacyV8CpuProfileEvent,
231         LegacyV8CpuProfileEvent{session_id, pid, tid, callsite_id});
232   }
233 
PushGeckoEvent(int64_t timestamp,const gecko_importer::GeckoEvent & event)234   void PushGeckoEvent(int64_t timestamp,
235                       const gecko_importer::GeckoEvent& event) {
236     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kGeckoEvent, event);
237   }
238 
PushArtMethodEvent(int64_t timestamp,const art_method::ArtMethodEvent & event)239   void PushArtMethodEvent(int64_t timestamp,
240                           const art_method::ArtMethodEvent& event) {
241     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kArtMethodEvent,
242                          event);
243   }
244 
PushPerfTextEvent(int64_t timestamp,const perf_text_importer::PerfTextEvent & event)245   void PushPerfTextEvent(int64_t timestamp,
246                          const perf_text_importer::PerfTextEvent& event) {
247     AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfTextEvent,
248                          event);
249   }
250 
251   void PushEtwEvent(uint32_t cpu,
252                     int64_t timestamp,
253                     TraceBlobView tbv,
254                     RefPtr<PacketSequenceStateGeneration> state,
255                     std::optional<MachineId> machine_id = std::nullopt) {
256     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
257       return;
258     }
259     TraceTokenBuffer::Id id =
260         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
261     auto* queue = GetQueue(cpu + 1, machine_id);
262     queue->Append(timestamp, TimestampedEvent::Type::kEtwEvent, id,
263                   use_slow_sorting_);
264     UpdateAppendMaxTs(queue);
265   }
266 
267   void PushFtraceEvent(uint32_t cpu,
268                        int64_t timestamp,
269                        TraceBlobView tbv,
270                        RefPtr<PacketSequenceStateGeneration> state,
271                        std::optional<MachineId> machine_id = std::nullopt) {
272     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
273       return;
274     }
275     TraceTokenBuffer::Id id =
276         token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)});
277     auto* queue = GetQueue(cpu + 1, machine_id);
278     queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id,
279                   use_slow_sorting_);
280     UpdateAppendMaxTs(queue);
281   }
282 
283   void PushInlineFtraceEvent(
284       uint32_t cpu,
285       int64_t timestamp,
286       const InlineSchedSwitch& inline_sched_switch,
287       std::optional<MachineId> machine_id = std::nullopt) {
288     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
289       return;
290     }
291     // TODO(rsavitski): if a trace has a mix of normal & "compact" events
292     // (being pushed through this function), the ftrace batches will no longer
293     // be fully sorted by timestamp. In such situations, we will have to sort
294     // at the end of the batch. We can do better as both sub-sequences are
295     // sorted however. Consider adding extra queues, or pushing them in a
296     // merge-sort fashion
297     // // instead.
298     TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_switch);
299     auto* queue = GetQueue(cpu + 1, machine_id);
300     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id,
301                   use_slow_sorting_);
302     UpdateAppendMaxTs(queue);
303   }
304 
305   void PushInlineFtraceEvent(
306       uint32_t cpu,
307       int64_t timestamp,
308       const InlineSchedWaking& inline_sched_waking,
309       std::optional<MachineId> machine_id = std::nullopt) {
310     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
311       return;
312     }
313     TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_waking);
314     auto* queue = GetQueue(cpu + 1, machine_id);
315     queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id,
316                   use_slow_sorting_);
317     UpdateAppendMaxTs(queue);
318   }
319 
ExtractEventsForced()320   void ExtractEventsForced() {
321     BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId();
322     SortAndExtractEventsUntilAllocId(end_id);
323     for (auto& sorter_data : sorter_data_by_machine_) {
324       for (const auto& queue : sorter_data.queues) {
325         PERFETTO_CHECK(queue.events_.empty());
326       }
327       sorter_data.queues.clear();
328     }
329 
330     alloc_id_for_extraction_ = end_id;
331     flushes_since_extraction_ = 0;
332   }
333 
NotifyFlushEvent()334   void NotifyFlushEvent() { flushes_since_extraction_++; }
335 
NotifyReadBufferEvent()336   void NotifyReadBufferEvent() {
337     if (sorting_mode_ == SortingMode::kFullSort ||
338         flushes_since_extraction_ < 2) {
339       return;
340     }
341 
342     SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_);
343     alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId();
344     flushes_since_extraction_ = 0;
345   }
346 
max_timestamp()347   int64_t max_timestamp() const { return append_max_ts_; }
348 
349  private:
350   struct TimestampedEvent {
351     enum class Type : uint8_t {
352       kAndroidDumpstateEvent,
353       kAndroidLogEvent,
354       kEtwEvent,
355       kFtraceEvent,
356       kFuchsiaRecord,
357       kInlineSchedSwitch,
358       kInlineSchedWaking,
359       kInstrumentsRow,
360       kJsonValue,
361       kLegacyV8CpuProfileEvent,
362       kPerfRecord,
363       kSpeRecord,
364       kSystraceLine,
365       kTracePacket,
366       kTrackEvent,
367       kGeckoEvent,
368       kArtMethodEvent,
369       kPerfTextEvent,
370       kMax = kPerfTextEvent,
371     };
372 
373     // Number of bits required to store the max element in |Type|.
374     static constexpr uint32_t kMaxTypeBits = 6;
375     static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits),
376                   "Max type does not fit inside storage");
377 
378     // The timestamp of this event.
379     int64_t ts;
380 
381     // The fields inside BumpAllocator::AllocId of this tokenized object
382     // corresponding to this event.
383     uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits;
384     uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits;
385 
386     // The type of this event. GCC7 does not like bit-field enums (see
387     // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of)
388     // so use an uint64_t instead and cast to the enum type.
389     uint64_t event_type : kMaxTypeBits;
390 
alloc_idTimestampedEvent391     BumpAllocator::AllocId alloc_id() const {
392       return BumpAllocator::AllocId{chunk_index, chunk_offset};
393     }
394 
typeTimestampedEvent395     Type type() const { return static_cast<Type>(event_type); }
396 
397     // For std::lower_bound().
CompareTimestampedEvent398     static bool Compare(const TimestampedEvent& x, int64_t ts) {
399       return x.ts < ts;
400     }
401 
402     // For std::sort().
403     bool operator<(const TimestampedEvent& evt) const {
404       return std::tie(ts, chunk_index, chunk_offset) <
405              std::tie(evt.ts, evt.chunk_index, evt.chunk_offset);
406     }
407 
408     struct SlowOperatorLess {
409       // For std::sort() in slow mode.
operatorTimestampedEvent::SlowOperatorLess410       bool operator()(const TimestampedEvent& a,
411                       const TimestampedEvent& b) const {
412         int64_t a_key =
413             KeyForType(buffer.Get<JsonEvent>(GetTokenBufferId(a))->type);
414         int64_t b_key =
415             KeyForType(buffer.Get<JsonEvent>(GetTokenBufferId(b))->type);
416         return std::tie(a.ts, a_key, a.chunk_index, a.chunk_offset) <
417                std::tie(b.ts, b_key, b.chunk_index, b.chunk_offset);
418       }
419 
KeyForTypeTimestampedEvent::SlowOperatorLess420       static int64_t KeyForType(const JsonEvent::Type& type) {
421         switch (type.index()) {
422           case base::variant_index<JsonEvent::Type, JsonEvent::End>():
423             return std::numeric_limits<int64_t>::min();
424           case base::variant_index<JsonEvent::Type, JsonEvent::Scoped>():
425             return std::numeric_limits<int64_t>::max() -
426                    std::get<JsonEvent::Scoped>(type).dur;
427           default:
428             return std::numeric_limits<int64_t>::max();
429         }
430         PERFETTO_FATAL("For GCC");
431       }
432 
433       TraceTokenBuffer& buffer;
434     };
435   };
436 
437   static_assert(sizeof(TimestampedEvent) == 16,
438                 "TimestampedEvent must be equal to 16 bytes");
439   static_assert(std::is_trivially_copyable_v<TimestampedEvent>,
440                 "TimestampedEvent must be trivially copyable");
441   static_assert(std::is_trivially_move_assignable_v<TimestampedEvent>,
442                 "TimestampedEvent must be trivially move assignable");
443   static_assert(std::is_trivially_move_constructible_v<TimestampedEvent>,
444                 "TimestampedEvent must be trivially move constructible");
445   static_assert(std::is_nothrow_swappable_v<TimestampedEvent>,
446                 "TimestampedEvent must be trivially swappable");
447 
448   struct Queue {
AppendQueue449     void Append(int64_t ts,
450                 TimestampedEvent::Type type,
451                 TraceTokenBuffer::Id id,
452                 bool use_slow_sorting) {
453       {
454         TimestampedEvent event;
455         event.ts = ts;
456         event.chunk_index = id.alloc_id.chunk_index;
457         event.chunk_offset = id.alloc_id.chunk_offset;
458         event.event_type = static_cast<uint8_t>(type);
459         events_.emplace_back(std::move(event));
460       }
461 
462       // Events are often seen in order.
463       if (PERFETTO_LIKELY(ts > max_ts_ ||
464                           (!use_slow_sorting && ts == max_ts_))) {
465         max_ts_ = ts;
466       } else {
467         // The event is breaking ordering. The first time it happens, keep
468         // track of which index we are at. We know that everything before that
469         // is sorted (because events were pushed monotonically). Everything
470         // after that index, instead, will need a sorting pass before moving
471         // events to the next pipeline stage.
472         if (sort_start_idx_ == 0) {
473           sort_start_idx_ = events_.size() - 1;
474           sort_min_ts_ = ts;
475         } else {
476           sort_min_ts_ = std::min(sort_min_ts_, ts);
477         }
478       }
479 
480       min_ts_ = std::min(min_ts_, ts);
481       PERFETTO_DCHECK(min_ts_ <= max_ts_);
482     }
483 
needs_sortingQueue484     bool needs_sorting() const { return sort_start_idx_ != 0; }
485     void Sort(TraceTokenBuffer&, bool use_slow_sorting);
486 
487     base::CircularQueue<TimestampedEvent> events_;
488     int64_t min_ts_ = std::numeric_limits<int64_t>::max();
489     int64_t max_ts_ = 0;
490     size_t sort_start_idx_ = 0;
491     int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max();
492   };
493 
494   void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id);
495 
496   Queue* GetQueue(size_t index,
497                   std::optional<MachineId> machine_id = std::nullopt) {
498     // sorter_data_by_machine_[0] corresponds to the default machine.
499     PERFETTO_DCHECK(sorter_data_by_machine_[0].machine_id == std::nullopt);
500     auto* queues = &sorter_data_by_machine_[0].queues;
501 
502     // Find the TraceSorterData instance when |machine_id| is not nullopt.
503     if (PERFETTO_UNLIKELY(machine_id.has_value())) {
504       auto it = std::find_if(sorter_data_by_machine_.begin() + 1,
505                              sorter_data_by_machine_.end(),
506                              [machine_id](const TraceSorterData& item) {
507                                return item.machine_id == machine_id;
508                              });
509       PERFETTO_DCHECK(it != sorter_data_by_machine_.end());
510       queues = &it->queues;
511     }
512 
513     if (PERFETTO_UNLIKELY(index >= queues->size()))
514       queues->resize(index + 1);
515     return &queues->at(index);
516   }
517 
518   template <typename E>
519   void AppendNonFtraceEvent(
520       int64_t ts,
521       TimestampedEvent::Type event_type,
522       E&& evt,
523       std::optional<MachineId> machine_id = std::nullopt) {
524     if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) {
525       return;
526     }
527     TraceTokenBuffer::Id id = token_buffer_.Append(std::forward<E>(evt));
528     AppendNonFtraceEventWithId(ts, event_type, id, machine_id);
529   }
530 
AppendNonFtraceEventWithId(int64_t ts,TimestampedEvent::Type event_type,TraceTokenBuffer::Id id,std::optional<MachineId> machine_id)531   void AppendNonFtraceEventWithId(int64_t ts,
532                                   TimestampedEvent::Type event_type,
533                                   TraceTokenBuffer::Id id,
534                                   std::optional<MachineId> machine_id) {
535     Queue* queue = GetQueue(0, machine_id);
536     queue->Append(ts, event_type, id, use_slow_sorting_);
537     UpdateAppendMaxTs(queue);
538   }
539 
UpdateAppendMaxTs(Queue * queue)540   void UpdateAppendMaxTs(Queue* queue) {
541     append_max_ts_ = std::max(append_max_ts_, queue->max_ts_);
542   }
543 
544   void ParseTracePacket(TraceProcessorContext& context,
545                         const TimestampedEvent&);
546   void ParseFtracePacket(TraceProcessorContext& context,
547                          uint32_t cpu,
548                          const TimestampedEvent&);
549   void ParseEtwPacket(TraceProcessorContext& context,
550                       uint32_t cpu,
551                       const TimestampedEvent&);
552 
553   void MaybeExtractEvent(size_t machine_idx,
554                          size_t queue_idx,
555                          const TimestampedEvent&);
556   void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event);
557 
GetTokenBufferId(const TimestampedEvent & event)558   static TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) {
559     return TraceTokenBuffer::Id{event.alloc_id()};
560   }
561 
562   struct TraceSorterData {
TraceSorterDataTraceSorterData563     explicit TraceSorterData(TraceProcessorContext* _machine_context)
564         : machine_id(_machine_context->machine_id()),
565           machine_context(_machine_context) {}
566     std::optional<MachineId> machine_id;
567     // queues_[0] is the general (non-ftrace) queue.
568     // queues_[1] is the ftrace queue for CPU(0).
569     // queues_[x] is the ftrace queue for CPU(x - 1).
570     TraceProcessorContext* machine_context;
571     std::vector<Queue> queues;
572   };
573   std::vector<TraceSorterData> sorter_data_by_machine_;
574 
575   // Whether we should ignore incremental extraction and just wait for
576   // forced extractionn at the end of the trace.
577   SortingMode sorting_mode_ = SortingMode::kDefault;
578 
579   std::shared_ptr<TraceStorage> storage_;
580 
581   // Buffer for storing tokenized objects while the corresponding events are
582   // being sorted.
583   TraceTokenBuffer token_buffer_;
584 
585   // The AllocId until which events should be extracted. Set based
586   // on the AllocId in |OnReadBuffer|.
587   BumpAllocator::AllocId alloc_id_for_extraction_ =
588       token_buffer_.PastTheEndAllocId();
589 
590   // The number of flushes which have happened since the last incremental
591   // extraction.
592   uint32_t flushes_since_extraction_ = 0;
593 
594   // max(e.ts for e appended to the sorter)
595   int64_t append_max_ts_ = 0;
596 
597   // How events pushed into the sorter should be handled.
598   EventHandling event_handling_ = EventHandling::kSortAndPush;
599 
600   // max(e.ts for e pushed to next stage)
601   int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min();
602 
603   // Whether when std::sorting the queues, we should use the slow
604   // sorting algorithm
605   bool use_slow_sorting_ = false;
606 };
607 
608 }  // namespace perfetto::trace_processor
609 
610 #endif  // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_
611