1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 19 20 #include <algorithm> 21 #include <cstddef> 22 #include <cstdint> 23 #include <limits> 24 #include <memory> 25 #include <optional> 26 #include <string> 27 #include <tuple> 28 #include <type_traits> 29 #include <utility> 30 #include <variant> 31 #include <vector> 32 33 #include "perfetto/base/compiler.h" 34 #include "perfetto/base/logging.h" 35 #include "perfetto/ext/base/circular_queue.h" 36 #include "perfetto/public/compiler.h" 37 #include "perfetto/trace_processor/ref_counted.h" 38 #include "perfetto/trace_processor/trace_blob_view.h" 39 #include "src/trace_processor/importers/android_bugreport/android_dumpstate_event.h" 40 #include "src/trace_processor/importers/android_bugreport/android_log_event.h" 41 #include "src/trace_processor/importers/art_method/art_method_event.h" 42 #include "src/trace_processor/importers/common/parser_types.h" 43 #include "src/trace_processor/importers/common/trace_parser.h" 44 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h" 45 #include "src/trace_processor/importers/gecko/gecko_event.h" 46 #include "src/trace_processor/importers/instruments/row.h" 47 #include "src/trace_processor/importers/perf/record.h" 48 #include "src/trace_processor/importers/perf_text/perf_text_event.h" 49 #include "src/trace_processor/importers/systrace/systrace_line.h" 50 #include "src/trace_processor/sorter/trace_token_buffer.h" 51 #include "src/trace_processor/storage/trace_storage.h" 52 #include "src/trace_processor/types/trace_processor_context.h" 53 #include "src/trace_processor/util/bump_allocator.h" 54 55 namespace perfetto::trace_processor { 56 57 // This class takes care of sorting events parsed from the trace stream in 58 // arbitrary order and pushing them to the next pipeline stages (parsing) in 59 // order. In order to support streaming use-cases, sorting happens within a 60 // window. 61 // 62 // Events are held in the TraceSorter staging area (events_) until either: 63 // 1. We can determine that it's safe to extract events by observing 64 // TracingServiceEvent Flush and ReadBuffer events 65 // 2. The trace EOF is reached 66 // 67 // Incremental extraction 68 // 69 // Incremental extraction happens by using a combination of flush and read 70 // buffer events from the tracing service. Note that incremental extraction 71 // is only applicable for write_into_file traces; ring-buffer traces will 72 // be sorted fully in-memory implicitly because there is only a single read 73 // buffer call at the end. 74 // 75 // The algorithm for incremental extraction is explained in detail at 76 // go/trace-sorting-is-complicated. 77 // 78 // Sorting algorithm 79 // 80 // The sorting algorithm is designed around the assumption that: 81 // - Most events come from ftrace. 82 // - Ftrace events are sorted within each cpu most of the times. 83 // 84 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 85 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 86 // necessary) before proceeding with the global merge-sort-extract. 87 // 88 // When an event is pushed through, it is just appended to the end of one of 89 // the N queues. While appending, we keep track of the fact that the queue 90 // is still ordered or just lost ordering. When an out-of-order event is 91 // detected on a queue we keep track of: (1) the offset within the queue where 92 // the chaos begun, (2) the timestamp that broke the ordering. 93 // 94 // When we decide to extract events from the queues into the next stages of 95 // the trace processor, we re-sort the events in the queue. Rather than 96 // re-sorting everything all the times, we use the above knowledge to restrict 97 // sorting to the (hopefully smaller) tail of the |events_| staging area. 98 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 99 // ordered, and the second partition [sort_start_idx_.. end] is not. 100 // We use a logarithmic bound search operation to figure out what is the index 101 // within the first partition where sorting should start, and sort all events 102 // from there to the end. 103 class TraceSorter { 104 public: 105 enum class SortingMode { 106 kDefault, 107 kFullSort, 108 }; 109 enum class EventHandling { 110 // Indicates that events should be sorted and pushed to the parsing stage. 111 kSortAndPush, 112 113 // Indicates that events should be sorted but then dropped before pushing 114 // to the parsing stage. 115 // Used for performance analysis of the sorter. 116 kSortAndDrop, 117 118 // Indicates that the events should be dropped as soon as they enter the 119 // sorter. 120 // Used in cases where we only want to perform tokenization: dropping data 121 // when it hits the sorter is much cleaner than trying to handle this 122 // at every different tokenizer. 123 kDrop, 124 }; 125 126 TraceSorter(TraceProcessorContext*, 127 SortingMode, 128 EventHandling = EventHandling::kSortAndPush); 129 130 ~TraceSorter(); 131 sorting_mode()132 SortingMode sorting_mode() const { return sorting_mode_; } 133 bool SetSortingMode(SortingMode sorting_mode); 134 AddMachineContext(TraceProcessorContext * context)135 void AddMachineContext(TraceProcessorContext* context) { 136 sorter_data_by_machine_.emplace_back(context); 137 } 138 139 void PushAndroidDumpstateEvent( 140 int64_t timestamp, 141 const AndroidDumpstateEvent& event, 142 std::optional<MachineId> machine_id = std::nullopt) { 143 AppendNonFtraceEvent(timestamp, 144 TimestampedEvent::Type::kAndroidDumpstateEvent, event, 145 machine_id); 146 } 147 148 void PushAndroidLogEvent(int64_t timestamp, 149 const AndroidLogEvent& event, 150 std::optional<MachineId> machine_id = std::nullopt) { 151 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kAndroidLogEvent, 152 event, machine_id); 153 } 154 155 void PushPerfRecord(int64_t timestamp, 156 perf_importer::Record record, 157 std::optional<MachineId> machine_id = std::nullopt) { 158 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfRecord, 159 std::move(record), machine_id); 160 } 161 162 void PushSpeRecord(int64_t timestamp, 163 TraceBlobView record, 164 std::optional<MachineId> machine_id = std::nullopt) { 165 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kSpeRecord, 166 std::move(record), machine_id); 167 } 168 169 void PushInstrumentsRow(int64_t timestamp, 170 const instruments_importer::Row& row, 171 std::optional<MachineId> machine_id = std::nullopt) { 172 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kInstrumentsRow, 173 row, machine_id); 174 } 175 176 void PushTracePacket(int64_t timestamp, 177 TracePacketData data, 178 std::optional<MachineId> machine_id = std::nullopt) { 179 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket, 180 std::move(data), machine_id); 181 } 182 183 void PushTracePacket(int64_t timestamp, 184 RefPtr<PacketSequenceStateGeneration> state, 185 TraceBlobView tbv, 186 std::optional<MachineId> machine_id = std::nullopt) { 187 PushTracePacket(timestamp, 188 TracePacketData{std::move(tbv), std::move(state)}, 189 machine_id); 190 } 191 PushJsonValue(int64_t timestamp,std::string json_value,const JsonEvent::Type & type)192 void PushJsonValue(int64_t timestamp, 193 std::string json_value, 194 const JsonEvent::Type& type) { 195 if (const auto* scoped = std::get_if<JsonEvent::Scoped>(&type); scoped) { 196 // We need to account for slices with duration by sorting them specially: 197 // this requires us to use the slower comparator which takes this into 198 // account. 199 use_slow_sorting_ = true; 200 } 201 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue, 202 JsonEvent{std::move(json_value), type}); 203 } 204 PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)205 void PushFuchsiaRecord(int64_t timestamp, FuchsiaRecord fuchsia_record) { 206 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord, 207 std::move(fuchsia_record)); 208 } 209 PushSystraceLine(SystraceLine systrace_line)210 void PushSystraceLine(SystraceLine systrace_line) { 211 AppendNonFtraceEvent(systrace_line.ts, 212 TimestampedEvent::Type::kSystraceLine, 213 std::move(systrace_line)); 214 } 215 216 void PushTrackEventPacket( 217 int64_t timestamp, 218 TrackEventData track_event, 219 std::optional<MachineId> machine_id = std::nullopt) { 220 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent, 221 std::move(track_event), machine_id); 222 } 223 PushLegacyV8CpuProfileEvent(int64_t timestamp,uint64_t session_id,uint32_t pid,uint32_t tid,uint32_t callsite_id)224 void PushLegacyV8CpuProfileEvent(int64_t timestamp, 225 uint64_t session_id, 226 uint32_t pid, 227 uint32_t tid, 228 uint32_t callsite_id) { 229 AppendNonFtraceEvent( 230 timestamp, TimestampedEvent::Type::kLegacyV8CpuProfileEvent, 231 LegacyV8CpuProfileEvent{session_id, pid, tid, callsite_id}); 232 } 233 PushGeckoEvent(int64_t timestamp,const gecko_importer::GeckoEvent & event)234 void PushGeckoEvent(int64_t timestamp, 235 const gecko_importer::GeckoEvent& event) { 236 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kGeckoEvent, event); 237 } 238 PushArtMethodEvent(int64_t timestamp,const art_method::ArtMethodEvent & event)239 void PushArtMethodEvent(int64_t timestamp, 240 const art_method::ArtMethodEvent& event) { 241 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kArtMethodEvent, 242 event); 243 } 244 PushPerfTextEvent(int64_t timestamp,const perf_text_importer::PerfTextEvent & event)245 void PushPerfTextEvent(int64_t timestamp, 246 const perf_text_importer::PerfTextEvent& event) { 247 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfTextEvent, 248 event); 249 } 250 251 void PushEtwEvent(uint32_t cpu, 252 int64_t timestamp, 253 TraceBlobView tbv, 254 RefPtr<PacketSequenceStateGeneration> state, 255 std::optional<MachineId> machine_id = std::nullopt) { 256 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) { 257 return; 258 } 259 TraceTokenBuffer::Id id = 260 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 261 auto* queue = GetQueue(cpu + 1, machine_id); 262 queue->Append(timestamp, TimestampedEvent::Type::kEtwEvent, id, 263 use_slow_sorting_); 264 UpdateAppendMaxTs(queue); 265 } 266 267 void PushFtraceEvent(uint32_t cpu, 268 int64_t timestamp, 269 TraceBlobView tbv, 270 RefPtr<PacketSequenceStateGeneration> state, 271 std::optional<MachineId> machine_id = std::nullopt) { 272 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) { 273 return; 274 } 275 TraceTokenBuffer::Id id = 276 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 277 auto* queue = GetQueue(cpu + 1, machine_id); 278 queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id, 279 use_slow_sorting_); 280 UpdateAppendMaxTs(queue); 281 } 282 283 void PushInlineFtraceEvent( 284 uint32_t cpu, 285 int64_t timestamp, 286 const InlineSchedSwitch& inline_sched_switch, 287 std::optional<MachineId> machine_id = std::nullopt) { 288 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) { 289 return; 290 } 291 // TODO(rsavitski): if a trace has a mix of normal & "compact" events 292 // (being pushed through this function), the ftrace batches will no longer 293 // be fully sorted by timestamp. In such situations, we will have to sort 294 // at the end of the batch. We can do better as both sub-sequences are 295 // sorted however. Consider adding extra queues, or pushing them in a 296 // merge-sort fashion 297 // // instead. 298 TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_switch); 299 auto* queue = GetQueue(cpu + 1, machine_id); 300 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id, 301 use_slow_sorting_); 302 UpdateAppendMaxTs(queue); 303 } 304 305 void PushInlineFtraceEvent( 306 uint32_t cpu, 307 int64_t timestamp, 308 const InlineSchedWaking& inline_sched_waking, 309 std::optional<MachineId> machine_id = std::nullopt) { 310 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) { 311 return; 312 } 313 TraceTokenBuffer::Id id = token_buffer_.Append(inline_sched_waking); 314 auto* queue = GetQueue(cpu + 1, machine_id); 315 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id, 316 use_slow_sorting_); 317 UpdateAppendMaxTs(queue); 318 } 319 ExtractEventsForced()320 void ExtractEventsForced() { 321 BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId(); 322 SortAndExtractEventsUntilAllocId(end_id); 323 for (auto& sorter_data : sorter_data_by_machine_) { 324 for (const auto& queue : sorter_data.queues) { 325 PERFETTO_CHECK(queue.events_.empty()); 326 } 327 sorter_data.queues.clear(); 328 } 329 330 alloc_id_for_extraction_ = end_id; 331 flushes_since_extraction_ = 0; 332 } 333 NotifyFlushEvent()334 void NotifyFlushEvent() { flushes_since_extraction_++; } 335 NotifyReadBufferEvent()336 void NotifyReadBufferEvent() { 337 if (sorting_mode_ == SortingMode::kFullSort || 338 flushes_since_extraction_ < 2) { 339 return; 340 } 341 342 SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_); 343 alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId(); 344 flushes_since_extraction_ = 0; 345 } 346 max_timestamp()347 int64_t max_timestamp() const { return append_max_ts_; } 348 349 private: 350 struct TimestampedEvent { 351 enum class Type : uint8_t { 352 kAndroidDumpstateEvent, 353 kAndroidLogEvent, 354 kEtwEvent, 355 kFtraceEvent, 356 kFuchsiaRecord, 357 kInlineSchedSwitch, 358 kInlineSchedWaking, 359 kInstrumentsRow, 360 kJsonValue, 361 kLegacyV8CpuProfileEvent, 362 kPerfRecord, 363 kSpeRecord, 364 kSystraceLine, 365 kTracePacket, 366 kTrackEvent, 367 kGeckoEvent, 368 kArtMethodEvent, 369 kPerfTextEvent, 370 kMax = kPerfTextEvent, 371 }; 372 373 // Number of bits required to store the max element in |Type|. 374 static constexpr uint32_t kMaxTypeBits = 6; 375 static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits), 376 "Max type does not fit inside storage"); 377 378 // The timestamp of this event. 379 int64_t ts; 380 381 // The fields inside BumpAllocator::AllocId of this tokenized object 382 // corresponding to this event. 383 uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits; 384 uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits; 385 386 // The type of this event. GCC7 does not like bit-field enums (see 387 // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of) 388 // so use an uint64_t instead and cast to the enum type. 389 uint64_t event_type : kMaxTypeBits; 390 alloc_idTimestampedEvent391 BumpAllocator::AllocId alloc_id() const { 392 return BumpAllocator::AllocId{chunk_index, chunk_offset}; 393 } 394 typeTimestampedEvent395 Type type() const { return static_cast<Type>(event_type); } 396 397 // For std::lower_bound(). CompareTimestampedEvent398 static bool Compare(const TimestampedEvent& x, int64_t ts) { 399 return x.ts < ts; 400 } 401 402 // For std::sort(). 403 bool operator<(const TimestampedEvent& evt) const { 404 return std::tie(ts, chunk_index, chunk_offset) < 405 std::tie(evt.ts, evt.chunk_index, evt.chunk_offset); 406 } 407 408 struct SlowOperatorLess { 409 // For std::sort() in slow mode. operatorTimestampedEvent::SlowOperatorLess410 bool operator()(const TimestampedEvent& a, 411 const TimestampedEvent& b) const { 412 int64_t a_key = 413 KeyForType(buffer.Get<JsonEvent>(GetTokenBufferId(a))->type); 414 int64_t b_key = 415 KeyForType(buffer.Get<JsonEvent>(GetTokenBufferId(b))->type); 416 return std::tie(a.ts, a_key, a.chunk_index, a.chunk_offset) < 417 std::tie(b.ts, b_key, b.chunk_index, b.chunk_offset); 418 } 419 KeyForTypeTimestampedEvent::SlowOperatorLess420 static int64_t KeyForType(const JsonEvent::Type& type) { 421 switch (type.index()) { 422 case base::variant_index<JsonEvent::Type, JsonEvent::End>(): 423 return std::numeric_limits<int64_t>::min(); 424 case base::variant_index<JsonEvent::Type, JsonEvent::Scoped>(): 425 return std::numeric_limits<int64_t>::max() - 426 std::get<JsonEvent::Scoped>(type).dur; 427 default: 428 return std::numeric_limits<int64_t>::max(); 429 } 430 PERFETTO_FATAL("For GCC"); 431 } 432 433 TraceTokenBuffer& buffer; 434 }; 435 }; 436 437 static_assert(sizeof(TimestampedEvent) == 16, 438 "TimestampedEvent must be equal to 16 bytes"); 439 static_assert(std::is_trivially_copyable_v<TimestampedEvent>, 440 "TimestampedEvent must be trivially copyable"); 441 static_assert(std::is_trivially_move_assignable_v<TimestampedEvent>, 442 "TimestampedEvent must be trivially move assignable"); 443 static_assert(std::is_trivially_move_constructible_v<TimestampedEvent>, 444 "TimestampedEvent must be trivially move constructible"); 445 static_assert(std::is_nothrow_swappable_v<TimestampedEvent>, 446 "TimestampedEvent must be trivially swappable"); 447 448 struct Queue { AppendQueue449 void Append(int64_t ts, 450 TimestampedEvent::Type type, 451 TraceTokenBuffer::Id id, 452 bool use_slow_sorting) { 453 { 454 TimestampedEvent event; 455 event.ts = ts; 456 event.chunk_index = id.alloc_id.chunk_index; 457 event.chunk_offset = id.alloc_id.chunk_offset; 458 event.event_type = static_cast<uint8_t>(type); 459 events_.emplace_back(std::move(event)); 460 } 461 462 // Events are often seen in order. 463 if (PERFETTO_LIKELY(ts > max_ts_ || 464 (!use_slow_sorting && ts == max_ts_))) { 465 max_ts_ = ts; 466 } else { 467 // The event is breaking ordering. The first time it happens, keep 468 // track of which index we are at. We know that everything before that 469 // is sorted (because events were pushed monotonically). Everything 470 // after that index, instead, will need a sorting pass before moving 471 // events to the next pipeline stage. 472 if (sort_start_idx_ == 0) { 473 sort_start_idx_ = events_.size() - 1; 474 sort_min_ts_ = ts; 475 } else { 476 sort_min_ts_ = std::min(sort_min_ts_, ts); 477 } 478 } 479 480 min_ts_ = std::min(min_ts_, ts); 481 PERFETTO_DCHECK(min_ts_ <= max_ts_); 482 } 483 needs_sortingQueue484 bool needs_sorting() const { return sort_start_idx_ != 0; } 485 void Sort(TraceTokenBuffer&, bool use_slow_sorting); 486 487 base::CircularQueue<TimestampedEvent> events_; 488 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 489 int64_t max_ts_ = 0; 490 size_t sort_start_idx_ = 0; 491 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 492 }; 493 494 void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id); 495 496 Queue* GetQueue(size_t index, 497 std::optional<MachineId> machine_id = std::nullopt) { 498 // sorter_data_by_machine_[0] corresponds to the default machine. 499 PERFETTO_DCHECK(sorter_data_by_machine_[0].machine_id == std::nullopt); 500 auto* queues = &sorter_data_by_machine_[0].queues; 501 502 // Find the TraceSorterData instance when |machine_id| is not nullopt. 503 if (PERFETTO_UNLIKELY(machine_id.has_value())) { 504 auto it = std::find_if(sorter_data_by_machine_.begin() + 1, 505 sorter_data_by_machine_.end(), 506 [machine_id](const TraceSorterData& item) { 507 return item.machine_id == machine_id; 508 }); 509 PERFETTO_DCHECK(it != sorter_data_by_machine_.end()); 510 queues = &it->queues; 511 } 512 513 if (PERFETTO_UNLIKELY(index >= queues->size())) 514 queues->resize(index + 1); 515 return &queues->at(index); 516 } 517 518 template <typename E> 519 void AppendNonFtraceEvent( 520 int64_t ts, 521 TimestampedEvent::Type event_type, 522 E&& evt, 523 std::optional<MachineId> machine_id = std::nullopt) { 524 if (PERFETTO_UNLIKELY(event_handling_ == EventHandling::kDrop)) { 525 return; 526 } 527 TraceTokenBuffer::Id id = token_buffer_.Append(std::forward<E>(evt)); 528 AppendNonFtraceEventWithId(ts, event_type, id, machine_id); 529 } 530 AppendNonFtraceEventWithId(int64_t ts,TimestampedEvent::Type event_type,TraceTokenBuffer::Id id,std::optional<MachineId> machine_id)531 void AppendNonFtraceEventWithId(int64_t ts, 532 TimestampedEvent::Type event_type, 533 TraceTokenBuffer::Id id, 534 std::optional<MachineId> machine_id) { 535 Queue* queue = GetQueue(0, machine_id); 536 queue->Append(ts, event_type, id, use_slow_sorting_); 537 UpdateAppendMaxTs(queue); 538 } 539 UpdateAppendMaxTs(Queue * queue)540 void UpdateAppendMaxTs(Queue* queue) { 541 append_max_ts_ = std::max(append_max_ts_, queue->max_ts_); 542 } 543 544 void ParseTracePacket(TraceProcessorContext& context, 545 const TimestampedEvent&); 546 void ParseFtracePacket(TraceProcessorContext& context, 547 uint32_t cpu, 548 const TimestampedEvent&); 549 void ParseEtwPacket(TraceProcessorContext& context, 550 uint32_t cpu, 551 const TimestampedEvent&); 552 553 void MaybeExtractEvent(size_t machine_idx, 554 size_t queue_idx, 555 const TimestampedEvent&); 556 void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event); 557 GetTokenBufferId(const TimestampedEvent & event)558 static TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) { 559 return TraceTokenBuffer::Id{event.alloc_id()}; 560 } 561 562 struct TraceSorterData { TraceSorterDataTraceSorterData563 explicit TraceSorterData(TraceProcessorContext* _machine_context) 564 : machine_id(_machine_context->machine_id()), 565 machine_context(_machine_context) {} 566 std::optional<MachineId> machine_id; 567 // queues_[0] is the general (non-ftrace) queue. 568 // queues_[1] is the ftrace queue for CPU(0). 569 // queues_[x] is the ftrace queue for CPU(x - 1). 570 TraceProcessorContext* machine_context; 571 std::vector<Queue> queues; 572 }; 573 std::vector<TraceSorterData> sorter_data_by_machine_; 574 575 // Whether we should ignore incremental extraction and just wait for 576 // forced extractionn at the end of the trace. 577 SortingMode sorting_mode_ = SortingMode::kDefault; 578 579 std::shared_ptr<TraceStorage> storage_; 580 581 // Buffer for storing tokenized objects while the corresponding events are 582 // being sorted. 583 TraceTokenBuffer token_buffer_; 584 585 // The AllocId until which events should be extracted. Set based 586 // on the AllocId in |OnReadBuffer|. 587 BumpAllocator::AllocId alloc_id_for_extraction_ = 588 token_buffer_.PastTheEndAllocId(); 589 590 // The number of flushes which have happened since the last incremental 591 // extraction. 592 uint32_t flushes_since_extraction_ = 0; 593 594 // max(e.ts for e appended to the sorter) 595 int64_t append_max_ts_ = 0; 596 597 // How events pushed into the sorter should be handled. 598 EventHandling event_handling_ = EventHandling::kSortAndPush; 599 600 // max(e.ts for e pushed to next stage) 601 int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min(); 602 603 // Whether when std::sorting the queues, we should use the slow 604 // sorting algorithm 605 bool use_slow_sorting_ = false; 606 }; 607 608 } // namespace perfetto::trace_processor 609 610 #endif // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 611