• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=invalid-name
20
21import hashlib
22from io import StringIO
23import os
24from pathlib import Path
25import subprocess
26import sys
27import tempfile
28import unittest
29from unittest import mock
30
31from atest import arg_parser
32from atest import atest_error
33from atest import atest_utils
34from atest import constants
35from atest import unittest_constants
36from atest import unittest_utils
37from atest.atest_enum import FilterType
38from atest.test_finders import test_info
39from pyfakefs import fake_filesystem_unittest
40
41TEST_MODULE_NAME_A = 'ModuleNameA'
42TEST_RUNNER_A = 'FakeTestRunnerA'
43TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
44TEST_DATA_A = {'test_data_a_1': 'a1', 'test_data_a_2': 'a2'}
45TEST_SUITE_A = 'FakeSuiteA'
46TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
47TEST_INSTALL_LOC_A = set(['host', 'device'])
48TEST_FINDER_A = 'MODULE'
49TEST_INFO_A = test_info.TestInfo(
50    TEST_MODULE_NAME_A,
51    TEST_RUNNER_A,
52    TEST_BUILD_TARGET_A,
53    TEST_DATA_A,
54    TEST_SUITE_A,
55    TEST_MODULE_CLASS_A,
56    TEST_INSTALL_LOC_A,
57)
58TEST_INFO_A.test_finder = TEST_FINDER_A
59TEST_ZIP_DATA_DIR = 'zip_files'
60TEST_SINGLE_ZIP_NAME = 'single_file.zip'
61TEST_MULTI_ZIP_NAME = 'multi_file.zip'
62
63REPO_INFO_OUTPUT = """Manifest branch: test_branch
64Manifest merge branch: refs/heads/test_branch
65Manifest groups: all,-notdefault
66----------------------------
67"""
68
69
70class ConcatenatePathTest(unittest.TestCase):
71  """Class that tests path concatenation."""
72
73  @classmethod
74  def setUpClass(cls):
75    """Mock the environment variables for the entire test class"""
76    cls.build_top = '/src/build_top'
77    cls.prod_out = '/src/build_top/product_out'
78    cls.host_out = '/src/build_top/host_out'
79    cls.target_out_cases = '/src/build_top/product_out/testcases'
80    cls.host_out_cases = '/src/build_top/host_out/testcases'
81    cls.target_product = 'test_target_product'
82    cls.build_variant = 'test_build_variant'
83    cls.mock_getenv = mock.patch.dict(
84        os.environ,
85        {
86            'ANDROID_BUILD_TOP': cls.build_top,
87            'ANDROID_PRODUCT_OUT': cls.prod_out,
88            'ANDROID_TARGET_OUT_TESTCASES': cls.target_out_cases,
89            'ANDROID_HOST_OUT': cls.host_out,
90            'ANDROID_HOST_OUT_TESTCASES': cls.host_out_cases,
91            'TARGET_PRODUCT': cls.target_product,
92            'TARGET_BUILD_VARIANT': cls.build_variant,
93        },
94    )
95    cls.mock_getenv.start()
96
97  @classmethod
98  def tearDownClass(cls):
99    """Clean up the mocks after the test class finishes"""
100    cls.mock_getenv.stop()
101
102  def test_get_vars(self):
103    """Test the values of AndroidVariables are expected"""
104    variables = atest_utils.AndroidVariables()
105
106    self.assertEqual(variables.build_top, self.build_top)
107    self.assertEqual(variables.product_out, self.prod_out)
108    self.assertEqual(variables.target_out_cases, self.target_out_cases)
109    self.assertEqual(variables.host_out, self.host_out)
110    self.assertEqual(variables.host_out_cases, self.host_out_cases)
111    self.assertEqual(variables.target_product, self.target_product)
112    self.assertEqual(variables.build_variant, self.build_variant)
113
114  def test_atest_utils_get_build_top(self):
115    """Test concatenating strings with get_build_top()."""
116    expected_path = Path(self.build_top, 'path/to/project')
117
118    return_path = atest_utils.get_build_top('path/to/project')
119
120    self.assertEqual(expected_path, return_path)
121
122  def test_atest_utils_get_product_out(self):
123    """Test concatenating strings with get_product_out()."""
124    expected_path = Path(self.prod_out, 'module-info.json')
125
126    return_path = atest_utils.get_product_out('module-info.json')
127
128    self.assertEqual(expected_path, return_path)
129
130  def test_atest_utils_get_host_out(self):
131    """Test concatenating strings with get_host_out()."""
132    expected_path = Path(self.host_out, 'bin/adb')
133
134    return_path = atest_utils.get_host_out('bin', 'adb')
135
136    self.assertEqual(expected_path, return_path)
137
138
139class GetBuildOutDirTests(unittest.TestCase):
140  """Test get_build_out_dir() for various conditions."""
141
142  def setUp(self) -> None:
143    self.abs_OUT_DIR = '/somewhere/out'
144    self.rel_OUT_DIR = 'somewhere/out'
145    self.abs_OUT_DIR_COMMON_BASE = '/somewhere/common_out'
146    self.rel_OUT_DIR_COMMON_BASE = 'somewhere/common_out'
147
148  def test_get_build_abs_out_dir(self):
149    """Test when OUT_DIR is an absolute path."""
150    with mock.patch.dict(
151        'os.environ',
152        {
153            constants.ANDROID_BUILD_TOP: '/src/build/top',
154            'OUT_DIR': self.abs_OUT_DIR,
155        },
156    ):
157      expected_out_dir = Path(self.abs_OUT_DIR)
158
159      returned_out_dir = atest_utils.get_build_out_dir()
160
161      self.assertEqual(expected_out_dir, returned_out_dir)
162
163  def test_get_build_rel_out_dir(self):
164    """Test when OUT_DIR is a relative path."""
165    with mock.patch.dict(
166        'os.environ',
167        {
168            constants.ANDROID_BUILD_TOP: '/src/build/top',
169            'OUT_DIR': self.rel_OUT_DIR,
170        },
171    ):
172      expected_out_dir = atest_utils.get_build_top(self.rel_OUT_DIR)
173
174      returned_out_dir = atest_utils.get_build_out_dir()
175
176      self.assertEqual(expected_out_dir, returned_out_dir)
177
178  def test_get_build_abs_out_dir_common_base(self):
179    """Test whe OUT_DIR_COMMON_BASE is an absolute path."""
180    build_top_path = '/src/build/top'
181    branch_name = Path(build_top_path).name
182    with mock.patch.dict(
183        'os.environ',
184        {
185            constants.ANDROID_BUILD_TOP: build_top_path,
186            'OUT_DIR_COMMON_BASE': self.abs_OUT_DIR_COMMON_BASE,
187        },
188    ):
189      expected_out_dir = Path(self.abs_OUT_DIR_COMMON_BASE, branch_name)
190
191      returned_out_dir = atest_utils.get_build_out_dir()
192
193      self.assertEqual(expected_out_dir, returned_out_dir)
194
195  def test_get_build_rel_out_dir_common_base(self):
196    """Test whe OUT_DIR_COMMON_BASE is a relative path."""
197    build_top_path = '/src/build/top'
198    branch_name = Path(build_top_path).name
199    with mock.patch.dict(
200        'os.environ',
201        {
202            constants.ANDROID_BUILD_TOP: build_top_path,
203            'OUT_DIR_COMMON_BASE': self.rel_OUT_DIR_COMMON_BASE,
204        },
205    ):
206      expected_out_dir = Path(
207          build_top_path, self.rel_OUT_DIR_COMMON_BASE, branch_name
208      )
209
210      returned_out_dir = atest_utils.get_build_out_dir()
211
212      self.assertEqual(expected_out_dir, returned_out_dir)
213
214  def test_get_build_out_dir(self):
215    """Test when OUT_DIR and OUT_DIR_COMMON_BASE are null."""
216    with mock.patch.dict(
217        'os.environ', {constants.ANDROID_BUILD_TOP: '/src/build/top'}
218    ):
219      expected_out_dir = atest_utils.get_build_top('out')
220
221      returned_out_dir = atest_utils.get_build_out_dir()
222
223      self.assertEqual(expected_out_dir, returned_out_dir)
224
225
226# pylint: disable=protected-access
227# pylint: disable=too-many-public-methods
228class AtestUtilsUnittests(unittest.TestCase):
229  """Unit tests for atest_utils.py"""
230
231  def test_capture_fail_section_has_fail_section(self):
232    """Test capture_fail_section when has fail section."""
233    test_list = [
234        'AAAAAA',
235        'FAILED: Error1',
236        '^\n',
237        'Error2\n',
238        '[  6% 191/2997] BBBBBB\n',
239        'CCCCC',
240        '[  20% 322/2997] DDDDDD\n',
241        'EEEEE',
242    ]
243    want_list = ['FAILED: Error1', '^\n', 'Error2\n']
244    self.assertEqual(want_list, atest_utils._capture_fail_section(test_list))
245
246  def test_capture_fail_section_no_fail_section(self):
247    """Test capture_fail_section when no fail section."""
248    test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
249    want_list = []
250    self.assertEqual(want_list, atest_utils._capture_fail_section(test_list))
251
252  def test_is_test_mapping_none_test_mapping_args(self):
253    """Test method is_test_mapping."""
254    non_tm_args = ['--host-unit-test-only']
255
256    for argument in non_tm_args:
257      args = arg_parser.create_atest_arg_parser().parse_args([argument])
258      self.assertFalse(
259          atest_utils.is_test_mapping(args),
260          'Option %s indicates NOT a test_mapping!' % argument,
261      )
262
263  def test_is_test_mapping_test_mapping_args(self):
264    """Test method is_test_mapping."""
265    tm_args = ['--test-mapping', '--include-subdirs']
266
267    for argument in tm_args:
268      args = arg_parser.create_atest_arg_parser().parse_args([argument])
269      self.assertTrue(
270          atest_utils.is_test_mapping(args),
271          'Option %s indicates a test_mapping!' % argument,
272      )
273
274  def test_is_test_mapping_implicit_test_mapping(self):
275    """Test method is_test_mapping."""
276    args = arg_parser.create_atest_arg_parser().parse_args(
277        ['--test', '--build', ':postsubmit']
278    )
279    self.assertTrue(
280        atest_utils.is_test_mapping(args),
281        'Option %s indicates a test_mapping!' % args,
282    )
283
284  def test_is_test_mapping_with_testname(self):
285    """Test method is_test_mapping."""
286    irrelevant_args = ['--test', ':postsubmit', 'testname']
287
288    args = arg_parser.create_atest_arg_parser().parse_args(irrelevant_args)
289    self.assertFalse(
290        atest_utils.is_test_mapping(args),
291        'Option %s indicates a test_mapping!' % args,
292    )
293
294  def test_is_test_mapping_false(self):
295    """Test method is_test_mapping."""
296    args = arg_parser.create_atest_arg_parser().parse_args(
297        ['--test', '--build', 'hello_atest']
298    )
299
300    self.assertFalse(atest_utils.is_test_mapping(args))
301
302  def test_has_colors(self):
303    """Test method _has_colors."""
304    # stream is file I/O
305    stream = open('/tmp/test_has_colors.txt', 'wb')
306    self.assertFalse(atest_utils._has_colors(stream))
307    stream.close()
308
309    # stream is not a tty(terminal).
310    stream = mock.Mock()
311    stream.isatty.return_value = False
312    self.assertFalse(atest_utils._has_colors(stream))
313
314    # stream is a tty(terminal).
315    stream = mock.Mock()
316    stream.isatty.return_value = True
317    self.assertTrue(atest_utils._has_colors(stream))
318
319  @mock.patch('atest.atest_utils._has_colors')
320  def test_colorize(self, mock_has_colors):
321    """Test method colorize."""
322    original_str = 'test string'
323    green_no = 2
324
325    # _has_colors() return False.
326    mock_has_colors.return_value = False
327    converted_str = atest_utils.colorize(
328        original_str, green_no, bp_color=constants.RED
329    )
330    self.assertEqual(original_str, converted_str)
331
332    # Green text with red background.
333    mock_has_colors.return_value = True
334    converted_str = atest_utils.colorize(
335        original_str, green_no, bp_color=constants.RED
336    )
337    green_highlight_string = '\x1b[1;32;41m%s\x1b[0m' % original_str
338    self.assertEqual(green_highlight_string, converted_str)
339
340    # Green text, no background.
341    mock_has_colors.return_value = True
342    converted_str = atest_utils.colorize(original_str, green_no)
343    green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
344    self.assertEqual(green_no_highlight_string, converted_str)
345
346  @mock.patch('atest.atest_utils.colorful_print')
347  @mock.patch('logging.error')
348  def test_print_and_log_error_no_format_prints_and_logs(
349      self, mocked_print, locked_error_logging
350  ):
351    atest_utils.print_and_log_error('no format')
352
353    mocked_print.assert_called_once()
354    locked_error_logging.assert_called_once()
355
356  @mock.patch('atest.atest_utils.colorful_print')
357  def test_print_and_log_error_single_non_string_prints(self, mocked_print):
358    atest_utils.print_and_log_error(123)
359
360    mocked_print.assert_called_once()
361
362  @mock.patch('atest.atest_utils.colorful_print')
363  def test_print_and_log_error_with_format_prints(self, mocked_print):
364    atest_utils.print_and_log_error('1+1=%s', 2)
365
366    mocked_print.assert_called_once()
367
368  @mock.patch('atest.atest_utils.colorful_print')
369  def test_print_and_log_error_bad_value_no_throw_no_print(self, mocked_print):
370    atest_utils.print_and_log_error('bad format %', 'format arg')
371
372    mocked_print.assert_not_called()
373
374  @mock.patch('atest.atest_utils.colorful_print')
375  def test_print_and_log_error_missing_format_arg_no_print(self, mocked_print):
376    atest_utils.print_and_log_error('bad format %s %s', 'format arg')
377
378    mocked_print.assert_not_called()
379
380  @mock.patch('atest.atest_utils.colorful_print')
381  def test_print_and_log_error_extra_format_arg_no_print(self, mocked_print):
382    atest_utils.print_and_log_error(
383        'bad format %s', 'format arg1', 'format arg2'
384    )
385
386    mocked_print.assert_not_called()
387
388  @mock.patch('atest.atest_utils._has_colors')
389  def test_colorful_print(self, mock_has_colors):
390    """Test method colorful_print."""
391    testing_str = 'color_print_test'
392    green_no = 2
393
394    # _has_colors() return False.
395    mock_has_colors.return_value = False
396    capture_output = StringIO()
397    sys.stdout = capture_output
398    atest_utils.colorful_print(
399        testing_str, green_no, bp_color=constants.RED, auto_wrap=False
400    )
401    sys.stdout = sys.__stdout__
402    uncolored_string = testing_str
403    self.assertEqual(capture_output.getvalue(), uncolored_string)
404
405    # Green text with red background, but no wrap.
406    mock_has_colors.return_value = True
407    capture_output = StringIO()
408    sys.stdout = capture_output
409    atest_utils.colorful_print(
410        testing_str, green_no, bp_color=constants.RED, auto_wrap=False
411    )
412    sys.stdout = sys.__stdout__
413    green_highlight_no_wrap_string = '\x1b[1;32;41m%s\x1b[0m' % testing_str
414    self.assertEqual(capture_output.getvalue(), green_highlight_no_wrap_string)
415
416    # Green text, no background, no wrap.
417    mock_has_colors.return_value = True
418    capture_output = StringIO()
419    sys.stdout = capture_output
420    atest_utils.colorful_print(testing_str, green_no, auto_wrap=False)
421    sys.stdout = sys.__stdout__
422    green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
423    self.assertEqual(capture_output.getvalue(), green_no_high_no_wrap_string)
424
425    # Green text with red background and wrap.
426    mock_has_colors.return_value = True
427    capture_output = StringIO()
428    sys.stdout = capture_output
429    atest_utils.colorful_print(
430        testing_str, green_no, bp_color=constants.RED, auto_wrap=True
431    )
432    sys.stdout = sys.__stdout__
433    green_highlight_wrap_string = '\x1b[1;32;41m%s\x1b[0m\n' % testing_str
434    self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
435
436    # Green text with wrap, but no background.
437    mock_has_colors.return_value = True
438    capture_output = StringIO()
439    sys.stdout = capture_output
440    atest_utils.colorful_print(testing_str, green_no, auto_wrap=True)
441    sys.stdout = sys.__stdout__
442    green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
443    self.assertEqual(capture_output.getvalue(), green_wrap_no_highlight_string)
444
445  def test_is_supported_mainline_module(self):
446    """Test the installed artifacts are supported."""
447    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apk'))
448    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apks'))
449    self.assertTrue(atest_utils.is_supported_mainline_module('out/foo.apex'))
450    self.assertFalse(atest_utils.is_supported_mainline_module('out/foo.capex'))
451
452  def test_get_test_and_mainline_modules(self):
453    """Test whether the given test reference is a mainline module test."""
454    # regular test.
455    self.assertIsNone(atest_utils.get_test_and_mainline_modules('test_name'))
456    # missing trailing bracket.
457    self.assertIsNone(
458        atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex')
459    )
460    # valid mainline module syntax
461    self.assertIsNotNone(
462        atest_utils.get_test_and_mainline_modules('test_name[foo.apk]')
463    )
464    self.assertIsNotNone(
465        atest_utils.get_test_and_mainline_modules('test_name[foo.apk+goo.apex]')
466    )
467
468  def test_get_test_info_cache_path(self):
469    """Test method get_test_info_cache_path."""
470    input_file_name = 'mytest_name'
471    cache_root = '/a/b/c'
472    expect_hashed_name = (
473        '%s.cache' % hashlib.md5(str(input_file_name).encode()).hexdigest()
474    )
475    self.assertEqual(
476        os.path.join(cache_root, expect_hashed_name),
477        atest_utils.get_test_info_cache_path(input_file_name, cache_root),
478    )
479
480  def test_get_and_load_cache(self):
481    """Test method update_test_info_cache and load_test_info_cache."""
482    test_reference = 'myTestRefA'
483    test_cache_dir = tempfile.mkdtemp()
484    atest_utils.update_test_info_cache(
485        test_reference, [TEST_INFO_A], test_cache_dir
486    )
487    unittest_utils.assert_equal_testinfo_sets(
488        self,
489        set([TEST_INFO_A]),
490        atest_utils.load_test_info_cache(test_reference, test_cache_dir),
491    )
492
493  @mock.patch('subprocess.check_output')
494  def test_get_modified_files(self, mock_co):
495    """Test method get_modified_files"""
496    mock_co.side_effect = [
497        x.encode('utf-8')
498        for x in ['/a/b/', '\n', 'test_fp1.java\nc/test_fp2.java']
499    ]
500    self.assertEqual(
501        {'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
502        atest_utils.get_modified_files(''),
503    )
504    mock_co.side_effect = [
505        x.encode('utf-8') for x in ['/a/b/', 'test_fp4', '/test_fp3.java']
506    ]
507    self.assertEqual(
508        {'/a/b/test_fp4', '/a/b/test_fp3.java'},
509        atest_utils.get_modified_files(''),
510    )
511
512  def test_delimiter(self):
513    """Test method delimiter"""
514    self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
515
516  def test_has_python_module(self):
517    """Test method has_python_module"""
518    self.assertFalse(atest_utils.has_python_module('M_M'))
519    self.assertTrue(atest_utils.has_python_module('os'))
520
521  @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
522  def test_read_zip_single_text(self, _matched):
523    """Test method extract_zip_text include only one text file."""
524    zip_path = os.path.join(
525        unittest_constants.TEST_DATA_DIR,
526        TEST_ZIP_DATA_DIR,
527        TEST_SINGLE_ZIP_NAME,
528    )
529    expect_content = '\nfile1_line1\nfile1_line2\n'
530    self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
531
532  @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
533  def test_read_zip_multi_text(self, _matched):
534    """Test method extract_zip_text include multiple text files."""
535    zip_path = os.path.join(
536        unittest_constants.TEST_DATA_DIR, TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME
537    )
538    expect_content = '\nfile1_line1\nfile1_line2\n\nfile2_line1\nfile2_line2\n'
539    self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
540
541  def test_matched_tf_error_log(self):
542    """Test method extract_zip_text include multiple text files."""
543    matched_content = '05-25 17:37:04 E/XXXXX YYYYY'
544    not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY'
545    # Test matched content
546    self.assertEqual(True, atest_utils.matched_tf_error_log(matched_content))
547    # Test not matched content
548    self.assertEqual(
549        False, atest_utils.matched_tf_error_log(not_matched_content)
550    )
551
552  # pylint: disable=no-member
553  def test_read_test_record_proto(self):
554    """Test method read_test_record."""
555    test_record_file_path = os.path.join(
556        unittest_constants.TEST_DATA_DIR, 'test_record.proto.testonly'
557    )
558    test_record = atest_utils.read_test_record(test_record_file_path)
559    self.assertEqual(
560        test_record.children[0].inline_test_record.test_record_id,
561        'x86 hello_world_test',
562    )
563
564  def test_load_json_safely_file_inexistent(self):
565    """Test method load_json_safely if file does not exist."""
566    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
567        'not_exist.json'
568    )
569    self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
570
571  def test_load_json_safely_valid_json_format(self):
572    """Test method load_json_safely if file exists and format is valid."""
573    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
574        'module-info.json'
575    )
576    content = atest_utils.load_json_safely(json_file_path)
577    self.assertEqual(
578        'MainModule1', content.get('MainModule1').get('module_name')
579    )
580    self.assertEqual(
581        [], content.get('MainModule2').get('test_mainline_modules')
582    )
583
584  def test_load_json_safely_invalid_json_format(self):
585    """Test method load_json_safely if file exist but content is invalid."""
586    json_file_path = Path(unittest_constants.TEST_DATA_DIR).joinpath(
587        'not-valid-module-info.json'
588    )
589    self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
590
591  @mock.patch('os.getenv')
592  def test_get_manifest_branch(self, mock_env):
593    """Test method get_manifest_branch"""
594    build_top = tempfile.TemporaryDirectory()
595    mock_env.return_value = build_top.name
596    repo_dir = Path(build_top.name).joinpath('.repo')
597    portal_xml = repo_dir.joinpath('manifest.xml')
598    manifest_dir = repo_dir.joinpath('manifests')
599    target_xml = manifest_dir.joinpath('Default.xml')
600    repo_dir.mkdir()
601    manifest_dir.mkdir()
602    content_portal = '<manifest><include name="Default.xml" /></manifest>'
603    content_manifest = """<manifest>
604            <remote name="aosp" fetch=".." review="https://android-review.googlesource.com/" />
605            <default revision="MONSTER-dev" remote="aosp" sync-j="4" />
606        </manifest>"""
607
608    # 1. The manifest.xml(portal) contains 'include' directive: 'Default.xml'.
609    # Search revision in .repo/manifests/Default.xml.
610    with open(portal_xml, 'w') as cache:
611      cache.write(content_portal)
612    with open(target_xml, 'w') as cache:
613      cache.write(content_manifest)
614    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
615    self.assertEqual('aosp-MONSTER-dev', atest_utils.get_manifest_branch(True))
616    os.remove(target_xml)
617    os.remove(portal_xml)
618
619    # 2. The manifest.xml contains neither 'include' nor 'revision' directive,
620    # keep searching revision in .repo/manifests/default.xml by default.
621    with open(portal_xml, 'w') as cache:
622      cache.write('<manifest></manifest>')
623    default_xml = manifest_dir.joinpath('default.xml')
624    with open(default_xml, 'w') as cache:
625      cache.write(content_manifest)
626    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
627    os.remove(default_xml)
628    os.remove(portal_xml)
629
630    # 3. revision was directly defined in 'manifest.xml'.
631    with open(portal_xml, 'w') as cache:
632      cache.write(content_manifest)
633    self.assertEqual('MONSTER-dev', atest_utils.get_manifest_branch())
634    os.remove(portal_xml)
635
636    # 4. Return None if the included xml does not exist.
637    with open(portal_xml, 'w') as cache:
638      cache.write(content_portal)
639    self.assertEqual('', atest_utils.get_manifest_branch())
640    os.remove(portal_xml)
641
642  def test_has_wildcard(self):
643    """Test method of has_wildcard"""
644    self.assertFalse(atest_utils.has_wildcard('test1'))
645    self.assertFalse(atest_utils.has_wildcard(['test1']))
646    self.assertTrue(atest_utils.has_wildcard('test1?'))
647    self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*']))
648
649  # pylint: disable=anomalous-backslash-in-string
650  def test_quote(self):
651    """Test method of quote()"""
652    target_str = r'TEST_(F|P)[0-9].*\w$'
653    expected_str = "'TEST_(F|P)[0-9].*\w$'"
654    self.assertEqual(atest_utils.quote(target_str), expected_str)
655    self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224')
656
657  @mock.patch('builtins.input', return_value='')
658  def test_prompt_with_yn_result(self, mock_input):
659    """Test method of prompt_with_yn_result"""
660    msg = 'Do you want to continue?'
661    mock_input.return_value = ''
662    self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
663    self.assertFalse(atest_utils.prompt_with_yn_result(msg, False))
664    mock_input.return_value = 'y'
665    self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
666    mock_input.return_value = 'nO'
667    self.assertFalse(atest_utils.prompt_with_yn_result(msg, True))
668
669  def test_get_android_junit_config_filters(self):
670    """Test method of get_android_junit_config_filters"""
671    no_filter_test_config = os.path.join(
672        unittest_constants.TEST_DATA_DIR, 'filter_configs', 'no_filter.cfg'
673    )
674    self.assertEqual(
675        {}, atest_utils.get_android_junit_config_filters(no_filter_test_config)
676    )
677
678    filtered_test_config = os.path.join(
679        unittest_constants.TEST_DATA_DIR, 'filter_configs', 'filter.cfg'
680    )
681    filter_dict = atest_utils.get_android_junit_config_filters(
682        filtered_test_config
683    )
684    include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION)
685    include_annotations.sort()
686    self.assertEqual(['include1', 'include2'], include_annotations)
687    exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION)
688    exclude_annotation.sort()
689    self.assertEqual(['exclude1', 'exclude2'], exclude_annotation)
690
691  def test_md5sum_file_existent(self):
692    """Test method of md5sum for an existent file."""
693    with tempfile.NamedTemporaryFile() as tmp_file:
694      with open(tmp_file.name, 'w', encoding='utf-8') as f:
695        f.write('some context')
696      expected_md5 = '6d583707b0149c07cc19a05f5fdc320c'
697
698      actual_md5 = atest_utils.md5sum(tmp_file.name)
699
700      self.assertEqual(actual_md5, expected_md5)
701
702  def test_md5sum_file_inexistent(self):
703    """Test method of md5sum for an inexistent file."""
704    inexistent_file = os.path.join('/somewhere/does/not/exist')
705    expected_md5 = ''
706
707    actual_md5 = atest_utils.md5sum(inexistent_file)
708
709    self.assertEqual(actual_md5, expected_md5)
710
711  def test_check_md5(self):
712    """Test method of check_md5"""
713    file1 = os.path.join(
714        unittest_constants.TEST_DATA_DIR, unittest_constants.JSON_FILE
715    )
716    checksum_file = '/tmp/_tmp_module-info.json'
717    atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json')
718    self.assertTrue(atest_utils.check_md5(checksum_file))
719    os.remove(checksum_file)
720    self.assertFalse(atest_utils.check_md5(checksum_file))
721    self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True))
722
723  def test_get_config_parameter(self):
724    """Test method of get_config_parameter"""
725    parameter_config = os.path.join(
726        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg'
727    )
728    no_parameter_config = os.path.join(
729        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'no_parameter.cfg'
730    )
731
732    # Test parameter empty value
733    self.assertEqual(
734        set(), atest_utils.get_config_parameter(no_parameter_config)
735    )
736
737    # Test parameter empty value
738    self.assertEqual(
739        {'value_1', 'value_2', 'value_3', 'value_4'},
740        atest_utils.get_config_parameter(parameter_config),
741    )
742
743  def test_get_config_device(self):
744    """Test method of get_config_device"""
745    device_config = os.path.join(
746        unittest_constants.TEST_DATA_DIR,
747        'parameter_config',
748        'multiple_device.cfg',
749    )
750    self.assertEqual(
751        {'device_1', 'device_2'}, atest_utils.get_config_device(device_config)
752    )
753
754  def test_get_mainline_param(self):
755    """Test method of get_mainline_param"""
756    mainline_param_config = os.path.join(
757        unittest_constants.TEST_DATA_DIR,
758        'parameter_config',
759        'mainline_param.cfg',
760    )
761    self.assertEqual(
762        {'foo1.apex', 'foo2.apk+foo3.apk'},
763        atest_utils.get_mainline_param(mainline_param_config),
764    )
765    no_mainline_param_config = os.path.join(
766        unittest_constants.TEST_DATA_DIR, 'parameter_config', 'parameter.cfg'
767    )
768    self.assertEqual(
769        set(), atest_utils.get_mainline_param(no_mainline_param_config)
770    )
771
772  def test_get_full_annotation_class_name(self):
773    """Test method of get_full_annotation_class_name."""
774    app_mode_full = 'android.platform.test.annotations.AppModeFull'
775    presubmit = 'android.platform.test.annotations.Presubmit'
776    module_info = {
777        'srcs': [
778            os.path.join(
779                unittest_constants.TEST_DATA_DIR,
780                'annotation_testing',
781                'Annotation.src',
782            )
783        ]
784    }
785    # get annotation class from keyword
786    self.assertEqual(
787        atest_utils.get_full_annotation_class_name(module_info, 'presubmit'),
788        presubmit,
789    )
790    # get annotation class from an accurate fqcn keyword.
791    self.assertEqual(
792        atest_utils.get_full_annotation_class_name(module_info, presubmit),
793        presubmit,
794    )
795    # accept fqcn keyword in lowercase.
796    self.assertEqual(
797        atest_utils.get_full_annotation_class_name(
798            module_info, 'android.platform.test.annotations.presubmit'
799        ),
800        presubmit,
801    )
802    # unable to get annotation class from keyword.
803    self.assertNotEqual(
804        atest_utils.get_full_annotation_class_name(
805            module_info, 'appleModefull'
806        ),
807        app_mode_full,
808    )
809    # do not support partial-correct keyword.
810    self.assertNotEqual(
811        atest_utils.get_full_annotation_class_name(
812            module_info, 'android.platform.test.annotations.pres'
813        ),
814        presubmit,
815    )
816
817  def test_has_mixed_type_filters_one_module_with_one_type_return_false(self):
818    """Test method of has_mixed_type_filters"""
819    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
820    test_data_1 = {constants.TI_FILTER: [filter_1]}
821    test_info_1 = test_info.TestInfo(
822        'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set()
823    )
824    self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1]))
825
826  def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self):
827    """Test method of has_mixed_type_filters"""
828    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
829    filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
830    test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]}
831    test_info_2 = test_info.TestInfo(
832        'MODULE', 'RUNNER', set(), test_data_2, 'SUITE', '', set()
833    )
834    self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2]))
835
836  def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(
837      self,
838  ):
839    """Test method of has_mixed_type_filters"""
840    filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
841    test_data_1 = {constants.TI_FILTER: [filter_1]}
842    test_info_1 = test_info.TestInfo(
843        'MODULE', 'RUNNER', set(), test_data_1, 'SUITE', '', set()
844    )
845    filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
846    test_data_3 = {constants.TI_FILTER: [filter_3]}
847    test_info_3 = test_info.TestInfo(
848        'MODULE3', 'RUNNER', set(), test_data_3, 'SUITE', '', set()
849    )
850    self.assertFalse(
851        atest_utils.has_mixed_type_filters([test_info_1, test_info_3])
852    )
853
854  def test_get_filter_types(self):
855    """Test method of get_filter_types."""
856    filters = set(['CLASS#METHOD'])
857    expect_types = set([FilterType.REGULAR_FILTER.value])
858    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
859
860    filters = set(['CLASS#METHOD*'])
861    expect_types = set([FilterType.WILDCARD_FILTER.value])
862    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
863
864    filters = set(['CLASS#METHOD', 'CLASS#METHOD*'])
865    expect_types = set(
866        [FilterType.WILDCARD_FILTER.value, FilterType.REGULAR_FILTER.value]
867    )
868    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
869
870    filters = set(['CLASS#METHOD?', 'CLASS#METHOD*'])
871    expect_types = set([FilterType.WILDCARD_FILTER.value])
872    self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
873
874  def test_get_bp_content(self):
875    """Method get_bp_content."""
876    # 1. "manifest" and "instrumentation_for" are defined.
877    content = """android_test    {
878                // comment
879                instrumentation_for: "AmSlam", // comment
880                manifest: "AndroidManifest-test.xml",
881                name: "AmSlamTests",
882        }"""
883    expected_result = {
884        'AmSlamTests': {
885            'target_module': 'AmSlam',
886            'manifest': 'AndroidManifest-test.xml',
887        }
888    }
889    temp_dir = tempfile.TemporaryDirectory()
890    tmpbp = Path(temp_dir.name).joinpath('Android.bp')
891    with open(tmpbp, 'w') as cache:
892      cache.write(content)
893    self.assertEqual(
894        atest_utils.get_bp_content(tmpbp, 'android_test'), expected_result
895    )
896    temp_dir.cleanup()
897
898    # 2. Only name is defined, will give default manifest and null
899    # target_module.
900    content = """android_app    {
901                // comment
902                name: "AmSlam",
903                srcs: ["src1.java", "src2.java"]
904        }"""
905    expected_result = {
906        'AmSlam': {'target_module': '', 'manifest': 'AndroidManifest.xml'}
907    }
908    temp_dir = tempfile.TemporaryDirectory()
909    tmpbp = Path(temp_dir.name).joinpath('Android.bp')
910    with open(tmpbp, 'w') as cache:
911      cache.write(content)
912    self.assertEqual(
913        atest_utils.get_bp_content(tmpbp, 'android_app'), expected_result
914    )
915    temp_dir.cleanup()
916
917    # 3. Not even an Android.bp.
918    content = """LOCAL_PATH := $(call my-dir)
919                # comment
920                include $(call all-subdir-makefiles)
921                LOCAL_MODULE := atest_foo_test
922        }"""
923    temp_dir = tempfile.TemporaryDirectory()
924    tmpbp = Path(temp_dir.name).joinpath('Android.mk')
925    with open(tmpbp, 'w') as cache:
926      cache.write(content)
927    self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), {})
928    temp_dir.cleanup()
929
930  def test_get_manifest_info(self):
931    """test get_manifest_info method."""
932    # An instrumentation test:
933    test_xml = os.path.join(
934        unittest_constants.TEST_DATA_DIR,
935        'foo/bar/AmSlam/test/AndroidManifest.xml',
936    )
937    expected = {
938        'package': 'com.android.settings.tests.unit',
939        'target_package': 'c0m.andr0id.settingS',
940        'persistent': False,
941    }
942    self.assertEqual(expected, atest_utils.get_manifest_info(test_xml))
943
944    # A target module:
945    target_xml = os.path.join(
946        unittest_constants.TEST_DATA_DIR, 'foo/bar/AmSlam/AndroidManifest.xml'
947    )
948    expected = {
949        'package': 'c0m.andr0id.settingS',
950        'target_package': '',
951        'persistent': False,
952    }
953    self.assertEqual(expected, atest_utils.get_manifest_info(target_xml))
954
955
956class GetTradefedInvocationTimeTest(fake_filesystem_unittest.TestCase):
957  """Tests of get_tradefed_invocation_time for various conditions."""
958
959  def setUp(self):
960    self.setUpPyfakefs()
961    self.log_path = '/somewhere/atest/log'
962
963  def test_get_tradefed_invocation_time_second_only(self):
964    """Test the parser can handle second and millisecond properly."""
965    end_host_log_file = Path(
966        self.log_path,
967        'inv_hashed_path',
968        'inv_hashed_subpath',
969        'end_host_log_test1.txt',
970    )
971    contents = """
972=============== Consumed Time ==============
973    x86_64 HelloWorldTests: 1s
974    x86_64 hallo-welt: 768 ms
975Total aggregated tests run time: 1s
976============== Modules Preparation Times ==============
977    x86_64 HelloWorldTests => prep = 2580 ms || clean = 298 ms
978    x86_64 hallo-welt => prep = 1736 ms || clean = 243 ms
979Total preparation time: 4s  ||  Total tear down time: 541 ms
980=======================================================
981=============== Summary ===============
982Total Run time: 6s
9832/2 modules completed
984Total Tests       : 3
985PASSED            : 3
986FAILED            : 0
987============== End of Results =============="""
988    self.fs.create_file(end_host_log_file, contents=contents)
989    test = 1 * 1000 + 768
990    prep = 2580 + 1736
991    teardown = 298 + 243
992    expected_elapsed_time = (test, prep, teardown)
993
994    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
995        self.log_path
996    )
997
998    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
999
1000  def test_get_tradefed_invocation_time_from_hours_to_milliseconds(self):
1001    """Test whether the parse can handle from hour to ms properly."""
1002    end_host_log_file = Path(
1003        self.log_path,
1004        'inv_hashed_path',
1005        'inv_hashed_subpath',
1006        'end_host_log_test2.txt',
1007    )
1008    contents = """
1009=============== Consumed Time ==============
1010    x86_64 HelloWorldTests: 27m 19s
1011    x86_64 hallo-welt: 3m 2s
1012Total aggregated tests run time: 31m
1013============== Modules Preparation Times ==============
1014    x86_64 HelloWorldTests => prep = 2580 ms || clean = 1298 ms
1015    x86_64 hallo-welt => prep = 1736 ms || clean = 1243 ms
1016Total preparation time: 1h 24m 17s ||  Total tear down time: 3s
1017=======================================================
1018=============== Summary ===============
1019Total Run time: 2h 5m 17s
10202/2 modules completed
1021Total Tests       : 3
1022PASSED            : 3
1023FAILED            : 0
1024============== End of Results =============="""
1025    self.fs.create_file(end_host_log_file, contents=contents)
1026    test = (27 * 60 + 19) * 1000 + (3 * 60 + 2) * 1000
1027    prep = 2580 + 1736
1028    teardown = 1298 + 1243
1029    expected_elapsed_time = (test, prep, teardown)
1030
1031    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
1032        self.log_path
1033    )
1034
1035    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
1036
1037  def test_get_tradefed_invocation_time_null_result(self):
1038    """Test whether the parser returns null tuple when no keywords found."""
1039    end_host_log_file = Path(
1040        self.log_path,
1041        'inv_hashed_path',
1042        'inv_hashed_subpath',
1043        'end_host_log_test4.txt',
1044    )
1045    contents = 'some\ncontext'
1046    self.fs.create_file(end_host_log_file, contents=contents)
1047    expected_elapsed_time = (0, 0, 0)
1048
1049    actual_elapsed_time = atest_utils.get_tradefed_invocation_time(
1050        self.log_path
1051    )
1052
1053    self.assertEqual(actual_elapsed_time, expected_elapsed_time)
1054
1055
1056if __name__ == '__main__':
1057  unittest.main()
1058