1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 19 20 #include <vector> 21 22 #include "perfetto/ext/base/circular_queue.h" 23 #include "perfetto/trace_processor/basic_types.h" 24 #include "src/trace_processor/storage/trace_storage.h" 25 #include "src/trace_processor/timestamped_trace_piece.h" 26 #include "src/trace_processor/trace_blob_view.h" 27 28 namespace Json { 29 class Value; 30 } // namespace Json 31 32 namespace perfetto { 33 namespace trace_processor { 34 35 class FuchsiaProviderView; 36 class PacketSequenceState; 37 struct SystraceLine; 38 39 // This class takes care of sorting events parsed from the trace stream in 40 // arbitrary order and pushing them to the next pipeline stages (parsing) in 41 // order. In order to support streaming use-cases, sorting happens within a 42 // max window. Events are held in the TraceSorter staging area (events_) until 43 // either (1) the (max - min) timestamp > window_size; (2) trace EOF. 44 // 45 // This class is designed around the assumption that: 46 // - Most events come from ftrace. 47 // - Ftrace events are sorted within each cpu most of the times. 48 // 49 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 50 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 51 // necessary) before proceeding with the global merge-sort-extract. 52 // When an event is pushed through, it is just appeneded to the end of one of 53 // the N queues. While appending, we keep track of the fact that the queue 54 // is still ordered or just lost ordering. When an out-of-order event is 55 // detected on a queue we keep track of: (1) the offset within the queue where 56 // the chaos begun, (2) the timestamp that broke the ordering. 57 // When we decide to extract events from the queues into the next stages of 58 // the trace processor, we re-sort the events in the queue. Rather than 59 // re-sorting everything all the times, we use the above knowledge to restrict 60 // sorting to the (hopefully smaller) tail of the |events_| staging area. 61 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 62 // ordered, and the second partition [sort_start_idx_.. end] is not. 63 // We use a logarithmic bound search operation to figure out what is the index 64 // within the first partition where sorting should start, and sort all events 65 // from there to the end. 66 class TraceSorter { 67 public: 68 TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns); 69 PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)70 inline void PushTracePacket(int64_t timestamp, 71 PacketSequenceState* state, 72 TraceBlobView packet) { 73 DCHECK_ftrace_batch_cpu(kNoBatch); 74 auto* queue = GetQueue(0); 75 queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, 76 std::move(packet), 77 state->current_generation())); 78 MaybeExtractEvents(queue); 79 } 80 PushJsonValue(int64_t timestamp,std::unique_ptr<Json::Value> json_value)81 inline void PushJsonValue(int64_t timestamp, 82 std::unique_ptr<Json::Value> json_value) { 83 auto* queue = GetQueue(0); 84 queue->Append( 85 TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); 86 MaybeExtractEvents(queue); 87 } 88 PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)89 inline void PushFuchsiaRecord(int64_t timestamp, 90 std::unique_ptr<FuchsiaRecord> record) { 91 DCHECK_ftrace_batch_cpu(kNoBatch); 92 auto* queue = GetQueue(0); 93 queue->Append( 94 TimestampedTracePiece(timestamp, packet_idx_++, std::move(record))); 95 MaybeExtractEvents(queue); 96 } 97 PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)98 inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) { 99 DCHECK_ftrace_batch_cpu(kNoBatch); 100 auto* queue = GetQueue(0); 101 int64_t timestamp = systrace_line->ts; 102 queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, 103 std::move(systrace_line))); 104 MaybeExtractEvents(queue); 105 } 106 PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event)107 inline void PushFtraceEvent(uint32_t cpu, 108 int64_t timestamp, 109 TraceBlobView event) { 110 set_ftrace_batch_cpu_for_DCHECK(cpu); 111 GetQueue(cpu + 1)->Append( 112 TimestampedTracePiece(timestamp, packet_idx_++, std::move(event))); 113 114 // The caller must call FinalizeFtraceEventBatch() after having pushed a 115 // batch of ftrace events. This is to amortize the overhead of handling 116 // global ordering and doing that in batches only after all ftrace events 117 // for a bundle are pushed. 118 } 119 120 // As with |PushFtraceEvent|, doesn't immediately sort the affected queues. 121 // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being 122 // pushed through this function), the ftrace batches will no longer be fully 123 // sorted by timestamp. In such situations, we will have to sort at the end of 124 // the batch. We can do better as both sub-sequences are sorted however. 125 // Consider adding extra queues, or pushing them in a merge-sort fashion 126 // instead. PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)127 inline void PushInlineFtraceEvent(uint32_t cpu, 128 int64_t timestamp, 129 InlineSchedSwitch inline_sched_switch) { 130 set_ftrace_batch_cpu_for_DCHECK(cpu); 131 GetQueue(cpu + 1)->Append( 132 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch)); 133 } PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)134 inline void PushInlineFtraceEvent(uint32_t cpu, 135 int64_t timestamp, 136 InlineSchedWaking inline_sched_waking) { 137 set_ftrace_batch_cpu_for_DCHECK(cpu); 138 GetQueue(cpu + 1)->Append( 139 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking)); 140 } 141 PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)142 inline void PushTrackEventPacket(int64_t timestamp, 143 std::unique_ptr<TrackEventData> data) { 144 auto* queue = GetQueue(0); 145 queue->Append( 146 TimestampedTracePiece(timestamp, packet_idx_++, std::move(data))); 147 MaybeExtractEvents(queue); 148 } 149 FinalizeFtraceEventBatch(uint32_t cpu)150 inline void FinalizeFtraceEventBatch(uint32_t cpu) { 151 DCHECK_ftrace_batch_cpu(cpu); 152 set_ftrace_batch_cpu_for_DCHECK(kNoBatch); 153 MaybeExtractEvents(GetQueue(cpu + 1)); 154 } 155 156 // Extract all events ignoring the window. ExtractEventsForced()157 void ExtractEventsForced() { 158 SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0); 159 queues_.resize(0); 160 } 161 162 // Sets the window size to be the size specified (which should be lower than 163 // any previous window size specified) and flushes any data beyond 164 // this window size. 165 // It is undefined to call this function with a window size greater than than 166 // the current size. SetWindowSizeNs(int64_t window_size_ns)167 void SetWindowSizeNs(int64_t window_size_ns) { 168 PERFETTO_DCHECK(window_size_ns <= window_size_ns_); 169 170 PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns); 171 window_size_ns_ = window_size_ns; 172 173 // Fast path: if, globally, we are within the window size, then just exit. 174 if (global_max_ts_ - global_min_ts_ < window_size_ns) 175 return; 176 SortAndExtractEventsBeyondWindow(window_size_ns_); 177 } 178 max_timestamp()179 int64_t max_timestamp() const { return global_max_ts_; } 180 181 private: 182 static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max(); 183 184 struct Queue { AppendQueue185 inline void Append(TimestampedTracePiece ttp) { 186 const int64_t timestamp = ttp.timestamp; 187 events_.emplace_back(std::move(ttp)); 188 min_ts_ = std::min(min_ts_, timestamp); 189 190 // Events are often seen in order. 191 if (PERFETTO_LIKELY(timestamp >= max_ts_)) { 192 max_ts_ = timestamp; 193 } else { 194 // The event is breaking ordering. The first time it happens, keep 195 // track of which index we are at. We know that everything before that 196 // is sorted (because events were pushed monotonically). Everything 197 // after that index, instead, will need a sorting pass before moving 198 // events to the next pipeline stage. 199 if (sort_start_idx_ == 0) { 200 PERFETTO_DCHECK(events_.size() >= 2); 201 sort_start_idx_ = events_.size() - 1; 202 sort_min_ts_ = timestamp; 203 } else { 204 sort_min_ts_ = std::min(sort_min_ts_, timestamp); 205 } 206 } 207 208 PERFETTO_DCHECK(min_ts_ <= max_ts_); 209 } 210 needs_sortingQueue211 bool needs_sorting() const { return sort_start_idx_ != 0; } 212 void Sort(); 213 214 base::CircularQueue<TimestampedTracePiece> events_; 215 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 216 int64_t max_ts_ = 0; 217 size_t sort_start_idx_ = 0; 218 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 219 }; 220 221 // This method passes any events older than window_size_ns to the 222 // parser to be parsed and then stored. 223 void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns); 224 GetQueue(size_t index)225 inline Queue* GetQueue(size_t index) { 226 if (PERFETTO_UNLIKELY(index >= queues_.size())) 227 queues_.resize(index + 1); 228 return &queues_[index]; 229 } 230 MaybeExtractEvents(Queue * queue)231 inline void MaybeExtractEvents(Queue* queue) { 232 DCHECK_ftrace_batch_cpu(kNoBatch); 233 global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); 234 global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); 235 236 // Fast path: if, globally, we are within the window size, then just exit. 237 if (global_max_ts_ - global_min_ts_ < window_size_ns_) 238 return; 239 SortAndExtractEventsBeyondWindow(window_size_ns_); 240 } 241 242 std::unique_ptr<TraceParser> parser_; 243 244 // queues_[0] is the general (non-ftrace) queue. 245 // queues_[1] is the ftrace queue for CPU(0). 246 // queues_[x] is the ftrace queue for CPU(x - 1). 247 std::vector<Queue> queues_; 248 249 // Events are propagated to the next stage only after (max - min) timestamp 250 // is larger than this value. 251 int64_t window_size_ns_; 252 253 // max(e.timestamp for e in queues_). 254 int64_t global_max_ts_ = 0; 255 256 // min(e.timestamp for e in queues_). 257 int64_t global_min_ts_ = std::numeric_limits<int64_t>::max(); 258 259 // Monotonic increasing value used to index timestamped trace pieces. 260 uint64_t packet_idx_ = 0; 261 262 // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. 263 bool bypass_next_stage_for_testing_ = false; 264 265 #if PERFETTO_DCHECK_IS_ON() 266 // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called. 267 uint32_t ftrace_batch_cpu_ = kNoBatch; 268 DCHECK_ftrace_batch_cpu(uint32_t cpu)269 inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) { 270 PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu); 271 } 272 set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)273 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) { 274 PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch || 275 cpu == kNoBatch); 276 ftrace_batch_cpu_ = cpu; 277 } 278 #else DCHECK_ftrace_batch_cpu(uint32_t)279 inline void DCHECK_ftrace_batch_cpu(uint32_t) {} set_ftrace_batch_cpu_for_DCHECK(uint32_t)280 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {} 281 #endif 282 }; 283 284 } // namespace trace_processor 285 } // namespace perfetto 286 287 #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 288