1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 19 20 #include <algorithm> 21 #include <memory> 22 #include <utility> 23 #include <vector> 24 25 #include "perfetto/ext/base/circular_queue.h" 26 #include "perfetto/ext/base/utils.h" 27 #include "perfetto/trace_processor/basic_types.h" 28 #include "perfetto/trace_processor/trace_blob_view.h" 29 #include "src/trace_processor/importers/common/parser_types.h" 30 #include "src/trace_processor/importers/common/trace_parser.h" 31 #include "src/trace_processor/importers/fuchsia/fuchsia_record.h" 32 #include "src/trace_processor/importers/systrace/systrace_line.h" 33 #include "src/trace_processor/sorter/trace_token_buffer.h" 34 #include "src/trace_processor/types/trace_processor_context.h" 35 #include "src/trace_processor/util/bump_allocator.h" 36 37 namespace perfetto { 38 namespace trace_processor { 39 40 // This class takes care of sorting events parsed from the trace stream in 41 // arbitrary order and pushing them to the next pipeline stages (parsing) in 42 // order. In order to support streaming use-cases, sorting happens within a 43 // window. 44 // 45 // Events are held in the TraceSorter staging area (events_) until either: 46 // 1. We can determine that it's safe to extract events by observing 47 // TracingServiceEvent Flush and ReadBuffer events 48 // 2. The trace EOF is reached 49 // 50 // Incremental extraction 51 // 52 // Incremental extraction happens by using a combination of flush and read 53 // buffer events from the tracing service. Note that incremental extraction 54 // is only applicable for write_into_file traces; ring-buffer traces will 55 // be sorted fully in-memory implicitly because there is only a single read 56 // buffer call at the end. 57 // 58 // The algorithm for incremental extraction is explained in detail at 59 // go/trace-sorting-is-complicated. 60 // 61 // Sorting algorithm 62 // 63 // The sorting algorithm is designed around the assumption that: 64 // - Most events come from ftrace. 65 // - Ftrace events are sorted within each cpu most of the times. 66 // 67 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 68 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 69 // necessary) before proceeding with the global merge-sort-extract. 70 // 71 // When an event is pushed through, it is just appended to the end of one of 72 // the N queues. While appending, we keep track of the fact that the queue 73 // is still ordered or just lost ordering. When an out-of-order event is 74 // detected on a queue we keep track of: (1) the offset within the queue where 75 // the chaos begun, (2) the timestamp that broke the ordering. 76 // 77 // When we decide to extract events from the queues into the next stages of 78 // the trace processor, we re-sort the events in the queue. Rather than 79 // re-sorting everything all the times, we use the above knowledge to restrict 80 // sorting to the (hopefully smaller) tail of the |events_| staging area. 81 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 82 // ordered, and the second partition [sort_start_idx_.. end] is not. 83 // We use a logarithmic bound search operation to figure out what is the index 84 // within the first partition where sorting should start, and sort all events 85 // from there to the end. 86 class TraceSorter { 87 public: 88 enum class SortingMode { 89 kDefault, 90 kFullSort, 91 }; 92 93 TraceSorter(TraceProcessorContext* context, 94 std::unique_ptr<TraceParser> parser, 95 SortingMode); 96 ~TraceSorter(); 97 PushTracePacket(int64_t timestamp,RefPtr<PacketSequenceStateGeneration> state,TraceBlobView tbv)98 inline void PushTracePacket(int64_t timestamp, 99 RefPtr<PacketSequenceStateGeneration> state, 100 TraceBlobView tbv) { 101 TraceTokenBuffer::Id id = 102 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 103 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTracePacket, id); 104 } 105 PushJsonValue(int64_t timestamp,std::string json_value)106 inline void PushJsonValue(int64_t timestamp, std::string json_value) { 107 TraceTokenBuffer::Id id = 108 token_buffer_.Append(JsonEvent{std::move(json_value)}); 109 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kJsonValue, id); 110 } 111 PushFuchsiaRecord(int64_t timestamp,FuchsiaRecord fuchsia_record)112 inline void PushFuchsiaRecord(int64_t timestamp, 113 FuchsiaRecord fuchsia_record) { 114 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(fuchsia_record)); 115 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kFuchsiaRecord, id); 116 } 117 PushSystraceLine(SystraceLine systrace_line)118 inline void PushSystraceLine(SystraceLine systrace_line) { 119 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(systrace_line)); 120 AppendNonFtraceEvent(systrace_line.ts, 121 TimestampedEvent::Type::kSystraceLine, id); 122 } 123 PushTrackEventPacket(int64_t timestamp,TrackEventData track_event)124 inline void PushTrackEventPacket(int64_t timestamp, 125 TrackEventData track_event) { 126 TraceTokenBuffer::Id id = token_buffer_.Append(std::move(track_event)); 127 AppendNonFtraceEvent(timestamp, TimestampedEvent::Type::kTrackEvent, id); 128 } 129 PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView tbv,RefPtr<PacketSequenceStateGeneration> state)130 inline void PushFtraceEvent(uint32_t cpu, 131 int64_t timestamp, 132 TraceBlobView tbv, 133 RefPtr<PacketSequenceStateGeneration> state) { 134 TraceTokenBuffer::Id id = 135 token_buffer_.Append(TracePacketData{std::move(tbv), std::move(state)}); 136 auto* queue = GetQueue(cpu + 1); 137 queue->Append(timestamp, TimestampedEvent::Type::kFtraceEvent, id); 138 UpdateAppendMaxTs(queue); 139 } 140 PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)141 inline void PushInlineFtraceEvent(uint32_t cpu, 142 int64_t timestamp, 143 InlineSchedSwitch inline_sched_switch) { 144 // TODO(rsavitski): if a trace has a mix of normal & "compact" events 145 // (being pushed through this function), the ftrace batches will no longer 146 // be fully sorted by timestamp. In such situations, we will have to sort 147 // at the end of the batch. We can do better as both sub-sequences are 148 // sorted however. Consider adding extra queues, or pushing them in a 149 // merge-sort fashion 150 // // instead. 151 TraceTokenBuffer::Id id = 152 token_buffer_.Append(std::move(inline_sched_switch)); 153 auto* queue = GetQueue(cpu + 1); 154 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedSwitch, id); 155 UpdateAppendMaxTs(queue); 156 } 157 PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)158 inline void PushInlineFtraceEvent(uint32_t cpu, 159 int64_t timestamp, 160 InlineSchedWaking inline_sched_waking) { 161 TraceTokenBuffer::Id id = 162 token_buffer_.Append(std::move(inline_sched_waking)); 163 auto* queue = GetQueue(cpu + 1); 164 queue->Append(timestamp, TimestampedEvent::Type::kInlineSchedWaking, id); 165 UpdateAppendMaxTs(queue); 166 } 167 ExtractEventsForced()168 void ExtractEventsForced() { 169 BumpAllocator::AllocId end_id = token_buffer_.PastTheEndAllocId(); 170 SortAndExtractEventsUntilAllocId(end_id); 171 for (const auto& queue : queues_) { 172 PERFETTO_DCHECK(queue.events_.empty()); 173 } 174 queues_.clear(); 175 176 alloc_id_for_extraction_ = end_id; 177 flushes_since_extraction_ = 0; 178 } 179 NotifyFlushEvent()180 void NotifyFlushEvent() { flushes_since_extraction_++; } 181 NotifyReadBufferEvent()182 void NotifyReadBufferEvent() { 183 if (sorting_mode_ == SortingMode::kFullSort || 184 flushes_since_extraction_ < 2) { 185 return; 186 } 187 188 SortAndExtractEventsUntilAllocId(alloc_id_for_extraction_); 189 alloc_id_for_extraction_ = token_buffer_.PastTheEndAllocId(); 190 flushes_since_extraction_ = 0; 191 } 192 max_timestamp()193 int64_t max_timestamp() const { return append_max_ts_; } 194 195 private: 196 struct TimestampedEvent { 197 enum class Type : uint8_t { 198 kFtraceEvent, 199 kTracePacket, 200 kInlineSchedSwitch, 201 kInlineSchedWaking, 202 kJsonValue, 203 kFuchsiaRecord, 204 kTrackEvent, 205 kSystraceLine, 206 kMax = kSystraceLine, 207 }; 208 209 // Number of bits required to store the max element in |Type|. 210 static constexpr uint32_t kMaxTypeBits = 4; 211 static_assert(static_cast<uint8_t>(Type::kMax) <= (1 << kMaxTypeBits), 212 "Max type does not fit inside storage"); 213 214 // The timestamp of this event. 215 int64_t ts; 216 217 // The fields inside BumpAllocator::AllocId of this tokenized object 218 // corresponding to this event. 219 uint64_t chunk_index : BumpAllocator::kChunkIndexAllocIdBits; 220 uint64_t chunk_offset : BumpAllocator::kChunkOffsetAllocIdBits; 221 222 // The type of this event. GCC7 does not like bit-field enums (see 223 // https://stackoverflow.com/questions/36005063/gcc-suppress-warning-too-small-to-hold-all-values-of) 224 // so use an uint64_t instead and cast to the enum type. 225 uint64_t event_type : kMaxTypeBits; 226 alloc_idTimestampedEvent227 BumpAllocator::AllocId alloc_id() const { 228 return BumpAllocator::AllocId{chunk_index, chunk_offset}; 229 } 230 231 // For std::lower_bound(). CompareTimestampedEvent232 static inline bool Compare(const TimestampedEvent& x, int64_t ts) { 233 return x.ts < ts; 234 } 235 236 // For std::sort(). 237 inline bool operator<(const TimestampedEvent& evt) const { 238 return std::tie(ts, chunk_index, chunk_offset) < 239 std::tie(evt.ts, evt.chunk_index, evt.chunk_offset); 240 } 241 }; 242 static_assert(sizeof(TimestampedEvent) == 16, 243 "TimestampedEvent must be equal to 16 bytes"); 244 static_assert(std::is_trivially_copyable<TimestampedEvent>::value, 245 "TimestampedEvent must be trivially copyable"); 246 static_assert(std::is_trivially_move_assignable<TimestampedEvent>::value, 247 "TimestampedEvent must be trivially move assignable"); 248 static_assert(std::is_trivially_move_constructible<TimestampedEvent>::value, 249 "TimestampedEvent must be trivially move constructible"); 250 static_assert(std::is_nothrow_swappable<TimestampedEvent>::value, 251 "TimestampedEvent must be trivially swappable"); 252 253 struct Queue { AppendQueue254 void Append(int64_t ts, 255 TimestampedEvent::Type type, 256 TraceTokenBuffer::Id id) { 257 { 258 TimestampedEvent event; 259 event.ts = ts; 260 event.chunk_index = id.alloc_id.chunk_index; 261 event.chunk_offset = id.alloc_id.chunk_offset; 262 event.event_type = static_cast<uint8_t>(type); 263 events_.emplace_back(std::move(event)); 264 } 265 266 // Events are often seen in order. 267 if (PERFETTO_LIKELY(ts >= max_ts_)) { 268 max_ts_ = ts; 269 } else { 270 // The event is breaking ordering. The first time it happens, keep 271 // track of which index we are at. We know that everything before that 272 // is sorted (because events were pushed monotonically). Everything 273 // after that index, instead, will need a sorting pass before moving 274 // events to the next pipeline stage. 275 if (sort_start_idx_ == 0) { 276 PERFETTO_DCHECK(events_.size() >= 2); 277 sort_start_idx_ = events_.size() - 1; 278 sort_min_ts_ = ts; 279 } else { 280 sort_min_ts_ = std::min(sort_min_ts_, ts); 281 } 282 } 283 284 min_ts_ = std::min(min_ts_, ts); 285 PERFETTO_DCHECK(min_ts_ <= max_ts_); 286 } 287 needs_sortingQueue288 bool needs_sorting() const { return sort_start_idx_ != 0; } 289 void Sort(); 290 291 base::CircularQueue<TimestampedEvent> events_; 292 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 293 int64_t max_ts_ = 0; 294 size_t sort_start_idx_ = 0; 295 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 296 }; 297 298 void SortAndExtractEventsUntilAllocId(BumpAllocator::AllocId alloc_id); 299 GetQueue(size_t index)300 inline Queue* GetQueue(size_t index) { 301 if (PERFETTO_UNLIKELY(index >= queues_.size())) 302 queues_.resize(index + 1); 303 return &queues_[index]; 304 } 305 AppendNonFtraceEvent(int64_t ts,TimestampedEvent::Type event_type,TraceTokenBuffer::Id id)306 inline void AppendNonFtraceEvent(int64_t ts, 307 TimestampedEvent::Type event_type, 308 TraceTokenBuffer::Id id) { 309 Queue* queue = GetQueue(0); 310 queue->Append(ts, event_type, id); 311 UpdateAppendMaxTs(queue); 312 } 313 UpdateAppendMaxTs(Queue * queue)314 inline void UpdateAppendMaxTs(Queue* queue) { 315 append_max_ts_ = std::max(append_max_ts_, queue->max_ts_); 316 } 317 318 void ParseTracePacket(const TimestampedEvent&); 319 void ParseFtracePacket(uint32_t cpu, const TimestampedEvent&); 320 321 void MaybeExtractEvent(size_t queue_idx, const TimestampedEvent&); 322 void ExtractAndDiscardTokenizedObject(const TimestampedEvent& event); 323 GetTokenBufferId(const TimestampedEvent & event)324 TraceTokenBuffer::Id GetTokenBufferId(const TimestampedEvent& event) { 325 return TraceTokenBuffer::Id{event.alloc_id()}; 326 } 327 328 TraceProcessorContext* context_ = nullptr; 329 std::unique_ptr<TraceParser> parser_; 330 331 // Whether we should ignore incremental extraction and just wait for 332 // forced extractionn at the end of the trace. 333 SortingMode sorting_mode_ = SortingMode::kDefault; 334 335 // Buffer for storing tokenized objects while the corresponding events are 336 // being sorted. 337 TraceTokenBuffer token_buffer_; 338 339 // The AllocId until which events should be extracted. Set based 340 // on the AllocId in |OnReadBuffer|. 341 BumpAllocator::AllocId alloc_id_for_extraction_ = 342 token_buffer_.PastTheEndAllocId(); 343 344 // The number of flushes which have happened since the last incremental 345 // extraction. 346 uint32_t flushes_since_extraction_ = 0; 347 348 // queues_[0] is the general (non-ftrace) queue. 349 // queues_[1] is the ftrace queue for CPU(0). 350 // queues_[x] is the ftrace queue for CPU(x - 1). 351 std::vector<Queue> queues_; 352 353 // max(e.ts for e appended to the sorter) 354 int64_t append_max_ts_ = 0; 355 356 // Used for performance tests. True when setting 357 // TRACE_PROCESSOR_SORT_ONLY=1. 358 bool bypass_next_stage_for_testing_ = false; 359 360 // max(e.ts for e pushed to next stage) 361 int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min(); 362 }; 363 364 } // namespace trace_processor 365 } // namespace perfetto 366 367 #endif // SRC_TRACE_PROCESSOR_SORTER_TRACE_SORTER_H_ 368