1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 19 20 #include <vector> 21 22 #include "perfetto/base/circular_queue.h" 23 #include "perfetto/trace_processor/basic_types.h" 24 #include "src/trace_processor/fuchsia_provider_view.h" 25 #include "src/trace_processor/proto_incremental_state.h" 26 #include "src/trace_processor/trace_blob_view.h" 27 #include "src/trace_processor/trace_processor_context.h" 28 #include "src/trace_processor/trace_storage.h" 29 30 #if PERFETTO_BUILDFLAG(PERFETTO_STANDALONE_BUILD) 31 #include <json/value.h> 32 #else 33 // Json traces are only supported in standalone build. 34 namespace Json { 35 class Value {}; 36 } // namespace Json 37 #endif 38 39 namespace perfetto { 40 namespace trace_processor { 41 42 // This class takes care of sorting events parsed from the trace stream in 43 // arbitrary order and pushing them to the next pipeline stages (parsing) in 44 // order. In order to support streaming use-cases, sorting happens within a 45 // max window. Events are held in the TraceSorter staging area (events_) until 46 // either (1) the (max - min) timestamp > window_size; (2) trace EOF. 47 // 48 // This class is designed around the assumption that: 49 // - Most events come from ftrace. 50 // - Ftrace events are sorted within each cpu most of the times. 51 // 52 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 53 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 54 // necessary) before proceeding with the global merge-sort-extract. 55 // When an event is pushed through, it is just appeneded to the end of one of 56 // the N queues. While appending, we keep track of the fact that the queue 57 // is still ordered or just lost ordering. When an out-of-order event is 58 // detected on a queue we keep track of: (1) the offset within the queue where 59 // the chaos begun, (2) the timestamp that broke the ordering. 60 // When we decide to extract events from the queues into the next stages of 61 // the trace processor, we re-sort the events in the queue. Rather than 62 // re-sorting everything all the times, we use the above knowledge to restrict 63 // sorting to the (hopefully smaller) tail of the |events_| staging area. 64 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 65 // ordered, and the second partition [sort_start_idx_.. end] is not. 66 // We use a logarithmic bound search operation to figure out what is the index 67 // within the first partition where sorting should start, and sort all events 68 // from there to the end. 69 class TraceSorter { 70 public: 71 struct TimestampedTracePiece { TimestampedTracePieceTimestampedTracePiece72 TimestampedTracePiece(int64_t ts, uint64_t idx, TraceBlobView tbv) 73 : TimestampedTracePiece(ts, 74 /*thread_ts=*/0, 75 idx, 76 std::move(tbv), 77 /*value=*/nullptr, 78 /*fpv=*/nullptr, 79 /*sequence_state=*/nullptr) {} 80 TimestampedTracePieceTimestampedTracePiece81 TimestampedTracePiece(int64_t ts, 82 uint64_t idx, 83 std::unique_ptr<Json::Value> value) 84 : TimestampedTracePiece(ts, 85 /*thread_ts=*/0, 86 idx, 87 // TODO(dproy): Stop requiring TraceBlobView in 88 // TimestampedTracePiece. 89 TraceBlobView(nullptr, 0, 0), 90 std::move(value), 91 /*fpv=*/nullptr, 92 /*sequence_state=*/nullptr) {} 93 TimestampedTracePieceTimestampedTracePiece94 TimestampedTracePiece(int64_t ts, 95 uint64_t idx, 96 TraceBlobView tbv, 97 std::unique_ptr<FuchsiaProviderView> fpv) 98 : TimestampedTracePiece(ts, 99 /*thread_ts=*/0, 100 idx, 101 std::move(tbv), 102 /*value=*/nullptr, 103 std::move(fpv), 104 /*sequence_state=*/nullptr) {} 105 TimestampedTracePieceTimestampedTracePiece106 TimestampedTracePiece( 107 int64_t ts, 108 int64_t thread_ts, 109 uint64_t idx, 110 TraceBlobView tbv, 111 ProtoIncrementalState::PacketSequenceState* sequence_state) 112 : TimestampedTracePiece(ts, 113 thread_ts, 114 idx, 115 std::move(tbv), 116 /*value=*/nullptr, 117 /*fpv=*/nullptr, 118 sequence_state) {} 119 TimestampedTracePieceTimestampedTracePiece120 TimestampedTracePiece( 121 int64_t ts, 122 int64_t thread_ts, 123 uint64_t idx, 124 TraceBlobView tbv, 125 std::unique_ptr<Json::Value> value, 126 std::unique_ptr<FuchsiaProviderView> fpv, 127 ProtoIncrementalState::PacketSequenceState* sequence_state) 128 : json_value(std::move(value)), 129 fuchsia_provider_view(std::move(fpv)), 130 packet_sequence_state(sequence_state), 131 timestamp(ts), 132 thread_timestamp(thread_ts), 133 packet_idx_(idx), 134 blob_view(std::move(tbv)) {} 135 136 TimestampedTracePiece(TimestampedTracePiece&&) noexcept = default; 137 TimestampedTracePiece& operator=(TimestampedTracePiece&&) = default; 138 139 // For std::lower_bound(). CompareTimestampedTracePiece140 static inline bool Compare(const TimestampedTracePiece& x, int64_t ts) { 141 return x.timestamp < ts; 142 } 143 144 // For std::sort(). 145 inline bool operator<(const TimestampedTracePiece& o) const { 146 return timestamp < o.timestamp || 147 (timestamp == o.timestamp && packet_idx_ < o.packet_idx_); 148 } 149 150 std::unique_ptr<Json::Value> json_value; 151 std::unique_ptr<FuchsiaProviderView> fuchsia_provider_view; 152 ProtoIncrementalState::PacketSequenceState* packet_sequence_state; 153 154 int64_t timestamp; 155 int64_t thread_timestamp; 156 uint64_t packet_idx_; 157 TraceBlobView blob_view; 158 }; 159 160 TraceSorter(TraceProcessorContext*, int64_t window_size_ns); 161 PushTracePacket(int64_t timestamp,TraceBlobView packet)162 inline void PushTracePacket(int64_t timestamp, TraceBlobView packet) { 163 DCHECK_ftrace_batch_cpu(kNoBatch); 164 auto* queue = GetQueue(0); 165 queue->Append( 166 TimestampedTracePiece(timestamp, packet_idx_++, std::move(packet))); 167 MaybeExtractEvents(queue); 168 } 169 PushJsonValue(int64_t timestamp,std::unique_ptr<Json::Value> json_value)170 inline void PushJsonValue(int64_t timestamp, 171 std::unique_ptr<Json::Value> json_value) { 172 auto* queue = GetQueue(0); 173 queue->Append( 174 TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); 175 MaybeExtractEvents(queue); 176 } 177 PushFuchsiaRecord(int64_t timestamp,TraceBlobView record,std::unique_ptr<FuchsiaProviderView> provider_view)178 inline void PushFuchsiaRecord( 179 int64_t timestamp, 180 TraceBlobView record, 181 std::unique_ptr<FuchsiaProviderView> provider_view) { 182 DCHECK_ftrace_batch_cpu(kNoBatch); 183 auto* queue = GetQueue(0); 184 queue->Append(TimestampedTracePiece( 185 timestamp, packet_idx_++, std::move(record), std::move(provider_view))); 186 MaybeExtractEvents(queue); 187 } 188 PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event)189 inline void PushFtraceEvent(uint32_t cpu, 190 int64_t timestamp, 191 TraceBlobView event) { 192 set_ftrace_batch_cpu_for_DCHECK(cpu); 193 GetQueue(cpu + 1)->Append( 194 TimestampedTracePiece(timestamp, packet_idx_++, std::move(event))); 195 196 // The caller must call FinalizeFtraceEventBatch() after having pushed a 197 // batch of ftrace events. This is to amortize the overhead of handling 198 // global ordering and doing that in batches only after all ftrace events 199 // for a bundle are pushed. 200 } 201 PushTrackEventPacket(int64_t timestamp,int64_t thread_time,ProtoIncrementalState::PacketSequenceState * state,TraceBlobView packet)202 inline void PushTrackEventPacket( 203 int64_t timestamp, 204 int64_t thread_time, 205 ProtoIncrementalState::PacketSequenceState* state, 206 TraceBlobView packet) { 207 auto* queue = GetQueue(0); 208 queue->Append(TimestampedTracePiece(timestamp, thread_time, packet_idx_++, 209 std::move(packet), state)); 210 MaybeExtractEvents(queue); 211 } 212 FinalizeFtraceEventBatch(uint32_t cpu)213 inline void FinalizeFtraceEventBatch(uint32_t cpu) { 214 DCHECK_ftrace_batch_cpu(cpu); 215 set_ftrace_batch_cpu_for_DCHECK(kNoBatch); 216 MaybeExtractEvents(GetQueue(cpu + 1)); 217 } 218 219 // Extract all events ignoring the window. ExtractEventsForced()220 void ExtractEventsForced() { 221 SortAndExtractEventsBeyondWindow(/*window_size_ns=*/0); 222 } 223 set_window_ns_for_testing(int64_t window_size_ns)224 void set_window_ns_for_testing(int64_t window_size_ns) { 225 window_size_ns_ = window_size_ns; 226 } 227 228 private: 229 static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max(); 230 231 struct Queue { AppendQueue232 inline void Append(TimestampedTracePiece ttp) { 233 const int64_t timestamp = ttp.timestamp; 234 events_.emplace_back(std::move(ttp)); 235 min_ts_ = std::min(min_ts_, timestamp); 236 237 // Events are often seen in order. 238 if (PERFETTO_LIKELY(timestamp >= max_ts_)) { 239 max_ts_ = timestamp; 240 } else { 241 // The event is breaking ordering. The first time it happens, keep 242 // track of which index we are at. We know that everything before that 243 // is sorted (because events were pushed monotonically). Everything 244 // after that index, instead, will need a sorting pass before moving 245 // events to the next pipeline stage. 246 if (sort_start_idx_ == 0) { 247 PERFETTO_DCHECK(events_.size() >= 2); 248 sort_start_idx_ = events_.size() - 1; 249 sort_min_ts_ = timestamp; 250 } else { 251 sort_min_ts_ = std::min(sort_min_ts_, timestamp); 252 } 253 } 254 255 PERFETTO_DCHECK(min_ts_ <= max_ts_); 256 } 257 needs_sortingQueue258 bool needs_sorting() const { return sort_start_idx_ != 0; } 259 void Sort(); 260 261 base::CircularQueue<TimestampedTracePiece> events_; 262 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 263 int64_t max_ts_ = 0; 264 size_t sort_start_idx_ = 0; 265 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 266 }; 267 268 // This method passes any events older than window_size_ns to the 269 // parser to be parsed and then stored. 270 void SortAndExtractEventsBeyondWindow(int64_t windows_size_ns); 271 GetQueue(size_t index)272 inline Queue* GetQueue(size_t index) { 273 if (PERFETTO_UNLIKELY(index >= queues_.size())) 274 queues_.resize(index + 1); 275 return &queues_[index]; 276 } 277 MaybeExtractEvents(Queue * queue)278 inline void MaybeExtractEvents(Queue* queue) { 279 DCHECK_ftrace_batch_cpu(kNoBatch); 280 global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); 281 global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); 282 283 if (global_max_ts_ - global_min_ts_ < window_size_ns_) 284 return; 285 286 SortAndExtractEventsBeyondWindow(window_size_ns_); 287 } 288 289 TraceProcessorContext* const context_; 290 291 // queues_[0] is the general (non-ftrace) queue. 292 // queues_[1] is the ftrace queue for CPU(0). 293 // queues_[x] is the ftrace queue for CPU(x - 1). 294 std::vector<Queue> queues_; 295 296 // Events are propagated to the next stage only after (max - min) timestamp 297 // is larger than this value. 298 int64_t window_size_ns_; 299 300 // max(e.timestamp for e in queues_). 301 int64_t global_max_ts_ = 0; 302 303 // min(e.timestamp for e in queues_). 304 int64_t global_min_ts_ = std::numeric_limits<int64_t>::max(); 305 306 // Monotonic increasing value used to index timestamped trace pieces. 307 uint64_t packet_idx_ = 0; 308 309 // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. 310 bool bypass_next_stage_for_testing_ = false; 311 312 #if PERFETTO_DCHECK_IS_ON() 313 // Used only for DCHECK-ing that FinalizeFtraceEventBatch() is called. 314 uint32_t ftrace_batch_cpu_ = kNoBatch; 315 DCHECK_ftrace_batch_cpu(uint32_t cpu)316 inline void DCHECK_ftrace_batch_cpu(uint32_t cpu) { 317 PERFETTO_DCHECK(ftrace_batch_cpu_ == kNoBatch || ftrace_batch_cpu_ == cpu); 318 } 319 set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu)320 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t cpu) { 321 PERFETTO_DCHECK(ftrace_batch_cpu_ == cpu || ftrace_batch_cpu_ == kNoBatch || 322 cpu == kNoBatch); 323 ftrace_batch_cpu_ = cpu; 324 } 325 #else DCHECK_ftrace_batch_cpu(uint32_t)326 inline void DCHECK_ftrace_batch_cpu(uint32_t) {} set_ftrace_batch_cpu_for_DCHECK(uint32_t)327 inline void set_ftrace_batch_cpu_for_DCHECK(uint32_t) {} 328 #endif 329 }; 330 331 } // namespace trace_processor 332 } // namespace perfetto 333 334 #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 335