1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 18 #define SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 19 20 #include <algorithm> 21 #include <memory> 22 #include <utility> 23 #include <vector> 24 25 #include "perfetto/ext/base/circular_queue.h" 26 #include "perfetto/trace_processor/basic_types.h" 27 #include "perfetto/trace_processor/trace_blob_view.h" 28 #include "src/trace_processor/timestamped_trace_piece.h" 29 30 namespace perfetto { 31 namespace trace_processor { 32 33 class PacketSequenceState; 34 struct SystraceLine; 35 36 // This class takes care of sorting events parsed from the trace stream in 37 // arbitrary order and pushing them to the next pipeline stages (parsing) in 38 // order. In order to support streaming use-cases, sorting happens within a 39 // window. 40 // 41 // Events are held in the TraceSorter staging area (events_) until either: 42 // 1. We can determine that it's safe to extract events by observing 43 // TracingServiceEvent Flush and ReadBuffer events 44 // 2. The trace EOF is reached 45 // 46 // Incremental extraction 47 // 48 // Incremental extraction happens by using a combination of flush and read 49 // buffer events from the tracing service. Note that incremental extraction 50 // is only applicable for write_into_file traces; ring-buffer traces will 51 // be sorted fully in-memory implicitly because there is only a single read 52 // buffer call at the end. 53 // 54 // The algorithm for incremental extraction is explained in detail at 55 // go/trace-sorting-is-complicated. 56 // 57 // Sorting algorithm 58 // 59 // The sorting algorithm is designed around the assumption that: 60 // - Most events come from ftrace. 61 // - Ftrace events are sorted within each cpu most of the times. 62 // 63 // Due to this, this class is oprerates as a streaming merge-sort of N+1 queues 64 // (N = num cpus + 1 for non-ftrace events). Each queue in turn gets sorted (if 65 // necessary) before proceeding with the global merge-sort-extract. 66 // 67 // When an event is pushed through, it is just appended to the end of one of 68 // the N queues. While appending, we keep track of the fact that the queue 69 // is still ordered or just lost ordering. When an out-of-order event is 70 // detected on a queue we keep track of: (1) the offset within the queue where 71 // the chaos begun, (2) the timestamp that broke the ordering. 72 // 73 // When we decide to extract events from the queues into the next stages of 74 // the trace processor, we re-sort the events in the queue. Rather than 75 // re-sorting everything all the times, we use the above knowledge to restrict 76 // sorting to the (hopefully smaller) tail of the |events_| staging area. 77 // At any time, the first partition of |events_| [0 .. sort_start_idx_) is 78 // ordered, and the second partition [sort_start_idx_.. end] is not. 79 // We use a logarithmic bound search operation to figure out what is the index 80 // within the first partition where sorting should start, and sort all events 81 // from there to the end. 82 class TraceSorter { 83 public: 84 enum class SortingMode { 85 kDefault, 86 kFullSort, 87 }; 88 89 TraceSorter(TraceProcessorContext* context, 90 std::unique_ptr<TraceParser> parser, 91 SortingMode); 92 PushTracePacket(int64_t timestamp,PacketSequenceState * state,TraceBlobView packet)93 inline void PushTracePacket(int64_t timestamp, 94 PacketSequenceState* state, 95 TraceBlobView packet) { 96 AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++, 97 std::move(packet), 98 state->current_generation())); 99 } 100 PushJsonValue(int64_t timestamp,std::string json_value)101 inline void PushJsonValue(int64_t timestamp, std::string json_value) { 102 AppendNonFtraceEvent( 103 TimestampedTracePiece(timestamp, packet_idx_++, std::move(json_value))); 104 } 105 PushFuchsiaRecord(int64_t timestamp,std::unique_ptr<FuchsiaRecord> record)106 inline void PushFuchsiaRecord(int64_t timestamp, 107 std::unique_ptr<FuchsiaRecord> record) { 108 AppendNonFtraceEvent( 109 TimestampedTracePiece(timestamp, packet_idx_++, std::move(record))); 110 } 111 PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line)112 inline void PushSystraceLine(std::unique_ptr<SystraceLine> systrace_line) { 113 int64_t timestamp = systrace_line->ts; 114 AppendNonFtraceEvent(TimestampedTracePiece(timestamp, packet_idx_++, 115 std::move(systrace_line))); 116 } 117 PushTrackEventPacket(int64_t timestamp,std::unique_ptr<TrackEventData> data)118 inline void PushTrackEventPacket(int64_t timestamp, 119 std::unique_ptr<TrackEventData> data) { 120 AppendNonFtraceEvent( 121 TimestampedTracePiece(timestamp, packet_idx_++, std::move(data))); 122 } 123 PushFtraceEvent(uint32_t cpu,int64_t timestamp,TraceBlobView event,PacketSequenceState * state)124 inline void PushFtraceEvent(uint32_t cpu, 125 int64_t timestamp, 126 TraceBlobView event, 127 PacketSequenceState* state) { 128 auto* queue = GetQueue(cpu + 1); 129 queue->Append(TimestampedTracePiece( 130 timestamp, packet_idx_++, 131 FtraceEventData{std::move(event), state->current_generation()})); 132 UpdateGlobalTs(queue); 133 } PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedSwitch inline_sched_switch)134 inline void PushInlineFtraceEvent(uint32_t cpu, 135 int64_t timestamp, 136 InlineSchedSwitch inline_sched_switch) { 137 // TODO(rsavitski): if a trace has a mix of normal & "compact" events (being 138 // pushed through this function), the ftrace batches will no longer be fully 139 // sorted by timestamp. In such situations, we will have to sort at the end 140 // of the batch. We can do better as both sub-sequences are sorted however. 141 // Consider adding extra queues, or pushing them in a merge-sort fashion 142 // instead. 143 auto* queue = GetQueue(cpu + 1); 144 queue->Append( 145 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_switch)); 146 UpdateGlobalTs(queue); 147 } PushInlineFtraceEvent(uint32_t cpu,int64_t timestamp,InlineSchedWaking inline_sched_waking)148 inline void PushInlineFtraceEvent(uint32_t cpu, 149 int64_t timestamp, 150 InlineSchedWaking inline_sched_waking) { 151 auto* queue = GetQueue(cpu + 1); 152 queue->Append( 153 TimestampedTracePiece(timestamp, packet_idx_++, inline_sched_waking)); 154 UpdateGlobalTs(queue); 155 } 156 ExtractEventsForced()157 void ExtractEventsForced() { 158 SortAndExtractEventsUntilPacket(packet_idx_); 159 queues_.resize(0); 160 161 packet_idx_for_extraction_ = packet_idx_; 162 flushes_since_extraction_ = 0; 163 } 164 NotifyFlushEvent()165 void NotifyFlushEvent() { flushes_since_extraction_++; } 166 NotifyReadBufferEvent()167 void NotifyReadBufferEvent() { 168 if (sorting_mode_ == SortingMode::kFullSort || 169 flushes_since_extraction_ < 2) { 170 return; 171 } 172 173 SortAndExtractEventsUntilPacket(packet_idx_for_extraction_); 174 packet_idx_for_extraction_ = packet_idx_; 175 flushes_since_extraction_ = 0; 176 } 177 max_timestamp()178 int64_t max_timestamp() const { return global_max_ts_; } 179 180 private: 181 static constexpr uint32_t kNoBatch = std::numeric_limits<uint32_t>::max(); 182 183 struct Queue { AppendQueue184 inline void Append(TimestampedTracePiece ttp) { 185 const int64_t timestamp = ttp.timestamp; 186 events_.emplace_back(std::move(ttp)); 187 min_ts_ = std::min(min_ts_, timestamp); 188 189 // Events are often seen in order. 190 if (PERFETTO_LIKELY(timestamp >= max_ts_)) { 191 max_ts_ = timestamp; 192 } else { 193 // The event is breaking ordering. The first time it happens, keep 194 // track of which index we are at. We know that everything before that 195 // is sorted (because events were pushed monotonically). Everything 196 // after that index, instead, will need a sorting pass before moving 197 // events to the next pipeline stage. 198 if (sort_start_idx_ == 0) { 199 PERFETTO_DCHECK(events_.size() >= 2); 200 sort_start_idx_ = events_.size() - 1; 201 sort_min_ts_ = timestamp; 202 } else { 203 sort_min_ts_ = std::min(sort_min_ts_, timestamp); 204 } 205 } 206 207 PERFETTO_DCHECK(min_ts_ <= max_ts_); 208 } 209 needs_sortingQueue210 bool needs_sorting() const { return sort_start_idx_ != 0; } 211 void Sort(); 212 213 base::CircularQueue<TimestampedTracePiece> events_; 214 int64_t min_ts_ = std::numeric_limits<int64_t>::max(); 215 int64_t max_ts_ = 0; 216 size_t sort_start_idx_ = 0; 217 int64_t sort_min_ts_ = std::numeric_limits<int64_t>::max(); 218 }; 219 220 void SortAndExtractEventsUntilPacket(uint64_t limit_packet_idx); 221 GetQueue(size_t index)222 inline Queue* GetQueue(size_t index) { 223 if (PERFETTO_UNLIKELY(index >= queues_.size())) 224 queues_.resize(index + 1); 225 return &queues_[index]; 226 } 227 AppendNonFtraceEvent(TimestampedTracePiece ttp)228 inline void AppendNonFtraceEvent(TimestampedTracePiece ttp) { 229 Queue* queue = GetQueue(0); 230 queue->Append(std::move(ttp)); 231 UpdateGlobalTs(queue); 232 } 233 UpdateGlobalTs(Queue * queue)234 inline void UpdateGlobalTs(Queue* queue) { 235 global_min_ts_ = std::min(global_min_ts_, queue->min_ts_); 236 global_max_ts_ = std::max(global_max_ts_, queue->max_ts_); 237 } 238 239 void MaybePushEvent(size_t queue_idx, 240 TimestampedTracePiece ttp) PERFETTO_ALWAYS_INLINE; 241 242 TraceProcessorContext* context_; 243 std::unique_ptr<TraceParser> parser_; 244 245 // Whether we should ignore incremental extraction and just wait for 246 // forced extractionn at the end of the trace. 247 SortingMode sorting_mode_ = SortingMode::kDefault; 248 249 // The packet index until which events should be extracted. Set based 250 // on the packet index in |OnReadBuffer|. 251 uint64_t packet_idx_for_extraction_ = 0; 252 253 // The number of flushes which have happened since the last incremental 254 // extraction. 255 uint32_t flushes_since_extraction_ = 0; 256 257 // queues_[0] is the general (non-ftrace) queue. 258 // queues_[1] is the ftrace queue for CPU(0). 259 // queues_[x] is the ftrace queue for CPU(x - 1). 260 std::vector<Queue> queues_; 261 262 // max(e.timestamp for e in queues_). 263 int64_t global_max_ts_ = 0; 264 265 // min(e.timestamp for e in queues_). 266 int64_t global_min_ts_ = std::numeric_limits<int64_t>::max(); 267 268 // Monotonic increasing value used to index timestamped trace pieces. 269 uint64_t packet_idx_ = 0; 270 271 // Used for performance tests. True when setting TRACE_PROCESSOR_SORT_ONLY=1. 272 bool bypass_next_stage_for_testing_ = false; 273 274 // max(e.ts for e pushed to next stage) 275 int64_t latest_pushed_event_ts_ = std::numeric_limits<int64_t>::min(); 276 }; 277 278 } // namespace trace_processor 279 } // namespace perfetto 280 281 #endif // SRC_TRACE_PROCESSOR_TRACE_SORTER_H_ 282