1#!/usr/bin/env python3 2# 3# Copyright 2018, The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Unittests for atest_utils.""" 18 19# pylint: disable=invalid-name 20# pylint: disable=line-too-long 21 22import hashlib 23import os 24import subprocess 25import sys 26import tempfile 27import unittest 28 29from io import StringIO 30from unittest import mock 31 32import atest_error 33import atest_utils 34import constants 35import unittest_utils 36import unittest_constants 37 38from test_finders import test_info 39from atest_enum import FilterType 40 41 42TEST_MODULE_NAME_A = 'ModuleNameA' 43TEST_RUNNER_A = 'FakeTestRunnerA' 44TEST_BUILD_TARGET_A = set(['bt1', 'bt2']) 45TEST_DATA_A = {'test_data_a_1': 'a1', 46 'test_data_a_2': 'a2'} 47TEST_SUITE_A = 'FakeSuiteA' 48TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A' 49TEST_INSTALL_LOC_A = set(['host', 'device']) 50TEST_FINDER_A = 'MODULE' 51TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A, 52 TEST_BUILD_TARGET_A, TEST_DATA_A, 53 TEST_SUITE_A, TEST_MODULE_CLASS_A, 54 TEST_INSTALL_LOC_A) 55TEST_INFO_A.test_finder = TEST_FINDER_A 56TEST_ZIP_DATA_DIR = 'zip_files' 57TEST_SINGLE_ZIP_NAME = 'single_file.zip' 58TEST_MULTI_ZIP_NAME = 'multi_file.zip' 59 60REPO_INFO_OUTPUT = '''Manifest branch: test_branch 61Manifest merge branch: refs/heads/test_branch 62Manifest groups: all,-notdefault 63---------------------------- 64''' 65 66#pylint: disable=protected-access 67class AtestUtilsUnittests(unittest.TestCase): 68 """Unit tests for atest_utils.py""" 69 70 def test_capture_fail_section_has_fail_section(self): 71 """Test capture_fail_section when has fail section.""" 72 test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n', 73 '[ 6% 191/2997] BBBBBB\n', 'CCCCC', 74 '[ 20% 322/2997] DDDDDD\n', 'EEEEE'] 75 want_list = ['FAILED: Error1', '^\n', 'Error2\n'] 76 self.assertEqual(want_list, 77 atest_utils._capture_fail_section(test_list)) 78 79 def test_capture_fail_section_no_fail_section(self): 80 """Test capture_fail_section when no fail section.""" 81 test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ'] 82 want_list = [] 83 self.assertEqual(want_list, 84 atest_utils._capture_fail_section(test_list)) 85 86 def test_is_test_mapping(self): 87 """Test method is_test_mapping.""" 88 tm_option_attributes = [ 89 'test_mapping', 90 'include_subdirs' 91 ] 92 for attr_to_test in tm_option_attributes: 93 args = mock.Mock() 94 for attr in tm_option_attributes: 95 setattr(args, attr, attr == attr_to_test) 96 args.tests = [] 97 args.host_unit_test_only = False 98 self.assertTrue( 99 atest_utils.is_test_mapping(args), 100 'Failed to validate option %s' % attr_to_test) 101 102 args = mock.Mock() 103 for attr in tm_option_attributes: 104 setattr(args, attr, False) 105 args.tests = [] 106 args.host_unit_test_only = True 107 self.assertFalse(atest_utils.is_test_mapping(args)) 108 109 args = mock.Mock() 110 for attr in tm_option_attributes: 111 setattr(args, attr, False) 112 args.tests = [':group_name'] 113 args.host_unit_test_only = False 114 self.assertTrue(atest_utils.is_test_mapping(args)) 115 116 args = mock.Mock() 117 for attr in tm_option_attributes: 118 setattr(args, attr, False) 119 args.tests = [':test1', 'test2'] 120 args.host_unit_test_only = False 121 self.assertFalse(atest_utils.is_test_mapping(args)) 122 123 args = mock.Mock() 124 for attr in tm_option_attributes: 125 setattr(args, attr, False) 126 args.tests = ['test2'] 127 args.host_unit_test_only = False 128 self.assertFalse(atest_utils.is_test_mapping(args)) 129 130 def test_has_colors(self): 131 """Test method _has_colors.""" 132 # stream is file I/O 133 stream = open('/tmp/test_has_colors.txt', 'wb') 134 self.assertFalse(atest_utils._has_colors(stream)) 135 stream.close() 136 137 # stream is not a tty(terminal). 138 stream = mock.Mock() 139 stream.isatty.return_value = False 140 self.assertFalse(atest_utils._has_colors(stream)) 141 142 # stream is a tty(terminal). 143 stream = mock.Mock() 144 stream.isatty.return_value = True 145 self.assertTrue(atest_utils._has_colors(stream)) 146 147 148 @mock.patch('atest_utils._has_colors') 149 def test_colorize(self, mock_has_colors): 150 """Test method colorize.""" 151 original_str = "test string" 152 green_no = 2 153 154 # _has_colors() return False. 155 mock_has_colors.return_value = False 156 converted_str = atest_utils.colorize(original_str, green_no, 157 highlight=True) 158 self.assertEqual(original_str, converted_str) 159 160 # Green with highlight. 161 mock_has_colors.return_value = True 162 converted_str = atest_utils.colorize(original_str, green_no, 163 highlight=True) 164 green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str 165 self.assertEqual(green_highlight_string, converted_str) 166 167 # Green, no highlight. 168 mock_has_colors.return_value = True 169 converted_str = atest_utils.colorize(original_str, green_no, 170 highlight=False) 171 green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str 172 self.assertEqual(green_no_highlight_string, converted_str) 173 174 175 @mock.patch('atest_utils._has_colors') 176 def test_colorful_print(self, mock_has_colors): 177 """Test method colorful_print.""" 178 testing_str = "color_print_test" 179 green_no = 2 180 181 # _has_colors() return False. 182 mock_has_colors.return_value = False 183 capture_output = StringIO() 184 sys.stdout = capture_output 185 atest_utils.colorful_print(testing_str, green_no, highlight=True, 186 auto_wrap=False) 187 sys.stdout = sys.__stdout__ 188 uncolored_string = testing_str 189 self.assertEqual(capture_output.getvalue(), uncolored_string) 190 191 # Green with highlight, but no wrap. 192 mock_has_colors.return_value = True 193 capture_output = StringIO() 194 sys.stdout = capture_output 195 atest_utils.colorful_print(testing_str, green_no, highlight=True, 196 auto_wrap=False) 197 sys.stdout = sys.__stdout__ 198 green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str 199 self.assertEqual(capture_output.getvalue(), 200 green_highlight_no_wrap_string) 201 202 # Green, no highlight, no wrap. 203 mock_has_colors.return_value = True 204 capture_output = StringIO() 205 sys.stdout = capture_output 206 atest_utils.colorful_print(testing_str, green_no, highlight=False, 207 auto_wrap=False) 208 sys.stdout = sys.__stdout__ 209 green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str 210 self.assertEqual(capture_output.getvalue(), 211 green_no_high_no_wrap_string) 212 213 # Green with highlight and wrap. 214 mock_has_colors.return_value = True 215 capture_output = StringIO() 216 sys.stdout = capture_output 217 atest_utils.colorful_print(testing_str, green_no, highlight=True, 218 auto_wrap=True) 219 sys.stdout = sys.__stdout__ 220 green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str 221 self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string) 222 223 # Green with wrap, but no highlight. 224 mock_has_colors.return_value = True 225 capture_output = StringIO() 226 sys.stdout = capture_output 227 atest_utils.colorful_print(testing_str, green_no, highlight=False, 228 auto_wrap=True) 229 sys.stdout = sys.__stdout__ 230 green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str 231 self.assertEqual(capture_output.getvalue(), 232 green_wrap_no_highlight_string) 233 234 @mock.patch('socket.gethostname') 235 @mock.patch('subprocess.check_output') 236 def test_is_external_run(self, mock_output, mock_hostname): 237 """Test method is_external_run.""" 238 mock_output.return_value = '' 239 mock_hostname.return_value = '' 240 self.assertTrue(atest_utils.is_external_run()) 241 242 mock_output.return_value = 'test@other.com' 243 mock_hostname.return_value = 'abc.com' 244 self.assertTrue(atest_utils.is_external_run()) 245 246 mock_output.return_value = 'test@other.com' 247 mock_hostname.return_value = 'abc.google.com' 248 self.assertFalse(atest_utils.is_external_run()) 249 250 mock_output.return_value = 'test@other.com' 251 mock_hostname.return_value = 'abc.google.def.com' 252 self.assertTrue(atest_utils.is_external_run()) 253 254 mock_output.return_value = 'test@google.com' 255 self.assertFalse(atest_utils.is_external_run()) 256 257 mock_output.return_value = 'test@other.com' 258 mock_hostname.return_value = 'c.googlers.com' 259 self.assertFalse(atest_utils.is_external_run()) 260 261 mock_output.return_value = 'test@other.com' 262 mock_hostname.return_value = 'a.googlers.com' 263 self.assertTrue(atest_utils.is_external_run()) 264 265 mock_output.side_effect = OSError() 266 self.assertTrue(atest_utils.is_external_run()) 267 268 mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd') 269 self.assertTrue(atest_utils.is_external_run()) 270 271 @mock.patch('metrics.metrics_base.get_user_type') 272 def test_print_data_collection_notice(self, mock_get_user_type): 273 """Test method print_data_collection_notice.""" 274 275 # get_user_type return 1(external). 276 mock_get_user_type.return_value = 1 277 notice_str = ('\n==================\nNotice:\n' 278 ' We collect anonymous usage statistics' 279 ' in accordance with our' 280 ' Content Licenses (https://source.android.com/setup/start/licenses),' 281 ' Contributor License Agreement (https://opensource.google.com/docs/cla/),' 282 ' Privacy Policy (https://policies.google.com/privacy) and' 283 ' Terms of Service (https://policies.google.com/terms).' 284 '\n==================\n\n') 285 capture_output = StringIO() 286 sys.stdout = capture_output 287 atest_utils.print_data_collection_notice() 288 sys.stdout = sys.__stdout__ 289 uncolored_string = notice_str 290 self.assertEqual(capture_output.getvalue(), uncolored_string) 291 292 # get_user_type return 0(internal). 293 mock_get_user_type.return_value = 0 294 notice_str = ('\n==================\nNotice:\n' 295 ' We collect usage statistics' 296 ' in accordance with our' 297 ' Content Licenses (https://source.android.com/setup/start/licenses),' 298 ' Contributor License Agreement (https://cla.developers.google.com/),' 299 ' Privacy Policy (https://policies.google.com/privacy) and' 300 ' Terms of Service (https://policies.google.com/terms).' 301 '\n==================\n\n') 302 capture_output = StringIO() 303 sys.stdout = capture_output 304 atest_utils.print_data_collection_notice() 305 sys.stdout = sys.__stdout__ 306 uncolored_string = notice_str 307 self.assertEqual(capture_output.getvalue(), uncolored_string) 308 309 @mock.patch('builtins.input') 310 @mock.patch('json.load') 311 def test_update_test_runner_cmd(self, mock_json_load_data, mock_input): 312 """Test method handle_test_runner_cmd without enable do_verification.""" 313 former_cmd_str = 'Former cmds =' 314 write_result_str = 'Save result mapping to test_result' 315 tmp_file = tempfile.NamedTemporaryFile() 316 input_cmd = 'atest_args' 317 runner_cmds = ['cmd1', 'cmd2'] 318 capture_output = StringIO() 319 sys.stdout = capture_output 320 # Previous data is empty. Should not enter strtobool. 321 # If entered, exception will be raised cause test fail. 322 mock_json_load_data.return_value = {} 323 atest_utils.handle_test_runner_cmd(input_cmd, 324 runner_cmds, 325 do_verification=False, 326 result_path=tmp_file.name) 327 sys.stdout = sys.__stdout__ 328 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 329 # Previous data is the same as the new input. Should not enter strtobool. 330 # If entered, exception will be raised cause test fail 331 capture_output = StringIO() 332 sys.stdout = capture_output 333 mock_json_load_data.return_value = {input_cmd:runner_cmds} 334 atest_utils.handle_test_runner_cmd(input_cmd, 335 runner_cmds, 336 do_verification=False, 337 result_path=tmp_file.name) 338 sys.stdout = sys.__stdout__ 339 self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1) 340 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 341 # Previous data has different cmds. Should enter strtobool not update, 342 # should not find write_result_str. 343 prev_cmds = ['cmd1'] 344 mock_input.return_value = 'n' 345 capture_output = StringIO() 346 sys.stdout = capture_output 347 mock_json_load_data.return_value = {input_cmd:prev_cmds} 348 atest_utils.handle_test_runner_cmd(input_cmd, 349 runner_cmds, 350 do_verification=False, 351 result_path=tmp_file.name) 352 sys.stdout = sys.__stdout__ 353 self.assertEqual(capture_output.getvalue().find(write_result_str), -1) 354 355 @mock.patch('json.load') 356 def test_verify_test_runner_cmd(self, mock_json_load_data): 357 """Test method handle_test_runner_cmd without enable update_result.""" 358 tmp_file = tempfile.NamedTemporaryFile() 359 input_cmd = 'atest_args' 360 runner_cmds = ['cmd1', 'cmd2'] 361 # Previous data is the same as the new input. Should not raise exception. 362 mock_json_load_data.return_value = {input_cmd:runner_cmds} 363 atest_utils.handle_test_runner_cmd(input_cmd, 364 runner_cmds, 365 do_verification=True, 366 result_path=tmp_file.name) 367 # Previous data has different cmds. Should enter strtobool and hit 368 # exception. 369 prev_cmds = ['cmd1'] 370 mock_json_load_data.return_value = {input_cmd:prev_cmds} 371 self.assertRaises(atest_error.DryRunVerificationError, 372 atest_utils.handle_test_runner_cmd, 373 input_cmd, 374 runner_cmds, 375 do_verification=True, 376 result_path=tmp_file.name) 377 378 def test_get_test_info_cache_path(self): 379 """Test method get_test_info_cache_path.""" 380 input_file_name = 'mytest_name' 381 cache_root = '/a/b/c' 382 expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name). 383 encode()).hexdigest()) 384 self.assertEqual(os.path.join(cache_root, expect_hashed_name), 385 atest_utils.get_test_info_cache_path(input_file_name, 386 cache_root)) 387 388 def test_get_and_load_cache(self): 389 """Test method update_test_info_cache and load_test_info_cache.""" 390 test_reference = 'myTestRefA' 391 test_cache_dir = tempfile.mkdtemp() 392 atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A], 393 test_cache_dir) 394 unittest_utils.assert_equal_testinfo_sets( 395 self, set([TEST_INFO_A]), 396 atest_utils.load_test_info_cache(test_reference, test_cache_dir)) 397 398 @mock.patch('os.getcwd') 399 def test_get_build_cmd(self, mock_cwd): 400 """Test method get_build_cmd.""" 401 build_top = '/home/a/b/c' 402 rel_path = 'd/e' 403 mock_cwd.return_value = os.path.join(build_top, rel_path) 404 os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top} 405 with mock.patch.dict('os.environ', os_environ_mock, clear=True): 406 expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode'] 407 self.assertEqual(expected_cmd, atest_utils.get_build_cmd()) 408 409 @mock.patch('subprocess.check_output') 410 def test_get_modified_files(self, mock_co): 411 """Test method get_modified_files""" 412 mock_co.side_effect = [ 413 x.encode('utf-8') for x in ['/a/b/', 414 '\n', 415 'test_fp1.java\nc/test_fp2.java']] 416 self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'}, 417 atest_utils.get_modified_files('')) 418 mock_co.side_effect = [ 419 x.encode('utf-8') for x in ['/a/b/', 420 'test_fp4', 421 '/test_fp3.java']] 422 self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'}, 423 atest_utils.get_modified_files('')) 424 425 def test_delimiter(self): 426 """Test method delimiter""" 427 self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2)) 428 429 def test_has_python_module(self): 430 """Test method has_python_module""" 431 self.assertFalse(atest_utils.has_python_module('M_M')) 432 self.assertTrue(atest_utils.has_python_module('os')) 433 434 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 435 def test_read_zip_single_text(self, _matched): 436 """Test method extract_zip_text include only one text file.""" 437 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 438 TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME) 439 expect_content = '\nfile1_line1\nfile1_line2\n' 440 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 441 442 @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True) 443 def test_read_zip_multi_text(self, _matched): 444 """Test method extract_zip_text include multiple text files.""" 445 zip_path = os.path.join(unittest_constants.TEST_DATA_DIR, 446 TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME) 447 expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n' 448 'file2_line2\n') 449 self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path)) 450 451 def test_matched_tf_error_log(self): 452 """Test method extract_zip_text include multiple text files.""" 453 matched_content = '05-25 17:37:04 E/XXXXX YYYYY' 454 not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY' 455 # Test matched content 456 self.assertEqual(True, 457 atest_utils.matched_tf_error_log(matched_content)) 458 # Test not matched content 459 self.assertEqual(False, 460 atest_utils.matched_tf_error_log(not_matched_content)) 461 462 @mock.patch('os.chmod') 463 @mock.patch('shutil.copy2') 464 @mock.patch('atest_utils.has_valid_cert') 465 @mock.patch('subprocess.check_output') 466 @mock.patch('os.path.exists') 467 def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert, 468 _cpc, _cm): 469 """Test method get_flakes.""" 470 # Test par file does not exist. 471 mock_path_exists.return_value = False 472 self.assertEqual(None, atest_utils.get_flakes()) 473 # Test par file exists. 474 mock_path_exists.return_value = True 475 mock_output.return_value = (b'flake_percent:0.10001\n' 476 b'postsubmit_flakes_per_week:12.0') 477 mock_valid_cert.return_value = True 478 expected_flake_info = {'flake_percent':'0.10001', 479 'postsubmit_flakes_per_week':'12.0'} 480 self.assertEqual(expected_flake_info, 481 atest_utils.get_flakes()) 482 # Test no valid cert 483 mock_valid_cert.return_value = False 484 self.assertEqual(None, 485 atest_utils.get_flakes()) 486 487 @mock.patch('subprocess.check_call') 488 def test_has_valid_cert(self, mock_call): 489 """Test method has_valid_cert.""" 490 # raise subprocess.CalledProcessError 491 mock_call.raiseError.side_effect = subprocess.CalledProcessError 492 self.assertFalse(atest_utils.has_valid_cert()) 493 with mock.patch("constants.CERT_STATUS_CMD", ''): 494 self.assertFalse(atest_utils.has_valid_cert()) 495 with mock.patch("constants.CERT_STATUS_CMD", 'CMD'): 496 # has valid cert 497 mock_call.return_value = 0 498 self.assertTrue(atest_utils.has_valid_cert()) 499 # no valid cert 500 mock_call.return_value = 4 501 self.assertFalse(atest_utils.has_valid_cert()) 502 503 # pylint: disable=no-member 504 def test_read_test_record_proto(self): 505 """Test method read_test_record.""" 506 test_record_file_path = os.path.join(unittest_constants.TEST_DATA_DIR, 507 "test_record.proto.testonly") 508 test_record = atest_utils.read_test_record(test_record_file_path) 509 self.assertEqual(test_record.children[0].inline_test_record.test_record_id, 510 'x86 hello_world_test') 511 512 def test_is_valid_json_file_file_not_exist(self): 513 """Test method is_valid_json_file if file not exist.""" 514 json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR, 515 "not_exist.json") 516 self.assertFalse(atest_utils.is_valid_json_file(json_file_path)) 517 518 def test_is_valid_json_file_content_valid(self): 519 """Test method is_valid_json_file if file exist and content is valid.""" 520 json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR, 521 "module-info.json") 522 self.assertTrue(atest_utils.is_valid_json_file(json_file_path)) 523 524 def test_is_valid_json_file_content_not_valid(self): 525 """Test method is_valid_json_file if file exist but content is valid.""" 526 json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR, 527 "not-valid-module-info.json") 528 self.assertFalse(atest_utils.is_valid_json_file(json_file_path)) 529 530 @mock.patch('subprocess.Popen') 531 @mock.patch('os.getenv') 532 def test_get_manifest_branch(self, mock_env, mock_popen): 533 """Test method get_manifest_branch""" 534 mock_env.return_value = 'any_path' 535 process = mock_popen.return_value 536 process.communicate.return_value = (REPO_INFO_OUTPUT, '') 537 self.assertEqual('test_branch', atest_utils.get_manifest_branch()) 538 539 mock_env.return_value = 'any_path' 540 process.communicate.return_value = ('not_matched_branch_pattern.', '') 541 self.assertEqual(None, atest_utils.get_manifest_branch()) 542 543 mock_env.return_value = 'any_path' 544 process.communicate.side_effect = subprocess.TimeoutExpired( 545 1, 546 'repo info') 547 self.assertEqual(None, atest_utils.get_manifest_branch()) 548 549 mock_env.return_value = None 550 process.communicate.return_value = (REPO_INFO_OUTPUT, '') 551 self.assertEqual(None, atest_utils.get_manifest_branch()) 552 553 def test_has_wildcard(self): 554 """Test method of has_wildcard""" 555 self.assertFalse(atest_utils.has_wildcard('test1')) 556 self.assertFalse(atest_utils.has_wildcard(['test1'])) 557 self.assertTrue(atest_utils.has_wildcard('test1?')) 558 self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*'])) 559 560 # pylint: disable=anomalous-backslash-in-string 561 def test_quote(self): 562 """Test method of quote()""" 563 target_str = r'TEST_(F|P)[0-9].*\w$' 564 expected_str = '\'TEST_(F|P)[0-9].*\w$\'' 565 self.assertEqual(atest_utils.quote(target_str), expected_str) 566 self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224') 567 568 @mock.patch('builtins.input', return_value='') 569 def test_prompt_with_yn_result(self, mock_input): 570 """Test method of prompt_with_yn_result""" 571 msg = 'Do you want to continue?' 572 mock_input.return_value = '' 573 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 574 self.assertFalse(atest_utils.prompt_with_yn_result(msg, False)) 575 mock_input.return_value = 'y' 576 self.assertTrue(atest_utils.prompt_with_yn_result(msg, True)) 577 mock_input.return_value = 'nO' 578 self.assertFalse(atest_utils.prompt_with_yn_result(msg, True)) 579 580 def test_get_android_junit_config_filters(self): 581 """Test method of get_android_junit_config_filters""" 582 no_filter_test_config = os.path.join( 583 unittest_constants.TEST_DATA_DIR, 584 "filter_configs", "no_filter.cfg") 585 self.assertEqual({}, 586 atest_utils.get_android_junit_config_filters( 587 no_filter_test_config)) 588 589 filtered_test_config = os.path.join( 590 unittest_constants.TEST_DATA_DIR, 591 'filter_configs', 'filter.cfg') 592 filter_dict = atest_utils.get_android_junit_config_filters( 593 filtered_test_config) 594 include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION) 595 include_annotations.sort() 596 self.assertEqual( 597 ['include1', 'include2'], 598 include_annotations) 599 exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION) 600 exclude_annotation.sort() 601 self.assertEqual( 602 ['exclude1', 'exclude2'], 603 exclude_annotation) 604 605 def test_md5sum(self): 606 """Test method of md5sum""" 607 exist_string = os.path.join(unittest_constants.TEST_DATA_DIR, 608 unittest_constants.JSON_FILE) 609 inexist_string = os.path.join(unittest_constants.TEST_DATA_DIR, 610 unittest_constants.CLASS_NAME) 611 self.assertEqual( 612 atest_utils.md5sum(exist_string), 'f02c1a648f16e5e9d7035bb11486ac2b') 613 self.assertEqual( 614 atest_utils.md5sum(inexist_string), '') 615 616 def test_check_md5(self): 617 """Test method of check_md5""" 618 file1 = os.path.join(unittest_constants.TEST_DATA_DIR, 619 unittest_constants.JSON_FILE) 620 checksum_file = '/tmp/_tmp_module-info.json' 621 atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json') 622 self.assertTrue(atest_utils.check_md5(checksum_file)) 623 os.remove(checksum_file) 624 self.assertFalse(atest_utils.check_md5(checksum_file)) 625 self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True)) 626 627 def test_get_config_parameter(self): 628 """Test method of get_config_parameter""" 629 parameter_config = os.path.join( 630 unittest_constants.TEST_DATA_DIR, 631 "parameter_config", "parameter.cfg") 632 no_parameter_config = os.path.join( 633 unittest_constants.TEST_DATA_DIR, 634 "parameter_config", "no_parameter.cfg") 635 636 # Test parameter empty value 637 self.assertEqual(set(), 638 atest_utils.get_config_parameter( 639 no_parameter_config)) 640 641 # Test parameter empty value 642 self.assertEqual({'value_1', 'value_2', 'value_3', 'value_4'}, 643 atest_utils.get_config_parameter( 644 parameter_config)) 645 646 def test_get_config_device(self): 647 """Test method of get_config_device""" 648 device_config = os.path.join( 649 unittest_constants.TEST_DATA_DIR, 650 "parameter_config", "multiple_device.cfg") 651 self.assertEqual({'device_1', 'device_2'}, 652 atest_utils.get_config_device(device_config)) 653 654 def test_get_mainline_param(self): 655 """Test method of get_mainline_param""" 656 mainline_param_config = os.path.join( 657 unittest_constants.TEST_DATA_DIR, 658 "parameter_config", "mainline_param.cfg") 659 self.assertEqual({'foo1.apex', 'foo2.apk+foo3.apk'}, 660 atest_utils.get_mainline_param( 661 mainline_param_config)) 662 no_mainline_param_config = os.path.join( 663 unittest_constants.TEST_DATA_DIR, 664 "parameter_config", "parameter.cfg") 665 self.assertEqual(set(), 666 atest_utils.get_mainline_param( 667 no_mainline_param_config)) 668 669 def test_get_full_annotation_class_name(self): 670 """Test method of get_full_annotation_class_name.""" 671 app_mode_full = 'android.platform.test.annotations.AppModeFull' 672 presubmit = 'android.platform.test.annotations.Presubmit' 673 module_info = {'srcs': [os.path.join(unittest_constants.TEST_DATA_DIR, 674 'annotation_testing', 675 'Annotation.src')]} 676 # get annotation class from keyword 677 self.assertEqual( 678 atest_utils.get_full_annotation_class_name(module_info, 'presubmit'), 679 presubmit) 680 # get annotation class from an accurate fqcn keyword. 681 self.assertEqual( 682 atest_utils.get_full_annotation_class_name(module_info, presubmit), 683 presubmit) 684 # accept fqcn keyword in lowercase. 685 self.assertEqual( 686 atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.presubmit'), 687 presubmit) 688 # unable to get annotation class from keyword. 689 self.assertNotEqual( 690 atest_utils.get_full_annotation_class_name(module_info, 'appleModefull'), app_mode_full) 691 # do not support partial-correct keyword. 692 self.assertNotEqual( 693 atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.pres'), presubmit) 694 695 def test_has_mixed_type_filters_one_module_with_one_type_return_false(self): 696 """Test method of has_mixed_type_filters""" 697 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 698 test_data_1 = {constants.TI_FILTER: [filter_1]} 699 test_info_1 = test_info.TestInfo('MODULE', 'RUNNER', 700 set(), test_data_1, 701 'SUITE', '', 702 set()) 703 self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1])) 704 705 def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self): 706 """Test method of has_mixed_type_filters""" 707 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 708 filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 709 test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]} 710 test_info_2 = test_info.TestInfo('MODULE', 'RUNNER', 711 set(), test_data_2, 712 'SUITE', '', 713 set()) 714 self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2])) 715 716 def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(self): 717 """Test method of has_mixed_type_filters""" 718 filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD'])) 719 test_data_1 = {constants.TI_FILTER: [filter_1]} 720 test_info_1 = test_info.TestInfo('MODULE', 'RUNNER', 721 set(), test_data_1, 722 'SUITE', '', 723 set()) 724 filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*'])) 725 test_data_3 = {constants.TI_FILTER: [filter_3]} 726 test_info_3 = test_info.TestInfo('MODULE3', 'RUNNER', 727 set(), test_data_3, 728 'SUITE', '', 729 set()) 730 self.assertFalse(atest_utils.has_mixed_type_filters( 731 [test_info_1, test_info_3])) 732 733 def test_get_filter_types(self): 734 """Test method of get_filter_types.""" 735 filters = set(['CLASS#METHOD']) 736 expect_types = set([FilterType.REGULAR_FILTER.value]) 737 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 738 739 filters = set(['CLASS#METHOD*']) 740 expect_types = set([FilterType.WILDCARD_FILTER.value]) 741 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 742 743 filters = set(['CLASS#METHOD', 'CLASS#METHOD*']) 744 expect_types = set([FilterType.WILDCARD_FILTER.value, 745 FilterType.REGULAR_FILTER.value]) 746 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 747 748 filters = set(['CLASS#METHOD?', 'CLASS#METHOD*']) 749 expect_types = set([FilterType.WILDCARD_FILTER.value]) 750 self.assertEqual(atest_utils.get_filter_types(filters), expect_types) 751 752if __name__ == "__main__": 753 unittest.main() 754