1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <stdarg.h>
18 
19 #include <chrono>
20 #include <list>
21 #include <map>
22 #include <random>
23 #include <regex>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include "perfetto/base/build_config.h"
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/ext/base/ctrl_c_handler.h"
32 #include "perfetto/ext/base/file_utils.h"
33 #include "perfetto/ext/base/scoped_file.h"
34 #include "perfetto/ext/base/subprocess.h"
35 #include "perfetto/ext/base/temp_file.h"
36 #include "perfetto/ext/base/utils.h"
37 #include "perfetto/protozero/proto_utils.h"
38 #include "perfetto/tracing.h"
39 #include "perfetto/tracing/core/forward_decls.h"
40 #include "perfetto/tracing/core/trace_config.h"
41 #include "src/base/test/utils.h"
42 
43 #include "protos/perfetto/config/stress_test_config.gen.h"
44 #include "protos/perfetto/trace/test_event.pbzero.h"
45 #include "protos/perfetto/trace/trace_packet.pbzero.h"
46 
47 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
48 #include <Windows.h>
49 #else
50 #include <signal.h>
51 #endif
52 
53 // Generated by gen_configs_blob.py. It defines the kStressTestConfigs array,
54 // which contains a proto-encoded StressTestConfig message for each .cfg file
55 // listed in /test/stress_test/configs/BUILD.gn.
56 #include "test/stress_test/configs/stress_test_config_blobs.h"
57 
58 namespace perfetto {
59 namespace {
60 
61 // TODO(primiano): We need a base::File to get around the awkwardness of
62 // files on Windows being a mix of int and HANDLE (and open() vs CreateFile())
63 // in our codebase.
OpenLogFile(const std::string & path)64 base::ScopedPlatformHandle OpenLogFile(const std::string& path) {
65 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
66   return base::ScopedPlatformHandle(::CreateFileA(
67       path.c_str(), GENERIC_READ | GENERIC_WRITE,
68       FILE_SHARE_DELETE | FILE_SHARE_READ, nullptr, CREATE_ALWAYS, 0, nullptr));
69 #else
70   return base::OpenFile(path, O_RDWR | O_CREAT | O_TRUNC, 0644);
71 #endif
72 }
73 
74 using StressTestConfig = protos::gen::StressTestConfig;
75 
76 struct SigHandlerCtx {
77   std::atomic<bool> aborted{};
78   std::vector<base::PlatformProcessId> pids_to_kill;
79 };
80 SigHandlerCtx* g_sig;
81 
82 struct TestResult {
83   const char* cfg_name = nullptr;
84   StressTestConfig cfg;
85   uint32_t run_time_ms = 0;
86   uint32_t trace_size_kb = 0;
87   uint32_t num_packets = 0;
88   uint32_t num_threads = 0;
89   uint32_t num_errors = 0;
90   base::Subprocess::ResourceUsage svc_rusage;
91   base::Subprocess::ResourceUsage prod_rusage;
92 };
93 
94 struct ParsedTraceStats {
95   struct WriterThread {
96     uint64_t packets_seen = 0;
97     bool last_seen = false;
98     uint32_t last_seq = 0;
99     uint64_t seq_errors = 0;
100     uint64_t counter_errors = 0;
101     std::minstd_rand0 rnd_engine;
102   };
103 
104   // One for each trusted_packet_sequence_id.
105   std::map<uint32_t, WriterThread> threads;
106 };
107 
108 class TestHarness {
109  public:
110   TestHarness();
111   void RunConfig(const char* cfg_name, const StressTestConfig&, bool verbose);
test_results() const112   const std::list<TestResult>& test_results() const { return test_results_; }
113 
114  private:
115   void ReadbackTrace(const std::string&, ParsedTraceStats*);
116   void ParseTracePacket(const uint8_t*, size_t, ParsedTraceStats* ctx);
117   void AddFailure(const char* fmt, ...) PERFETTO_PRINTF_FORMAT(2, 3);
118 
119   std::vector<std::string> env_;
120   std::list<TestResult> test_results_;
121   std::string results_dir_;
122   base::ScopedFile error_log_;
123 };
124 
TestHarness()125 TestHarness::TestHarness() {
126   results_dir_ = base::GetSysTempDir() + "/perfetto-stress-test";
127 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
128   base::ignore_result(system(("rmdir \"" + results_dir_ + "\" /s /q").c_str()));
129 #else
130   base::ignore_result(system(("rm -r -- \"" + results_dir_ + "\"").c_str()));
131 #endif
132   PERFETTO_CHECK(base::Mkdir(results_dir_));
133   PERFETTO_LOG("Saving test results in %s", results_dir_.c_str());
134 }
135 
AddFailure(const char * fmt,...)136 void TestHarness::AddFailure(const char* fmt, ...) {
137   ++test_results_.back().num_errors;
138 
139   char log_msg[512];
140   va_list args;
141   va_start(args, fmt);
142   int res = vsnprintf(log_msg, sizeof(log_msg), fmt, args);
143   va_end(args);
144 
145   PERFETTO_ELOG("FAIL: %s", log_msg);
146 
147   if (res > 0 && static_cast<size_t>(res) < sizeof(log_msg) - 2) {
148     log_msg[res++] = '\n';
149     log_msg[res++] = '\0';
150   }
151 
152   base::WriteAll(*error_log_, log_msg, static_cast<size_t>(res));
153 }
154 
RunConfig(const char * cfg_name,const StressTestConfig & cfg,bool verbose)155 void TestHarness::RunConfig(const char* cfg_name,
156                             const StressTestConfig& cfg,
157                             bool verbose) {
158   test_results_.emplace_back();
159   TestResult& test_result = test_results_.back();
160   test_result.cfg_name = cfg_name;
161   test_result.cfg = cfg;
162   g_sig->pids_to_kill.clear();
163 
164   auto result_dir = results_dir_ + "/" + cfg_name;
165   PERFETTO_CHECK(base::Mkdir(result_dir));
166   error_log_ = base::OpenFile(result_dir + "/errors.log",
167                               O_RDWR | O_CREAT | O_TRUNC, 0644);
168 
169   PERFETTO_ILOG("Starting \"%s\" - %s", cfg_name, result_dir.c_str());
170 
171   env_.emplace_back("PERFETTO_PRODUCER_SOCK_NAME=" + result_dir +
172                     "/producer.sock");
173   env_.emplace_back("PERFETTO_CONSUMER_SOCK_NAME=" + result_dir +
174                     "/consumer.sock");
175   std::string bin_dir = base::GetCurExecutableDir();
176 
177   // Start the service.
178   base::Subprocess traced({bin_dir + "/traced"});
179   traced.args.env = env_;
180   if (!verbose) {
181     traced.args.out_fd = OpenLogFile(result_dir + "/traced.log");
182     traced.args.stderr_mode = traced.args.stdout_mode =
183         base::Subprocess::OutputMode::kFd;
184   }
185   traced.Start();
186   g_sig->pids_to_kill.emplace_back(traced.pid());
187   std::this_thread::sleep_for(std::chrono::milliseconds(100));
188   PERFETTO_CHECK(traced.Poll() == base::Subprocess::kRunning);
189 
190   // Start the producer processes.
191   std::list<base::Subprocess> producers;
192   for (uint32_t i = 0; i < cfg.num_processes(); ++i) {
193     producers.emplace_back(base::Subprocess({bin_dir + "/stress_producer"}));
194     auto& producer = producers.back();
195     producer.args.input = cfg.SerializeAsString();
196     if (!verbose) {
197       producer.args.out_fd =
198           OpenLogFile(result_dir + "/producer." + std::to_string(i) + ".log");
199       producer.args.stderr_mode = producer.args.stdout_mode =
200           base::Subprocess::OutputMode::kFd;
201     }
202     producer.args.env = env_;
203     producer.Start();
204     g_sig->pids_to_kill.emplace_back(producer.pid());
205   }
206   std::this_thread::sleep_for(std::chrono::milliseconds(100));
207   for (auto& producer : producers)
208     PERFETTO_CHECK(producer.Poll() == base::Subprocess::kRunning);
209 
210   auto trace_file_path = result_dir + "/trace";
211   base::Subprocess consumer(
212       {bin_dir + "/perfetto", "-c", "-", "-o", trace_file_path.c_str()});
213   consumer.args.env = env_;
214   consumer.args.input = cfg.trace_config().SerializeAsString();
215   if (!verbose) {
216     consumer.args.out_fd = OpenLogFile(result_dir + "/perfetto.log");
217     consumer.args.stderr_mode = consumer.args.stdout_mode =
218         base::Subprocess::OutputMode::kFd;
219   }
220   remove(trace_file_path.c_str());
221   consumer.Start();
222   int64_t t_start = base::GetBootTimeNs().count();
223   g_sig->pids_to_kill.emplace_back(consumer.pid());
224 
225   std::this_thread::sleep_for(std::chrono::milliseconds(100));
226   PERFETTO_CHECK(consumer.Poll() == base::Subprocess::kRunning);
227 
228   if (!consumer.Wait(
229           static_cast<int>(cfg.trace_config().duration_ms() + 30000))) {
230     AddFailure("Consumer didn't quit in time");
231     consumer.KillAndWaitForTermination();
232   }
233 
234   int64_t t_end = base::GetBootTimeNs().count();
235 
236   for (auto& producer : producers) {
237     producer.KillAndWaitForTermination();
238     test_result.prod_rusage = producer.posix_rusage();  // Only keep last one
239   }
240   producers.clear();
241   traced.KillAndWaitForTermination();
242 
243   test_result.svc_rusage = traced.posix_rusage();
244   test_result.run_time_ms = static_cast<uint32_t>((t_end - t_start) / 1000000);
245 
246   // Verify
247   // TODO(primiano): read back the TraceStats and check them as well.
248   ParsedTraceStats ctx;
249   ReadbackTrace(trace_file_path, &ctx);
250   auto exp_thd = cfg.num_processes() * cfg.num_threads();
251   if (ctx.threads.size() != exp_thd) {
252     AddFailure("Trace threads mismatch. Expected %u threads, got %zu", exp_thd,
253                ctx.threads.size());
254   }
255   for (const auto& it : ctx.threads) {
256     uint32_t seq_id = it.first;
257     const auto& thd = it.second;
258     if (!thd.last_seen) {
259       AddFailure("Last packet not seen for sequence %u", seq_id);
260     }
261     if (thd.seq_errors > 0) {
262       AddFailure("Sequence %u had %" PRIu64 " packets out of sync", seq_id,
263                  thd.seq_errors);
264     }
265     if (thd.counter_errors > 0) {
266       AddFailure("Sequence %u had %" PRIu64 " packets counter errors", seq_id,
267                  thd.counter_errors);
268     }
269   }
270 
271   error_log_.reset();
272   PERFETTO_ILOG("Completed \"%s\"", cfg_name);
273 }
274 
ReadbackTrace(const std::string & trace_file_path,ParsedTraceStats * ctx)275 void TestHarness::ReadbackTrace(const std::string& trace_file_path,
276                                 ParsedTraceStats* ctx) {
277   TestResult& test_result = test_results_.back();
278   using namespace protozero::proto_utils;
279   auto fd = base::OpenFile(trace_file_path.c_str(), O_RDONLY);
280   if (!fd)
281     return AddFailure("Trace file does not exist");
282   const off_t file_size = lseek(*fd, 0, SEEK_END);
283   lseek(*fd, 0, SEEK_SET);
284   if (file_size <= 0)
285     return AddFailure("Trace file is empty");
286 
287   test_result.trace_size_kb = static_cast<uint32_t>(file_size / 1000);
288   std::string trace_data;
289   PERFETTO_CHECK(base::ReadFileDescriptor(*fd, &trace_data));
290   const auto* const start = reinterpret_cast<const uint8_t*>(trace_data.data());
291   const uint8_t* const end = start + file_size;
292 
293   constexpr uint8_t kTracePacketTag = MakeTagLengthDelimited(1);
294 
295   for (auto* ptr = start; (end - ptr) > 2;) {
296     const uint8_t* tokenizer_start = ptr;
297     if (*(ptr++) != kTracePacketTag) {
298       return AddFailure("Tokenizer failure at offset %zd", ptr - start);
299     }
300     uint64_t packet_size = 0;
301     ptr = ParseVarInt(ptr, end, &packet_size);
302     const uint8_t* const packet_start = ptr;
303     ptr += packet_size;
304     if ((ptr - tokenizer_start) < 2 || ptr > end)
305       return AddFailure("Got invalid packet size %" PRIu64 " at offset %zd",
306                         packet_size,
307                         static_cast<ssize_t>(packet_start - start));
308     ParseTracePacket(packet_start, static_cast<size_t>(packet_size), ctx);
309   }
310   test_result.num_threads = static_cast<uint32_t>(ctx->threads.size());
311 }
312 
ParseTracePacket(const uint8_t * start,size_t size,ParsedTraceStats * ctx)313 void TestHarness::ParseTracePacket(const uint8_t* start,
314                                    size_t size,
315                                    ParsedTraceStats* ctx) {
316   TestResult& test_result = test_results_.back();
317   protos::pbzero::TracePacket::Decoder packet(start, size);
318   if (!packet.has_for_testing())
319     return;
320 
321   ++test_result.num_packets;
322   const uint32_t seq_id = packet.trusted_packet_sequence_id();
323 
324   protos::pbzero::TestEvent::Decoder te(packet.for_testing());
325   auto t_it = ctx->threads.find(seq_id);
326   bool is_first_packet = false;
327   if (t_it == ctx->threads.end()) {
328     is_first_packet = true;
329     t_it = ctx->threads.emplace(seq_id, ParsedTraceStats::WriterThread()).first;
330   }
331   ParsedTraceStats::WriterThread& thd = t_it->second;
332 
333   ++thd.packets_seen;
334   if (te.is_last()) {
335     if (thd.last_seen) {
336       return AddFailure(
337           "last_seen=true happened more than once for sequence %u", seq_id);
338     } else {
339       thd.last_seen = true;
340     }
341   }
342   if (is_first_packet) {
343     thd.rnd_engine = std::minstd_rand0(te.seq_value());
344   } else {
345     const uint32_t expected = static_cast<uint32_t>(thd.rnd_engine());
346     if (te.seq_value() != expected) {
347       thd.rnd_engine = std::minstd_rand0(te.seq_value());  // Resync the engine.
348       ++thd.seq_errors;
349       return AddFailure(
350           "TestEvent seq mismatch for sequence %u. Expected %u got %u", seq_id,
351           expected, te.seq_value());
352     }
353     if (te.counter() != thd.packets_seen) {
354       return AddFailure(
355           "TestEvent counter mismatch for sequence %u. Expected %" PRIu64
356           " got %" PRIu64,
357           seq_id, thd.packets_seen, te.counter());
358     }
359   }
360 
361   if (!te.has_payload()) {
362     return AddFailure("TestEvent %u for sequence %u has no payload",
363                       te.seq_value(), seq_id);
364   }
365 
366   // Check the validity of the payload. The payload might be nested. If that is
367   // the case, we need to check all levels.
368   protozero::ConstBytes payload_bounds = te.payload();
369   for (uint32_t depth = 0, last_depth = 0;; depth++) {
370     if (depth > 100) {
371       return AddFailure("Unexpectedly deep depth for event %u, sequence %u",
372                         te.seq_value(), seq_id);
373     }
374     protos::pbzero::TestEvent::TestPayload::Decoder payload(payload_bounds);
375     const uint32_t rem_depth = payload.remaining_nesting_depth();
376 
377     // The payload is a repeated field and must have exactly two instances.
378     // The writer splits it always in two halves of identical size.
379     int num_payload_pieces = 0;
380     size_t last_size = 0;
381     for (auto it = payload.str(); it; ++it, ++num_payload_pieces) {
382       protozero::ConstChars payload_str = *it;
383       last_size = last_size ? last_size : payload_str.size;
384       if (payload_str.size != last_size) {
385         return AddFailure(
386             "Asymmetrical payload at depth %u, event id %u, sequence %u. "
387             "%zu != %zu",
388             depth, te.seq_value(), seq_id, last_size, payload_str.size);
389       }
390       // Check that the payload content matches the expected sequence.
391       for (size_t i = 0; i < payload_str.size; i++) {
392         char exp = static_cast<char>(33 + ((te.seq_value() + i) % 64));
393         if (payload_str.data[i] != exp) {
394           return AddFailure(
395               "Payload mismatch at %zu, depth %u, event id %u, sequence %u. "
396               "Expected: 0x%x, Actual: 0x%x",
397               i, depth, te.seq_value(), seq_id, exp, payload_str.data[i]);
398         }
399       }
400     }
401     if (num_payload_pieces != 2) {
402       return AddFailure(
403           "Broken payload at depth %u, event id %u, sequence %u. "
404           "Expecting 2 repeated str fields, got %d",
405           depth, te.seq_value(), seq_id, num_payload_pieces);
406     }
407 
408     if (depth > 0 && rem_depth != last_depth - 1) {
409       return AddFailure(
410           "Unexpected nesting level (expected: %u, actual: %u) at depth %u, "
411           "event id %u, sequence %u",
412           rem_depth, last_depth - 1, depth, te.seq_value(), seq_id);
413     }
414 
415     last_depth = rem_depth;
416     if (rem_depth == 0)
417       break;
418     if (payload.has_nested()) {
419       payload_bounds = *payload.nested();
420     } else {
421       payload_bounds = {nullptr, 0};
422     }
423   }
424 }
425 
CtrlCHandler()426 void CtrlCHandler() {
427   g_sig->aborted.store(true);
428   for (auto it = g_sig->pids_to_kill.rbegin(); it != g_sig->pids_to_kill.rend();
429        it++) {
430 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
431     base::ScopedPlatformHandle proc_handle(
432         ::OpenProcess(PROCESS_TERMINATE, false, static_cast<DWORD>(*it)));
433     ::TerminateProcess(*proc_handle, STATUS_CONTROL_C_EXIT);
434 #else
435     kill(*it, SIGKILL);
436 #endif
437   }
438 }
439 
StressTestMain(int argc,char ** argv)440 void StressTestMain(int argc, char** argv) {
441   TestHarness th;
442   std::regex filter;
443   bool has_filter = false;
444 
445   bool verbose = false;
446   for (int i = 1; i < argc; ++i) {
447     if (!strcmp(argv[i], "-v")) {
448       verbose = true;
449     } else {
450       filter = std::regex(argv[i], std::regex::ECMAScript | std::regex::icase);
451       has_filter = true;
452     }
453   }
454 
455   g_sig = new SigHandlerCtx();
456   base::InstallCtrlCHandler(&CtrlCHandler);
457 
458   for (size_t i = 0; i < base::ArraySize(kStressTestConfigs) && !g_sig->aborted;
459        ++i) {
460     const auto& cfg_blob = kStressTestConfigs[i];
461     StressTestConfig cfg;
462     std::cmatch ignored;
463     if (has_filter && !std::regex_search(cfg_blob.name, ignored, filter)) {
464       continue;
465     }
466     PERFETTO_CHECK(cfg.ParseFromArray(cfg_blob.data, cfg_blob.size));
467     th.RunConfig(cfg_blob.name, cfg, verbose);
468   }
469 
470   for (const auto& tres : th.test_results()) {
471     const auto& cfg = tres.cfg;
472     printf("===============================================================\n");
473     printf("Config: %s\n", tres.cfg_name);
474     printf("===============================================================\n");
475     printf("%-20s %-10s %-10s\n", "Metric", "Expected", "Actual");
476     printf("%-20s %-10s %-10s\n", "------", "--------", "------");
477     printf("%-20s %-10d %-10u\n", "#Errors", 0, tres.num_errors);
478     printf("%-20s %-10u %-10u \n", "Duration [ms]",
479            cfg.trace_config().duration_ms(), tres.run_time_ms);
480 
481     uint32_t exp_threads = cfg.num_processes() * cfg.num_threads();
482     printf("%-20s %-10u %-10u\n", "Num threads", exp_threads, tres.num_threads);
483 
484     double dur_s = cfg.trace_config().duration_ms() / 1e3;
485     double exp_per_thread = cfg.steady_state_timings().rate_mean() * dur_s;
486     if (cfg.burst_period_ms()) {
487       double burst_rate = 1.0 * cfg.burst_duration_ms() / cfg.burst_period_ms();
488       exp_per_thread *= 1.0 - burst_rate;
489       exp_per_thread += burst_rate * cfg.burst_timings().rate_mean() * dur_s;
490     }
491     if (cfg.max_events())
492       exp_per_thread = std::min(exp_per_thread, 1.0 * cfg.max_events());
493     double exp_packets = std::round(exp_per_thread * exp_threads);
494     printf("%-20s %-10.0f %-10u\n", "Num packets", exp_packets,
495            tres.num_packets);
496 
497     double exp_size_kb = exp_packets * (cfg.nesting() + 1) *
498                          (cfg.steady_state_timings().payload_mean() + 40) /
499                          1000;
500     printf("%-20s ~%-9.0f %-10u\n", "Trace size [KB]", exp_size_kb,
501            tres.trace_size_kb);
502 
503     double exp_rss_mb = cfg.trace_config().buffers()[0].size_kb() / 1000;
504     printf("%-20s (max) %-4.0f %-10u\n", "Svc RSS [MB]", exp_rss_mb,
505            tres.svc_rusage.max_rss_kb / 1000);
506     printf("%-20s %-10s %-10u\n", "Svc CPU [ms]", "---",
507            tres.svc_rusage.cpu_time_ms());
508     printf("%-20s %-10s %u / %u\n", "Svc #ctxswitch", "---",
509            tres.svc_rusage.invol_ctx_switch, tres.svc_rusage.vol_ctx_switch);
510 
511     printf("%-20s %-10s %-10u\n", "Prod RSS [MB]", "---",
512            tres.prod_rusage.max_rss_kb / 1000);
513     printf("%-20s %-10s %-10u\n", "Prod CPU [ms]", "---",
514            tres.prod_rusage.cpu_time_ms());
515     printf("%-20s %-10s %u / %u\n", "Prod #ctxswitch", "---",
516            tres.prod_rusage.invol_ctx_switch, tres.prod_rusage.vol_ctx_switch);
517     printf("\n");
518   }
519 }
520 
521 }  // namespace
522 }  // namespace perfetto
523 
main(int argc,char ** argv)524 int main(int argc, char** argv) {
525   perfetto::StressTestMain(argc, argv);
526 }
527