• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "src/trace_processor/sorter/trace_sorter.h"
17 
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <map>
22 #include <memory>
23 #include <optional>
24 #include <random>
25 #include <tuple>
26 #include <utility>
27 #include <vector>
28 
29 #include "perfetto/ext/base/string_view.h"
30 #include "perfetto/trace_processor/trace_blob.h"
31 #include "perfetto/trace_processor/trace_blob_view.h"
32 #include "src/trace_processor/importers/common/parser_types.h"
33 #include "src/trace_processor/importers/proto/packet_sequence_state_generation.h"
34 #include "src/trace_processor/importers/proto/proto_trace_parser_impl.h"
35 #include "src/trace_processor/storage/stats.h"
36 #include "src/trace_processor/storage/trace_storage.h"
37 #include "src/trace_processor/types/trace_processor_context.h"
38 #include "test/gtest_and_gmock.h"
39 
40 namespace perfetto::trace_processor {
41 namespace {
42 
43 using ::testing::_;
44 using ::testing::InSequence;
45 using ::testing::Invoke;
46 using ::testing::MockFunction;
47 using ::testing::NiceMock;
48 
49 constexpr std::optional<MachineId> kNullMachineId = std::nullopt;
50 
51 class MockTraceParser : public ProtoTraceParserImpl {
52  public:
MockTraceParser(TraceProcessorContext * context)53   explicit MockTraceParser(TraceProcessorContext* context)
54       : ProtoTraceParserImpl(context), machine_id_(context->machine_id()) {}
55 
56   MOCK_METHOD(void,
57               MOCK_ParseFtracePacket,
58               (uint32_t cpu,
59                int64_t timestamp,
60                const uint8_t* data,
61                size_t length,
62                std::optional<MachineId>));
63 
ParseFtraceEvent(uint32_t cpu,int64_t timestamp,TracePacketData data)64   void ParseFtraceEvent(uint32_t cpu,
65                         int64_t timestamp,
66                         TracePacketData data) override {
67     MOCK_ParseFtracePacket(cpu, timestamp, data.packet.data(),
68                            data.packet.length(), machine_id_);
69   }
70 
71   MOCK_METHOD(void,
72               MOCK_ParseTracePacket,
73               (int64_t ts, const uint8_t* data, size_t length));
74 
ParseTrackEvent(int64_t,TrackEventData)75   void ParseTrackEvent(int64_t, TrackEventData) override {}
76 
ParseTracePacket(int64_t ts,TracePacketData data)77   void ParseTracePacket(int64_t ts, TracePacketData data) override {
78     TraceBlobView& tbv = data.packet;
79     MOCK_ParseTracePacket(ts, tbv.data(), tbv.length());
80   }
81 
82   std::optional<MachineId> machine_id_;
83 };
84 
85 class MockTraceStorage : public TraceStorage {
86  public:
87   MockTraceStorage() = default;
88 
89   MOCK_METHOD(StringId, InternString, (base::StringView view), (override));
90 };
91 
92 class TraceSorterTest : public ::testing::Test {
93  public:
TraceSorterTest()94   TraceSorterTest() : test_buffer_(TraceBlob::Allocate(8)) {
95     storage_ = new NiceMock<MockTraceStorage>();
96     context_.storage.reset(storage_);
97     CreateSorter();
98   }
99 
CreateSorter(bool full_sort=true)100   void CreateSorter(bool full_sort = true) {
101     parser_ = new MockTraceParser(&context_);
102     context_.proto_trace_parser.reset(parser_);
103     auto sorting_mode = full_sort ? TraceSorter::SortingMode::kFullSort
104                                   : TraceSorter::SortingMode::kDefault;
105     context_.sorter.reset(new TraceSorter(&context_, sorting_mode));
106   }
107 
108  protected:
109   TraceProcessorContext context_;
110   MockTraceParser* parser_;
111   NiceMock<MockTraceStorage>* storage_;
112   TraceBlobView test_buffer_;
113 };
114 
TEST_F(TraceSorterTest,TestFtrace)115 TEST_F(TraceSorterTest, TestFtrace) {
116   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
117   TraceBlobView view = test_buffer_.slice_off(0, 1);
118   EXPECT_CALL(*parser_,
119               MOCK_ParseFtracePacket(0, 1000, view.data(), 1, kNullMachineId));
120   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
121                                    std::move(view), state);
122   context_.sorter->ExtractEventsForced();
123 }
124 
TEST_F(TraceSorterTest,TestTracePacket)125 TEST_F(TraceSorterTest, TestTracePacket) {
126   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
127   TraceBlobView view = test_buffer_.slice_off(0, 1);
128   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view.data(), 1));
129   context_.sorter->PushTracePacket(1000, state, std::move(view));
130   context_.sorter->ExtractEventsForced();
131 }
132 
TEST_F(TraceSorterTest,Ordering)133 TEST_F(TraceSorterTest, Ordering) {
134   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
135   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
136   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
137   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
138   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
139 
140   InSequence s;
141 
142   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(0, 1000, view_1.data(), 1,
143                                                kNullMachineId));
144   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1001, view_2.data(), 2));
145   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, view_3.data(), 3));
146   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(2, 1200, view_4.data(), 4,
147                                                kNullMachineId));
148 
149   context_.sorter->PushFtraceEvent(2 /*cpu*/, 1200 /*timestamp*/,
150                                    std::move(view_4), state);
151   context_.sorter->PushTracePacket(1001, state, std::move(view_2));
152   context_.sorter->PushTracePacket(1100, state, std::move(view_3));
153   context_.sorter->PushFtraceEvent(0 /*cpu*/, 1000 /*timestamp*/,
154                                    std::move(view_1), state);
155   context_.sorter->ExtractEventsForced();
156 }
157 
TEST_F(TraceSorterTest,IncrementalExtraction)158 TEST_F(TraceSorterTest, IncrementalExtraction) {
159   CreateSorter(false);
160 
161   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
162 
163   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
164   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
165   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
166   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
167   TraceBlobView view_5 = test_buffer_.slice_off(0, 5);
168 
169   // Flush at the start of packet sequence to match behavior of the
170   // service.
171   context_.sorter->NotifyFlushEvent();
172   context_.sorter->PushTracePacket(1200, state, std::move(view_2));
173   context_.sorter->PushTracePacket(1100, state, std::move(view_1));
174 
175   // No data should be exttracted at this point because we haven't
176   // seen two flushes yet.
177   context_.sorter->NotifyReadBufferEvent();
178 
179   // Now that we've seen two flushes, we should be ready to start extracting
180   // data on the next OnReadBufer call (after two flushes as usual).
181   context_.sorter->NotifyFlushEvent();
182   context_.sorter->NotifyReadBufferEvent();
183 
184   context_.sorter->NotifyFlushEvent();
185   context_.sorter->NotifyFlushEvent();
186   context_.sorter->PushTracePacket(1400, state, std::move(view_4));
187   context_.sorter->PushTracePacket(1300, state, std::move(view_3));
188 
189   // This ReadBuffer call should finally extract until the first OnReadBuffer
190   // call.
191   {
192     InSequence s;
193     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
194     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
195   }
196   context_.sorter->NotifyReadBufferEvent();
197 
198   context_.sorter->NotifyFlushEvent();
199   context_.sorter->PushTracePacket(1500, state, std::move(view_5));
200 
201   // Nothing should be extracted as we haven't seen the second flush.
202   context_.sorter->NotifyReadBufferEvent();
203 
204   // Now we've seen the second flush we should extract the next two packets.
205   context_.sorter->NotifyFlushEvent();
206   {
207     InSequence s;
208     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1300, test_buffer_.data(), 3));
209     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1400, test_buffer_.data(), 4));
210   }
211   context_.sorter->NotifyReadBufferEvent();
212 
213   // The forced extraction should get the last packet.
214   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1500, test_buffer_.data(), 5));
215   context_.sorter->ExtractEventsForced();
216 }
217 
218 // Simulate a producer bug where the third packet is emitted
219 // out of order. Verify that we track the stats correctly.
TEST_F(TraceSorterTest,OutOfOrder)220 TEST_F(TraceSorterTest, OutOfOrder) {
221   CreateSorter(false);
222 
223   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
224 
225   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
226   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
227   TraceBlobView view_3 = test_buffer_.slice_off(0, 3);
228   TraceBlobView view_4 = test_buffer_.slice_off(0, 4);
229 
230   context_.sorter->NotifyFlushEvent();
231   context_.sorter->NotifyFlushEvent();
232   context_.sorter->PushTracePacket(1200, state, std::move(view_2));
233   context_.sorter->PushTracePacket(1100, state, std::move(view_1));
234   context_.sorter->NotifyReadBufferEvent();
235 
236   // Both of the packets should have been pushed through.
237   context_.sorter->NotifyFlushEvent();
238   context_.sorter->NotifyFlushEvent();
239   {
240     InSequence s;
241     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1100, test_buffer_.data(), 1));
242     EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1200, test_buffer_.data(), 2));
243   }
244   context_.sorter->NotifyReadBufferEvent();
245 
246   // Now, pass the third packet out of order.
247   context_.sorter->NotifyFlushEvent();
248   context_.sorter->NotifyFlushEvent();
249   context_.sorter->PushTracePacket(1150, state, std::move(view_3));
250   context_.sorter->NotifyReadBufferEvent();
251 
252   // Third packet should not be pushed through.
253   context_.sorter->NotifyFlushEvent();
254   context_.sorter->NotifyFlushEvent();
255   context_.sorter->NotifyReadBufferEvent();
256 
257   // We should also increment the stat that this was out of order.
258   const auto& stats = context_.storage->stats();
259   ASSERT_EQ(stats[stats::sorter_push_event_out_of_order].value, 1);
260 
261   // Third packet should not be pushed through.
262   context_.sorter->NotifyFlushEvent();
263   context_.sorter->NotifyFlushEvent();
264   context_.sorter->PushTracePacket(1170, state, std::move(view_4));
265   context_.sorter->NotifyReadBufferEvent();
266   context_.sorter->ExtractEventsForced();
267 
268   // We should also increment the stat that this was out of order.
269   ASSERT_EQ(stats[stats::sorter_push_event_out_of_order].value, 2);
270 }
271 
272 // Simulates a random stream of ftrace events happening on random CPUs.
273 // Tests that the output of the TraceSorter matches the timestamp order
274 // (% events happening at the same time on different CPUs).
TEST_F(TraceSorterTest,MultiQueueSorting)275 TEST_F(TraceSorterTest, MultiQueueSorting) {
276   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
277   std::minstd_rand0 rnd_engine(0);
278   std::map<int64_t /*ts*/, std::vector<uint32_t /*cpu*/>> expectations;
279 
280   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
281       .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
282                                              const uint8_t*, size_t,
283                                              std::optional<MachineId>) {
284         EXPECT_EQ(expectations.begin()->first, timestamp);
285         auto& cpus = expectations.begin()->second;
286         bool cpu_found = false;
287         for (auto it = cpus.begin(); it < cpus.end(); it++) {
288           if (*it != cpu)
289             continue;
290           cpu_found = true;
291           cpus.erase(it);
292           break;
293         }
294         if (cpus.empty())
295           expectations.erase(expectations.begin());
296         EXPECT_TRUE(cpu_found);
297       }));
298 
299   // Allocate a 1000 byte trace blob and push one byte chunks to be sorted with
300   // random timestamps. This will stress test the sorter with worst case
301   // scenarios and will (and has many times) expose any subtle bugs hiding in
302   // the sorter logic.
303   TraceBlobView tbv(TraceBlob::Allocate(1000));
304   for (uint16_t i = 0; i < 1000; i++) {
305     int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
306     uint8_t num_cpus = rnd_engine() % 3;
307     for (uint8_t j = 0; j < num_cpus; j++) {
308       uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
309       expectations[ts].push_back(cpu);
310       context_.sorter->PushFtraceEvent(cpu, ts, tbv.slice_off(i, 1), state);
311     }
312   }
313 
314   context_.sorter->ExtractEventsForced();
315   EXPECT_TRUE(expectations.empty());
316 }
317 
318 // An generalized version of MultiQueueSorting with multiple machines.
TEST_F(TraceSorterTest,MultiMachineSorting)319 TEST_F(TraceSorterTest, MultiMachineSorting) {
320   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
321   std::minstd_rand0 rnd_engine(0);
322 
323   struct ExpectedMachineAndCpu {
324     std::optional<MachineId> machine_id;
325     uint32_t cpu;
326 
327     bool operator==(const ExpectedMachineAndCpu& other) const {
328       return std::tie(machine_id, cpu) == std::tie(other.machine_id, other.cpu);
329     }
330     bool operator!=(const ExpectedMachineAndCpu& other) const {
331       return !operator==(other);
332     }
333   };
334   std::map<int64_t /*ts*/, std::vector<ExpectedMachineAndCpu>> expectations;
335 
336   // The total number of machines (including the default one).
337   constexpr size_t num_machines = 5;
338   std::vector<MockTraceParser*> extra_parsers;
339   std::vector<std::unique_ptr<TraceProcessorContext>> extra_contexts;
340   // Set up extra machines and add to the sorter.
341   // MachineIdValue are 1..(num_machines-1).
342   for (auto i = 1u; i < num_machines; i++) {
343     TraceProcessorContext::InitArgs args{context_.config, context_.storage, i};
344     auto ctx = std::make_unique<TraceProcessorContext>(args);
345     auto parser = std::make_unique<MockTraceParser>(ctx.get());
346     extra_parsers.push_back(parser.get());
347     ctx->proto_trace_parser = std::move(parser);
348     extra_contexts.push_back(std::move(ctx));
349     context_.sorter->AddMachineContext(extra_contexts.back().get());
350   }
351 
352   // Set up the expectation for the default machine.
353   EXPECT_CALL(*parser_, MOCK_ParseFtracePacket(_, _, _, _, _))
354       .WillRepeatedly(Invoke([&expectations](uint32_t cpu, int64_t timestamp,
355                                              const uint8_t*, size_t,
356                                              std::optional<MachineId>) {
357         EXPECT_EQ(expectations.begin()->first, timestamp);
358         auto& machines_and_cpus = expectations.begin()->second;
359         bool found = false;
360         for (auto it = machines_and_cpus.begin(); it < machines_and_cpus.end();
361              it++) {
362           // The default machine is called machine ID == std::nullopt.
363           if (*it != ExpectedMachineAndCpu{kNullMachineId, cpu})
364             continue;
365           found = true;
366           machines_and_cpus.erase(it);
367           break;
368         }
369         if (machines_and_cpus.empty())
370           expectations.erase(expectations.begin());
371         EXPECT_TRUE(found);
372       }));
373   // Set up expectations for remote machines.
374   for (auto* parser : extra_parsers) {
375     EXPECT_CALL(*parser, MOCK_ParseFtracePacket(_, _, _, _, _))
376         .WillRepeatedly(Invoke(
377             [&expectations](uint32_t cpu, int64_t timestamp, const uint8_t*,
378                             size_t, std::optional<MachineId> machine_id) {
379               EXPECT_TRUE(machine_id.has_value());
380               EXPECT_EQ(expectations.begin()->first, timestamp);
381               auto& machines_and_cpus = expectations.begin()->second;
382               bool found = false;
383               for (auto it = machines_and_cpus.begin();
384                    it < machines_and_cpus.end(); it++) {
385                 // Remote machines are called with non-null machine_id.
386                 if (*it != ExpectedMachineAndCpu{machine_id, cpu})
387                   continue;
388                 found = true;
389                 machines_and_cpus.erase(it);
390                 break;
391               }
392               if (machines_and_cpus.empty())
393                 expectations.erase(expectations.begin());
394               EXPECT_TRUE(found);
395             }));
396   }
397 
398   // Allocate a 1000 byte trace blob (per-machine) and push one byte chunks to
399   // be sorted with random timestamps.
400   constexpr size_t alloc_size = 1000;
401   TraceBlobView tbv(TraceBlob::Allocate(alloc_size * num_machines));
402   for (size_t m = 0; m < num_machines; m++) {
403     // TraceProcessorContext::machine_id is nullopt for the default machine or a
404     // monotonic counter starting from 1. 0 is a reserved value that isn't used.
405     std::optional<MachineId> machine;
406     if (m)
407       machine = extra_contexts[m - 1]->machine_id();
408 
409     for (uint16_t i = 0; i < alloc_size; i++) {
410       int64_t ts = abs(static_cast<int64_t>(rnd_engine()));
411       uint8_t num_cpus = rnd_engine() % 3;
412       for (uint8_t j = 0; j < num_cpus; j++) {
413         uint32_t cpu = static_cast<uint32_t>(rnd_engine() % 32);
414         expectations[ts].push_back(ExpectedMachineAndCpu{machine, cpu});
415         context_.sorter->PushFtraceEvent(
416             cpu, ts, tbv.slice_off(m * alloc_size + i, 1), state, machine);
417       }
418     }
419   }
420 
421   context_.sorter->ExtractEventsForced();
422   EXPECT_TRUE(expectations.empty());
423 }
424 
TEST_F(TraceSorterTest,SetSortingMode)425 TEST_F(TraceSorterTest, SetSortingMode) {
426   CreateSorter(false);
427 
428   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
429 
430   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
431   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
432 
433   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view_1.data(), 1));
434   context_.sorter->PushTracePacket(1000, state, std::move(view_1));
435 
436   // Changing to full sorting mode should succeed as no events have been
437   // extracted yet.
438   EXPECT_TRUE(
439       context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
440 
441   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(2000, view_2.data(), 2));
442   context_.sorter->PushTracePacket(2000, state, std::move(view_2));
443 
444   // Changing back to default sorting mode is not allowed.
445   EXPECT_FALSE(
446       context_.sorter->SetSortingMode(TraceSorter::SortingMode::kDefault));
447 
448   // Setting sorting mode to the current mode should succeed.
449   EXPECT_TRUE(
450       context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
451 
452   context_.sorter->ExtractEventsForced();
453 
454   // Setting sorting mode to the current mode should still succeed.
455   EXPECT_TRUE(
456       context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
457 }
458 
TEST_F(TraceSorterTest,SetSortingModeAfterExtraction)459 TEST_F(TraceSorterTest, SetSortingModeAfterExtraction) {
460   CreateSorter(false);
461 
462   auto state = PacketSequenceStateGeneration::CreateFirst(&context_);
463 
464   TraceBlobView view_1 = test_buffer_.slice_off(0, 1);
465   TraceBlobView view_2 = test_buffer_.slice_off(0, 2);
466 
467   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(1000, view_1.data(), 1));
468   context_.sorter->PushTracePacket(1000, state, std::move(view_1));
469   EXPECT_CALL(*parser_, MOCK_ParseTracePacket(2000, view_2.data(), 2));
470   context_.sorter->PushTracePacket(2000, state, std::move(view_2));
471   context_.sorter->ExtractEventsForced();
472 
473   // Changing to full sorting mode should fail as some events have already been
474   // extracted.
475   EXPECT_FALSE(
476       context_.sorter->SetSortingMode(TraceSorter::SortingMode::kFullSort));
477 }
478 
479 }  // namespace
480 }  // namespace perfetto::trace_processor
481