1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 19 20 #include <algorithm> 21 #include <cstddef> 22 #include <cstdint> 23 #include <limits> 24 #include <memory> 25 #include <optional> 26 #include <string> 27 #include <tuple> 28 #include <type_traits> 29 #include <utility> 30 #include <vector> 31 32 #include "perfetto/base/logging.h" 33 #include "perfetto/ext/base/circular_queue.h" 34 #include "perfetto/public/compiler.h" 35 #include "perfetto/trace_processor/ref_counted.h" 36 #include "perfetto/trace_processor/trace_blob_view.h" 37 #include "src/trace_processor/importers/common/parser_types.h" 38 #include "src/trace_processor/importers/common/trace_parser.h" 39 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h" 40 #include "src/trace_processor/importers/perf/record.h" 41 #include "src/trace_processor/importers/systrace/systrace_line.h" 42 #include "src/trace_processor/sorter/trace_token_buffer.h" 43 #include "src/trace_processor/storage/trace_storage.h" 44 #include "src/trace_processor/types/trace_processor_context.h" 45 #include "src/trace_processor/util/bump_allocator.h" 46 47 namespace perfetto::trace_processor { 48 49 // This class takes care of sorting events parsed from the trace stream in 50 // arbitrary order and pushing them to the next pipeline stages (parsing) in 51 // order. In order to support streaming use-cases, sorting happens within a 52 // window. 53 // 54 // Events are held in the TraceSorter staging area (events_) until either: 55 // 1. We can determine that it's safe to extract events by observing 56 // TracingServiceEvent Flush and ReadBuffer events 57 // 2. The trace EOF is reached 58 // 59 // Incremental extraction 60 // 61 // Incremental extraction happens by using a combination of flush and read 62 // buffer events from the tracing service. Note that incremental extraction 63 // is only applicable for write_into_file traces; ring-buffer traces will 64 // be sorted fully in-memory implicitly because there is only a single read 65 // buffer call at the end. 66 // 67 // The algorithm for incremental extraction is explained in detail at 68 // go/trace-sorting-is-complicated. 69 // 70 // Sorting algorithm 71 // 72 // The sorting algorithm is designed around the assumption that: 73 // - Most events come from ftrace. 74 // - Ftrace events are sorted within each cpu most of the times. 75 // 76 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 77 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 78 // necessary) before proceeding with the global merge-sort-extract. 79 // 80 // When an event is pushed through, it is just appended to the end of one of 81 // the N queues. While appending, we keep track of the fact that the queue 82 // is still ordered or just lost ordering. When an out-of-order event is 83 // detected on a queue we keep track of: (1) the offset within the queue where 84 // the chaos begun, (2) the timestamp that broke the ordering. 85 // 86 // When we decide to extract events from the queues into the next stages of 87 // the trace processor, we re-sort the events in the queue. Rather than 88 // re-sorting everything all the times, we use the above knowledge to restrict 89 // sorting to the (hopefully smaller) tail of the |events_| staging area. 90 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 91 // ordered, and the second partition [sort_start_idx_.. end] is not. 92 // We use a logarithmic bound search operation to figure out what is the index 93 // within the first partition where sorting should start, and sort all events 94 // from there to the end. 95 class TraceSorter { 96 public: 97 enum class SortingMode { 98 kDefault, 99 kFullSort, 100 }; 101 102 TraceSorter(TraceProcessorContext* context, SortingMode sorting_mode); 103 104 ~TraceSorter(); 105 sorting_mode()106 SortingMode sorting_mode() const { return sorting_mode_; } 107 AddMachineContext(TraceProcessorContext * context)108 inline void AddMachineContext(TraceProcessorContext* context) { 109 sorter_data_by_machine_.emplace_back(context); 110 } 111 112 inline void PushPerfRecord( 113 int64_t timestamp, 114 perf_importer::Record record, 115 std::optional<MachineId> machine_id = std::nullopt) { 116 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(record)); 117 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kPerfRecord, id, 118 machine_id); 119 } 120 121 inline void PushTracePacket( 122 int64_t timestamp, 123 TracePacketData data, 124 std::optional<MachineId> machine_id = std::nullopt) { 125 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(data)); 126 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket, id, 127 machine_id); 128 } 129 130 inline void PushTracePacket( 131 int64_t timestamp, 132 RefPtr<PacketSequenceStateGeneration> state, 133 TraceBlobView tbv, 134 std::optional<MachineId> machine_id = std::nullopt) { 135 PushTracePacket(timestamp, 136 TracePacketData{std::move(tbv), std::move(state)}, 137 machine_id); 138 } 139 PushJsonValue(int64_t timestamp,std::string json_value)140 inline void PushJsonValue(int64_t timestamp, std::string json_value) { 141 TraceTokenBuffer::Id id = 142 token_buffer_.Append(JsonEvent{std::move(json_value)}); 143 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue, id); 144 } 145 PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)146 inline void PushFuchsiaRecord(int64_t timestamp, 147 FuchsiaRecord fuchsia_record) { 148 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(fuchsia_record)); 149 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord, id); 150 } 151 PushSystraceLine(SystraceLine systrace_line)152 inline void PushSystraceLine(SystraceLine systrace_line) { 153 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(systrace_line)); 154 AppendNonFtraceEvent(systrace_line.ts, 155 TimestampedEvent::Type::kSystraceLine, id); 156 } 157 158 inline void PushTrackEventPacket( 159 int64_t timestamp, 160 TrackEventData track_event, 161 std::optional<MachineId> machine_id = std::nullopt) { 162 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(track_event)); 163 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent, id, 164 machine_id); 165 } 166 167 inline void PushEtwEvent(uint32_t cpu, 168 int64_t timestamp, 169 TraceBlobView tbv, 170 RefPtr<PacketSequenceStateGeneration> state, 171 std::optional<MachineId> machine_id = std::nullopt) { 172 TraceTokenBuffer::Id id = 173 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 174 auto* queue = GetQueue(cpu + 1, machine_id); 175 queue->Append(timestamp, TimestampedEvent::Type::kEtwEvent, id); 176 UpdateAppendMaxTs(queue); 177 } 178 179 inline void PushFtraceEvent( 180 uint32_t cpu, 181 int64_t timestamp, 182 TraceBlobView tbv, 183 RefPtr<PacketSequenceStateGeneration> state, 184 std::optional<MachineId> machine_id = std::nullopt) { 185 TraceTokenBuffer::Id id = 186 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 187 auto* queue = GetQueue(cpu + 1, machine_id); 188 queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id); 189 UpdateAppendMaxTs(queue); 190 } 191 192 inline void PushInlineFtraceEvent( 193 uint32_t cpu, 194 int64_t timestamp, 195 InlineSchedSwitch inline_sched_switch, 196 std::optional<MachineId> machine_id = std::nullopt) { 197 // TODO(rsavitski): if a trace has a mix of normal & "compact" events 198 // (being pushed through this function), the ftrace batches will no longer 199 // be fully sorted by timestamp. In such situations, we will have to sort 200 // at the end of the batch. We can do better as both sub-sequences are 201 // sorted however. Consider adding extra queues, or pushing them in a 202 // merge-sort fashion 203 // // instead. 204 TraceTokenBuffer::Id id = 205 token_buffer_.Append(std::move(inline_sched_switch)); 206 auto* queue = GetQueue(cpu + 1, machine_id); 207 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id); 208 UpdateAppendMaxTs(queue); 209 } 210 211 inline void PushInlineFtraceEvent( 212 uint32_t cpu, 213 int64_t timestamp, 214 InlineSchedWaking inline_sched_waking, 215 std::optional<MachineId> machine_id = std::nullopt) { 216 TraceTokenBuffer::Id id = 217 token_buffer_.Append(std::move(inline_sched_waking)); 218 auto* queue = GetQueue(cpu + 1, machine_id); 219 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id); 220 UpdateAppendMaxTs(queue); 221 } 222 ExtractEventsForced()223 void ExtractEventsForced() { 224 BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId(); 225 SortAndExtractEventsUntilAllocId(end_id); 226 for (auto& sorter_data : sorter_data_by_machine_) { 227 for (const auto& queue : sorter_data.queues) { 228 PERFETTO_CHECK(queue.events_.empty()); 229 } 230 sorter_data.queues.clear(); 231 } 232 233 alloc_id_for_extraction_ = end_id; 234 flushes_since_extraction_ = 0; 235 } 236 NotifyFlushEvent()237 void NotifyFlushEvent() { flushes_since_extraction_++; } 238 NotifyReadBufferEvent()239 void NotifyReadBufferEvent() { 240 if (sorting_mode_ == SortingMode::kFullSort || 241 flushes_since_extraction_ < 2) { 242 return; 243 } 244 245 SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_); 246 alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId(); 247 flushes_since_extraction_ = 0; 248 } 249 max_timestamp()250 int64_t max_timestamp() const { return append_max_ts_; } 251 252 private: 253 struct TimestampedEvent { 254 enum class Type : uint8_t { 255 kFtraceEvent, 256 kPerfRecord, 257 kTracePacket, 258 kInlineSchedSwitch, 259 kInlineSchedWaking, 260 kJsonValue, 261 kFuchsiaRecord, 262 kTrackEvent, 263 kSystraceLine, 264 kEtwEvent, 265 kMax = kEtwEvent, 266 }; 267 268 // Number of bits required to store the max element in |Type|. 269 static constexpr uint32_t kMaxTypeBits = 4; 270 static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits), 271 "Max type does not fit inside storage"); 272 273 // The timestamp of this event. 274 int64_t ts; 275 276 // The fields inside BumpAllocator::AllocId of this tokenized object 277 // corresponding to this event. 278 uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits; 279 uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits; 280 281 // The type of this event. GCC7 does not like bit-field enums (see 282 // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of) 283 // so use an uint64_t instead and cast to the enum type. 284 uint64_t event_type : kMaxTypeBits; 285 alloc_idTimestampedEvent286 BumpAllocator::AllocId alloc_id() const { 287 return BumpAllocator::AllocId{chunk_index, chunk_offset}; 288 } 289 290 // For std::lower_bound(). CompareTimestampedEvent291 static inline bool Compare(const TimestampedEvent& x, int64_t ts) { 292 return x.ts < ts; 293 } 294 295 // For std::sort(). 296 inline bool operator<(const TimestampedEvent& evt) const { 297 return std::tie(ts, chunk_index, chunk_offset) < 298 std::tie(evt.ts, evt.chunk_index, evt.chunk_offset); 299 } 300 }; 301 302 static_assert(sizeof(TimestampedEvent) == 16, 303 "TimestampedEvent must be equal to 16 bytes"); 304 static_assert(std::is_trivially_copyable_v<TimestampedEvent>, 305 "TimestampedEvent must be trivially copyable"); 306 static_assert(std::is_trivially_move_assignable_v<TimestampedEvent>, 307 "TimestampedEvent must be trivially move assignable"); 308 static_assert(std::is_trivially_move_constructible_v<TimestampedEvent>, 309 "TimestampedEvent must be trivially move constructible"); 310 static_assert(std::is_nothrow_swappable_v<TimestampedEvent>, 311 "TimestampedEvent must be trivially swappable"); 312 313 struct Queue { AppendQueue314 void Append(int64_t ts, 315 TimestampedEvent::Type type, 316 TraceTokenBuffer::Id id) { 317 { 318 TimestampedEvent event; 319 event.ts = ts; 320 event.chunk_index = id.alloc_id.chunk_index; 321 event.chunk_offset = id.alloc_id.chunk_offset; 322 event.event_type = static_cast<uint8_t>(type); 323 events_.emplace_back(std::move(event)); 324 } 325 326 // Events are often seen in order. 327 if (PERFETTO_LIKELY(ts >= max_ts_)) { 328 max_ts_ = ts; 329 } else { 330 // The event is breaking ordering. The first time it happens, keep 331 // track of which index we are at. We know that everything before that 332 // is sorted (because events were pushed monotonically). Everything 333 // after that index, instead, will need a sorting pass before moving 334 // events to the next pipeline stage. 335 if (sort_start_idx_ == 0) { 336 PERFETTO_DCHECK(events_.size() >= 2); 337 sort_start_idx_ = events_.size() - 1; 338 sort_min_ts_ = ts; 339 } else { 340 sort_min_ts_ = std::min(sort_min_ts_, ts); 341 } 342 } 343 344 min_ts_ = std::min(min_ts_, ts); 345 PERFETTO_DCHECK(min_ts_ <= max_ts_); 346 } 347 needs_sortingQueue348 bool needs_sorting() const { return sort_start_idx_ != 0; } 349 void Sort(); 350 351 base::CircularQueue<TimestampedEvent> events_; 352 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 353 int64_t max_ts_ = 0; 354 size_t sort_start_idx_ = 0; 355 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 356 }; 357 358 void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id); 359 360 inline Queue* GetQueue(size_t index, 361 std::optional<MachineId> machine_id = std::nullopt) { 362 // sorter_data_by_machine_[0] corresponds to the default machine. 363 PERFETTO_DCHECK(sorter_data_by_machine_[0].machine_id == std::nullopt); 364 auto* queues = &sorter_data_by_machine_[0].queues; 365 366 // Find the TraceSorterData instance when |machine_id| is not nullopt. 367 if (PERFETTO_UNLIKELY(machine_id.has_value())) { 368 auto it = std::find_if(sorter_data_by_machine_.begin() + 1, 369 sorter_data_by_machine_.end(), 370 [machine_id](const TraceSorterData& item) { 371 return item.machine_id == machine_id; 372 }); 373 PERFETTO_DCHECK(it != sorter_data_by_machine_.end()); 374 queues = &it->queues; 375 } 376 377 if (PERFETTO_UNLIKELY(index >= queues->size())) 378 queues->resize(index + 1); 379 return &queues->at(index); 380 } 381 382 inline void AppendNonFtraceEvent( 383 int64_t ts, 384 TimestampedEvent::Type event_type, 385 TraceTokenBuffer::Id id, 386 std::optional<MachineId> machine_id = std::nullopt) { 387 Queue* queue = GetQueue(0, machine_id); 388 queue->Append(ts, event_type, id); 389 UpdateAppendMaxTs(queue); 390 } 391 UpdateAppendMaxTs(Queue * queue)392 inline void UpdateAppendMaxTs(Queue* queue) { 393 append_max_ts_ = std::max(append_max_ts_, queue->max_ts_); 394 } 395 396 void ParseTracePacket(TraceProcessorContext& context, 397 const TimestampedEvent&); 398 void ParseFtracePacket(TraceProcessorContext& context, 399 uint32_t cpu, 400 const TimestampedEvent&); 401 void ParseEtwPacket(TraceProcessorContext& context, 402 uint32_t cpu, 403 const TimestampedEvent&); 404 405 void MaybeExtractEvent(size_t machine_idx, 406 size_t queue_idx, 407 const TimestampedEvent&); 408 void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event); 409 GetTokenBufferId(const TimestampedEvent & event)410 static TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) { 411 return TraceTokenBuffer::Id{event.alloc_id()}; 412 } 413 414 struct TraceSorterData { TraceSorterDataTraceSorterData415 explicit TraceSorterData(TraceProcessorContext* _machine_context) 416 : machine_id(_machine_context->machine_id()), 417 machine_context(_machine_context) {} 418 std::optional<MachineId> machine_id; 419 // queues_[0] is the general (non-ftrace) queue. 420 // queues_[1] is the ftrace queue for CPU(0). 421 // queues_[x] is the ftrace queue for CPU(x - 1). 422 TraceProcessorContext* machine_context; 423 std::vector<Queue> queues; 424 }; 425 std::vector<TraceSorterData> sorter_data_by_machine_; 426 427 // Whether we should ignore incremental extraction and just wait for 428 // forced extractionn at the end of the trace. 429 SortingMode sorting_mode_ = SortingMode::kDefault; 430 431 std::shared_ptr<TraceStorage> storage_; 432 433 // Buffer for storing tokenized objects while the corresponding events are 434 // being sorted. 435 TraceTokenBuffer token_buffer_; 436 437 // The AllocId until which events should be extracted. Set based 438 // on the AllocId in |OnReadBuffer|. 439 BumpAllocator::AllocId alloc_id_for_extraction_ = 440 token_buffer_.PastTheEndAllocId(); 441 442 // The number of flushes which have happened since the last incremental 443 // extraction. 444 uint32_t flushes_since_extraction_ = 0; 445 446 // max(e.ts for e appended to the sorter) 447 int64_t append_max_ts_ = 0; 448 449 // Used for performance tests. True when setting 450 // TRACE_PROCESSOR_SORT_ONLY=1. 451 bool bypass_next_stage_for_testing_ = false; 452 453 // max(e.ts for e pushed to next stage) 454 int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min(); 455 }; 456 457 } // namespace perfetto::trace_processor 458 459 #endif // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 460