1#!/usr/bin/env python3 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19# pylint: disable=invalid-name 20# pylint: disable=line-too-long 21 22import hashlib 23import os 24import subprocess 25import sys 26import tempfile 27import unittest 28 29from io import StringIO 30from pathlib import Path 31from unittest import mock 32 33# pylint: disable=import-error 34from pyfakefs import fake_filesystem_unittest 35 36from atest import atest_arg_parser 37from atest import atest_error 38from atest import atest_utils 39from atest import constants 40from atest import unittest_utils 41from atest import unittest_constants 42 43from atest.test_finders import test_info 44from atest.atest_enum import FilterType 45 46TEST_MODULE_NAME_A = 'ModuleNameA' 47TEST_RUNNER_A = 'FakeTestRunnerA' 48TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 49TEST_DATA_A = {'test_data_a_1': 'a1', 50 'test_data_a_2': 'a2'} 51TEST_SUITE_A = 'FakeSuiteA' 52TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 53TEST_INSTALL_LOC_A = set(['host', 'device']) 54TEST_FINDER_A = 'MODULE' 55TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A, 56 TEST_BUILD_TARGET_A, TEST_DATA_A, 57 TEST_SUITE_A, TEST_MODULE_CLASS_A, 58 TEST_INSTALL_LOC_A) 59TEST_INFO_A.test_finder = TEST_FINDER_A 60TEST_ZIP_DATA_DIR = 'zip_files' 61TEST_SINGLE_ZIP_NAME = 'single_file.zip' 62TEST_MULTI_ZIP_NAME = 'multi_file.zip' 63 64REPO_INFO_OUTPUT = '''Manifest branch: test_branch 65Manifest merge branch: refs/heads/test_branch 66Manifest groups: all,-notdefault 67---------------------------- 68''' 69 70# pylint: disable=protected-access 71# pylint: disable=too-many-public-methods 72class AtestUtilsUnittests(unittest.TestCase): 73 """Unit tests for atest_utils.py""" 74 75 def test_capture_fail_section_has_fail_section(self): 76 """Test capture_fail_section when has fail section.""" 77 test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n', 78 '[ 6% 191/2997] BBBBBB\n', 'CCCCC', 79 '[ 20% 322/2997] DDDDDD\n', 'EEEEE'] 80 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 81 self.assertEqual(want_list, 82 atest_utils._capture_fail_section(test_list)) 83 84 def test_capture_fail_section_no_fail_section(self): 85 """Test capture_fail_section when no fail section.""" 86 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 87 want_list = [] 88 self.assertEqual(want_list, 89 atest_utils._capture_fail_section(test_list)) 90 91 def test_is_test_mapping_none_test_mapping_args(self): 92 """Test method is_test_mapping.""" 93 parser = atest_arg_parser.AtestArgParser() 94 parser.add_atest_args() 95 non_tm_args = ['--host-unit-test-only', '--smart-testing-local'] 96 97 for argument in non_tm_args: 98 args = parser.parse_args([argument]) 99 self.assertFalse( 100 atest_utils.is_test_mapping(args), 101 'Option %s indicates NOT a test_mapping!' % argument) 102 103 def test_is_test_mapping_test_mapping_args(self): 104 """Test method is_test_mapping.""" 105 parser = atest_arg_parser.AtestArgParser() 106 parser.add_atest_args() 107 tm_args = ['--test-mapping', '--include-subdirs'] 108 109 for argument in tm_args: 110 args = parser.parse_args([argument]) 111 self.assertTrue( 112 atest_utils.is_test_mapping(args), 113 'Option %s indicates a test_mapping!' % argument) 114 115 def test_is_test_mapping_implicit_test_mapping(self): 116 """Test method is_test_mapping.""" 117 parser = atest_arg_parser.AtestArgParser() 118 parser.add_atest_args() 119 120 args = parser.parse_args(['--test', '--build', ':postsubmit']) 121 self.assertTrue( 122 atest_utils.is_test_mapping(args), 123 'Option %s indicates a test_mapping!' % args) 124 125 def test_is_test_mapping_with_testname(self): 126 """Test method is_test_mapping.""" 127 parser = atest_arg_parser.AtestArgParser() 128 parser.add_atest_args() 129 irrelevant_args = ['--test', ':postsubmit', 'testname'] 130 131 args = parser.parse_args(irrelevant_args) 132 self.assertFalse( 133 atest_utils.is_test_mapping(args), 134 'Option %s indicates a test_mapping!' % args) 135 136 def test_is_test_mapping_false(self): 137 """Test method is_test_mapping.""" 138 parser = atest_arg_parser.AtestArgParser() 139 parser.add_atest_args() 140 args = parser.parse_args(['--test', '--build', 'hello_atest']) 141 142 self.assertFalse( 143 atest_utils.is_test_mapping(args)) 144 145 def test_has_colors(self): 146 """Test method _has_colors.""" 147 # stream is file I/O 148 stream = open('/tmp/test_has_colors.txt', 'wb') 149 self.assertFalse(atest_utils._has_colors(stream)) 150 stream.close() 151 152 # stream is not a tty(terminal). 153 stream = mock.Mock() 154 stream.isatty.return_value = False 155 self.assertFalse(atest_utils._has_colors(stream)) 156 157 # stream is a tty(terminal). 158 stream = mock.Mock() 159 stream.isatty.return_value = True 160 self.assertTrue(atest_utils._has_colors(stream)) 161 162 163 @mock.patch('atest.atest_utils._has_colors') 164 def test_colorize(self, mock_has_colors): 165 """Test method colorize.""" 166 original_str = "test string" 167 green_no = 2 168 169 # _has_colors() return False. 170 mock_has_colors.return_value = False 171 converted_str = atest_utils.colorize(original_str, green_no, 172 bp_color=constants.RED) 173 self.assertEqual(original_str, converted_str) 174 175 # Green text with red background. 176 mock_has_colors.return_value = True 177 converted_str = atest_utils.colorize(original_str, green_no, 178 bp_color=constants.RED) 179 green_highlight_string = '\x1b[1;32;41m%s\x1b[0m' % original_str 180 self.assertEqual(green_highlight_string, converted_str) 181 182 # Green text, no background. 183 mock_has_colors.return_value = True 184 converted_str = atest_utils.colorize(original_str, green_no) 185 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 186 self.assertEqual(green_no_highlight_string, converted_str) 187 188 189 @mock.patch('atest.atest_utils._has_colors') 190 def test_colorful_print(self, mock_has_colors): 191 """Test method colorful_print.""" 192 testing_str = "color_print_test" 193 green_no = 2 194 195 # _has_colors() return False. 196 mock_has_colors.return_value = False 197 capture_output = StringIO() 198 sys.stdout = capture_output 199 atest_utils.colorful_print(testing_str, green_no, 200 bp_color=constants.RED, 201 auto_wrap=False) 202 sys.stdout = sys.__stdout__ 203 uncolored_string = testing_str 204 self.assertEqual(capture_output.getvalue(), uncolored_string) 205 206 # Green text with red background, but no wrap. 207 mock_has_colors.return_value = True 208 capture_output = StringIO() 209 sys.stdout = capture_output 210 atest_utils.colorful_print(testing_str, green_no, 211 bp_color=constants.RED, 212 auto_wrap=False) 213 sys.stdout = sys.__stdout__ 214 green_highlight_no_wrap_string = '\x1b[1;32;41m%s\x1b[0m' % testing_str 215 self.assertEqual(capture_output.getvalue(), 216 green_highlight_no_wrap_string) 217 218 # Green text, no background, no wrap. 219 mock_has_colors.return_value = True 220 capture_output = StringIO() 221 sys.stdout = capture_output 222 atest_utils.colorful_print(testing_str, green_no, 223 auto_wrap=False) 224 sys.stdout = sys.__stdout__ 225 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 226 self.assertEqual(capture_output.getvalue(), 227 green_no_high_no_wrap_string) 228 229 # Green text with red background and wrap. 230 mock_has_colors.return_value = True 231 capture_output = StringIO() 232 sys.stdout = capture_output 233 atest_utils.colorful_print(testing_str, green_no, 234 bp_color=constants.RED, 235 auto_wrap=True) 236 sys.stdout = sys.__stdout__ 237 green_highlight_wrap_string = '\x1b[1;32;41m%s\x1b[0m\n' % testing_str 238 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 239 240 # Green text with wrap, but no background. 241 mock_has_colors.return_value = True 242 capture_output = StringIO() 243 sys.stdout = capture_output 244 atest_utils.colorful_print(testing_str, green_no, 245 auto_wrap=True) 246 sys.stdout = sys.__stdout__ 247 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 248 self.assertEqual(capture_output.getvalue(), 249 green_wrap_no_highlight_string) 250 251 @mock.patch('builtins.input') 252 @mock.patch('json.load') 253 def test_update_test_runner_cmd(self, mock_json_load_data, mock_input): 254 """Test method handle_test_runner_cmd without enable do_verification.""" 255 former_cmd_str = 'Former cmds =' 256 write_result_str = 'Save result mapping to test_result' 257 tmp_file = tempfile.NamedTemporaryFile() 258 input_cmd = 'atest_args' 259 runner_cmds = ['cmd1', 'cmd2'] 260 capture_output = StringIO() 261 sys.stdout = capture_output 262 # Previous data is empty. Should not enter strtobool. 263 # If entered, exception will be raised cause test fail. 264 mock_json_load_data.return_value = {} 265 atest_utils.handle_test_runner_cmd(input_cmd, 266 runner_cmds, 267 do_verification=False, 268 result_path=tmp_file.name) 269 sys.stdout = sys.__stdout__ 270 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 271 # Previous data is the same as the new input. Should not enter strtobool. 272 # If entered, exception will be raised cause test fail 273 capture_output = StringIO() 274 sys.stdout = capture_output 275 mock_json_load_data.return_value = {input_cmd:runner_cmds} 276 atest_utils.handle_test_runner_cmd(input_cmd, 277 runner_cmds, 278 do_verification=False, 279 result_path=tmp_file.name) 280 sys.stdout = sys.__stdout__ 281 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 282 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 283 # Previous data has different cmds. Should enter strtobool not update, 284 # should not find write_result_str. 285 prev_cmds = ['cmd1'] 286 mock_input.return_value = 'n' 287 capture_output = StringIO() 288 sys.stdout = capture_output 289 mock_json_load_data.return_value = {input_cmd:prev_cmds} 290 atest_utils.handle_test_runner_cmd(input_cmd, 291 runner_cmds, 292 do_verification=False, 293 result_path=tmp_file.name) 294 sys.stdout = sys.__stdout__ 295 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 296 297 @mock.patch('json.load') 298 def test_verify_test_runner_cmd(self, mock_json_load_data): 299 """Test method handle_test_runner_cmd without enable update_result.""" 300 tmp_file = tempfile.NamedTemporaryFile() 301 input_cmd = 'atest_args' 302 runner_cmds = ['cmd1', 'cmd2'] 303 # Previous data is the same as the new input. Should not raise exception. 304 mock_json_load_data.return_value = {input_cmd:runner_cmds} 305 atest_utils.handle_test_runner_cmd(input_cmd, 306 runner_cmds, 307 do_verification=True, 308 result_path=tmp_file.name) 309 # Previous data has different cmds. Should enter strtobool and hit 310 # exception. 311 prev_cmds = ['cmd1'] 312 mock_json_load_data.return_value = {input_cmd:prev_cmds} 313 self.assertRaises(atest_error.DryRunVerificationError, 314 atest_utils.handle_test_runner_cmd, 315 input_cmd, 316 runner_cmds, 317 do_verification=True, 318 result_path=tmp_file.name) 319 320 def test_get_test_info_cache_path(self): 321 """Test method get_test_info_cache_path.""" 322 input_file_name = 'mytest_name' 323 cache_root = '/a/b/c' 324 expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name). 325 encode()).hexdigest()) 326 self.assertEqual(os.path.join(cache_root, expect_hashed_name), 327 atest_utils.get_test_info_cache_path(input_file_name, 328 cache_root)) 329 330 def test_get_and_load_cache(self): 331 """Test method update_test_info_cache and load_test_info_cache.""" 332 test_reference = 'myTestRefA' 333 test_cache_dir = tempfile.mkdtemp() 334 atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A], 335 test_cache_dir) 336 unittest_utils.assert_equal_testinfo_sets( 337 self, set([TEST_INFO_A]), 338 atest_utils.load_test_info_cache(test_reference, test_cache_dir)) 339 340 @mock.patch('os.getcwd') 341 def test_get_build_cmd(self, mock_cwd): 342 """Test method get_build_cmd.""" 343 build_top = '/home/a/b/c' 344 rel_path = 'd/e' 345 mock_cwd.return_value = os.path.join(build_top, rel_path) 346 # TODO: (b/264015241) Stop mocking build variables. 347 os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top} 348 with mock.patch.dict('os.environ', os_environ_mock, clear=True): 349 expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode'] 350 self.assertEqual(expected_cmd, atest_utils.get_build_cmd()) 351 352 @mock.patch('subprocess.check_output') 353 def test_get_modified_files(self, mock_co): 354 """Test method get_modified_files""" 355 mock_co.side_effect = [ 356 x.encode('utf-8') for x in ['/a/b/', 357 '\n', 358 'test_fp1.java\nc/test_fp2.java']] 359 self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 360 atest_utils.get_modified_files('')) 361 mock_co.side_effect = [ 362 x.encode('utf-8') for x in ['/a/b/', 363 'test_fp4', 364 '/test_fp3.java']] 365 self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'}, 366 atest_utils.get_modified_files('')) 367 368 def test_delimiter(self): 369 """Test method delimiter""" 370 self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2)) 371 372 def test_has_python_module(self): 373 """Test method has_python_module""" 374 self.assertFalse(atest_utils.has_python_module('M_M')) 375 self.assertTrue(atest_utils.has_python_module('os')) 376 377 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 378 def test_read_zip_single_text(self, _matched): 379 """Test method extract_zip_text include only one text file.""" 380 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 381 TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME) 382 expect_content = '\nfile1_line1\nfile1_line2\n' 383 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 384 385 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 386 def test_read_zip_multi_text(self, _matched): 387 """Test method extract_zip_text include multiple text files.""" 388 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 389 TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME) 390 expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n' 391 'file2_line2\n') 392 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 393 394 def test_matched_tf_error_log(self): 395 """Test method extract_zip_text include multiple text files.""" 396 matched_content = '05-25 17:37:04 E/XXXXX YYYYY' 397 not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY' 398 # Test matched content 399 self.assertEqual(True, 400 atest_utils.matched_tf_error_log(matched_content)) 401 # Test not matched content 402 self.assertEqual(False, 403 atest_utils.matched_tf_error_log(not_matched_content)) 404 405 @mock.patch('os.chmod') 406 @mock.patch('shutil.copy2') 407 @mock.patch('atest.atest_utils.has_valid_cert') 408 @mock.patch('subprocess.check_output') 409 @mock.patch('os.path.exists') 410 def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert, 411 _cpc, _cm): 412 """Test method get_flakes.""" 413 # Test par file does not exist. 414 mock_path_exists.return_value = False 415 self.assertEqual(None, atest_utils.get_flakes()) 416 # Test par file exists. 417 mock_path_exists.return_value = True 418 mock_output.return_value = (b'flake_percent:0.10001\n' 419 b'postsubmit_flakes_per_week:12.0') 420 mock_valid_cert.return_value = True 421 expected_flake_info = {'flake_percent':'0.10001', 422 'postsubmit_flakes_per_week':'12.0'} 423 self.assertEqual(expected_flake_info, 424 atest_utils.get_flakes()) 425 # Test no valid cert 426 mock_valid_cert.return_value = False 427 self.assertEqual(None, 428 atest_utils.get_flakes()) 429 430 @mock.patch('subprocess.check_call') 431 def test_has_valid_cert(self, mock_call): 432 """Test method has_valid_cert.""" 433 # raise subprocess.CalledProcessError 434 mock_call.raiseError.side_effect = subprocess.CalledProcessError 435 self.assertFalse(atest_utils.has_valid_cert()) 436 with mock.patch("atest.constants.CERT_STATUS_CMD", ''): 437 self.assertFalse(atest_utils.has_valid_cert()) 438 with mock.patch("atest.constants.CERT_STATUS_CMD", 'CMD'): 439 # has valid cert 440 mock_call.return_value = 0 441 self.assertTrue(atest_utils.has_valid_cert()) 442 # no valid cert 443 mock_call.return_value = 4 444 self.assertFalse(atest_utils.has_valid_cert()) 445 446 # pylint: disable=no-member 447 def test_read_test_record_proto(self): 448 """Test method read_test_record.""" 449 test_record_file_path = os.path.join( 450 unittest_constants.TEST_DATA_DIR, 451 "test_record.proto.testonly") 452 test_record = atest_utils.read_test_record(test_record_file_path) 453 self.assertEqual( 454 test_record.children[0].inline_test_record.test_record_id, 455 'x86 hello_world_test') 456 457 def test_load_json_safely_file_inexistent(self): 458 """Test method load_json_safely if file does not exist.""" 459 json_file_path = Path( 460 unittest_constants.TEST_DATA_DIR).joinpath("not_exist.json") 461 self.assertEqual({}, atest_utils.load_json_safely(json_file_path)) 462 463 def test_load_json_safely_valid_json_format(self): 464 """Test method load_json_safely if file exists and format is valid.""" 465 json_file_path = Path( 466 unittest_constants.TEST_DATA_DIR).joinpath("module-info.json") 467 content = atest_utils.load_json_safely(json_file_path) 468 self.assertEqual('MainModule1', content.get('MainModule1').get('module_name')) 469 self.assertEqual([], content.get('MainModule2').get('test_mainline_modules')) 470 471 def test_load_json_safely_invalid_json_format(self): 472 """Test method load_json_safely if file exist but content is invalid.""" 473 json_file_path = Path( 474 unittest_constants.TEST_DATA_DIR).joinpath("not-valid-module-info.json") 475 self.assertEqual({}, atest_utils.load_json_safely(json_file_path)) 476 477 @mock.patch('os.getenv') 478 def test_get_manifest_branch(self, mock_env): 479 """Test method get_manifest_branch""" 480 build_top = tempfile.TemporaryDirectory() 481 mock_env.return_value = build_top.name 482 repo_dir = Path(build_top.name).joinpath('.repo') 483 portal_xml = repo_dir.joinpath('manifest.xml') 484 manifest_dir = repo_dir.joinpath('manifests') 485 target_xml = manifest_dir.joinpath('Default.xml') 486 repo_dir.mkdir() 487 manifest_dir.mkdir() 488 content_portal = '<manifest><include name="Default.xml" /></manifest>' 489 content_manifest = '''<manifest> 490 <remote name="aosp" fetch=".." review="https://android-review.googlesource.com/" /> 491 <default revision="MONSTER-dev" remote="aosp" sync-j="4" /> 492 </manifest>''' 493 494 # 1. The manifest.xml(portal) contains 'include' directive: 'Default.xml'. 495 # Search revision in .repo/manifests/Default.xml. 496 with open(portal_xml, 'w') as cache: 497 cache.write(content_portal) 498 with open(target_xml, 'w') as cache: 499 cache.write(content_manifest) 500 self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch()) 501 self.assertEqual("aosp-MONSTER-dev", atest_utils.get_manifest_branch(True)) 502 os.remove(target_xml) 503 os.remove(portal_xml) 504 505 # 2. The manifest.xml contains neither 'include' nor 'revision' directive, 506 # keep searching revision in .repo/manifests/default.xml by default. 507 with open(portal_xml, 'w') as cache: 508 cache.write('<manifest></manifest>') 509 default_xml = manifest_dir.joinpath('default.xml') 510 with open(default_xml, 'w') as cache: 511 cache.write(content_manifest) 512 self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch()) 513 os.remove(default_xml) 514 os.remove(portal_xml) 515 516 # 3. revision was directly defined in 'manifest.xml'. 517 with open(portal_xml, 'w') as cache: 518 cache.write(content_manifest) 519 self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch()) 520 os.remove(portal_xml) 521 522 # 4. Return None if the included xml does not exist. 523 with open(portal_xml, 'w') as cache: 524 cache.write(content_portal) 525 self.assertEqual('', atest_utils.get_manifest_branch()) 526 os.remove(portal_xml) 527 528 def test_has_wildcard(self): 529 """Test method of has_wildcard""" 530 self.assertFalse(atest_utils.has_wildcard('test1')) 531 self.assertFalse(atest_utils.has_wildcard(['test1'])) 532 self.assertTrue(atest_utils.has_wildcard('test1?')) 533 self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*'])) 534 535 # pylint: disable=anomalous-backslash-in-string 536 def test_quote(self): 537 """Test method of quote()""" 538 target_str = r'TEST_(F|P)[0-9].*\w$' 539 expected_str = '\'TEST_(F|P)[0-9].*\w$\'' 540 self.assertEqual(atest_utils.quote(target_str), expected_str) 541 self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224') 542 543 @mock.patch('builtins.input', return_value='') 544 def test_prompt_with_yn_result(self, mock_input): 545 """Test method of prompt_with_yn_result""" 546 msg = 'Do you want to continue?' 547 mock_input.return_value = '' 548 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 549 self.assertFalse(atest_utils.prompt_with_yn_result(msg, False)) 550 mock_input.return_value = 'y' 551 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 552 mock_input.return_value = 'nO' 553 self.assertFalse(atest_utils.prompt_with_yn_result(msg, True)) 554 555 def test_get_android_junit_config_filters(self): 556 """Test method of get_android_junit_config_filters""" 557 no_filter_test_config = os.path.join( 558 unittest_constants.TEST_DATA_DIR, 559 "filter_configs", "no_filter.cfg") 560 self.assertEqual({}, 561 atest_utils.get_android_junit_config_filters( 562 no_filter_test_config)) 563 564 filtered_test_config = os.path.join( 565 unittest_constants.TEST_DATA_DIR, 566 'filter_configs', 'filter.cfg') 567 filter_dict = atest_utils.get_android_junit_config_filters( 568 filtered_test_config) 569 include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION) 570 include_annotations.sort() 571 self.assertEqual( 572 ['include1', 'include2'], 573 include_annotations) 574 exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION) 575 exclude_annotation.sort() 576 self.assertEqual( 577 ['exclude1', 'exclude2'], 578 exclude_annotation) 579 580 def test_md5sum(self): 581 """Test method of md5sum""" 582 exist_string = os.path.join(unittest_constants.TEST_DATA_DIR, 583 unittest_constants.JSON_FILE) 584 inexist_string = os.path.join(unittest_constants.TEST_DATA_DIR, 585 unittest_constants.CLASS_NAME) 586 self.assertEqual( 587 atest_utils.md5sum(exist_string), '062160df00c20b1ee4d916b7baf71346') 588 self.assertEqual( 589 atest_utils.md5sum(inexist_string), '') 590 591 def test_check_md5(self): 592 """Test method of check_md5""" 593 file1 = os.path.join(unittest_constants.TEST_DATA_DIR, 594 unittest_constants.JSON_FILE) 595 checksum_file = '/tmp/_tmp_module-info.json' 596 atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json') 597 self.assertTrue(atest_utils.check_md5(checksum_file)) 598 os.remove(checksum_file) 599 self.assertFalse(atest_utils.check_md5(checksum_file)) 600 self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True)) 601 602 def test_get_config_parameter(self): 603 """Test method of get_config_parameter""" 604 parameter_config = os.path.join( 605 unittest_constants.TEST_DATA_DIR, 606 "parameter_config", "parameter.cfg") 607 no_parameter_config = os.path.join( 608 unittest_constants.TEST_DATA_DIR, 609 "parameter_config", "no_parameter.cfg") 610 611 # Test parameter empty value 612 self.assertEqual(set(), 613 atest_utils.get_config_parameter( 614 no_parameter_config)) 615 616 # Test parameter empty value 617 self.assertEqual({'value_1', 'value_2', 'value_3', 'value_4'}, 618 atest_utils.get_config_parameter( 619 parameter_config)) 620 621 def test_get_config_device(self): 622 """Test method of get_config_device""" 623 device_config = os.path.join( 624 unittest_constants.TEST_DATA_DIR, 625 "parameter_config", "multiple_device.cfg") 626 self.assertEqual({'device_1', 'device_2'}, 627 atest_utils.get_config_device(device_config)) 628 629 def test_get_mainline_param(self): 630 """Test method of get_mainline_param""" 631 mainline_param_config = os.path.join( 632 unittest_constants.TEST_DATA_DIR, 633 "parameter_config", "mainline_param.cfg") 634 self.assertEqual({'foo1.apex', 'foo2.apk+foo3.apk'}, 635 atest_utils.get_mainline_param( 636 mainline_param_config)) 637 no_mainline_param_config = os.path.join( 638 unittest_constants.TEST_DATA_DIR, 639 "parameter_config", "parameter.cfg") 640 self.assertEqual(set(), 641 atest_utils.get_mainline_param( 642 no_mainline_param_config)) 643 644 def test_get_full_annotation_class_name(self): 645 """Test method of get_full_annotation_class_name.""" 646 app_mode_full = 'android.platform.test.annotations.AppModeFull' 647 presubmit = 'android.platform.test.annotations.Presubmit' 648 module_info = {'srcs': [os.path.join(unittest_constants.TEST_DATA_DIR, 649 'annotation_testing', 650 'Annotation.src')]} 651 # get annotation class from keyword 652 self.assertEqual( 653 atest_utils.get_full_annotation_class_name(module_info, 'presubmit'), 654 presubmit) 655 # get annotation class from an accurate fqcn keyword. 656 self.assertEqual( 657 atest_utils.get_full_annotation_class_name(module_info, presubmit), 658 presubmit) 659 # accept fqcn keyword in lowercase. 660 self.assertEqual( 661 atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.presubmit'), 662 presubmit) 663 # unable to get annotation class from keyword. 664 self.assertNotEqual( 665 atest_utils.get_full_annotation_class_name(module_info, 'appleModefull'), app_mode_full) 666 # do not support partial-correct keyword. 667 self.assertNotEqual( 668 atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.pres'), presubmit) 669 670 def test_has_mixed_type_filters_one_module_with_one_type_return_false(self): 671 """Test method of has_mixed_type_filters""" 672 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 673 test_data_1 = {constants.TI_FILTER: [filter_1]} 674 test_info_1 = test_info.TestInfo('MODULE', 'RUNNER', 675 set(), test_data_1, 676 'SUITE', '', 677 set()) 678 self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1])) 679 680 def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self): 681 """Test method of has_mixed_type_filters""" 682 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 683 filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 684 test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]} 685 test_info_2 = test_info.TestInfo('MODULE', 'RUNNER', 686 set(), test_data_2, 687 'SUITE', '', 688 set()) 689 self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2])) 690 691 def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(self): 692 """Test method of has_mixed_type_filters""" 693 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 694 test_data_1 = {constants.TI_FILTER: [filter_1]} 695 test_info_1 = test_info.TestInfo('MODULE', 'RUNNER', 696 set(), test_data_1, 697 'SUITE', '', 698 set()) 699 filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 700 test_data_3 = {constants.TI_FILTER: [filter_3]} 701 test_info_3 = test_info.TestInfo('MODULE3', 'RUNNER', 702 set(), test_data_3, 703 'SUITE', '', 704 set()) 705 self.assertFalse(atest_utils.has_mixed_type_filters( 706 [test_info_1, test_info_3])) 707 708 def test_get_filter_types(self): 709 """Test method of get_filter_types.""" 710 filters = set(['CLASS#METHOD']) 711 expect_types = set([FilterType.REGULAR_FILTER.value]) 712 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 713 714 filters = set(['CLASS#METHOD*']) 715 expect_types = set([FilterType.WILDCARD_FILTER.value]) 716 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 717 718 filters = set(['CLASS#METHOD', 'CLASS#METHOD*']) 719 expect_types = set([FilterType.WILDCARD_FILTER.value, 720 FilterType.REGULAR_FILTER.value]) 721 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 722 723 filters = set(['CLASS#METHOD?', 'CLASS#METHOD*']) 724 expect_types = set([FilterType.WILDCARD_FILTER.value]) 725 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 726 727 def test_get_bp_content(self): 728 """Method get_bp_content.""" 729 # 1. "manifest" and "instrumentation_for" are defined. 730 content = '''android_test { 731 // comment 732 instrumentation_for: "AmSlam", // comment 733 manifest: "AndroidManifest-test.xml", 734 name: "AmSlamTests", 735 }''' 736 expected_result = {"AmSlamTests": 737 {"target_module": "AmSlam", "manifest": "AndroidManifest-test.xml"}} 738 temp_dir = tempfile.TemporaryDirectory() 739 tmpbp = Path(temp_dir.name).joinpath('Android.bp') 740 with open(tmpbp, 'w') as cache: 741 cache.write(content) 742 self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_test'), 743 expected_result) 744 temp_dir.cleanup() 745 746 # 2. Only name is defined, will give default manifest and null target_module. 747 content = '''android_app { 748 // comment 749 name: "AmSlam", 750 srcs: ["src1.java", "src2.java"] 751 }''' 752 expected_result = {"AmSlam": 753 {"target_module": "", "manifest": "AndroidManifest.xml"}} 754 temp_dir = tempfile.TemporaryDirectory() 755 tmpbp = Path(temp_dir.name).joinpath('Android.bp') 756 with open(tmpbp, 'w') as cache: 757 cache.write(content) 758 self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), 759 expected_result) 760 temp_dir.cleanup() 761 762 # 3. Not even an Android.bp. 763 content = '''LOCAL_PATH := $(call my-dir) 764 # comment 765 include $(call all-subdir-makefiles) 766 LOCAL_MODULE := atest_foo_test 767 }''' 768 temp_dir = tempfile.TemporaryDirectory() 769 tmpbp = Path(temp_dir.name).joinpath('Android.mk') 770 with open(tmpbp, 'w') as cache: 771 cache.write(content) 772 self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), {}) 773 temp_dir.cleanup() 774 775 def test_get_manifest_info(self): 776 """test get_manifest_info method.""" 777 # An instrumentation test: 778 test_xml = os.path.join(unittest_constants.TEST_DATA_DIR, 779 'foo/bar/AmSlam/test/AndroidManifest.xml') 780 expected = { 781 'package': 'com.android.settings.tests.unit', 782 'target_package': 'c0m.andr0id.settingS', 783 'persistent': False 784 } 785 self.assertEqual(expected, atest_utils.get_manifest_info(test_xml)) 786 787 # A target module: 788 target_xml = os.path.join(unittest_constants.TEST_DATA_DIR, 789 'foo/bar/AmSlam/AndroidManifest.xml') 790 expected = { 791 'package': 'c0m.andr0id.settingS', 792 'target_package': '', 793 'persistent': False 794 } 795 self.assertEqual(expected, atest_utils.get_manifest_info(target_xml)) 796 797# pylint: disable=missing-function-docstring 798class AutoShardUnittests(fake_filesystem_unittest.TestCase): 799 """Tests for auto shard functions""" 800 def setUp(self): 801 self.setUpPyfakefs() 802 803 def test_get_local_auto_shardable_tests(self): 804 """test get local auto shardable list""" 805 shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath( 806 '.atest/auto_shard/local_auto_shardable_tests') 807 808 self.fs.create_file(shardable_tests_file, contents='abc\ndef') 809 810 long_duration_tests = atest_utils.get_local_auto_shardable_tests() 811 812 expected_list = ['abc', 'def'] 813 self.assertEqual(long_duration_tests , expected_list) 814 815 def test_update_shardable_tests_with_time_less_than_600(self): 816 """test update local auto shardable list""" 817 shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath( 818 '.atest/auto_shard/local_auto_shardable_tests') 819 820 self.fs.create_file(shardable_tests_file, contents='') 821 822 atest_utils.update_shardable_tests('test1', 10) 823 824 with open(shardable_tests_file) as f: 825 self.assertEqual('', f.read()) 826 827 def test_update_shardable_tests_with_time_larger_than_600(self): 828 """test update local auto shardable list""" 829 shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath( 830 '.atest/auto_shard/local_auto_shardable_tests') 831 832 self.fs.create_file(shardable_tests_file, contents='') 833 834 atest_utils.update_shardable_tests('test2', 1000) 835 836 with open(shardable_tests_file) as f: 837 self.assertEqual('test2', f.read()) 838 839 def test_update_shardable_tests_with_time_larger_than_600_twice(self): 840 """test update local auto shardable list""" 841 shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath( 842 '.atest/auto_shard/local_auto_shardable_tests') 843 # access the fake_filesystem object via fake_fs 844 self.fs.create_file(shardable_tests_file, contents='') 845 846 atest_utils.update_shardable_tests('test3', 1000) 847 atest_utils.update_shardable_tests('test3', 601) 848 849 with open(shardable_tests_file) as f: 850 self.assertEqual('test3', f.read()) 851 852 853if __name__ == "__main__": 854 unittest.main() 855