1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 19 20 #include <vector> 21 22 #include "perfetto/ext/base/circular_queue.h" 23 #include "perfetto/trace_processor/basic_types.h" 24 #include "src/trace_processor/importers/common/trace_blob_view.h" 25 #include "src/trace_processor/storage/trace_storage.h" 26 #include "src/trace_processor/timestamped_trace_piece.h" 27 28 namespace Json { 29 class Value; 30 } // namespace Json 31 32 namespace perfetto { 33 namespace trace_processor { 34 35 class FuchsiaProviderView; 36 class PacketSequenceState; 37 struct SystraceLine; 38 39 // This class takes care of sorting events parsed from the trace stream in 40 // arbitrary order and pushing them to the next pipeline stages (parsing) in 41 // order. In order to support streaming use-cases, sorting happens within a 42 // max window. Events are held in the TraceSorter staging area (events_) until 43 // either (1) the (max - min) timestamp > window_size; (2) trace EOF. 44 // 45 // This class is designed around the assumption that: 46 // - Most events come from ftrace. 47 // - Ftrace events are sorted within each cpu most of the times. 48 // 49 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 50 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 51 // necessary) before proceeding with the global merge-sort-extract. 52 // When an event is pushed through, it is just appeneded to the end of one of 53 // the N queues. While appending, we keep track of the fact that the queue 54 // is still ordered or just lost ordering. When an out-of-order event is 55 // detected on a queue we keep track of: (1) the offset within the queue where 56 // the chaos begun, (2) the timestamp that broke the ordering. 57 // When we decide to extract events from the queues into the next stages of 58 // the trace processor, we re-sort the events in the queue. Rather than 59 // re-sorting everything all the times, we use the above knowledge to restrict 60 // sorting to the (hopefully smaller) tail of the |events_| staging area. 61 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 62 // ordered, and the second partition [sort_start_idx_.. end] is not. 63 // We use a logarithmic bound search operation to figure out what is the index 64 // within the first partition where sorting should start, and sort all events 65 // from there to the end. 66 class TraceSorter { 67 public: 68 TraceSorter(std::unique_ptr<TraceParser> parser, int64_t window_size_ns); 69 PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)70 inline void PushTracePacket(int64_t timestamp, 71 PacketSequenceState* state, 72 TraceBlobView packet) { 73 DCHECK_ftrace_batch_cpu(kNoBatch); 74 auto* queue = GetQueue(0); 75 queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, 76 std::move(packet), 77 state->current_generation())); 78 MaybeExtractEvents(queue); 79 } 80 PushJsonValue(int64_t timestamp,std::string json_value)81 inline void PushJsonValue(int64_t timestamp, std::string json_value) { 82 auto* queue = GetQueue(0); 83 queue->Append( 84 TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); 85 MaybeExtractEvents(queue); 86 } 87 PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)88 inline void PushFuchsiaRecord(int64_t timestamp, 89 std::unique_ptr<FuchsiaRecord> record) { 90 DCHECK_ftrace_batch_cpu(kNoBatch); 91 auto* queue = GetQueue(0); 92 queue->Append( 93 TimestampedTracePiece(timestamp, packet_idx_++, std::move(record))); 94 MaybeExtractEvents(queue); 95 } 96 PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)97 inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) { 98 DCHECK_ftrace_batch_cpu(kNoBatch); 99 auto* queue = GetQueue(0); 100 int64_t timestamp = systrace_line->ts; 101 queue->Append(TimestampedTracePiece(timestamp, packet_idx_++, 102 std::move(systrace_line))); 103 MaybeExtractEvents(queue); 104 } 105 PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event,PacketSequenceState * state)106 inline void PushFtraceEvent(uint32_t cpu, 107 int64_t timestamp, 108 TraceBlobView event, 109 PacketSequenceState* state) { 110 set_ftrace_batch_cpu_for_DCHECK(cpu); 111 GetQueue(cpu + 1)->Append(TimestampedTracePiece( 112 timestamp, packet_idx_++, 113 FtraceEventData{std::move(event), state->current_generation()})); 114 115 // The caller must call FinalizeFtraceEventBatch() after having pushed a 116 // batch of ftrace events. This is to amortize the overhead of handling 117 // global ordering and doing that in batches only after all ftrace events 118 // for a bundle are pushed. 119 } 120 121 // As with |PushFtraceEvent|, doesn't immediately sort the affected queues. 122 // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being 123 // pushed through this function), the ftrace batches will no longer be fully 124 // sorted by timestamp. In such situations, we will have to sort at the end of 125 // the batch. We can do better as both sub-sequences are sorted however. 126 // Consider adding extra queues, or pushing them in a merge-sort fashion 127 // instead. PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)128 inline void PushInlineFtraceEvent(uint32_t cpu, 129 int64_t timestamp, 130 InlineSchedSwitch inline_sched_switch) { 131 set_ftrace_batch_cpu_for_DCHECK(cpu); 132 GetQueue(cpu + 1)->Append( 133 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch)); 134 } PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)135 inline void PushInlineFtraceEvent(uint32_t cpu, 136 int64_t timestamp, 137 InlineSchedWaking inline_sched_waking) { 138 set_ftrace_batch_cpu_for_DCHECK(cpu); 139 GetQueue(cpu + 1)->Append( 140 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking)); 141 } 142 PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)143 inline void PushTrackEventPacket(int64_t timestamp, 144 std::unique_ptr<TrackEventData> data) { 145 auto* queue = GetQueue(0); 146 queue->Append( 147 TimestampedTracePiece(timestamp, packet_idx_++, std::move(data))); 148 MaybeExtractEvents(queue); 149 } 150 FinalizeFtraceEventBatch(uint32_t cpu)151 inline void FinalizeFtraceEventBatch(uint32_t cpu) { 152 DCHECK_ftrace_batch_cpu(cpu); 153 set_ftrace_batch_cpu_for_DCHECK(kNoBatch); 154 MaybeExtractEvents(GetQueue(cpu + 1)); 155 } 156 157 // Extract all events ignoring the window. ExtractEventsForced()158 void ExtractEventsForced() { 159 SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0); 160 queues_.resize(0); 161 } 162 163 // Sets the window size to be the size specified (which should be lower than 164 // any previous window size specified) and flushes any data beyond 165 // this window size. 166 // It is undefined to call this function with a window size greater than than 167 // the current size. SetWindowSizeNs(int64_t window_size_ns)168 void SetWindowSizeNs(int64_t window_size_ns) { 169 PERFETTO_DCHECK(window_size_ns <= window_size_ns_); 170 171 PERFETTO_DLOG("Setting window size to be %" PRId64 " ns", window_size_ns); 172 window_size_ns_ = window_size_ns; 173 174 // Fast path: if, globally, we are within the window size, then just exit. 175 if (global_max_ts_ - global_min_ts_ < window_size_ns) 176 return; 177 SortAndExtractEventsBeyondWindow(window_size_ns_); 178 } 179 max_timestamp()180 int64_t max_timestamp() const { return global_max_ts_; } 181 182 private: 183 static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max(); 184 185 struct Queue { AppendQueue186 inline void Append(TimestampedTracePiece ttp) { 187 const int64_t timestamp = ttp.timestamp; 188 events_.emplace_back(std::move(ttp)); 189 min_ts_ = std::min(min_ts_, timestamp); 190 191 // Events are often seen in order. 192 if (PERFETTO_LIKELY(timestamp >= max_ts_)) { 193 max_ts_ = timestamp; 194 } else { 195 // The event is breaking ordering. The first time it happens, keep 196 // track of which index we are at. We know that everything before that 197 // is sorted (because events were pushed monotonically). Everything 198 // after that index, instead, will need a sorting pass before moving 199 // events to the next pipeline stage. 200 if (sort_start_idx_ == 0) { 201 PERFETTO_DCHECK(events_.size() >= 2); 202 sort_start_idx_ = events_.size() - 1; 203 sort_min_ts_ = timestamp; 204 } else { 205 sort_min_ts_ = std::min(sort_min_ts_, timestamp); 206 } 207 } 208 209 PERFETTO_DCHECK(min_ts_ <= max_ts_); 210 } 211 needs_sortingQueue212 bool needs_sorting() const { return sort_start_idx_ != 0; } 213 void Sort(); 214 215 base::CircularQueue<TimestampedTracePiece> events_; 216 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 217 int64_t max_ts_ = 0; 218 size_t sort_start_idx_ = 0; 219 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 220 }; 221 222 // This method passes any events older than window_size_ns to the 223 // parser to be parsed and then stored. 224 void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns); 225 GetQueue(size_t index)226 inline Queue* GetQueue(size_t index) { 227 if (PERFETTO_UNLIKELY(index >= queues_.size())) 228 queues_.resize(index + 1); 229 return &queues_[index]; 230 } 231 MaybeExtractEvents(Queue * queue)232 inline void MaybeExtractEvents(Queue* queue) { 233 DCHECK_ftrace_batch_cpu(kNoBatch); 234 global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); 235 global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); 236 237 // Fast path: if, globally, we are within the window size, then just exit. 238 if (global_max_ts_ - global_min_ts_ < window_size_ns_) 239 return; 240 SortAndExtractEventsBeyondWindow(window_size_ns_); 241 } 242 243 std::unique_ptr<TraceParser> parser_; 244 245 // queues_[0] is the general (non-ftrace) queue. 246 // queues_[1] is the ftrace queue for CPU(0). 247 // queues_[x] is the ftrace queue for CPU(x - 1). 248 std::vector<Queue> queues_; 249 250 // Events are propagated to the next stage only after (max - min) timestamp 251 // is larger than this value. 252 int64_t window_size_ns_; 253 254 // max(e.timestamp for e in queues_). 255 int64_t global_max_ts_ = 0; 256 257 // min(e.timestamp for e in queues_). 258 int64_t global_min_ts_ = std::numeric_limits<int64_t>::max(); 259 260 // Monotonic increasing value used to index timestamped trace pieces. 261 uint64_t packet_idx_ = 0; 262 263 // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. 264 bool bypass_next_stage_for_testing_ = false; 265 266 #if PERFETTO_DCHECK_IS_ON() 267 // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called. 268 uint32_t ftrace_batch_cpu_ = kNoBatch; 269 DCHECK_ftrace_batch_cpu(uint32_t cpu)270 inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) { 271 PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu); 272 } 273 set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)274 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) { 275 PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch || 276 cpu == kNoBatch); 277 ftrace_batch_cpu_ = cpu; 278 } 279 #else DCHECK_ftrace_batch_cpu(uint32_t)280 inline void DCHECK_ftrace_batch_cpu(uint32_t) {} set_ftrace_batch_cpu_for_DCHECK(uint32_t)281 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {} 282 #endif 283 }; 284 285 } // namespace trace_processor 286 } // namespace perfetto 287 288 #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 289