1#!/usr/bin/env python3 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19# pylint: disable=line-too-long 20 21import hashlib 22import os 23import subprocess 24import sys 25import tempfile 26import unittest 27 28from io import StringIO 29from unittest import mock 30 31import atest_error 32import atest_utils 33import constants 34import unittest_utils 35 36from test_finders import test_info 37 38 39TEST_MODULE_NAME_A = 'ModuleNameA' 40TEST_RUNNER_A = 'FakeTestRunnerA' 41TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 42TEST_DATA_A = {'test_data_a_1': 'a1', 43 'test_data_a_2': 'a2'} 44TEST_SUITE_A = 'FakeSuiteA' 45TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 46TEST_INSTALL_LOC_A = set(['host', 'device']) 47TEST_FINDER_A = 'MODULE' 48TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A, 49 TEST_BUILD_TARGET_A, TEST_DATA_A, 50 TEST_SUITE_A, TEST_MODULE_CLASS_A, 51 TEST_INSTALL_LOC_A) 52TEST_INFO_A.test_finder = TEST_FINDER_A 53 54#pylint: disable=protected-access 55class AtestUtilsUnittests(unittest.TestCase): 56 """Unit tests for atest_utils.py""" 57 58 def test_capture_fail_section_has_fail_section(self): 59 """Test capture_fail_section when has fail section.""" 60 test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n', 61 '[ 6% 191/2997] BBBBBB\n', 'CCCCC', 62 '[ 20% 322/2997] DDDDDD\n', 'EEEEE'] 63 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 64 self.assertEqual(want_list, 65 atest_utils._capture_fail_section(test_list)) 66 67 def test_capture_fail_section_no_fail_section(self): 68 """Test capture_fail_section when no fail section.""" 69 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 70 want_list = [] 71 self.assertEqual(want_list, 72 atest_utils._capture_fail_section(test_list)) 73 74 def test_is_test_mapping(self): 75 """Test method is_test_mapping.""" 76 tm_option_attributes = [ 77 'test_mapping', 78 'include_subdirs' 79 ] 80 for attr_to_test in tm_option_attributes: 81 args = mock.Mock() 82 for attr in tm_option_attributes: 83 setattr(args, attr, attr == attr_to_test) 84 args.tests = [] 85 self.assertTrue( 86 atest_utils.is_test_mapping(args), 87 'Failed to validate option %s' % attr_to_test) 88 89 args = mock.Mock() 90 for attr in tm_option_attributes: 91 setattr(args, attr, False) 92 args.tests = [':group_name'] 93 self.assertTrue(atest_utils.is_test_mapping(args)) 94 95 args = mock.Mock() 96 for attr in tm_option_attributes: 97 setattr(args, attr, False) 98 args.tests = [':test1', 'test2'] 99 self.assertFalse(atest_utils.is_test_mapping(args)) 100 101 args = mock.Mock() 102 for attr in tm_option_attributes: 103 setattr(args, attr, False) 104 args.tests = ['test2'] 105 self.assertFalse(atest_utils.is_test_mapping(args)) 106 107 @mock.patch('curses.tigetnum') 108 def test_has_colors(self, mock_curses_tigetnum): 109 """Test method _has_colors.""" 110 # stream is file I/O 111 stream = open('/tmp/test_has_colors.txt', 'wb') 112 self.assertFalse(atest_utils._has_colors(stream)) 113 stream.close() 114 115 # stream is not a tty(terminal). 116 stream = mock.Mock() 117 stream.isatty.return_value = False 118 self.assertFalse(atest_utils._has_colors(stream)) 119 120 # stream is a tty(terminal) and colors < 2. 121 stream = mock.Mock() 122 stream.isatty.return_value = True 123 mock_curses_tigetnum.return_value = 1 124 self.assertFalse(atest_utils._has_colors(stream)) 125 126 # stream is a tty(terminal) and colors > 2. 127 stream = mock.Mock() 128 stream.isatty.return_value = True 129 mock_curses_tigetnum.return_value = 256 130 self.assertTrue(atest_utils._has_colors(stream)) 131 132 133 @mock.patch('atest_utils._has_colors') 134 def test_colorize(self, mock_has_colors): 135 """Test method colorize.""" 136 original_str = "test string" 137 green_no = 2 138 139 # _has_colors() return False. 140 mock_has_colors.return_value = False 141 converted_str = atest_utils.colorize(original_str, green_no, 142 highlight=True) 143 self.assertEqual(original_str, converted_str) 144 145 # Green with highlight. 146 mock_has_colors.return_value = True 147 converted_str = atest_utils.colorize(original_str, green_no, 148 highlight=True) 149 green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str 150 self.assertEqual(green_highlight_string, converted_str) 151 152 # Green, no highlight. 153 mock_has_colors.return_value = True 154 converted_str = atest_utils.colorize(original_str, green_no, 155 highlight=False) 156 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 157 self.assertEqual(green_no_highlight_string, converted_str) 158 159 160 @mock.patch('atest_utils._has_colors') 161 def test_colorful_print(self, mock_has_colors): 162 """Test method colorful_print.""" 163 testing_str = "color_print_test" 164 green_no = 2 165 166 # _has_colors() return False. 167 mock_has_colors.return_value = False 168 capture_output = StringIO() 169 sys.stdout = capture_output 170 atest_utils.colorful_print(testing_str, green_no, highlight=True, 171 auto_wrap=False) 172 sys.stdout = sys.__stdout__ 173 uncolored_string = testing_str 174 self.assertEqual(capture_output.getvalue(), uncolored_string) 175 176 # Green with highlight, but no wrap. 177 mock_has_colors.return_value = True 178 capture_output = StringIO() 179 sys.stdout = capture_output 180 atest_utils.colorful_print(testing_str, green_no, highlight=True, 181 auto_wrap=False) 182 sys.stdout = sys.__stdout__ 183 green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str 184 self.assertEqual(capture_output.getvalue(), 185 green_highlight_no_wrap_string) 186 187 # Green, no highlight, no wrap. 188 mock_has_colors.return_value = True 189 capture_output = StringIO() 190 sys.stdout = capture_output 191 atest_utils.colorful_print(testing_str, green_no, highlight=False, 192 auto_wrap=False) 193 sys.stdout = sys.__stdout__ 194 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 195 self.assertEqual(capture_output.getvalue(), 196 green_no_high_no_wrap_string) 197 198 # Green with highlight and wrap. 199 mock_has_colors.return_value = True 200 capture_output = StringIO() 201 sys.stdout = capture_output 202 atest_utils.colorful_print(testing_str, green_no, highlight=True, 203 auto_wrap=True) 204 sys.stdout = sys.__stdout__ 205 green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str 206 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 207 208 # Green with wrap, but no highlight. 209 mock_has_colors.return_value = True 210 capture_output = StringIO() 211 sys.stdout = capture_output 212 atest_utils.colorful_print(testing_str, green_no, highlight=False, 213 auto_wrap=True) 214 sys.stdout = sys.__stdout__ 215 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 216 self.assertEqual(capture_output.getvalue(), 217 green_wrap_no_highlight_string) 218 219 @mock.patch('socket.gethostname') 220 @mock.patch('subprocess.check_output') 221 def test_is_external_run(self, mock_output, mock_hostname): 222 """Test method is_external_run.""" 223 mock_output.return_value = '' 224 mock_hostname.return_value = '' 225 self.assertTrue(atest_utils.is_external_run()) 226 227 mock_output.return_value = 'test@other.com' 228 mock_hostname.return_value = 'abc.com' 229 self.assertTrue(atest_utils.is_external_run()) 230 231 mock_output.return_value = 'test@other.com' 232 mock_hostname.return_value = 'abc.google.com' 233 self.assertFalse(atest_utils.is_external_run()) 234 235 mock_output.return_value = 'test@other.com' 236 mock_hostname.return_value = 'abc.google.def.com' 237 self.assertTrue(atest_utils.is_external_run()) 238 239 mock_output.return_value = 'test@google.com' 240 self.assertFalse(atest_utils.is_external_run()) 241 242 mock_output.return_value = 'test@other.com' 243 mock_hostname.return_value = 'c.googlers.com' 244 self.assertFalse(atest_utils.is_external_run()) 245 246 mock_output.return_value = 'test@other.com' 247 mock_hostname.return_value = 'a.googlers.com' 248 self.assertTrue(atest_utils.is_external_run()) 249 250 mock_output.side_effect = OSError() 251 self.assertTrue(atest_utils.is_external_run()) 252 253 mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd') 254 self.assertTrue(atest_utils.is_external_run()) 255 256 @mock.patch('metrics.metrics_base.get_user_type') 257 def test_print_data_collection_notice(self, mock_get_user_type): 258 """Test method print_data_collection_notice.""" 259 260 # get_user_type return 1(external). 261 mock_get_user_type.return_value = 1 262 notice_str = ('\n==================\nNotice:\n' 263 ' We collect anonymous usage statistics' 264 ' in accordance with our' 265 ' Content Licenses (https://source.android.com/setup/start/licenses),' 266 ' Contributor License Agreement (https://opensource.google.com/docs/cla/),' 267 ' Privacy Policy (https://policies.google.com/privacy) and' 268 ' Terms of Service (https://policies.google.com/terms).' 269 '\n==================\n\n') 270 capture_output = StringIO() 271 sys.stdout = capture_output 272 atest_utils.print_data_collection_notice() 273 sys.stdout = sys.__stdout__ 274 uncolored_string = notice_str 275 self.assertEqual(capture_output.getvalue(), uncolored_string) 276 277 # get_user_type return 0(internal). 278 mock_get_user_type.return_value = 0 279 notice_str = ('\n==================\nNotice:\n' 280 ' We collect usage statistics' 281 ' in accordance with our' 282 ' Content Licenses (https://source.android.com/setup/start/licenses),' 283 ' Contributor License Agreement (https://cla.developers.google.com/),' 284 ' Privacy Policy (https://policies.google.com/privacy) and' 285 ' Terms of Service (https://policies.google.com/terms).' 286 '\n==================\n\n') 287 capture_output = StringIO() 288 sys.stdout = capture_output 289 atest_utils.print_data_collection_notice() 290 sys.stdout = sys.__stdout__ 291 uncolored_string = notice_str 292 self.assertEqual(capture_output.getvalue(), uncolored_string) 293 294 @mock.patch('builtins.input') 295 @mock.patch('json.load') 296 def test_update_test_runner_cmd(self, mock_json_load_data, mock_input): 297 """Test method handle_test_runner_cmd without enable do_verification.""" 298 former_cmd_str = 'Former cmds =' 299 write_result_str = 'Save result mapping to test_result' 300 tmp_file = tempfile.NamedTemporaryFile() 301 input_cmd = 'atest_args' 302 runner_cmds = ['cmd1', 'cmd2'] 303 capture_output = StringIO() 304 sys.stdout = capture_output 305 # Previous data is empty. Should not enter strtobool. 306 # If entered, exception will be raised cause test fail. 307 mock_json_load_data.return_value = {} 308 atest_utils.handle_test_runner_cmd(input_cmd, 309 runner_cmds, 310 do_verification=False, 311 result_path=tmp_file.name) 312 sys.stdout = sys.__stdout__ 313 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 314 # Previous data is the same as the new input. Should not enter strtobool. 315 # If entered, exception will be raised cause test fail 316 capture_output = StringIO() 317 sys.stdout = capture_output 318 mock_json_load_data.return_value = {input_cmd:runner_cmds} 319 atest_utils.handle_test_runner_cmd(input_cmd, 320 runner_cmds, 321 do_verification=False, 322 result_path=tmp_file.name) 323 sys.stdout = sys.__stdout__ 324 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 325 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 326 # Previous data has different cmds. Should enter strtobool not update, 327 # should not find write_result_str. 328 prev_cmds = ['cmd1'] 329 mock_input.return_value = 'n' 330 capture_output = StringIO() 331 sys.stdout = capture_output 332 mock_json_load_data.return_value = {input_cmd:prev_cmds} 333 atest_utils.handle_test_runner_cmd(input_cmd, 334 runner_cmds, 335 do_verification=False, 336 result_path=tmp_file.name) 337 sys.stdout = sys.__stdout__ 338 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 339 340 @mock.patch('json.load') 341 def test_verify_test_runner_cmd(self, mock_json_load_data): 342 """Test method handle_test_runner_cmd without enable update_result.""" 343 tmp_file = tempfile.NamedTemporaryFile() 344 input_cmd = 'atest_args' 345 runner_cmds = ['cmd1', 'cmd2'] 346 # Previous data is the same as the new input. Should not raise exception. 347 mock_json_load_data.return_value = {input_cmd:runner_cmds} 348 atest_utils.handle_test_runner_cmd(input_cmd, 349 runner_cmds, 350 do_verification=True, 351 result_path=tmp_file.name) 352 # Previous data has different cmds. Should enter strtobool and hit 353 # exception. 354 prev_cmds = ['cmd1'] 355 mock_json_load_data.return_value = {input_cmd:prev_cmds} 356 self.assertRaises(atest_error.DryRunVerificationError, 357 atest_utils.handle_test_runner_cmd, 358 input_cmd, 359 runner_cmds, 360 do_verification=True, 361 result_path=tmp_file.name) 362 363 def test_get_test_info_cache_path(self): 364 """Test method get_test_info_cache_path.""" 365 input_file_name = 'mytest_name' 366 cache_root = '/a/b/c' 367 expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name). 368 encode()).hexdigest()) 369 self.assertEqual(os.path.join(cache_root, expect_hashed_name), 370 atest_utils.get_test_info_cache_path(input_file_name, 371 cache_root)) 372 373 def test_get_and_load_cache(self): 374 """Test method update_test_info_cache and load_test_info_cache.""" 375 test_reference = 'myTestRefA' 376 test_cache_dir = tempfile.mkdtemp() 377 atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A], 378 test_cache_dir) 379 unittest_utils.assert_equal_testinfo_sets( 380 self, set([TEST_INFO_A]), 381 atest_utils.load_test_info_cache(test_reference, test_cache_dir)) 382 383 @mock.patch('os.getcwd') 384 def test_get_build_cmd(self, mock_cwd): 385 """Test method get_build_cmd.""" 386 build_top = '/home/a/b/c' 387 rel_path = 'd/e' 388 mock_cwd.return_value = os.path.join(build_top, rel_path) 389 os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top} 390 with mock.patch.dict('os.environ', os_environ_mock, clear=True): 391 expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode'] 392 self.assertEqual(expected_cmd, atest_utils.get_build_cmd()) 393 394 @mock.patch('subprocess.check_output') 395 def test_get_modified_files(self, mock_co): 396 """Test method get_modified_files""" 397 mock_co.side_effect = [ 398 x.encode('utf-8') for x in ['/a/b/', 399 '\n', 400 'test_fp1.java\nc/test_fp2.java']] 401 self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 402 atest_utils.get_modified_files('')) 403 mock_co.side_effect = [ 404 x.encode('utf-8') for x in ['/a/b/', 405 'test_fp4', 406 '/test_fp3.java']] 407 self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'}, 408 atest_utils.get_modified_files('')) 409 410 def test_delimiter(self): 411 """Test method delimiter""" 412 self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2)) 413 414if __name__ == "__main__": 415 unittest.main() 416