1#!/usr/bin/env python3 2# 3# Copyright (C) 2021 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""test_utils.py: utils for testing. 18""" 19 20import logging 21from multiprocessing.connection import Connection 22import os 23from pathlib import Path 24import shutil 25import sys 26import subprocess 27import time 28from typing import List, Optional 29import unittest 30 31from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str 32 33INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh')) 34 35 36class TestHelper: 37 """ Keep global test options. """ 38 39 @classmethod 40 def init( 41 cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str], 42 device_serial_number: Optional[str], 43 progress_conn: Optional[Connection]): 44 """ 45 When device_serial_number is None, no Android device is used. 46 When device_serial_number is '', use the default Android device. 47 When device_serial_number is not empty, select Android device by serial number. 48 """ 49 cls.script_dir = Path(__file__).resolve().parents[1] 50 cls.test_base_dir = Path(test_dir).resolve() 51 cls.test_base_dir.mkdir(parents=True, exist_ok=True) 52 cls.testdata_dir = Path(testdata_dir).resolve() 53 cls.browser_option = [] if use_browser else ['--no_browser'] 54 cls.ndk_path = ndk_path 55 cls.progress_conn = progress_conn 56 57 # Logs can come from multiple processes. So use append mode to avoid overwrite. 58 cls.log_fh = open(cls.test_base_dir / 'test.log', 'a') 59 logging.getLogger().handlers.clear() 60 logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh)) 61 os.close(sys.stderr.fileno()) 62 os.dup2(cls.log_fh.fileno(), sys.stderr.fileno()) 63 64 if device_serial_number is not None: 65 if device_serial_number: 66 os.environ['ANDROID_SERIAL'] = device_serial_number 67 cls.adb = AdbHelper(enable_switch_to_root=True) 68 cls.android_version = cls.adb.get_android_version() 69 cls.device_features = None 70 71 @classmethod 72 def log(cls, s: str): 73 cls.log_fh.write(s + '\n') 74 # Child processes can also write to log file, so flush it immediately to keep the order. 75 cls.log_fh.flush() 76 77 @classmethod 78 def testdata_path(cls, testdata_name: str) -> str: 79 """ Return the path of a test data. """ 80 return str(cls.testdata_dir / testdata_name) 81 82 @classmethod 83 def get_test_dir(cls, test_name: str) -> Path: 84 """ Return the dir to run a test. """ 85 return cls.test_base_dir / test_name 86 87 @classmethod 88 def script_path(cls, script_name: str) -> str: 89 """ Return the dir of python scripts. """ 90 return str(cls.script_dir / script_name) 91 92 @classmethod 93 def get_device_features(cls): 94 if cls.device_features is None: 95 args = [sys.executable, cls.script_path( 96 'run_simpleperf_on_device.py'), 'list', '--show-features'] 97 output = subprocess.check_output(args, stderr=TestHelper.log_fh) 98 output = bytes_to_str(output) 99 cls.device_features = output.split() 100 return cls.device_features 101 102 @classmethod 103 def is_trace_offcpu_supported(cls): 104 return 'trace-offcpu' in cls.get_device_features() 105 106 @classmethod 107 def get_32bit_abi(cls): 108 return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0] 109 110 @classmethod 111 def write_progress(cls, progress: str): 112 if cls.progress_conn: 113 cls.progress_conn.send(progress) 114 115 116class TestBase(unittest.TestCase): 117 def setUp(self): 118 """ Run each test in a separate dir. """ 119 self.test_dir = TestHelper.get_test_dir( 120 '%s.%s' % (self.__class__.__name__, self._testMethodName)) 121 self.test_dir.mkdir() 122 os.chdir(self.test_dir) 123 TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName)) 124 125 def run(self, result=None): 126 start_time = time.time() 127 ret = super(TestBase, self).run(result) 128 if result.errors and result.errors[-1][0] == self: 129 status = 'FAILED' 130 err_info = result.errors[-1][1] 131 elif result.failures and result.failures[-1][0] == self: 132 status = 'FAILED' 133 err_info = result.failures[-1][1] 134 else: 135 status = 'OK' 136 137 time_taken = time.time() - start_time 138 TestHelper.log( 139 'end test %s.%s %s (%.3fs)' % 140 (self.__class__.__name__, self._testMethodName, status, time_taken)) 141 if status != 'OK': 142 TestHelper.log(err_info) 143 144 # Remove test data for passed tests to save space. 145 if status == 'OK': 146 remove(self.test_dir) 147 TestHelper.write_progress( 148 '%s.%s %s' % (self.__class__.__name__, self._testMethodName, status)) 149 return ret 150 151 def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str: 152 if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT: 153 args += TestHelper.browser_option 154 if TestHelper.ndk_path: 155 if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py', 156 'report_html.py']: 157 args += ['--ndk_path', TestHelper.ndk_path] 158 if args[0].endswith('.py'): 159 args = [sys.executable, TestHelper.script_path(args[0])] + args[1:] 160 use_shell = args[0].endswith('.bat') 161 try: 162 if return_output: 163 stdout_fd = subprocess.PIPE 164 drop_output = False 165 elif drop_output: 166 stdout_fd = subprocess.DEVNULL 167 else: 168 stdout_fd = None 169 170 subproc = subprocess.Popen(args, stdout=stdout_fd, 171 stderr=TestHelper.log_fh, shell=use_shell) 172 stdout_data, _ = subproc.communicate() 173 output_data = bytes_to_str(stdout_data) 174 returncode = subproc.returncode 175 176 except OSError: 177 returncode = None 178 self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args) 179 if return_output: 180 return output_data 181 return '' 182 183 def check_strings_in_file(self, filename, strings): 184 self.check_exist(filename=filename) 185 with open(filename, 'r') as fh: 186 self.check_strings_in_content(fh.read(), strings) 187 188 def check_exist(self, filename=None, dirname=None): 189 if filename: 190 self.assertTrue(os.path.isfile(filename), filename) 191 if dirname: 192 self.assertTrue(os.path.isdir(dirname), dirname) 193 194 def check_strings_in_content(self, content, strings): 195 fulfilled = [content.find(s) != -1 for s in strings] 196 self.check_fulfilled_entries(fulfilled, strings) 197 198 def check_fulfilled_entries(self, fulfilled, entries): 199 failed_entries = [] 200 for ok, entry in zip(fulfilled, entries): 201 if not ok: 202 failed_entries.append(entry) 203 204 if failed_entries: 205 self.fail('failed in below entries: %s' % (failed_entries,)) 206