1#!/usr/bin/env python3 2# 3# Copyright (C) 2021 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""test_utils.py: utils for testing. 18""" 19 20import logging 21from multiprocessing.connection import Connection 22import os 23from pathlib import Path 24import re 25import shutil 26import sys 27import subprocess 28import time 29from typing import List, Optional, Tuple, Union 30import unittest 31 32from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str 33 34INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh')) 35 36 37class TestHelper: 38 """ Keep global test options. """ 39 40 @classmethod 41 def init( 42 cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str], 43 device_serial_number: Optional[str], 44 progress_conn: Optional[Connection]): 45 """ 46 When device_serial_number is None, no Android device is used. 47 When device_serial_number is '', use the default Android device. 48 When device_serial_number is not empty, select Android device by serial number. 49 """ 50 cls.script_dir = Path(__file__).resolve().parents[1] 51 cls.test_base_dir = Path(test_dir).resolve() 52 cls.test_base_dir.mkdir(parents=True, exist_ok=True) 53 cls.testdata_dir = Path(testdata_dir).resolve() 54 cls.browser_option = [] if use_browser else ['--no_browser'] 55 cls.ndk_path = ndk_path 56 cls.progress_conn = progress_conn 57 58 # Logs can come from multiple processes. So use append mode to avoid overwrite. 59 cls.log_fh = open(cls.test_base_dir / 'test.log', 'a') 60 logging.getLogger().handlers.clear() 61 logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh)) 62 os.close(sys.stderr.fileno()) 63 os.dup2(cls.log_fh.fileno(), sys.stderr.fileno()) 64 65 if device_serial_number is not None: 66 if device_serial_number: 67 os.environ['ANDROID_SERIAL'] = device_serial_number 68 cls.adb = AdbHelper(enable_switch_to_root=True) 69 cls.android_version = cls.adb.get_android_version() 70 cls.device_features = None 71 72 @classmethod 73 def log(cls, s: str): 74 cls.log_fh.write(s + '\n') 75 # Child processes can also write to log file, so flush it immediately to keep the order. 76 cls.log_fh.flush() 77 78 @classmethod 79 def testdata_path(cls, testdata_name: str) -> str: 80 """ Return the path of a test data. """ 81 return str(cls.testdata_dir / testdata_name) 82 83 @classmethod 84 def get_test_dir(cls, test_name: str) -> Path: 85 """ Return the dir to run a test. """ 86 return cls.test_base_dir / test_name 87 88 @classmethod 89 def script_path(cls, script_name: str) -> str: 90 """ Return the dir of python scripts. """ 91 return str(cls.script_dir / script_name) 92 93 @classmethod 94 def get_device_features(cls): 95 if cls.device_features is None: 96 args = [sys.executable, cls.script_path( 97 'run_simpleperf_on_device.py'), 'list', '--show-features'] 98 output = subprocess.check_output(args, stderr=TestHelper.log_fh) 99 output = bytes_to_str(output) 100 cls.device_features = output.split() 101 return cls.device_features 102 103 @classmethod 104 def is_trace_offcpu_supported(cls): 105 return 'trace-offcpu' in cls.get_device_features() 106 107 @classmethod 108 def get_32bit_abi(cls): 109 return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0] 110 111 @classmethod 112 def get_kernel_version(cls) -> Tuple[int]: 113 output = cls.adb.check_run_and_return_output(['shell', 'uname', '-r']) 114 m = re.search(r'^(\d+)\.(\d+)', output) 115 assert m 116 return (int(m.group(1)), int(m.group(2))) 117 118 @classmethod 119 def write_progress(cls, progress: str): 120 if cls.progress_conn: 121 cls.progress_conn.send(progress) 122 123 124class TestBase(unittest.TestCase): 125 def setUp(self): 126 """ Run each test in a separate dir. """ 127 self.test_dir = TestHelper.get_test_dir( 128 '%s.%s' % (self.__class__.__name__, self._testMethodName)) 129 self.test_dir.mkdir() 130 os.chdir(self.test_dir) 131 TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName)) 132 133 def run(self, result=None): 134 start_time = time.time() 135 ret = super(TestBase, self).run(result) 136 if result.errors and result.errors[-1][0] == self: 137 status = 'FAILED' 138 err_info = result.errors[-1][1] 139 elif result.failures and result.failures[-1][0] == self: 140 status = 'FAILED' 141 err_info = result.failures[-1][1] 142 else: 143 status = 'OK' 144 145 time_taken = time.time() - start_time 146 TestHelper.log( 147 'end test %s.%s %s (%.3fs)' % 148 (self.__class__.__name__, self._testMethodName, status, time_taken)) 149 if status != 'OK': 150 TestHelper.log(err_info) 151 152 # Remove test data for passed tests to save space. 153 if status == 'OK': 154 remove(self.test_dir) 155 TestHelper.write_progress( 156 '%s.%s %s %.3fs' % 157 (self.__class__.__name__, self._testMethodName, status, time_taken)) 158 return ret 159 160 def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str: 161 if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT: 162 args += TestHelper.browser_option 163 if TestHelper.ndk_path: 164 if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py', 165 'report_html.py']: 166 args += ['--ndk_path', TestHelper.ndk_path] 167 if args[0].endswith('.py'): 168 args = [sys.executable, TestHelper.script_path(args[0])] + args[1:] 169 use_shell = args[0].endswith('.bat') 170 try: 171 if return_output: 172 stdout_fd = subprocess.PIPE 173 drop_output = False 174 elif drop_output: 175 stdout_fd = subprocess.DEVNULL 176 else: 177 stdout_fd = None 178 179 subproc = subprocess.Popen(args, stdout=stdout_fd, 180 stderr=TestHelper.log_fh, shell=use_shell) 181 stdout_data, _ = subproc.communicate() 182 output_data = bytes_to_str(stdout_data) 183 returncode = subproc.returncode 184 185 except OSError: 186 returncode = None 187 self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args) 188 if return_output: 189 return output_data 190 return '' 191 192 def check_strings_in_file(self, filename, strings: List[Union[str, re.Pattern]]): 193 self.check_exist(filename=filename) 194 with open(filename, 'r') as fh: 195 self.check_strings_in_content(fh.read(), strings) 196 197 def check_exist(self, filename=None, dirname=None): 198 if filename: 199 self.assertTrue(os.path.isfile(filename), filename) 200 if dirname: 201 self.assertTrue(os.path.isdir(dirname), dirname) 202 203 def check_strings_in_content(self, content: str, strings: List[Union[str, re.Pattern]]): 204 fulfilled = [] 205 for s in strings: 206 if isinstance(s, re.Pattern): 207 fulfilled.append(s.search(content)) 208 else: 209 fulfilled.append(s in content) 210 self.check_fulfilled_entries(fulfilled, strings) 211 212 def check_fulfilled_entries(self, fulfilled, entries): 213 failed_entries = [] 214 for ok, entry in zip(fulfilled, entries): 215 if not ok: 216 failed_entries.append(entry) 217 218 if failed_entries: 219 self.fail('failed in below entries: %s' % (failed_entries,)) 220