• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2021 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""test_utils.py: utils for testing.
18"""
19
20import logging
21from multiprocessing.connection import Connection
22import os
23from pathlib import Path
24import re
25import shutil
26import sys
27import subprocess
28import time
29from typing import List, Optional, Tuple, Union
30import unittest
31
32from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str
33
34INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh'))
35
36
37class TestHelper:
38    """ Keep global test options. """
39
40    @classmethod
41    def init(
42            cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str],
43            device_serial_number: Optional[str],
44            progress_conn: Optional[Connection]):
45        """
46            When device_serial_number is None, no Android device is used.
47            When device_serial_number is '', use the default Android device.
48            When device_serial_number is not empty, select Android device by serial number.
49        """
50        cls.script_dir = Path(__file__).resolve().parents[1]
51        cls.test_base_dir = Path(test_dir).resolve()
52        cls.test_base_dir.mkdir(parents=True, exist_ok=True)
53        cls.testdata_dir = Path(testdata_dir).resolve()
54        cls.browser_option = [] if use_browser else ['--no_browser']
55        cls.ndk_path = ndk_path
56        cls.progress_conn = progress_conn
57
58        # Logs can come from multiple processes. So use append mode to avoid overwrite.
59        cls.log_fh = open(cls.test_base_dir / 'test.log', 'a')
60        logging.getLogger().handlers.clear()
61        logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh))
62        os.close(sys.stderr.fileno())
63        os.dup2(cls.log_fh.fileno(), sys.stderr.fileno())
64
65        if device_serial_number is not None:
66            if device_serial_number:
67                os.environ['ANDROID_SERIAL'] = device_serial_number
68            cls.adb = AdbHelper(enable_switch_to_root=True)
69            cls.android_version = cls.adb.get_android_version()
70            cls.device_features = None
71
72    @classmethod
73    def log(cls, s: str):
74        cls.log_fh.write(s + '\n')
75        # Child processes can also write to log file, so flush it immediately to keep the order.
76        cls.log_fh.flush()
77
78    @classmethod
79    def testdata_path(cls, testdata_name: str) -> str:
80        """ Return the path of a test data. """
81        return str(cls.testdata_dir / testdata_name)
82
83    @classmethod
84    def get_test_dir(cls, test_name: str) -> Path:
85        """ Return the dir to run a test. """
86        return cls.test_base_dir / test_name
87
88    @classmethod
89    def script_path(cls, script_name: str) -> str:
90        """ Return the dir of python scripts. """
91        return str(cls.script_dir / script_name)
92
93    @classmethod
94    def get_device_features(cls):
95        if cls.device_features is None:
96            args = [sys.executable, cls.script_path(
97                'run_simpleperf_on_device.py'), 'list', '--show-features']
98            output = subprocess.check_output(args, stderr=TestHelper.log_fh)
99            output = bytes_to_str(output)
100            cls.device_features = output.split()
101        return cls.device_features
102
103    @classmethod
104    def is_trace_offcpu_supported(cls):
105        return 'trace-offcpu' in cls.get_device_features()
106
107    @classmethod
108    def get_32bit_abi(cls):
109        return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0]
110
111    @classmethod
112    def get_kernel_version(cls) -> Tuple[int]:
113        output = cls.adb.check_run_and_return_output(['shell', 'uname', '-r'])
114        m = re.search(r'^(\d+)\.(\d+)', output)
115        assert m
116        return (int(m.group(1)), int(m.group(2)))
117
118    @classmethod
119    def write_progress(cls, progress: str):
120        if cls.progress_conn:
121            cls.progress_conn.send(progress)
122
123
124class TestBase(unittest.TestCase):
125    def setUp(self):
126        """ Run each test in a separate dir. """
127        self.test_dir = TestHelper.get_test_dir(
128            '%s.%s' % (self.__class__.__name__, self._testMethodName))
129        self.test_dir.mkdir()
130        os.chdir(self.test_dir)
131        TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName))
132
133    def run(self, result=None):
134        start_time = time.time()
135        ret = super(TestBase, self).run(result)
136        if result.errors and result.errors[-1][0] == self:
137            status = 'FAILED'
138            err_info = result.errors[-1][1]
139        elif result.failures and result.failures[-1][0] == self:
140            status = 'FAILED'
141            err_info = result.failures[-1][1]
142        else:
143            status = 'OK'
144
145        time_taken = time.time() - start_time
146        TestHelper.log(
147            'end test %s.%s %s (%.3fs)' %
148            (self.__class__.__name__, self._testMethodName, status, time_taken))
149        if status != 'OK':
150            TestHelper.log(err_info)
151
152        # Remove test data for passed tests to save space.
153        if status == 'OK':
154            remove(self.test_dir)
155        TestHelper.write_progress(
156            '%s.%s  %s  %.3fs' %
157            (self.__class__.__name__, self._testMethodName, status, time_taken))
158        return ret
159
160    def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str:
161        if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT:
162            args += TestHelper.browser_option
163        if TestHelper.ndk_path:
164            if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py',
165                           'report_html.py']:
166                args += ['--ndk_path', TestHelper.ndk_path]
167        if args[0].endswith('.py'):
168            args = [sys.executable, TestHelper.script_path(args[0])] + args[1:]
169        use_shell = args[0].endswith('.bat')
170        try:
171            if return_output:
172                stdout_fd = subprocess.PIPE
173                drop_output = False
174            elif drop_output:
175                stdout_fd = subprocess.DEVNULL
176            else:
177                stdout_fd = None
178
179            subproc = subprocess.Popen(args, stdout=stdout_fd,
180                                       stderr=TestHelper.log_fh, shell=use_shell)
181            stdout_data, _ = subproc.communicate()
182            output_data = bytes_to_str(stdout_data)
183            returncode = subproc.returncode
184
185        except OSError:
186            returncode = None
187        self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args)
188        if return_output:
189            return output_data
190        return ''
191
192    def check_strings_in_file(self, filename, strings: List[Union[str, re.Pattern]]):
193        self.check_exist(filename=filename)
194        with open(filename, 'r') as fh:
195            self.check_strings_in_content(fh.read(), strings)
196
197    def check_exist(self, filename=None, dirname=None):
198        if filename:
199            self.assertTrue(os.path.isfile(filename), filename)
200        if dirname:
201            self.assertTrue(os.path.isdir(dirname), dirname)
202
203    def check_strings_in_content(self, content: str, strings: List[Union[str, re.Pattern]]):
204        fulfilled = []
205        for s in strings:
206            if isinstance(s, re.Pattern):
207                fulfilled.append(s.search(content))
208            else:
209                fulfilled.append(s in content)
210        self.check_fulfilled_entries(fulfilled, strings)
211
212    def check_fulfilled_entries(self, fulfilled, entries):
213        failed_entries = []
214        for ok, entry in zip(fulfilled, entries):
215            if not ok:
216                failed_entries.append(entry)
217
218        if failed_entries:
219            self.fail('failed in below entries: %s' % (failed_entries,))
220