#!/usr/bin/env python3 # Copyright (C) 2023 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures import datetime import difflib import os import subprocess import sys import tempfile from binascii import unhexlify from dataclasses import dataclass from typing import List, Tuple, Optional from google.protobuf import text_format from python.generators.diff_tests.testing import Metric, MetricV2SpecTextproto, Path, TestCase, TestType, BinaryProto, TextProto from python.generators.diff_tests.utils import ( ColorFormatter, create_message_factory, get_env, get_trace_descriptor_path, read_all_tests, serialize_python_trace, serialize_textproto_trace, modify_trace) ROOT_DIR = os.path.dirname( os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Performance result of running the test. @dataclass class PerfResult: test: TestCase ingest_time_ns: int real_time_ns: int def __init__(self, test: TestCase, perf_lines: List[str]): self.test = test assert len(perf_lines) == 1 perf_numbers = perf_lines[0].split(',') assert len(perf_numbers) == 2 self.ingest_time_ns = int(perf_numbers[0]) self.real_time_ns = int(perf_numbers[1]) # Data gathered from running the test. @dataclass class TestResult: test: TestCase trace: str cmd: List[str] expected: str actual: str passed: bool stderr: str exit_code: int perf_result: Optional[PerfResult] def __init__( self, test: TestCase, gen_trace_path: str, cmd: List[str], expected_text: str, actual_text: str, stderr: str, exit_code: int, perf_lines: List[str], ) -> None: self.test = test self.trace = gen_trace_path self.cmd = cmd self.stderr = stderr self.exit_code = exit_code # For better string formatting we often add whitespaces, which has to now # be removed. def strip_whitespaces(text: str): no_front_new_line_text = text.lstrip('\n') return '\n'.join(s.strip() for s in no_front_new_line_text.split('\n')) self.expected = strip_whitespaces(expected_text) self.actual = strip_whitespaces(actual_text) expected_content = self.expected.replace('\r\n', '\n') actual_content = self.actual.replace('\r\n', '\n') self.passed = (expected_content == actual_content) if self.exit_code == 0: self.perf_result = PerfResult(self.test, perf_lines) else: self.perf_result = None def write_diff(self): expected_lines = self.expected.splitlines(True) actual_lines = self.actual.splitlines(True) diff = difflib.unified_diff( expected_lines, actual_lines, fromfile='expected', tofile='actual') return "".join(list(diff)) # Results of running the test suite. Mostly used for printing aggregated # results. @dataclass class TestResults: test_failures: List[str] perf_data: List[PerfResult] test_time_ms: int def str(self, no_colors: bool, tests_no: int): c = ColorFormatter(no_colors) res = ( f"[==========] {tests_no} tests ran. ({self.test_time_ms} ms total)\n" f"{c.green('[ PASSED ]')} " f"{tests_no - len(self.test_failures)} tests.\n") if len(self.test_failures) > 0: res += (f"{c.red('[ FAILED ]')} " f"{len(self.test_failures)} tests.\n") for failure in self.test_failures: res += f"{c.red('[ FAILED ]')} {failure}\n" return res # Responsible for executing singular diff test. @dataclass class TestCaseRunner: test: TestCase trace_processor_path: str trace_descriptor_path: str colors: ColorFormatter override_sql_module_paths: List[str] def __output_to_text_proto(self, actual: str, out: BinaryProto) -> str: """Deserializes a binary proto and returns its text representation. Args: actual: (string) HEX encoded serialized proto message message_type: (string) Message type Returns: Text proto """ try: protos_dir = os.path.join( ROOT_DIR, os.path.dirname(self.trace_processor_path), 'gen', 'protos', ) raw_data = unhexlify(actual.splitlines()[-1][1:-1]) descriptor_paths = [ f.path for f in os.scandir( os.path.join(protos_dir, 'perfetto', 'trace_processor')) if f.is_file() and os.path.splitext(f.name)[1] == '.descriptor' ] descriptor_paths.append( os.path.join(protos_dir, 'third_party', 'pprof', 'profile.descriptor')) proto = create_message_factory(descriptor_paths, out.message_type)() proto.ParseFromString(raw_data) try: return out.post_processing(proto) except: return '' except: return '' def __run_metrics_test(self, trace_path: str, metrics_message_factory) -> TestResult: with tempfile.NamedTemporaryFile(delete=False) as tmp_perf_file: assert isinstance(self.test.blueprint.query, Metric) is_json_output_file = self.test.blueprint.is_out_file( ) and os.path.basename(self.test.expected_path).endswith('.json.out') is_json_output = is_json_output_file or self.test.blueprint.is_out_json() cmd = [ self.trace_processor_path, '--analyze-trace-proto-content', '--crop-track-events', '--extra-checks', '--run-metrics', self.test.blueprint.query.name, '--metrics-output=%s' % ('json' if is_json_output else 'binary'), '--perf-file', tmp_perf_file.name, trace_path, ] if self.test.register_files_dir: cmd += ['--register-files-dir', self.test.register_files_dir] for sql_module_path in self.override_sql_module_paths: cmd += ['--override-sql-module', sql_module_path] tp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env(ROOT_DIR)) (stdout, stderr) = tp.communicate() if is_json_output: expected_text = self.test.expected_str actual_text = stdout.decode('utf8') else: # Expected will be in text proto format and we'll need to parse it to # a real proto. expected_message = metrics_message_factory() text_format.Merge(self.test.expected_str, expected_message) # Actual will be the raw bytes of the proto and we'll need to parse it # into a message. actual_message = metrics_message_factory() actual_message.ParseFromString(stdout) # Convert both back to text format. expected_text = text_format.MessageToString(expected_message) actual_text = text_format.MessageToString(actual_message) os.remove(tmp_perf_file.name) return TestResult( self.test, trace_path, cmd, expected_text, actual_text, stderr.decode('utf8'), tp.returncode, [line.decode('utf8') for line in tmp_perf_file.readlines()], ) def __run_metrics_v2_test( self, trace_path: str, keep_input: bool, summary_spec_message_factory, summary_message_factory, ) -> TestResult: with tempfile.NamedTemporaryFile(delete=False) as tmp_perf_file, \ tempfile.NamedTemporaryFile(delete=False) as tmp_spec_file: assert isinstance(self.test.blueprint.query, MetricV2SpecTextproto) spec_message = summary_spec_message_factory() text_format.Merge(self.test.blueprint.query.contents, spec_message.metric_spec.add()) tmp_spec_file.write(spec_message.SerializeToString()) tmp_spec_file.flush() cmd = [ self.trace_processor_path, '--analyze-trace-proto-content', '--crop-track-events', '--extra-checks', '--perf-file', tmp_perf_file.name, '--summary', '--summary-spec', tmp_spec_file.name, '--summary-metrics-v2', spec_message.metric_spec[0].id, '--summary-format', 'binary', trace_path, ] for sql_module_path in self.override_sql_module_paths: cmd += ['--override-sql-module', sql_module_path] tp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env(ROOT_DIR), ) (stdout, stderr) = tp.communicate() # Expected will be in text proto format and we'll need to parse it to # a real proto. expected_summary = summary_message_factory() text_format.Merge(self.test.expected_str, expected_summary.metric.add()) # Actual will be the raw bytes of the proto and we'll need to parse it # into a message. actual_summary = summary_message_factory() actual_summary.ParseFromString(stdout) os.remove(tmp_perf_file.name) if not keep_input: os.remove(tmp_spec_file.name) return TestResult( self.test, trace_path, cmd, text_format.MessageToString(expected_summary.metric[0]), text_format.MessageToString(actual_summary.metric[0]), stderr.decode('utf8'), tp.returncode, [line.decode('utf8') for line in tmp_perf_file.readlines()], ) # Run a query based Diff Test. def __run_query_test(self, trace_path: str) -> TestResult: with tempfile.NamedTemporaryFile(delete=False) as tmp_perf_file: cmd = [ self.trace_processor_path, '--analyze-trace-proto-content', '--crop-track-events', '--extra-checks', '--perf-file', tmp_perf_file.name, trace_path, ] if self.test.blueprint.is_query_file(): cmd += ['-q', self.test.query_path] else: assert isinstance(self.test.blueprint.query, str) cmd += ['-Q', self.test.blueprint.query] if self.test.register_files_dir: cmd += ['--register-files-dir', self.test.register_files_dir] for sql_module_path in self.override_sql_module_paths: cmd += ['--override-sql-module', sql_module_path] tp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env(ROOT_DIR)) (stdout, stderr) = tp.communicate() actual = stdout.decode('utf8') if self.test.blueprint.is_out_binaryproto(): assert isinstance(self.test.blueprint.out, BinaryProto) actual = self.__output_to_text_proto(actual, self.test.blueprint.out) os.remove(tmp_perf_file.name) return TestResult( self.test, trace_path, cmd, self.test.expected_str, actual, stderr.decode('utf8'), tp.returncode, [line.decode('utf8') for line in tmp_perf_file.readlines()], ) def __run( self, summary_descriptor_path: str, metrics_descriptor_paths: List[str], extension_descriptor_paths: List[str], keep_input, ) -> Tuple[TestResult, str]: # We can't use delete=True here. When using that on Windows, the # resulting file is opened in exclusive mode (in turn that's a subtle # side-effect of the underlying CreateFile(FILE_ATTRIBUTE_TEMPORARY)) # and TP fails to open the passed path. gen_trace_file = None if self.test.blueprint.is_trace_file(): assert self.test.trace_path if self.test.trace_path.endswith('.py'): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) serialize_python_trace(ROOT_DIR, self.trace_descriptor_path, self.test.trace_path, gen_trace_file) elif self.test.trace_path.endswith('.textproto'): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) serialize_textproto_trace(self.trace_descriptor_path, extension_descriptor_paths, self.test.trace_path, gen_trace_file) elif self.test.blueprint.is_trace_textproto(): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) proto = create_message_factory([self.trace_descriptor_path] + extension_descriptor_paths, 'perfetto.protos.Trace')() assert isinstance(self.test.blueprint.trace, TextProto) text_format.Merge(self.test.blueprint.trace.contents, proto) gen_trace_file.write(proto.SerializeToString()) gen_trace_file.flush() else: gen_trace_file = tempfile.NamedTemporaryFile(delete=False) with open(gen_trace_file.name, 'w') as trace_file: trace_file.write(self.test.blueprint.trace.contents) if self.test.blueprint.trace_modifier is not None: if gen_trace_file: # Overwrite |gen_trace_file|. modify_trace(self.trace_descriptor_path, extension_descriptor_paths, gen_trace_file.name, gen_trace_file.name, self.test.blueprint.trace_modifier) else: # Create |gen_trace_file| to save the modified trace. gen_trace_file = tempfile.NamedTemporaryFile(delete=False) modify_trace(self.trace_descriptor_path, extension_descriptor_paths, self.test.trace_path, gen_trace_file.name, self.test.blueprint.trace_modifier) if gen_trace_file: trace_path = os.path.realpath(gen_trace_file.name) else: trace_path = self.test.trace_path assert trace_path str = f"{self.colors.yellow('[ RUN ]')} {self.test.name}\n" if self.test.type == TestType.QUERY: result = self.__run_query_test(trace_path) elif self.test.type == TestType.METRIC: result = self.__run_metrics_test( trace_path, create_message_factory(metrics_descriptor_paths, 'perfetto.protos.TraceMetrics'), ) elif self.test.type == TestType.METRIC_V2: result = self.__run_metrics_v2_test( trace_path, keep_input, create_message_factory([summary_descriptor_path], 'perfetto.protos.TraceSummarySpec'), create_message_factory([summary_descriptor_path], 'perfetto.protos.TraceSummary'), ) else: assert False if gen_trace_file: if not keep_input: gen_trace_file.close() os.remove(trace_path) def write_cmdlines(): res = "" if self.test.trace_path and (self.test.trace_path.endswith('.textproto') or self.test.trace_path.endswith('.py')): res += 'Command to generate trace:\n' res += 'tools/serialize_test_trace.py ' res += '--descriptor {} {} > {}\n'.format( os.path.relpath(self.trace_descriptor_path, ROOT_DIR), os.path.relpath(self.test.trace_path, ROOT_DIR), os.path.relpath(trace_path, ROOT_DIR)) res += f"Command line:\n{' '.join(result.cmd)}\n" return res if result.exit_code != 0 or not result.passed: result.passed = False str += result.stderr if result.exit_code == 0: str += f"Expected did not match actual for test {self.test.name}.\n" str += write_cmdlines() str += result.write_diff() else: str += write_cmdlines() str += (f"{self.colors.red('[ FAILED ]')} {self.test.name}\n") return result, str else: assert result.perf_result str += (f"{self.colors.green('[ OK ]')} {self.test.name} " f"(ingest: {result.perf_result.ingest_time_ns / 1000000:.2f} ms " f"query: {result.perf_result.real_time_ns / 1000000:.2f} ms)\n") return result, str # Run a TestCase. def execute( self, summary_descriptor_path: str, metrics_descriptor_paths: List[str], extension_descriptor_paths: List[str], keep_input: bool, ) -> Tuple[str, str, TestResult]: if not metrics_descriptor_paths: out_path = os.path.dirname(self.trace_processor_path) metrics_protos_path = os.path.join( out_path, 'gen', 'protos', 'perfetto', 'metrics', ) metrics_descriptor_paths = [ os.path.join(metrics_protos_path, 'metrics.descriptor'), os.path.join(metrics_protos_path, 'chrome', 'all_chrome_metrics.descriptor'), os.path.join(metrics_protos_path, 'webview', 'all_webview_metrics.descriptor') ] result, run_str = self.__run( summary_descriptor_path, metrics_descriptor_paths, extension_descriptor_paths, keep_input, ) return self.test.name, run_str, result # Fetches and executes all diff viable tests. @dataclass class DiffTestsRunner: tests: List[TestCase] trace_processor_path: str trace_descriptor_path: str test_runners: List[TestCaseRunner] quiet: bool def __init__( self, name_filter: str, trace_processor_path: str, trace_descriptor: str, no_colors: bool, override_sql_module_paths: List[str], test_dir: str, quiet: bool, ): self.tests = read_all_tests(name_filter, test_dir) self.trace_processor_path = trace_processor_path self.quiet = quiet out_path = os.path.dirname(self.trace_processor_path) self.trace_descriptor_path = get_trace_descriptor_path( out_path, trace_descriptor, ) self.test_runners = [] color_formatter = ColorFormatter(no_colors) for test in self.tests: self.test_runners.append( TestCaseRunner( test, self.trace_processor_path, self.trace_descriptor_path, color_formatter, override_sql_module_paths, )) def run_all_tests( self, summary_descriptor: str, metrics_descriptor_paths: List[str], chrome_extensions: str, test_extensions: str, winscope_extensions: str, keep_input: bool, ) -> TestResults: perf_results = [] failures = [] test_run_start = datetime.datetime.now() completed_tests = 0 with concurrent.futures.ProcessPoolExecutor() as e: fut = [ e.submit( test.execute, summary_descriptor, metrics_descriptor_paths, [chrome_extensions, test_extensions, winscope_extensions], keep_input, ) for test in self.test_runners ] for res in concurrent.futures.as_completed(fut): test_name, res_str, result = res.result() if self.quiet: completed_tests += 1 sys.stderr.write(f"\rRan {completed_tests} tests") if not result.passed: sys.stderr.write(f"\r") sys.stderr.write(res_str) else: sys.stderr.write(res_str) if not result or not result.passed: failures.append(test_name) else: perf_results.append(result.perf_result) test_time_ms = int( (datetime.datetime.now() - test_run_start).total_seconds() * 1000) if self.quiet: sys.stderr.write(f"\r") return TestResults(failures, perf_results, test_time_ms)