1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <stdarg.h>
18 
19 #include <chrono>
20 #include <list>
21 #include <map>
22 #include <random>
23 #include <regex>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "perfetto/base/build_config.h"
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/ext/base/ctrl_c_handler.h"
31 #include "perfetto/ext/base/file_utils.h"
32 #include "perfetto/ext/base/scoped_file.h"
33 #include "perfetto/ext/base/subprocess.h"
34 #include "perfetto/ext/base/temp_file.h"
35 #include "perfetto/ext/base/utils.h"
36 #include "perfetto/protozero/proto_utils.h"
37 #include "perfetto/tracing.h"
38 #include "perfetto/tracing/core/forward_decls.h"
39 #include "perfetto/tracing/core/trace_config.h"
40 #include "src/base/test/utils.h"
41 
42 #include "protos/perfetto/config/stress_test_config.gen.h"
43 #include "protos/perfetto/trace/test_event.pbzero.h"
44 #include "protos/perfetto/trace/trace_packet.pbzero.h"
45 
46 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
47 #include <Windows.h>
48 #else
49 #include <signal.h>
50 #endif
51 
52 // Generated by gen_configs_blob.py. It defines the kStressTestConfigs array,
53 // which contains a proto-encoded StressTestConfig message for each .cfg file
54 // listed in /test/stress_test/configs/BUILD.gn.
55 #include "test/stress_test/configs/stress_test_config_blobs.h"
56 
57 namespace perfetto {
58 namespace {
59 
60 // TODO(primiano): We need a base::File to get around the awkwardness of
61 // files on Windows being a mix of int and HANDLE (and open() vs CreateFile())
62 // in our codebase.
OpenLogFile(const std::string & path)63 base::ScopedPlatformHandle OpenLogFile(const std::string& path) {
64 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
65   return base::ScopedPlatformHandle(::CreateFileA(
66       path.c_str(), GENERIC_READ | GENERIC_WRITE,
67       FILE_SHARE_DELETE | FILE_SHARE_READ, nullptr, CREATE_ALWAYS, 0, nullptr));
68 #else
69   return base::OpenFile(path, O_RDWR | O_CREAT | O_TRUNC, 0644);
70 #endif
71 }
72 
73 using StressTestConfig = protos::gen::StressTestConfig;
74 
75 struct SigHandlerCtx {
76   std::atomic<bool> aborted{};
77   std::vector<int> pids_to_kill;
78 };
79 SigHandlerCtx* g_sig;
80 
81 struct TestResult {
82   const char* cfg_name = nullptr;
83   StressTestConfig cfg;
84   uint32_t run_time_ms = 0;
85   uint32_t trace_size_kb = 0;
86   uint32_t num_packets = 0;
87   uint32_t num_threads = 0;
88   uint32_t num_errors = 0;
89   base::Subprocess::ResourceUsage svc_rusage;
90   base::Subprocess::ResourceUsage prod_rusage;
91 };
92 
93 struct ParsedTraceStats {
94   struct WriterThread {
95     uint64_t packets_seen = 0;
96     bool last_seen = false;
97     uint32_t last_seq = 0;
98     uint64_t seq_errors = 0;
99     uint64_t counter_errors = 0;
100     std::minstd_rand0 rnd_engine;
101   };
102 
103   // One for each trusted_packet_sequence_id.
104   std::map<uint32_t, WriterThread> threads;
105 };
106 
107 class TestHarness {
108  public:
109   TestHarness();
110   void RunConfig(const char* cfg_name, const StressTestConfig&, bool verbose);
test_results() const111   const std::list<TestResult>& test_results() const { return test_results_; }
112 
113  private:
114   void ReadbackTrace(const std::string&, ParsedTraceStats*);
115   void ParseTracePacket(const uint8_t*, size_t, ParsedTraceStats* ctx);
116   void AddFailure(const char* fmt, ...) PERFETTO_PRINTF_FORMAT(2, 3);
117 
118   std::vector<std::string> env_;
119   std::list<TestResult> test_results_;
120   std::string results_dir_;
121   base::ScopedFile error_log_;
122 };
123 
TestHarness()124 TestHarness::TestHarness() {
125   results_dir_ = base::GetSysTempDir() + "/perfetto-stress-test";
126 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
127   base::ignore_result(system(("rmdir \"" + results_dir_ + "\" /s /q").c_str()));
128 #else
129   base::ignore_result(system(("rm -r -- \"" + results_dir_ + "\"").c_str()));
130 #endif
131   PERFETTO_CHECK(base::Mkdir(results_dir_));
132   PERFETTO_LOG("Saving test results in %s", results_dir_.c_str());
133 }
134 
AddFailure(const char * fmt,...)135 void TestHarness::AddFailure(const char* fmt, ...) {
136   ++test_results_.back().num_errors;
137 
138   char log_msg[512];
139   va_list args;
140   va_start(args, fmt);
141   int res = vsnprintf(log_msg, sizeof(log_msg), fmt, args);
142   va_end(args);
143 
144   PERFETTO_ELOG("FAIL: %s", log_msg);
145 
146   if (res > 0 && static_cast<size_t>(res) < sizeof(log_msg) - 2) {
147     log_msg[res++] = '\n';
148     log_msg[res++] = '\0';
149   }
150 
151   base::WriteAll(*error_log_, log_msg, static_cast<size_t>(res));
152 }
153 
RunConfig(const char * cfg_name,const StressTestConfig & cfg,bool verbose)154 void TestHarness::RunConfig(const char* cfg_name,
155                             const StressTestConfig& cfg,
156                             bool verbose) {
157   test_results_.emplace_back();
158   TestResult& test_result = test_results_.back();
159   test_result.cfg_name = cfg_name;
160   test_result.cfg = cfg;
161   g_sig->pids_to_kill.clear();
162 
163   auto result_dir = results_dir_ + "/" + cfg_name;
164   PERFETTO_CHECK(base::Mkdir(result_dir));
165   error_log_ = base::OpenFile(result_dir + "/errors.log",
166                               O_RDWR | O_CREAT | O_TRUNC, 0644);
167 
168   PERFETTO_ILOG("Starting \"%s\" - %s", cfg_name, result_dir.c_str());
169 
170   env_.emplace_back("PERFETTO_PRODUCER_SOCK_NAME=" + result_dir +
171                     "/producer.sock");
172   env_.emplace_back("PERFETTO_CONSUMER_SOCK_NAME=" + result_dir +
173                     "/consumer.sock");
174   std::string bin_dir = base::GetCurExecutableDir();
175 
176   // Start the service.
177   base::Subprocess traced({bin_dir + "/traced"});
178   traced.args.env = env_;
179   if (!verbose) {
180     traced.args.out_fd = OpenLogFile(result_dir + "/traced.log");
181     traced.args.stderr_mode = traced.args.stdout_mode =
182         base::Subprocess::OutputMode::kFd;
183   }
184   traced.Start();
185   g_sig->pids_to_kill.emplace_back(traced.pid());
186   std::this_thread::sleep_for(std::chrono::milliseconds(100));
187   PERFETTO_CHECK(traced.Poll() == base::Subprocess::kRunning);
188 
189   // Start the producer processes.
190   std::list<base::Subprocess> producers;
191   for (uint32_t i = 0; i < cfg.num_processes(); ++i) {
192     producers.emplace_back(base::Subprocess({bin_dir + "/stress_producer"}));
193     auto& producer = producers.back();
194     producer.args.input = cfg.SerializeAsString();
195     if (!verbose) {
196       producer.args.out_fd =
197           OpenLogFile(result_dir + "/producer." + std::to_string(i) + ".log");
198       producer.args.stderr_mode = producer.args.stdout_mode =
199           base::Subprocess::OutputMode::kFd;
200     }
201     producer.args.env = env_;
202     producer.Start();
203     g_sig->pids_to_kill.emplace_back(producer.pid());
204   }
205   std::this_thread::sleep_for(std::chrono::milliseconds(100));
206   for (auto& producer : producers)
207     PERFETTO_CHECK(producer.Poll() == base::Subprocess::kRunning);
208 
209   auto trace_file_path = result_dir + "/trace";
210   base::Subprocess consumer(
211       {bin_dir + "/perfetto", "-c", "-", "-o", trace_file_path.c_str()});
212   consumer.args.env = env_;
213   consumer.args.input = cfg.trace_config().SerializeAsString();
214   if (!verbose) {
215     consumer.args.out_fd = OpenLogFile(result_dir + "/perfetto.log");
216     consumer.args.stderr_mode = consumer.args.stdout_mode =
217         base::Subprocess::OutputMode::kFd;
218   }
219   remove(trace_file_path.c_str());
220   consumer.Start();
221   int64_t t_start = base::GetBootTimeNs().count();
222   g_sig->pids_to_kill.emplace_back(consumer.pid());
223 
224   std::this_thread::sleep_for(std::chrono::milliseconds(100));
225   PERFETTO_CHECK(consumer.Poll() == base::Subprocess::kRunning);
226 
227   if (!consumer.Wait(
228           static_cast<int>(cfg.trace_config().duration_ms() + 30000))) {
229     AddFailure("Consumer didn't quit in time");
230     consumer.KillAndWaitForTermination();
231   }
232 
233   int64_t t_end = base::GetBootTimeNs().count();
234 
235   for (auto& producer : producers) {
236     producer.KillAndWaitForTermination();
237     test_result.prod_rusage = producer.posix_rusage();  // Only keep last one
238   }
239   producers.clear();
240   traced.KillAndWaitForTermination();
241 
242   test_result.svc_rusage = traced.posix_rusage();
243   test_result.run_time_ms = static_cast<uint32_t>((t_end - t_start) / 1000000);
244 
245   // Verify
246   // TODO(primiano): read back the TraceStats and check them as well.
247   ParsedTraceStats ctx;
248   ReadbackTrace(trace_file_path, &ctx);
249   auto exp_thd = cfg.num_processes() * cfg.num_threads();
250   if (ctx.threads.size() != exp_thd) {
251     AddFailure("Trace threads mismatch. Expected %u threads, got %zu", exp_thd,
252                ctx.threads.size());
253   }
254   for (const auto& it : ctx.threads) {
255     uint32_t seq_id = it.first;
256     const auto& thd = it.second;
257     if (!thd.last_seen) {
258       AddFailure("Last packet not seen for sequence %u", seq_id);
259     }
260     if (thd.seq_errors > 0) {
261       AddFailure("Sequence %u had %" PRIu64 " packets out of sync", seq_id,
262                  thd.seq_errors);
263     }
264     if (thd.counter_errors > 0) {
265       AddFailure("Sequence %u had %" PRIu64 " packets counter errors", seq_id,
266                  thd.counter_errors);
267     }
268   }
269 
270   error_log_.reset();
271   PERFETTO_ILOG("Completed \"%s\"", cfg_name);
272 }
273 
ReadbackTrace(const std::string & trace_file_path,ParsedTraceStats * ctx)274 void TestHarness::ReadbackTrace(const std::string& trace_file_path,
275                                 ParsedTraceStats* ctx) {
276   TestResult& test_result = test_results_.back();
277   using namespace protozero::proto_utils;
278   auto fd = base::OpenFile(trace_file_path.c_str(), O_RDONLY);
279   if (!fd)
280     return AddFailure("Trace file does not exist");
281   const off_t file_size = lseek(*fd, 0, SEEK_END);
282   lseek(*fd, 0, SEEK_SET);
283   if (file_size <= 0)
284     return AddFailure("Trace file is empty");
285 
286   test_result.trace_size_kb = static_cast<uint32_t>(file_size / 1000);
287   std::string trace_data;
288   PERFETTO_CHECK(base::ReadFileDescriptor(*fd, &trace_data));
289   const auto* const start = reinterpret_cast<const uint8_t*>(trace_data.data());
290   const uint8_t* const end = start + file_size;
291 
292   constexpr uint8_t kTracePacketTag = MakeTagLengthDelimited(1);
293 
294   for (auto* ptr = start; (end - ptr) > 2;) {
295     const uint8_t* tokenizer_start = ptr;
296     if (*(ptr++) != kTracePacketTag) {
297       return AddFailure("Tokenizer failure at offset %zd", ptr - start);
298     }
299     uint64_t packet_size = 0;
300     ptr = ParseVarInt(ptr, end, &packet_size);
301     const uint8_t* const packet_start = ptr;
302     ptr += packet_size;
303     if ((ptr - tokenizer_start) < 2 || ptr > end)
304       return AddFailure("Got invalid packet size %" PRIu64 " at offset %zd",
305                         packet_size,
306                         static_cast<ssize_t>(packet_start - start));
307     ParseTracePacket(packet_start, static_cast<size_t>(packet_size), ctx);
308   }
309   test_result.num_threads = static_cast<uint32_t>(ctx->threads.size());
310 }
311 
ParseTracePacket(const uint8_t * start,size_t size,ParsedTraceStats * ctx)312 void TestHarness::ParseTracePacket(const uint8_t* start,
313                                    size_t size,
314                                    ParsedTraceStats* ctx) {
315   TestResult& test_result = test_results_.back();
316   protos::pbzero::TracePacket::Decoder packet(start, size);
317   if (!packet.has_for_testing())
318     return;
319 
320   ++test_result.num_packets;
321   const uint32_t seq_id = packet.trusted_packet_sequence_id();
322 
323   protos::pbzero::TestEvent::Decoder te(packet.for_testing());
324   auto t_it = ctx->threads.find(seq_id);
325   bool is_first_packet = false;
326   if (t_it == ctx->threads.end()) {
327     is_first_packet = true;
328     t_it = ctx->threads.emplace(seq_id, ParsedTraceStats::WriterThread()).first;
329   }
330   ParsedTraceStats::WriterThread& thd = t_it->second;
331 
332   ++thd.packets_seen;
333   if (te.is_last()) {
334     if (thd.last_seen) {
335       return AddFailure(
336           "last_seen=true happened more than once for sequence %u", seq_id);
337     } else {
338       thd.last_seen = true;
339     }
340   }
341   if (is_first_packet) {
342     thd.rnd_engine = std::minstd_rand0(te.seq_value());
343   } else {
344     const uint32_t expected = static_cast<uint32_t>(thd.rnd_engine());
345     if (te.seq_value() != expected) {
346       thd.rnd_engine = std::minstd_rand0(te.seq_value());  // Resync the engine.
347       ++thd.seq_errors;
348       return AddFailure(
349           "TestEvent seq mismatch for sequence %u. Expected %u got %u", seq_id,
350           expected, te.seq_value());
351     }
352     if (te.counter() != thd.packets_seen) {
353       return AddFailure(
354           "TestEvent counter mismatch for sequence %u. Expected %" PRIu64
355           " got %" PRIu64,
356           seq_id, thd.packets_seen, te.counter());
357     }
358   }
359 
360   if (!te.has_payload()) {
361     return AddFailure("TestEvent %u for sequence %u has no payload",
362                       te.seq_value(), seq_id);
363   }
364 
365   // Check the validity of the payload. The payload might be nested. If that is
366   // the case, we need to check all levels.
367   protozero::ConstBytes payload_bounds = te.payload();
368   for (uint32_t depth = 0, last_depth = 0;; depth++) {
369     if (depth > 100) {
370       return AddFailure("Unexpectedly deep depth for event %u, sequence %u",
371                         te.seq_value(), seq_id);
372     }
373     protos::pbzero::TestEvent::TestPayload::Decoder payload(payload_bounds);
374     const uint32_t rem_depth = payload.remaining_nesting_depth();
375 
376     // The payload is a repeated field and must have exactly two instances.
377     // The writer splits it always in two halves of identical size.
378     int num_payload_pieces = 0;
379     size_t last_size = 0;
380     for (auto it = payload.str(); it; ++it, ++num_payload_pieces) {
381       protozero::ConstChars payload_str = *it;
382       last_size = last_size ? last_size : payload_str.size;
383       if (payload_str.size != last_size) {
384         return AddFailure(
385             "Asymmetrical payload at depth %u, event id %u, sequence %u. "
386             "%zu != %zu",
387             depth, te.seq_value(), seq_id, last_size, payload_str.size);
388       }
389       // Check that the payload content matches the expected sequence.
390       for (size_t i = 0; i < payload_str.size; i++) {
391         char exp = static_cast<char>(33 + ((te.seq_value() + i) % 64));
392         if (payload_str.data[i] != exp) {
393           return AddFailure(
394               "Payload mismatch at %zu, depth %u, event id %u, sequence %u. "
395               "Expected: 0x%x, Actual: 0x%x",
396               i, depth, te.seq_value(), seq_id, exp, payload_str.data[i]);
397         }
398       }
399     }
400     if (num_payload_pieces != 2) {
401       return AddFailure(
402           "Broken payload at depth %u, event id %u, sequence %u. "
403           "Expecting 2 repeated str fields, got %d",
404           depth, te.seq_value(), seq_id, num_payload_pieces);
405     }
406 
407     if (depth > 0 && rem_depth != last_depth - 1) {
408       return AddFailure(
409           "Unexpected nesting level (expected: %u, actual: %u) at depth %u, "
410           "event id %u, sequence %u",
411           rem_depth, last_depth - 1, depth, te.seq_value(), seq_id);
412     }
413 
414     last_depth = rem_depth;
415     if (rem_depth == 0)
416       break;
417     if (payload.has_nested()) {
418       payload_bounds = *payload.nested();
419     } else {
420       payload_bounds = {nullptr, 0};
421     }
422   }
423 }
424 
CtrlCHandler()425 void CtrlCHandler() {
426   g_sig->aborted.store(true);
427   for (auto it = g_sig->pids_to_kill.rbegin(); it != g_sig->pids_to_kill.rend();
428        it++) {
429 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
430     base::ScopedPlatformHandle proc_handle(
431         ::OpenProcess(PROCESS_TERMINATE, false, *it));
432     ::TerminateProcess(*proc_handle, STATUS_CONTROL_C_EXIT);
433 #else
434     kill(*it, SIGKILL);
435 #endif
436   }
437 }
438 
StressTestMain(int argc,char ** argv)439 void StressTestMain(int argc, char** argv) {
440   TestHarness th;
441   std::regex filter;
442   bool has_filter = false;
443 
444   bool verbose = false;
445   for (int i = 1; i < argc; ++i) {
446     if (!strcmp(argv[i], "-v")) {
447       verbose = true;
448     } else {
449       filter = std::regex(argv[i], std::regex::ECMAScript | std::regex::icase);
450       has_filter = true;
451     }
452   }
453 
454   g_sig = new SigHandlerCtx();
455   base::InstallCtrCHandler(&CtrlCHandler);
456 
457   for (size_t i = 0; i < base::ArraySize(kStressTestConfigs) && !g_sig->aborted;
458        ++i) {
459     const auto& cfg_blob = kStressTestConfigs[i];
460     StressTestConfig cfg;
461     std::cmatch ignored;
462     if (has_filter && !std::regex_search(cfg_blob.name, ignored, filter)) {
463       continue;
464     }
465     PERFETTO_CHECK(cfg.ParseFromArray(cfg_blob.data, cfg_blob.size));
466     th.RunConfig(cfg_blob.name, cfg, verbose);
467   }
468 
469   for (const auto& tres : th.test_results()) {
470     const auto& cfg = tres.cfg;
471     printf("===============================================================\n");
472     printf("Config: %s\n", tres.cfg_name);
473     printf("===============================================================\n");
474     printf("%-20s %-10s %-10s\n", "Metric", "Expected", "Actual");
475     printf("%-20s %-10s %-10s\n", "------", "--------", "------");
476     printf("%-20s %-10d %-10d\n", "#Errors", 0, tres.num_errors);
477     printf("%-20s %-10d %-10d \n", "Duration [ms]",
478            cfg.trace_config().duration_ms(), tres.run_time_ms);
479 
480     uint32_t exp_threads = cfg.num_processes() * cfg.num_threads();
481     printf("%-20s %-10u %-10u\n", "Num threads", exp_threads, tres.num_threads);
482 
483     double dur_s = cfg.trace_config().duration_ms() / 1e3;
484     double exp_per_thread = cfg.steady_state_timings().rate_mean() * dur_s;
485     if (cfg.burst_period_ms()) {
486       double burst_rate = 1.0 * cfg.burst_duration_ms() / cfg.burst_period_ms();
487       exp_per_thread *= 1.0 - burst_rate;
488       exp_per_thread += burst_rate * cfg.burst_timings().rate_mean() * dur_s;
489     }
490     if (cfg.max_events())
491       exp_per_thread = std::min(exp_per_thread, 1.0 * cfg.max_events());
492     double exp_packets = std::round(exp_per_thread * exp_threads);
493     printf("%-20s %-10.0f %-10d\n", "Num packets", exp_packets,
494            tres.num_packets);
495 
496     double exp_size_kb = exp_packets * (cfg.nesting() + 1) *
497                          (cfg.steady_state_timings().payload_mean() + 40) /
498                          1000;
499     printf("%-20s ~%-9.0f %-10d\n", "Trace size [KB]", exp_size_kb,
500            tres.trace_size_kb);
501 
502     double exp_rss_mb = cfg.trace_config().buffers()[0].size_kb() / 1000;
503     printf("%-20s (max) %-4.0f %-10d\n", "Svc RSS [MB]", exp_rss_mb,
504            tres.svc_rusage.max_rss_kb / 1000);
505     printf("%-20s %-10s %-10d\n", "Svc CPU [ms]", "---",
506            tres.svc_rusage.cpu_time_ms());
507     printf("%-20s %-10s %d / %d\n", "Svc #ctxswitch", "---",
508            tres.svc_rusage.invol_ctx_switch, tres.svc_rusage.vol_ctx_switch);
509 
510     printf("%-20s %-10s %-10d\n", "Prod RSS [MB]", "---",
511            tres.prod_rusage.max_rss_kb / 1000);
512     printf("%-20s %-10s %-10d\n", "Prod CPU [ms]", "---",
513            tres.prod_rusage.cpu_time_ms());
514     printf("%-20s %-10s %d / %d\n", "Prod #ctxswitch", "---",
515            tres.prod_rusage.invol_ctx_switch, tres.prod_rusage.vol_ctx_switch);
516     printf("\n");
517   }
518 }
519 
520 }  // namespace
521 }  // namespace perfetto
522 
main(int argc,char ** argv)523 int main(int argc, char** argv) {
524   perfetto::StressTestMain(argc, argv);
525 }
526