1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "src/trace_processor/sorter/trace_sorter.h"
17
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <map>
22 #include <memory>
23 #include <optional>
24 #include <random>
25 #include <tuple>
26 #include <utility>
27 #include <vector>
28
29 #include "perfetto/ext/base/string_view.h"
30 #include "perfetto/trace_processor/trace_blob.h"
31 #include "perfetto/trace_processor/trace_blob_view.h"
32 #include "src/trace_processor/importers/common/parser_types.h"
33 #include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
34 #include "src/trace_processor/importers/proto/proto_trace_parser_impl.h"
35 #include "src/trace_processor/storage/stats.h"
36 #include "src/trace_processor/storage/trace_storage.h"
37 #include "src/trace_processor/types/trace_processor_context.h"
38 #include "test/gtest_and_gmock.h"
39
40 namespace perfetto::trace_processor {
41 namespace {
42
43 using ::testing::_;
44 using ::testing::InSequence;
45 using ::testing::Invoke;
46 using ::testing::MockFunction;
47 using ::testing::NiceMock;
48
49 constexpr std::optional<MachineId> kNullMachineId = std::nullopt;
50
51 class MockTraceParser : public ProtoTraceParserImpl {
52 public:
MockTraceParser(TraceProcessorContext * context)53 explicit MockTraceParser(TraceProcessorContext* context)
54 : ProtoTraceParserImpl(context), machine_id_(context->machine_id()) {}
55
56 MOCK_METHOD(void,
57 MOCK_ParseFtracePacket,
58 (uint32_t cpu,
59 int64_t timestamp,
60 const uint8_t* data,
61 size_t length,
62 std::optional<MachineId>));
63
ParseFtraceEvent(uint32_t cpu,int64_t timestamp,TracePacketData data)64 void ParseFtraceEvent(uint32_t cpu,
65 int64_t timestamp,
66 TracePacketData data) override {
67 MOCK_ParseFtracePacket(cpu, timestamp, data.packet.data(),
68 data.packet.length(), machine_id_);
69 }
70
71 MOCK_METHOD(void,
72 MOCK_ParseTracePacket,
73 (int64_t ts, const uint8_t* data, size_t length));
74
ParseTrackEvent(int64_t,TrackEventData)75 void ParseTrackEvent(int64_t, TrackEventData) override {}
76
ParseTracePacket(int64_t ts,TracePacketData data)77 void ParseTracePacket(int64_t ts, TracePacketData data) override {
78 TraceBlobView& tbv = data.packet;
79 MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
80 }
81
82 std::optional<MachineId> machine_id_;
83 };
84
85 class MockTraceStorage : public TraceStorage {
86 public:
87 MockTraceStorage() = default;
88
89 MOCK_METHOD(StringId, InternString, (base::StringView view), (override));
90 };
91
92 class TraceSorterTest : public ::testing::Test {
93 public:
TraceSorterTest()94 TraceSorterTest() : test_buffer_(TraceBlob::Allocate(8)) {
95 storage_ = new NiceMock<MockTraceStorage>();
96 context_.storage.reset(storage_);
97 CreateSorter();
98 }
99
CreateSorter(bool full_sort=true)100 void CreateSorter(bool full_sort = true) {
101 parser_ = new MockTraceParser(&context_);
102 context_.proto_trace_parser.reset(parser_);
103 auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
104 : TraceSorter::SortingMode::kDefault;
105 context_.sorter.reset(new TraceSorter(&context_, sorting_mode));
106 }
107
108 protected:
109 TraceProcessorContext context_;
110 MockTraceParser* parser_;
111 NiceMock<MockTraceStorage>* storage_;
112 TraceBlobView test_buffer_;
113 };
114
TEST_F(TraceSorterTest,TestFtrace)115 TEST_F(TraceSorterTest, TestFtrace) {
116 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
117 TraceBlobView view = test_buffer_.slice_off(0, 1);
118 EXPECT_CALL(*parser_,
119 MOCK_ParseFtracePacket(0, 1000, view.data(), 1, kNullMachineId));
120 context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
121 std::move(view), state);
122 context_.sorter->ExtractEventsForced();
123 }
124
TEST_F(TraceSorterTest,TestTracePacket)125 TEST_F(TraceSorterTest, TestTracePacket) {
126 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
127 TraceBlobView view = test_buffer_.slice_off(0, 1);
128 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
129 context_.sorter->PushTracePacket(1000, state, std::move(view));
130 context_.sorter->ExtractEventsForced();
131 }
132
TEST_F(TraceSorterTest,Ordering)133 TEST_F(TraceSorterTest, Ordering) {
134 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
135 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
136 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
137 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
138 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
139
140 InSequence s;
141
142 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1,
143 kNullMachineId));
144 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
145 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
146 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4,
147 kNullMachineId));
148
149 context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
150 std::move(view_4), state);
151 context_.sorter->PushTracePacket(1001, state, std::move(view_2));
152 context_.sorter->PushTracePacket(1100, state, std::move(view_3));
153 context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
154 std::move(view_1), state);
155 context_.sorter->ExtractEventsForced();
156 }
157
TEST_F(TraceSorterTest,IncrementalExtraction)158 TEST_F(TraceSorterTest, IncrementalExtraction) {
159 CreateSorter(false);
160
161 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
162
163 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
164 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
165 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
166 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
167 TraceBlobView view_5 = test_buffer_.slice_off(0, 5);
168
169 // Flush at the start of packet sequence to match behavior of the
170 // service.
171 context_.sorter->NotifyFlushEvent();
172 context_.sorter->PushTracePacket(1200, state, std::move(view_2));
173 context_.sorter->PushTracePacket(1100, state, std::move(view_1));
174
175 // No data should be exttracted at this point because we haven't
176 // seen two flushes yet.
177 context_.sorter->NotifyReadBufferEvent();
178
179 // Now that we've seen two flushes, we should be ready to start extracting
180 // data on the next OnReadBufer call (after two flushes as usual).
181 context_.sorter->NotifyFlushEvent();
182 context_.sorter->NotifyReadBufferEvent();
183
184 context_.sorter->NotifyFlushEvent();
185 context_.sorter->NotifyFlushEvent();
186 context_.sorter->PushTracePacket(1400, state, std::move(view_4));
187 context_.sorter->PushTracePacket(1300, state, std::move(view_3));
188
189 // This ReadBuffer call should finally extract until the first OnReadBuffer
190 // call.
191 {
192 InSequence s;
193 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
194 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
195 }
196 context_.sorter->NotifyReadBufferEvent();
197
198 context_.sorter->NotifyFlushEvent();
199 context_.sorter->PushTracePacket(1500, state, std::move(view_5));
200
201 // Nothing should be extracted as we haven't seen the second flush.
202 context_.sorter->NotifyReadBufferEvent();
203
204 // Now we've seen the second flush we should extract the next two packets.
205 context_.sorter->NotifyFlushEvent();
206 {
207 InSequence s;
208 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
209 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
210 }
211 context_.sorter->NotifyReadBufferEvent();
212
213 // The forced extraction should get the last packet.
214 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
215 context_.sorter->ExtractEventsForced();
216 }
217
218 // Simulate a producer bug where the third packet is emitted
219 // out of order. Verify that we track the stats correctly.
TEST_F(TraceSorterTest,OutOfOrder)220 TEST_F(TraceSorterTest, OutOfOrder) {
221 CreateSorter(false);
222
223 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
224
225 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
226 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
227 TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
228 TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
229
230 context_.sorter->NotifyFlushEvent();
231 context_.sorter->NotifyFlushEvent();
232 context_.sorter->PushTracePacket(1200, state, std::move(view_2));
233 context_.sorter->PushTracePacket(1100, state, std::move(view_1));
234 context_.sorter->NotifyReadBufferEvent();
235
236 // Both of the packets should have been pushed through.
237 context_.sorter->NotifyFlushEvent();
238 context_.sorter->NotifyFlushEvent();
239 {
240 InSequence s;
241 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
242 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
243 }
244 context_.sorter->NotifyReadBufferEvent();
245
246 // Now, pass the third packet out of order.
247 context_.sorter->NotifyFlushEvent();
248 context_.sorter->NotifyFlushEvent();
249 context_.sorter->PushTracePacket(1150, state, std::move(view_3));
250 context_.sorter->NotifyReadBufferEvent();
251
252 // Third packet should not be pushed through.
253 context_.sorter->NotifyFlushEvent();
254 context_.sorter->NotifyFlushEvent();
255 context_.sorter->NotifyReadBufferEvent();
256
257 // We should also increment the stat that this was out of order.
258 const auto& stats = context_.storage->stats();
259 ASSERT_EQ(stats[stats::sorter_push_event_out_of_order].value, 1);
260
261 // Third packet should not be pushed through.
262 context_.sorter->NotifyFlushEvent();
263 context_.sorter->NotifyFlushEvent();
264 context_.sorter->PushTracePacket(1170, state, std::move(view_4));
265 context_.sorter->NotifyReadBufferEvent();
266 context_.sorter->ExtractEventsForced();
267
268 // We should also increment the stat that this was out of order.
269 ASSERT_EQ(stats[stats::sorter_push_event_out_of_order].value, 2);
270 }
271
272 // Simulates a random stream of ftrace events happening on random CPUs.
273 // Tests that the output of the TraceSorter matches the timestamp order
274 // (% events happening at the same time on different CPUs).
TEST_F(TraceSorterTest,MultiQueueSorting)275 TEST_F(TraceSorterTest, MultiQueueSorting) {
276 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
277 std::minstd_rand0 rnd_engine(0);
278 std::map<int64_t /*ts*/, std::vector<uint32_t /*cpu*/>> expectations;
279
280 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
281 .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
282 const uint8_t*, size_t,
283 std::optional<MachineId>) {
284 EXPECT_EQ(expectations.begin()->first, timestamp);
285 auto& cpus = expectations.begin()->second;
286 bool cpu_found = false;
287 for (auto it = cpus.begin(); it < cpus.end(); it++) {
288 if (*it != cpu)
289 continue;
290 cpu_found = true;
291 cpus.erase(it);
292 break;
293 }
294 if (cpus.empty())
295 expectations.erase(expectations.begin());
296 EXPECT_TRUE(cpu_found);
297 }));
298
299 // Allocate a 1000 byte trace blob and push one byte chunks to be sorted with
300 // random timestamps. This will stress test the sorter with worst case
301 // scenarios and will (and has many times) expose any subtle bugs hiding in
302 // the sorter logic.
303 TraceBlobView tbv(TraceBlob::Allocate(1000));
304 for (uint16_t i = 0; i < 1000; i++) {
305 int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
306 uint8_t num_cpus = rnd_engine() % 3;
307 for (uint8_t j = 0; j < num_cpus; j++) {
308 uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
309 expectations[ts].push_back(cpu);
310 context_.sorter->PushFtraceEvent(cpu, ts, tbv.slice_off(i, 1), state);
311 }
312 }
313
314 context_.sorter->ExtractEventsForced();
315 EXPECT_TRUE(expectations.empty());
316 }
317
318 // An generalized version of MultiQueueSorting with multiple machines.
TEST_F(TraceSorterTest,MultiMachineSorting)319 TEST_F(TraceSorterTest, MultiMachineSorting) {
320 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
321 std::minstd_rand0 rnd_engine(0);
322
323 struct ExpectedMachineAndCpu {
324 std::optional<MachineId> machine_id;
325 uint32_t cpu;
326
327 bool operator==(const ExpectedMachineAndCpu& other) const {
328 return std::tie(machine_id, cpu) == std::tie(other.machine_id, other.cpu);
329 }
330 bool operator!=(const ExpectedMachineAndCpu& other) const {
331 return !operator==(other);
332 }
333 };
334 std::map<int64_t /*ts*/, std::vector<ExpectedMachineAndCpu>> expectations;
335
336 // The total number of machines (including the default one).
337 constexpr size_t num_machines = 5;
338 std::vector<MockTraceParser*> extra_parsers;
339 std::vector<std::unique_ptr<TraceProcessorContext>> extra_contexts;
340 // Set up extra machines and add to the sorter.
341 // MachineIdValue are 1..(num_machines-1).
342 for (auto i = 1u; i < num_machines; i++) {
343 TraceProcessorContext::InitArgs args{context_.config, context_.storage, i};
344 auto ctx = std::make_unique<TraceProcessorContext>(args);
345 auto parser = std::make_unique<MockTraceParser>(ctx.get());
346 extra_parsers.push_back(parser.get());
347 ctx->proto_trace_parser = std::move(parser);
348 extra_contexts.push_back(std::move(ctx));
349 context_.sorter->AddMachineContext(extra_contexts.back().get());
350 }
351
352 // Set up the expectation for the default machine.
353 EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
354 .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
355 const uint8_t*, size_t,
356 std::optional<MachineId>) {
357 EXPECT_EQ(expectations.begin()->first, timestamp);
358 auto& machines_and_cpus = expectations.begin()->second;
359 bool found = false;
360 for (auto it = machines_and_cpus.begin(); it < machines_and_cpus.end();
361 it++) {
362 // The default machine is called machine ID == std::nullopt.
363 if (*it != ExpectedMachineAndCpu{kNullMachineId, cpu})
364 continue;
365 found = true;
366 machines_and_cpus.erase(it);
367 break;
368 }
369 if (machines_and_cpus.empty())
370 expectations.erase(expectations.begin());
371 EXPECT_TRUE(found);
372 }));
373 // Set up expectations for remote machines.
374 for (auto* parser : extra_parsers) {
375 EXPECT_CALL(*parser, MOCK_ParseFtracePacket(_, _, _, _, _))
376 .WillRepeatedly(Invoke(
377 [&expectations](uint32_t cpu, int64_t timestamp, const uint8_t*,
378 size_t, std::optional<MachineId> machine_id) {
379 EXPECT_TRUE(machine_id.has_value());
380 EXPECT_EQ(expectations.begin()->first, timestamp);
381 auto& machines_and_cpus = expectations.begin()->second;
382 bool found = false;
383 for (auto it = machines_and_cpus.begin();
384 it < machines_and_cpus.end(); it++) {
385 // Remote machines are called with non-null machine_id.
386 if (*it != ExpectedMachineAndCpu{machine_id, cpu})
387 continue;
388 found = true;
389 machines_and_cpus.erase(it);
390 break;
391 }
392 if (machines_and_cpus.empty())
393 expectations.erase(expectations.begin());
394 EXPECT_TRUE(found);
395 }));
396 }
397
398 // Allocate a 1000 byte trace blob (per-machine) and push one byte chunks to
399 // be sorted with random timestamps.
400 constexpr size_t alloc_size = 1000;
401 TraceBlobView tbv(TraceBlob::Allocate(alloc_size * num_machines));
402 for (size_t m = 0; m < num_machines; m++) {
403 // TraceProcessorContext::machine_id is nullopt for the default machine or a
404 // monotonic counter starting from 1. 0 is a reserved value that isn't used.
405 std::optional<MachineId> machine;
406 if (m)
407 machine = extra_contexts[m - 1]->machine_id();
408
409 for (uint16_t i = 0; i < alloc_size; i++) {
410 int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
411 uint8_t num_cpus = rnd_engine() % 3;
412 for (uint8_t j = 0; j < num_cpus; j++) {
413 uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
414 expectations[ts].push_back(ExpectedMachineAndCpu{machine, cpu});
415 context_.sorter->PushFtraceEvent(
416 cpu, ts, tbv.slice_off(m * alloc_size + i, 1), state, machine);
417 }
418 }
419 }
420
421 context_.sorter->ExtractEventsForced();
422 EXPECT_TRUE(expectations.empty());
423 }
424
TEST_F(TraceSorterTest,SetSortingMode)425 TEST_F(TraceSorterTest, SetSortingMode) {
426 CreateSorter(false);
427
428 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
429
430 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
431 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
432
433 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view_1.data(), 1));
434 context_.sorter->PushTracePacket(1000, state, std::move(view_1));
435
436 // Changing to full sorting mode should succeed as no events have been
437 // extracted yet.
438 EXPECT_TRUE(
439 context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
440
441 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(2000, view_2.data(), 2));
442 context_.sorter->PushTracePacket(2000, state, std::move(view_2));
443
444 // Changing back to default sorting mode is not allowed.
445 EXPECT_FALSE(
446 context_.sorter->SetSortingMode(TraceSorter::SortingMode::kDefault));
447
448 // Setting sorting mode to the current mode should succeed.
449 EXPECT_TRUE(
450 context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
451
452 context_.sorter->ExtractEventsForced();
453
454 // Setting sorting mode to the current mode should still succeed.
455 EXPECT_TRUE(
456 context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
457 }
458
TEST_F(TraceSorterTest,SetSortingModeAfterExtraction)459 TEST_F(TraceSorterTest, SetSortingModeAfterExtraction) {
460 CreateSorter(false);
461
462 auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
463
464 TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
465 TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
466
467 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view_1.data(), 1));
468 context_.sorter->PushTracePacket(1000, state, std::move(view_1));
469 EXPECT_CALL(*parser_, MOCK_ParseTracePacket(2000, view_2.data(), 2));
470 context_.sorter->PushTracePacket(2000, state, std::move(view_2));
471 context_.sorter->ExtractEventsForced();
472
473 // Changing to full sorting mode should fail as some events have already been
474 // extracted.
475 EXPECT_FALSE(
476 context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
477 }
478
479 } // namespace
480 } // namespace perfetto::trace_processor
481