#!/usr/bin/env python3 # Copyright (C) 2023 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import concurrent.futures import datetime import difflib import os import subprocess import sys import tempfile from binascii import unhexlify from dataclasses import dataclass from typing import List, Tuple, Optional from google.protobuf import text_format, message_factory, descriptor_pool from python.generators.diff_tests.testing import TestCase, TestType, BinaryProto from python.generators.diff_tests.utils import ( ColorFormatter, create_message_factory, get_env, get_trace_descriptor_path, read_all_tests, serialize_python_trace, serialize_textproto_trace) ROOT_DIR = os.path.dirname( os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Performance result of running the test. @dataclass class PerfResult: test: TestCase ingest_time_ns: int real_time_ns: int def __init__(self, test: TestCase, perf_lines: List[str]): self.test = test assert len(perf_lines) == 1 perf_numbers = perf_lines[0].split(',') assert len(perf_numbers) == 2 self.ingest_time_ns = int(perf_numbers[0]) self.real_time_ns = int(perf_numbers[1]) # Data gathered from running the test. @dataclass class TestResult: test: TestCase trace: str cmd: List[str] expected: str actual: str passed: bool stderr: str exit_code: int perf_result: Optional[PerfResult] def __init__(self, test: TestCase, gen_trace_path: str, cmd: List[str], expected_text: str, actual_text: str, stderr: str, exit_code: int, perf_lines: List[str]) -> None: self.test = test self.trace = gen_trace_path self.cmd = cmd self.stderr = stderr self.exit_code = exit_code # For better string formatting we often add whitespaces, which has to now # be removed. def strip_whitespaces(text: str): no_front_new_line_text = text.lstrip('\n') return '\n'.join(s.strip() for s in no_front_new_line_text.split('\n')) self.expected = strip_whitespaces(expected_text) self.actual = strip_whitespaces(actual_text) expected_content = self.expected.replace('\r\n', '\n') actual_content = self.actual.replace('\r\n', '\n') self.passed = (expected_content == actual_content) if self.exit_code == 0: self.perf_result = PerfResult(self.test, perf_lines) else: self.perf_result = None def write_diff(self): expected_lines = self.expected.splitlines(True) actual_lines = self.actual.splitlines(True) diff = difflib.unified_diff( expected_lines, actual_lines, fromfile='expected', tofile='actual') return "".join(list(diff)) def rebase(self, rebase) -> str: if not rebase or self.passed: return "" if not self.test.blueprint.is_out_file(): return f"Can't rebase expected results passed as strings.\n" if self.exit_code != 0: return f"Rebase failed for {self.test.name} as query failed\n" with open(self.test.expected_path, 'w') as f: f.write(self.actual) return f"Rebasing {self.test.name}\n" # Results of running the test suite. Mostly used for printing aggregated # results. @dataclass class TestResults: test_failures: List[str] perf_data: List[PerfResult] rebased: List[str] test_time_ms: int def str(self, no_colors: bool, tests_no: int): c = ColorFormatter(no_colors) res = ( f"[==========] {tests_no} tests ran. ({self.test_time_ms} ms total)\n" f"{c.green('[ PASSED ]')} " f"{tests_no - len(self.test_failures)} tests.\n") if len(self.test_failures) > 0: res += (f"{c.red('[ FAILED ]')} " f"{len(self.test_failures)} tests.\n") for failure in self.test_failures: res += f"{c.red('[ FAILED ]')} {failure}\n" return res def rebase_str(self): res = f"\n[ REBASED ] {len(self.rebased)} tests.\n" for name in self.rebased: res += f"[ REBASED ] {name}\n" return res # Responsible for executing singular diff test. @dataclass class TestCaseRunner: test: TestCase trace_processor_path: str trace_descriptor_path: str colors: ColorFormatter def __output_to_text_proto(self, actual: str, out: BinaryProto) -> str: """Deserializes a binary proto and returns its text representation. Args: actual: (string) HEX encoded serialized proto message message_type: (string) Message type Returns: Text proto """ try: raw_data = unhexlify(actual.splitlines()[-1][1:-1]) out_path = os.path.dirname(self.trace_processor_path) descriptor_paths = [ f.path for f in os.scandir( os.path.join(ROOT_DIR, out_path, 'gen', 'protos', 'perfetto', 'trace_processor')) if f.is_file() and os.path.splitext(f.name)[1] == '.descriptor' ] descriptor_paths.append( os.path.join(ROOT_DIR, out_path, 'gen', 'protos', 'third_party', 'pprof', 'profile.descriptor')) proto = create_message_factory(descriptor_paths, out.message_type)() proto.ParseFromString(raw_data) try: return out.post_processing(proto) except: return '' return text_format.MessageToString(proto) except: return '' def __run_metrics_test(self, trace_path: str, metrics_message_factory) -> TestResult: if self.test.blueprint.is_out_file(): with open(self.test.expected_path, 'r') as expected_file: expected = expected_file.read() else: expected = self.test.blueprint.out.contents tmp_perf_file = tempfile.NamedTemporaryFile(delete=False) is_json_output_file = self.test.blueprint.is_out_file( ) and os.path.basename(self.test.expected_path).endswith('.json.out') is_json_output = is_json_output_file or self.test.blueprint.is_out_json() cmd = [ self.trace_processor_path, '--analyze-trace-proto-content', '--crop-track-events', '--run-metrics', self.test.blueprint.query.name, '--metrics-output=%s' % ('json' if is_json_output else 'binary'), '--perf-file', tmp_perf_file.name, trace_path, ] tp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env(ROOT_DIR)) (stdout, stderr) = tp.communicate() if is_json_output: expected_text = expected actual_text = stdout.decode('utf8') else: # Expected will be in text proto format and we'll need to parse it to # a real proto. expected_message = metrics_message_factory() text_format.Merge(expected, expected_message) # Actual will be the raw bytes of the proto and we'll need to parse it # into a message. actual_message = metrics_message_factory() actual_message.ParseFromString(stdout) # Convert both back to text format. expected_text = text_format.MessageToString(expected_message) actual_text = text_format.MessageToString(actual_message) perf_lines = [line.decode('utf8') for line in tmp_perf_file.readlines()] tmp_perf_file.close() os.remove(tmp_perf_file.name) return TestResult(self.test, trace_path, cmd, expected_text, actual_text, stderr.decode('utf8'), tp.returncode, perf_lines) # Run a query based Diff Test. def __run_query_test(self, trace_path: str) -> TestResult: # Fetch expected text. if self.test.expected_path: with open(self.test.expected_path, 'r') as expected_file: expected = expected_file.read() else: expected = self.test.blueprint.out.contents # Fetch query. if self.test.blueprint.is_query_file(): query = self.test.query_path else: tmp_query_file = tempfile.NamedTemporaryFile(delete=False) with open(tmp_query_file.name, 'w') as query_file: query_file.write(self.test.blueprint.query) query = tmp_query_file.name tmp_perf_file = tempfile.NamedTemporaryFile(delete=False) cmd = [ self.trace_processor_path, '--analyze-trace-proto-content', '--crop-track-events', '-q', query, '--perf-file', tmp_perf_file.name, trace_path, ] tp = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=get_env(ROOT_DIR)) (stdout, stderr) = tp.communicate() if not self.test.blueprint.is_query_file(): tmp_query_file.close() os.remove(tmp_query_file.name) perf_lines = [line.decode('utf8') for line in tmp_perf_file.readlines()] tmp_perf_file.close() os.remove(tmp_perf_file.name) actual = stdout.decode('utf8') if self.test.blueprint.is_out_binaryproto(): actual = self.__output_to_text_proto(actual, self.test.blueprint.out) return TestResult(self.test, trace_path, cmd, expected, actual, stderr.decode('utf8'), tp.returncode, perf_lines) def __run(self, metrics_descriptor_paths: List[str], extension_descriptor_paths: List[str], keep_input, rebase) -> Tuple[TestResult, str]: # We can't use delete=True here. When using that on Windows, the # resulting file is opened in exclusive mode (in turn that's a subtle # side-effect of the underlying CreateFile(FILE_ATTRIBUTE_TEMPORARY)) # and TP fails to open the passed path. gen_trace_file = None if self.test.blueprint.is_trace_file(): if self.test.trace_path.endswith('.py'): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) serialize_python_trace(ROOT_DIR, self.trace_descriptor_path, self.test.trace_path, gen_trace_file) elif self.test.trace_path.endswith('.textproto'): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) serialize_textproto_trace(self.trace_descriptor_path, extension_descriptor_paths, self.test.trace_path, gen_trace_file) elif self.test.blueprint.is_trace_textproto(): gen_trace_file = tempfile.NamedTemporaryFile(delete=False) proto = create_message_factory([self.trace_descriptor_path] + extension_descriptor_paths, 'perfetto.protos.Trace')() text_format.Merge(self.test.blueprint.trace.contents, proto) gen_trace_file.write(proto.SerializeToString()) gen_trace_file.flush() else: gen_trace_file = tempfile.NamedTemporaryFile(delete=False) with open(gen_trace_file.name, 'w') as trace_file: trace_file.write(self.test.blueprint.trace.contents) if gen_trace_file: trace_path = os.path.realpath(gen_trace_file.name) else: trace_path = self.test.trace_path str = f"{self.colors.yellow('[ RUN ]')} {self.test.name}\n" if self.test.type == TestType.QUERY: result = self.__run_query_test(trace_path) elif self.test.type == TestType.METRIC: result = self.__run_metrics_test( trace_path, create_message_factory(metrics_descriptor_paths, 'perfetto.protos.TraceMetrics')) else: assert False if gen_trace_file: if keep_input: str += f"Saving generated input trace: {trace_path}\n" else: gen_trace_file.close() os.remove(trace_path) def write_cmdlines(): res = "" if not gen_trace_file: res += 'Command to generate trace:\n' res += 'tools/serialize_test_trace.py ' res += '--descriptor {} {} > {}\n'.format( os.path.relpath(self.trace_descriptor_path, ROOT_DIR), os.path.relpath(self.test.trace_path, ROOT_DIR), os.path.relpath(trace_path, ROOT_DIR)) res += f"Command line:\n{' '.join(result.cmd)}\n" return res if result.exit_code != 0 or not result.passed: str += result.stderr if result.exit_code == 0: str += f"Expected did not match actual for test {self.test.name}.\n" str += write_cmdlines() str += result.write_diff() else: str += write_cmdlines() str += (f"{self.colors.red('[ FAILED ]')} {self.test.name}\n") str += result.rebase(rebase) return result, str else: str += (f"{self.colors.green('[ OK ]')} {self.test.name} " f"(ingest: {result.perf_result.ingest_time_ns / 1000000:.2f} ms " f"query: {result.perf_result.real_time_ns / 1000000:.2f} ms)\n") return result, str # Run a TestCase. def execute(self, extension_descriptor_paths: List[str], metrics_descriptor: str, keep_input: bool, rebase: bool) -> Tuple[str, str, TestResult]: if metrics_descriptor: metrics_descriptor_paths = [metrics_descriptor] else: out_path = os.path.dirname(self.trace_processor_path) metrics_protos_path = os.path.join(out_path, 'gen', 'protos', 'perfetto', 'metrics') metrics_descriptor_paths = [ os.path.join(metrics_protos_path, 'metrics.descriptor'), os.path.join(metrics_protos_path, 'chrome', 'all_chrome_metrics.descriptor'), os.path.join(metrics_protos_path, 'webview', 'all_webview_metrics.descriptor') ] result_str = "" result, run_str = self.__run(metrics_descriptor_paths, extension_descriptor_paths, keep_input, rebase) result_str += run_str if not result: return self.test.name, result_str, None return self.test.name, result_str, result # Fetches and executes all diff viable tests. @dataclass class DiffTestsRunner: tests: List[TestCase] trace_processor_path: str trace_descriptor_path: str test_runners: List[TestCaseRunner] def __init__(self, name_filter: str, trace_processor_path: str, trace_descriptor: str, no_colors: bool): self.tests = read_all_tests(name_filter, ROOT_DIR) self.trace_processor_path = trace_processor_path out_path = os.path.dirname(self.trace_processor_path) self.trace_descriptor_path = get_trace_descriptor_path( out_path, trace_descriptor) self.test_runners = [] color_formatter = ColorFormatter(no_colors) for test in self.tests: self.test_runners.append( TestCaseRunner(test, self.trace_processor_path, self.trace_descriptor_path, color_formatter)) def run_all_tests(self, metrics_descriptor: str, keep_input: bool, rebase: bool) -> TestResults: perf_results = [] failures = [] rebased = [] test_run_start = datetime.datetime.now() out_path = os.path.dirname(self.trace_processor_path) chrome_extensions = os.path.join(out_path, 'gen', 'protos', 'third_party', 'chromium', 'chrome_track_event.descriptor') test_extensions = os.path.join(out_path, 'gen', 'protos', 'perfetto', 'trace', 'test_extensions.descriptor') with concurrent.futures.ProcessPoolExecutor() as e: fut = [ e.submit(test.execute, [chrome_extensions, test_extensions], metrics_descriptor, keep_input, rebase) for test in self.test_runners ] for res in concurrent.futures.as_completed(fut): test_name, res_str, result = res.result() sys.stderr.write(res_str) if not result or not result.passed: if rebase: rebased.append(test_name) failures.append(test_name) else: perf_results.append(result.perf_result) test_time_ms = int( (datetime.datetime.now() - test_run_start).total_seconds() * 1000) return TestResults(failures, perf_results, rebased, test_time_ms)