• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=invalid-name
20# pylint: disable=line-too-long
21
22import hashlib
23import os
24import subprocess
25import sys
26import tempfile
27import unittest
28
29from io import StringIO
30from pathlib import Path
31from unittest import mock
32
33# pylint: disable=import-error
34from pyfakefs import fake_filesystem_unittest
35
36from atest import atest_arg_parser
37from atest import atest_error
38from atest import atest_utils
39from atest import constants
40from atest import unittest_utils
41from atest import unittest_constants
42
43from atest.test_finders import test_info
44from atest.atest_enum import FilterType
45
46TEST_MODULE_NAME_A = 'ModuleNameA'
47TEST_RUNNER_A = 'FakeTestRunnerA'
48TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
49TEST_DATA_A = {'test_data_a_1': 'a1',
50               'test_data_a_2': 'a2'}
51TEST_SUITE_A = 'FakeSuiteA'
52TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
53TEST_INSTALL_LOC_A = set(['host', 'device'])
54TEST_FINDER_A = 'MODULE'
55TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A,
56                                 TEST_BUILD_TARGET_A, TEST_DATA_A,
57                                 TEST_SUITE_A, TEST_MODULE_CLASS_A,
58                                 TEST_INSTALL_LOC_A)
59TEST_INFO_A.test_finder = TEST_FINDER_A
60TEST_ZIP_DATA_DIR = 'zip_files'
61TEST_SINGLE_ZIP_NAME = 'single_file.zip'
62TEST_MULTI_ZIP_NAME = 'multi_file.zip'
63
64REPO_INFO_OUTPUT = '''Manifest branch: test_branch
65Manifest merge branch: refs/heads/test_branch
66Manifest groups: all,-notdefault
67----------------------------
68'''
69
70# pylint: disable=protected-access
71# pylint: disable=too-many-public-methods
72class AtestUtilsUnittests(unittest.TestCase):
73    """Unit tests for atest_utils.py"""
74
75    def test_capture_fail_section_has_fail_section(self):
76        """Test capture_fail_section when has fail section."""
77        test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n',
78                     '[  6% 191/2997] BBBBBB\n', 'CCCCC',
79                     '[  20% 322/2997] DDDDDD\n', 'EEEEE']
80        want_list = ['FAILED: Error1', '^\n', 'Error2\n']
81        self.assertEqual(want_list,
82                         atest_utils._capture_fail_section(test_list))
83
84    def test_capture_fail_section_no_fail_section(self):
85        """Test capture_fail_section when no fail section."""
86        test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
87        want_list = []
88        self.assertEqual(want_list,
89                         atest_utils._capture_fail_section(test_list))
90
91    def test_is_test_mapping_none_test_mapping_args(self):
92        """Test method is_test_mapping."""
93        parser = atest_arg_parser.AtestArgParser()
94        parser.add_atest_args()
95        non_tm_args = ['--host-unit-test-only', '--smart-testing-local']
96
97        for argument in non_tm_args:
98            args = parser.parse_args([argument])
99            self.assertFalse(
100                atest_utils.is_test_mapping(args),
101                'Option %s indicates NOT a test_mapping!' % argument)
102
103    def test_is_test_mapping_test_mapping_args(self):
104        """Test method is_test_mapping."""
105        parser = atest_arg_parser.AtestArgParser()
106        parser.add_atest_args()
107        tm_args = ['--test-mapping', '--include-subdirs']
108
109        for argument in tm_args:
110            args = parser.parse_args([argument])
111            self.assertTrue(
112                atest_utils.is_test_mapping(args),
113                'Option %s indicates a test_mapping!' % argument)
114
115    def test_is_test_mapping_implicit_test_mapping(self):
116        """Test method is_test_mapping."""
117        parser = atest_arg_parser.AtestArgParser()
118        parser.add_atest_args()
119
120        args = parser.parse_args(['--test', '--build', ':postsubmit'])
121        self.assertTrue(
122            atest_utils.is_test_mapping(args),
123            'Option %s indicates a test_mapping!' % args)
124
125    def test_is_test_mapping_with_testname(self):
126        """Test method is_test_mapping."""
127        parser = atest_arg_parser.AtestArgParser()
128        parser.add_atest_args()
129        irrelevant_args = ['--test', ':postsubmit', 'testname']
130
131        args = parser.parse_args(irrelevant_args)
132        self.assertFalse(
133            atest_utils.is_test_mapping(args),
134            'Option %s indicates a test_mapping!' % args)
135
136    def test_is_test_mapping_false(self):
137        """Test method is_test_mapping."""
138        parser = atest_arg_parser.AtestArgParser()
139        parser.add_atest_args()
140        args = parser.parse_args(['--test', '--build', 'hello_atest'])
141
142        self.assertFalse(
143            atest_utils.is_test_mapping(args))
144
145    def test_has_colors(self):
146        """Test method _has_colors."""
147        # stream is file I/O
148        stream = open('/tmp/test_has_colors.txt', 'wb')
149        self.assertFalse(atest_utils._has_colors(stream))
150        stream.close()
151
152        # stream is not a tty(terminal).
153        stream = mock.Mock()
154        stream.isatty.return_value = False
155        self.assertFalse(atest_utils._has_colors(stream))
156
157        # stream is a tty(terminal).
158        stream = mock.Mock()
159        stream.isatty.return_value = True
160        self.assertTrue(atest_utils._has_colors(stream))
161
162
163    @mock.patch('atest.atest_utils._has_colors')
164    def test_colorize(self, mock_has_colors):
165        """Test method colorize."""
166        original_str = "test string"
167        green_no = 2
168
169        # _has_colors() return False.
170        mock_has_colors.return_value = False
171        converted_str = atest_utils.colorize(original_str, green_no,
172                                             bp_color=constants.RED)
173        self.assertEqual(original_str, converted_str)
174
175        # Green text with red background.
176        mock_has_colors.return_value = True
177        converted_str = atest_utils.colorize(original_str, green_no,
178                                             bp_color=constants.RED)
179        green_highlight_string = '\x1b[1;32;41m%s\x1b[0m' % original_str
180        self.assertEqual(green_highlight_string, converted_str)
181
182        # Green text, no background.
183        mock_has_colors.return_value = True
184        converted_str = atest_utils.colorize(original_str, green_no)
185        green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
186        self.assertEqual(green_no_highlight_string, converted_str)
187
188
189    @mock.patch('atest.atest_utils._has_colors')
190    def test_colorful_print(self, mock_has_colors):
191        """Test method colorful_print."""
192        testing_str = "color_print_test"
193        green_no = 2
194
195        # _has_colors() return False.
196        mock_has_colors.return_value = False
197        capture_output = StringIO()
198        sys.stdout = capture_output
199        atest_utils.colorful_print(testing_str, green_no,
200                                   bp_color=constants.RED,
201                                   auto_wrap=False)
202        sys.stdout = sys.__stdout__
203        uncolored_string = testing_str
204        self.assertEqual(capture_output.getvalue(), uncolored_string)
205
206        # Green text with red background, but no wrap.
207        mock_has_colors.return_value = True
208        capture_output = StringIO()
209        sys.stdout = capture_output
210        atest_utils.colorful_print(testing_str, green_no,
211                                   bp_color=constants.RED,
212                                   auto_wrap=False)
213        sys.stdout = sys.__stdout__
214        green_highlight_no_wrap_string = '\x1b[1;32;41m%s\x1b[0m' % testing_str
215        self.assertEqual(capture_output.getvalue(),
216                         green_highlight_no_wrap_string)
217
218        # Green text, no background, no wrap.
219        mock_has_colors.return_value = True
220        capture_output = StringIO()
221        sys.stdout = capture_output
222        atest_utils.colorful_print(testing_str, green_no,
223                                   auto_wrap=False)
224        sys.stdout = sys.__stdout__
225        green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
226        self.assertEqual(capture_output.getvalue(),
227                         green_no_high_no_wrap_string)
228
229        # Green text with red background and wrap.
230        mock_has_colors.return_value = True
231        capture_output = StringIO()
232        sys.stdout = capture_output
233        atest_utils.colorful_print(testing_str, green_no,
234                                   bp_color=constants.RED,
235                                   auto_wrap=True)
236        sys.stdout = sys.__stdout__
237        green_highlight_wrap_string = '\x1b[1;32;41m%s\x1b[0m\n' % testing_str
238        self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
239
240        # Green text with wrap, but no background.
241        mock_has_colors.return_value = True
242        capture_output = StringIO()
243        sys.stdout = capture_output
244        atest_utils.colorful_print(testing_str, green_no,
245                                   auto_wrap=True)
246        sys.stdout = sys.__stdout__
247        green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
248        self.assertEqual(capture_output.getvalue(),
249                         green_wrap_no_highlight_string)
250
251    @mock.patch('builtins.input')
252    @mock.patch('json.load')
253    def test_update_test_runner_cmd(self, mock_json_load_data, mock_input):
254        """Test method handle_test_runner_cmd without enable do_verification."""
255        former_cmd_str = 'Former cmds ='
256        write_result_str = 'Save result mapping to test_result'
257        tmp_file = tempfile.NamedTemporaryFile()
258        input_cmd = 'atest_args'
259        runner_cmds = ['cmd1', 'cmd2']
260        capture_output = StringIO()
261        sys.stdout = capture_output
262        # Previous data is empty. Should not enter strtobool.
263        # If entered, exception will be raised cause test fail.
264        mock_json_load_data.return_value = {}
265        atest_utils.handle_test_runner_cmd(input_cmd,
266                                           runner_cmds,
267                                           do_verification=False,
268                                           result_path=tmp_file.name)
269        sys.stdout = sys.__stdout__
270        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
271        # Previous data is the same as the new input. Should not enter strtobool.
272        # If entered, exception will be raised cause test fail
273        capture_output = StringIO()
274        sys.stdout = capture_output
275        mock_json_load_data.return_value = {input_cmd:runner_cmds}
276        atest_utils.handle_test_runner_cmd(input_cmd,
277                                           runner_cmds,
278                                           do_verification=False,
279                                           result_path=tmp_file.name)
280        sys.stdout = sys.__stdout__
281        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
282        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
283        # Previous data has different cmds. Should enter strtobool not update,
284        # should not find write_result_str.
285        prev_cmds = ['cmd1']
286        mock_input.return_value = 'n'
287        capture_output = StringIO()
288        sys.stdout = capture_output
289        mock_json_load_data.return_value = {input_cmd:prev_cmds}
290        atest_utils.handle_test_runner_cmd(input_cmd,
291                                           runner_cmds,
292                                           do_verification=False,
293                                           result_path=tmp_file.name)
294        sys.stdout = sys.__stdout__
295        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
296
297    @mock.patch('json.load')
298    def test_verify_test_runner_cmd(self, mock_json_load_data):
299        """Test method handle_test_runner_cmd without enable update_result."""
300        tmp_file = tempfile.NamedTemporaryFile()
301        input_cmd = 'atest_args'
302        runner_cmds = ['cmd1', 'cmd2']
303        # Previous data is the same as the new input. Should not raise exception.
304        mock_json_load_data.return_value = {input_cmd:runner_cmds}
305        atest_utils.handle_test_runner_cmd(input_cmd,
306                                           runner_cmds,
307                                           do_verification=True,
308                                           result_path=tmp_file.name)
309        # Previous data has different cmds. Should enter strtobool and hit
310        # exception.
311        prev_cmds = ['cmd1']
312        mock_json_load_data.return_value = {input_cmd:prev_cmds}
313        self.assertRaises(atest_error.DryRunVerificationError,
314                          atest_utils.handle_test_runner_cmd,
315                          input_cmd,
316                          runner_cmds,
317                          do_verification=True,
318                          result_path=tmp_file.name)
319
320    def test_get_test_info_cache_path(self):
321        """Test method get_test_info_cache_path."""
322        input_file_name = 'mytest_name'
323        cache_root = '/a/b/c'
324        expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name).
325                                                       encode()).hexdigest())
326        self.assertEqual(os.path.join(cache_root, expect_hashed_name),
327                         atest_utils.get_test_info_cache_path(input_file_name,
328                                                              cache_root))
329
330    def test_get_and_load_cache(self):
331        """Test method update_test_info_cache and load_test_info_cache."""
332        test_reference = 'myTestRefA'
333        test_cache_dir = tempfile.mkdtemp()
334        atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A],
335                                           test_cache_dir)
336        unittest_utils.assert_equal_testinfo_sets(
337            self, set([TEST_INFO_A]),
338            atest_utils.load_test_info_cache(test_reference, test_cache_dir))
339
340    @mock.patch('os.getcwd')
341    def test_get_build_cmd(self, mock_cwd):
342        """Test method get_build_cmd."""
343        build_top = '/home/a/b/c'
344        rel_path = 'd/e'
345        mock_cwd.return_value = os.path.join(build_top, rel_path)
346        # TODO: (b/264015241) Stop mocking build variables.
347        os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top}
348        with mock.patch.dict('os.environ', os_environ_mock, clear=True):
349            expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode']
350            self.assertEqual(expected_cmd, atest_utils.get_build_cmd())
351
352    @mock.patch('subprocess.check_output')
353    def test_get_modified_files(self, mock_co):
354        """Test method get_modified_files"""
355        mock_co.side_effect = [
356            x.encode('utf-8') for x in ['/a/b/',
357                                        '\n',
358                                        'test_fp1.java\nc/test_fp2.java']]
359        self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
360                         atest_utils.get_modified_files(''))
361        mock_co.side_effect = [
362            x.encode('utf-8') for x in ['/a/b/',
363                                        'test_fp4',
364                                        '/test_fp3.java']]
365        self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'},
366                         atest_utils.get_modified_files(''))
367
368    def test_delimiter(self):
369        """Test method delimiter"""
370        self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
371
372    def test_has_python_module(self):
373        """Test method has_python_module"""
374        self.assertFalse(atest_utils.has_python_module('M_M'))
375        self.assertTrue(atest_utils.has_python_module('os'))
376
377    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
378    def test_read_zip_single_text(self, _matched):
379        """Test method extract_zip_text include only one text file."""
380        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
381                                TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME)
382        expect_content = '\nfile1_line1\nfile1_line2\n'
383        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
384
385    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
386    def test_read_zip_multi_text(self, _matched):
387        """Test method extract_zip_text include multiple text files."""
388        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
389                                TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME)
390        expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n'
391                          'file2_line2\n')
392        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
393
394    def test_matched_tf_error_log(self):
395        """Test method extract_zip_text include multiple text files."""
396        matched_content = '05-25 17:37:04 E/XXXXX YYYYY'
397        not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY'
398        # Test matched content
399        self.assertEqual(True,
400                         atest_utils.matched_tf_error_log(matched_content))
401        # Test not matched content
402        self.assertEqual(False,
403                         atest_utils.matched_tf_error_log(not_matched_content))
404
405    @mock.patch('os.chmod')
406    @mock.patch('shutil.copy2')
407    @mock.patch('atest.atest_utils.has_valid_cert')
408    @mock.patch('subprocess.check_output')
409    @mock.patch('os.path.exists')
410    def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert,
411                        _cpc, _cm):
412        """Test method get_flakes."""
413        # Test par file does not exist.
414        mock_path_exists.return_value = False
415        self.assertEqual(None, atest_utils.get_flakes())
416        # Test par file exists.
417        mock_path_exists.return_value = True
418        mock_output.return_value = (b'flake_percent:0.10001\n'
419                                    b'postsubmit_flakes_per_week:12.0')
420        mock_valid_cert.return_value = True
421        expected_flake_info = {'flake_percent':'0.10001',
422                               'postsubmit_flakes_per_week':'12.0'}
423        self.assertEqual(expected_flake_info,
424                         atest_utils.get_flakes())
425        # Test no valid cert
426        mock_valid_cert.return_value = False
427        self.assertEqual(None,
428                         atest_utils.get_flakes())
429
430    @mock.patch('subprocess.check_call')
431    def test_has_valid_cert(self, mock_call):
432        """Test method has_valid_cert."""
433        # raise subprocess.CalledProcessError
434        mock_call.raiseError.side_effect = subprocess.CalledProcessError
435        self.assertFalse(atest_utils.has_valid_cert())
436        with mock.patch("atest.constants.CERT_STATUS_CMD", ''):
437            self.assertFalse(atest_utils.has_valid_cert())
438        with mock.patch("atest.constants.CERT_STATUS_CMD", 'CMD'):
439            # has valid cert
440            mock_call.return_value = 0
441            self.assertTrue(atest_utils.has_valid_cert())
442            # no valid cert
443            mock_call.return_value = 4
444            self.assertFalse(atest_utils.has_valid_cert())
445
446    # pylint: disable=no-member
447    def test_read_test_record_proto(self):
448        """Test method read_test_record."""
449        test_record_file_path = os.path.join(
450            unittest_constants.TEST_DATA_DIR,
451            "test_record.proto.testonly")
452        test_record = atest_utils.read_test_record(test_record_file_path)
453        self.assertEqual(
454            test_record.children[0].inline_test_record.test_record_id,
455            'x86 hello_world_test')
456
457    def test_load_json_safely_file_inexistent(self):
458        """Test method load_json_safely if file does not exist."""
459        json_file_path = Path(
460            unittest_constants.TEST_DATA_DIR).joinpath("not_exist.json")
461        self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
462
463    def test_load_json_safely_valid_json_format(self):
464        """Test method load_json_safely if file exists and format is valid."""
465        json_file_path = Path(
466            unittest_constants.TEST_DATA_DIR).joinpath("module-info.json")
467        content = atest_utils.load_json_safely(json_file_path)
468        self.assertEqual('MainModule1', content.get('MainModule1').get('module_name'))
469        self.assertEqual([], content.get('MainModule2').get('test_mainline_modules'))
470
471    def test_load_json_safely_invalid_json_format(self):
472        """Test method load_json_safely if file exist but content is invalid."""
473        json_file_path = Path(
474            unittest_constants.TEST_DATA_DIR).joinpath("not-valid-module-info.json")
475        self.assertEqual({}, atest_utils.load_json_safely(json_file_path))
476
477    @mock.patch('os.getenv')
478    def test_get_manifest_branch(self, mock_env):
479        """Test method get_manifest_branch"""
480        build_top = tempfile.TemporaryDirectory()
481        mock_env.return_value = build_top.name
482        repo_dir = Path(build_top.name).joinpath('.repo')
483        portal_xml = repo_dir.joinpath('manifest.xml')
484        manifest_dir = repo_dir.joinpath('manifests')
485        target_xml = manifest_dir.joinpath('Default.xml')
486        repo_dir.mkdir()
487        manifest_dir.mkdir()
488        content_portal = '<manifest><include name="Default.xml" /></manifest>'
489        content_manifest = '''<manifest>
490            <remote name="aosp" fetch=".." review="https://android-review.googlesource.com/" />
491            <default revision="MONSTER-dev" remote="aosp" sync-j="4" />
492        </manifest>'''
493
494        # 1. The manifest.xml(portal) contains 'include' directive: 'Default.xml'.
495        # Search revision in .repo/manifests/Default.xml.
496        with open(portal_xml, 'w') as cache:
497            cache.write(content_portal)
498        with open(target_xml, 'w') as cache:
499            cache.write(content_manifest)
500        self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch())
501        self.assertEqual("aosp-MONSTER-dev", atest_utils.get_manifest_branch(True))
502        os.remove(target_xml)
503        os.remove(portal_xml)
504
505        # 2. The manifest.xml contains neither 'include' nor 'revision' directive,
506        # keep searching revision in .repo/manifests/default.xml by default.
507        with open(portal_xml, 'w') as cache:
508            cache.write('<manifest></manifest>')
509        default_xml = manifest_dir.joinpath('default.xml')
510        with open(default_xml, 'w') as cache:
511            cache.write(content_manifest)
512        self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch())
513        os.remove(default_xml)
514        os.remove(portal_xml)
515
516        # 3. revision was directly defined in 'manifest.xml'.
517        with open(portal_xml, 'w') as cache:
518            cache.write(content_manifest)
519        self.assertEqual("MONSTER-dev", atest_utils.get_manifest_branch())
520        os.remove(portal_xml)
521
522        # 4. Return None if the included xml does not exist.
523        with open(portal_xml, 'w') as cache:
524            cache.write(content_portal)
525        self.assertEqual('', atest_utils.get_manifest_branch())
526        os.remove(portal_xml)
527
528    def test_has_wildcard(self):
529        """Test method of has_wildcard"""
530        self.assertFalse(atest_utils.has_wildcard('test1'))
531        self.assertFalse(atest_utils.has_wildcard(['test1']))
532        self.assertTrue(atest_utils.has_wildcard('test1?'))
533        self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*']))
534
535    # pylint: disable=anomalous-backslash-in-string
536    def test_quote(self):
537        """Test method of quote()"""
538        target_str = r'TEST_(F|P)[0-9].*\w$'
539        expected_str = '\'TEST_(F|P)[0-9].*\w$\''
540        self.assertEqual(atest_utils.quote(target_str), expected_str)
541        self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224')
542
543    @mock.patch('builtins.input', return_value='')
544    def test_prompt_with_yn_result(self, mock_input):
545        """Test method of prompt_with_yn_result"""
546        msg = 'Do you want to continue?'
547        mock_input.return_value = ''
548        self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
549        self.assertFalse(atest_utils.prompt_with_yn_result(msg, False))
550        mock_input.return_value = 'y'
551        self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
552        mock_input.return_value = 'nO'
553        self.assertFalse(atest_utils.prompt_with_yn_result(msg, True))
554
555    def test_get_android_junit_config_filters(self):
556        """Test method of get_android_junit_config_filters"""
557        no_filter_test_config = os.path.join(
558            unittest_constants.TEST_DATA_DIR,
559            "filter_configs", "no_filter.cfg")
560        self.assertEqual({},
561                         atest_utils.get_android_junit_config_filters(
562                             no_filter_test_config))
563
564        filtered_test_config = os.path.join(
565            unittest_constants.TEST_DATA_DIR,
566            'filter_configs', 'filter.cfg')
567        filter_dict = atest_utils.get_android_junit_config_filters(
568            filtered_test_config)
569        include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION)
570        include_annotations.sort()
571        self.assertEqual(
572            ['include1', 'include2'],
573            include_annotations)
574        exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION)
575        exclude_annotation.sort()
576        self.assertEqual(
577            ['exclude1', 'exclude2'],
578            exclude_annotation)
579
580    def test_md5sum(self):
581        """Test method of md5sum"""
582        exist_string = os.path.join(unittest_constants.TEST_DATA_DIR,
583                                    unittest_constants.JSON_FILE)
584        inexist_string = os.path.join(unittest_constants.TEST_DATA_DIR,
585                                      unittest_constants.CLASS_NAME)
586        self.assertEqual(
587            atest_utils.md5sum(exist_string), '062160df00c20b1ee4d916b7baf71346')
588        self.assertEqual(
589            atest_utils.md5sum(inexist_string), '')
590
591    def test_check_md5(self):
592        """Test method of check_md5"""
593        file1 = os.path.join(unittest_constants.TEST_DATA_DIR,
594                            unittest_constants.JSON_FILE)
595        checksum_file = '/tmp/_tmp_module-info.json'
596        atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json')
597        self.assertTrue(atest_utils.check_md5(checksum_file))
598        os.remove(checksum_file)
599        self.assertFalse(atest_utils.check_md5(checksum_file))
600        self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True))
601
602    def test_get_config_parameter(self):
603        """Test method of get_config_parameter"""
604        parameter_config = os.path.join(
605            unittest_constants.TEST_DATA_DIR,
606            "parameter_config", "parameter.cfg")
607        no_parameter_config = os.path.join(
608            unittest_constants.TEST_DATA_DIR,
609            "parameter_config", "no_parameter.cfg")
610
611        # Test parameter empty value
612        self.assertEqual(set(),
613                         atest_utils.get_config_parameter(
614                             no_parameter_config))
615
616        # Test parameter empty value
617        self.assertEqual({'value_1', 'value_2', 'value_3', 'value_4'},
618                         atest_utils.get_config_parameter(
619                             parameter_config))
620
621    def test_get_config_device(self):
622        """Test method of get_config_device"""
623        device_config = os.path.join(
624            unittest_constants.TEST_DATA_DIR,
625            "parameter_config", "multiple_device.cfg")
626        self.assertEqual({'device_1', 'device_2'},
627                         atest_utils.get_config_device(device_config))
628
629    def test_get_mainline_param(self):
630        """Test method of get_mainline_param"""
631        mainline_param_config = os.path.join(
632            unittest_constants.TEST_DATA_DIR,
633            "parameter_config", "mainline_param.cfg")
634        self.assertEqual({'foo1.apex', 'foo2.apk+foo3.apk'},
635                         atest_utils.get_mainline_param(
636                             mainline_param_config))
637        no_mainline_param_config = os.path.join(
638            unittest_constants.TEST_DATA_DIR,
639            "parameter_config", "parameter.cfg")
640        self.assertEqual(set(),
641                         atest_utils.get_mainline_param(
642                             no_mainline_param_config))
643
644    def test_get_full_annotation_class_name(self):
645        """Test method of get_full_annotation_class_name."""
646        app_mode_full = 'android.platform.test.annotations.AppModeFull'
647        presubmit = 'android.platform.test.annotations.Presubmit'
648        module_info = {'srcs': [os.path.join(unittest_constants.TEST_DATA_DIR,
649                                'annotation_testing',
650                                'Annotation.src')]}
651        # get annotation class from keyword
652        self.assertEqual(
653            atest_utils.get_full_annotation_class_name(module_info, 'presubmit'),
654            presubmit)
655        # get annotation class from an accurate fqcn keyword.
656        self.assertEqual(
657            atest_utils.get_full_annotation_class_name(module_info, presubmit),
658            presubmit)
659        # accept fqcn keyword in lowercase.
660        self.assertEqual(
661            atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.presubmit'),
662            presubmit)
663        # unable to get annotation class from keyword.
664        self.assertNotEqual(
665            atest_utils.get_full_annotation_class_name(module_info, 'appleModefull'), app_mode_full)
666        # do not support partial-correct keyword.
667        self.assertNotEqual(
668            atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.pres'), presubmit)
669
670    def test_has_mixed_type_filters_one_module_with_one_type_return_false(self):
671        """Test method of has_mixed_type_filters"""
672        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
673        test_data_1 = {constants.TI_FILTER: [filter_1]}
674        test_info_1 = test_info.TestInfo('MODULE', 'RUNNER',
675                                set(), test_data_1,
676                                'SUITE', '',
677                                set())
678        self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1]))
679
680    def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self):
681        """Test method of has_mixed_type_filters"""
682        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
683        filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
684        test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]}
685        test_info_2 = test_info.TestInfo('MODULE', 'RUNNER',
686                                set(), test_data_2,
687                                'SUITE', '',
688                                set())
689        self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2]))
690
691    def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(self):
692        """Test method of has_mixed_type_filters"""
693        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
694        test_data_1 = {constants.TI_FILTER: [filter_1]}
695        test_info_1 = test_info.TestInfo('MODULE', 'RUNNER',
696                                set(), test_data_1,
697                                'SUITE', '',
698                                set())
699        filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
700        test_data_3 = {constants.TI_FILTER: [filter_3]}
701        test_info_3 = test_info.TestInfo('MODULE3', 'RUNNER',
702                                set(), test_data_3,
703                                'SUITE', '',
704                                set())
705        self.assertFalse(atest_utils.has_mixed_type_filters(
706            [test_info_1, test_info_3]))
707
708    def test_get_filter_types(self):
709        """Test method of get_filter_types."""
710        filters = set(['CLASS#METHOD'])
711        expect_types = set([FilterType.REGULAR_FILTER.value])
712        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
713
714        filters = set(['CLASS#METHOD*'])
715        expect_types = set([FilterType.WILDCARD_FILTER.value])
716        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
717
718        filters = set(['CLASS#METHOD', 'CLASS#METHOD*'])
719        expect_types = set([FilterType.WILDCARD_FILTER.value,
720                          FilterType.REGULAR_FILTER.value])
721        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
722
723        filters = set(['CLASS#METHOD?', 'CLASS#METHOD*'])
724        expect_types = set([FilterType.WILDCARD_FILTER.value])
725        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
726
727    def test_get_bp_content(self):
728        """Method get_bp_content."""
729        # 1. "manifest" and "instrumentation_for" are defined.
730        content = '''android_test    {
731                // comment
732                instrumentation_for: "AmSlam", // comment
733                manifest: "AndroidManifest-test.xml",
734                name: "AmSlamTests",
735        }'''
736        expected_result = {"AmSlamTests":
737                           {"target_module": "AmSlam", "manifest": "AndroidManifest-test.xml"}}
738        temp_dir = tempfile.TemporaryDirectory()
739        tmpbp = Path(temp_dir.name).joinpath('Android.bp')
740        with open(tmpbp, 'w') as cache:
741            cache.write(content)
742        self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_test'),
743                         expected_result)
744        temp_dir.cleanup()
745
746        # 2. Only name is defined, will give default manifest and null target_module.
747        content = '''android_app    {
748                // comment
749                name: "AmSlam",
750                srcs: ["src1.java", "src2.java"]
751        }'''
752        expected_result = {"AmSlam":
753                           {"target_module": "", "manifest": "AndroidManifest.xml"}}
754        temp_dir = tempfile.TemporaryDirectory()
755        tmpbp = Path(temp_dir.name).joinpath('Android.bp')
756        with open(tmpbp, 'w') as cache:
757            cache.write(content)
758        self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'),
759                         expected_result)
760        temp_dir.cleanup()
761
762        # 3. Not even an Android.bp.
763        content = '''LOCAL_PATH := $(call my-dir)
764                # comment
765                include $(call all-subdir-makefiles)
766                LOCAL_MODULE := atest_foo_test
767        }'''
768        temp_dir = tempfile.TemporaryDirectory()
769        tmpbp = Path(temp_dir.name).joinpath('Android.mk')
770        with open(tmpbp, 'w') as cache:
771            cache.write(content)
772        self.assertEqual(atest_utils.get_bp_content(tmpbp, 'android_app'), {})
773        temp_dir.cleanup()
774
775    def test_get_manifest_info(self):
776        """test get_manifest_info method."""
777        # An instrumentation test:
778        test_xml = os.path.join(unittest_constants.TEST_DATA_DIR,
779                                'foo/bar/AmSlam/test/AndroidManifest.xml')
780        expected = {
781            'package': 'com.android.settings.tests.unit',
782            'target_package': 'c0m.andr0id.settingS',
783            'persistent': False
784        }
785        self.assertEqual(expected, atest_utils.get_manifest_info(test_xml))
786
787        # A target module:
788        target_xml = os.path.join(unittest_constants.TEST_DATA_DIR,
789                                  'foo/bar/AmSlam/AndroidManifest.xml')
790        expected = {
791            'package': 'c0m.andr0id.settingS',
792            'target_package': '',
793            'persistent': False
794        }
795        self.assertEqual(expected, atest_utils.get_manifest_info(target_xml))
796
797# pylint: disable=missing-function-docstring
798class AutoShardUnittests(fake_filesystem_unittest.TestCase):
799    """Tests for auto shard functions"""
800    def setUp(self):
801        self.setUpPyfakefs()
802
803    def test_get_local_auto_shardable_tests(self):
804        """test get local auto shardable list"""
805        shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath(
806        '.atest/auto_shard/local_auto_shardable_tests')
807
808        self.fs.create_file(shardable_tests_file, contents='abc\ndef')
809
810        long_duration_tests = atest_utils.get_local_auto_shardable_tests()
811
812        expected_list = ['abc', 'def']
813        self.assertEqual(long_duration_tests , expected_list)
814
815    def test_update_shardable_tests_with_time_less_than_600(self):
816        """test update local auto shardable list"""
817        shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath(
818        '.atest/auto_shard/local_auto_shardable_tests')
819
820        self.fs.create_file(shardable_tests_file, contents='')
821
822        atest_utils.update_shardable_tests('test1', 10)
823
824        with open(shardable_tests_file) as f:
825            self.assertEqual('', f.read())
826
827    def test_update_shardable_tests_with_time_larger_than_600(self):
828        """test update local auto shardable list"""
829        shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath(
830        '.atest/auto_shard/local_auto_shardable_tests')
831
832        self.fs.create_file(shardable_tests_file, contents='')
833
834        atest_utils.update_shardable_tests('test2', 1000)
835
836        with open(shardable_tests_file) as f:
837            self.assertEqual('test2', f.read())
838
839    def test_update_shardable_tests_with_time_larger_than_600_twice(self):
840        """test update local auto shardable list"""
841        shardable_tests_file = Path(atest_utils.get_misc_dir()).joinpath(
842        '.atest/auto_shard/local_auto_shardable_tests')
843        # access the fake_filesystem object via fake_fs
844        self.fs.create_file(shardable_tests_file, contents='')
845
846        atest_utils.update_shardable_tests('test3', 1000)
847        atest_utils.update_shardable_tests('test3', 601)
848
849        with open(shardable_tests_file) as f:
850            self.assertEqual('test3', f.read())
851
852
853if __name__ == "__main__":
854    unittest.main()
855