• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=invalid-name
20# pylint: disable=line-too-long
21
22import hashlib
23import os
24import subprocess
25import sys
26import tempfile
27import unittest
28
29from io import StringIO
30from unittest import mock
31
32import atest_error
33import atest_utils
34import constants
35import unittest_utils
36import unittest_constants
37
38from test_finders import test_info
39from atest_enum import FilterType
40
41
42TEST_MODULE_NAME_A = 'ModuleNameA'
43TEST_RUNNER_A = 'FakeTestRunnerA'
44TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
45TEST_DATA_A = {'test_data_a_1': 'a1',
46               'test_data_a_2': 'a2'}
47TEST_SUITE_A = 'FakeSuiteA'
48TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
49TEST_INSTALL_LOC_A = set(['host', 'device'])
50TEST_FINDER_A = 'MODULE'
51TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A,
52                                 TEST_BUILD_TARGET_A, TEST_DATA_A,
53                                 TEST_SUITE_A, TEST_MODULE_CLASS_A,
54                                 TEST_INSTALL_LOC_A)
55TEST_INFO_A.test_finder = TEST_FINDER_A
56TEST_ZIP_DATA_DIR = 'zip_files'
57TEST_SINGLE_ZIP_NAME = 'single_file.zip'
58TEST_MULTI_ZIP_NAME = 'multi_file.zip'
59
60REPO_INFO_OUTPUT = '''Manifest branch: test_branch
61Manifest merge branch: refs/heads/test_branch
62Manifest groups: all,-notdefault
63----------------------------
64'''
65
66#pylint: disable=protected-access
67class AtestUtilsUnittests(unittest.TestCase):
68    """Unit tests for atest_utils.py"""
69
70    def test_capture_fail_section_has_fail_section(self):
71        """Test capture_fail_section when has fail section."""
72        test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n',
73                     '[  6% 191/2997] BBBBBB\n', 'CCCCC',
74                     '[  20% 322/2997] DDDDDD\n', 'EEEEE']
75        want_list = ['FAILED: Error1', '^\n', 'Error2\n']
76        self.assertEqual(want_list,
77                         atest_utils._capture_fail_section(test_list))
78
79    def test_capture_fail_section_no_fail_section(self):
80        """Test capture_fail_section when no fail section."""
81        test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
82        want_list = []
83        self.assertEqual(want_list,
84                         atest_utils._capture_fail_section(test_list))
85
86    def test_is_test_mapping(self):
87        """Test method is_test_mapping."""
88        tm_option_attributes = [
89            'test_mapping',
90            'include_subdirs'
91        ]
92        for attr_to_test in tm_option_attributes:
93            args = mock.Mock()
94            for attr in tm_option_attributes:
95                setattr(args, attr, attr == attr_to_test)
96            args.tests = []
97            args.host_unit_test_only = False
98            self.assertTrue(
99                atest_utils.is_test_mapping(args),
100                'Failed to validate option %s' % attr_to_test)
101
102        args = mock.Mock()
103        for attr in tm_option_attributes:
104            setattr(args, attr, False)
105        args.tests = []
106        args.host_unit_test_only = True
107        self.assertFalse(atest_utils.is_test_mapping(args))
108
109        args = mock.Mock()
110        for attr in tm_option_attributes:
111            setattr(args, attr, False)
112        args.tests = [':group_name']
113        args.host_unit_test_only = False
114        self.assertTrue(atest_utils.is_test_mapping(args))
115
116        args = mock.Mock()
117        for attr in tm_option_attributes:
118            setattr(args, attr, False)
119        args.tests = [':test1', 'test2']
120        args.host_unit_test_only = False
121        self.assertFalse(atest_utils.is_test_mapping(args))
122
123        args = mock.Mock()
124        for attr in tm_option_attributes:
125            setattr(args, attr, False)
126        args.tests = ['test2']
127        args.host_unit_test_only = False
128        self.assertFalse(atest_utils.is_test_mapping(args))
129
130    def test_has_colors(self):
131        """Test method _has_colors."""
132        # stream is file I/O
133        stream = open('/tmp/test_has_colors.txt', 'wb')
134        self.assertFalse(atest_utils._has_colors(stream))
135        stream.close()
136
137        # stream is not a tty(terminal).
138        stream = mock.Mock()
139        stream.isatty.return_value = False
140        self.assertFalse(atest_utils._has_colors(stream))
141
142        # stream is a tty(terminal).
143        stream = mock.Mock()
144        stream.isatty.return_value = True
145        self.assertTrue(atest_utils._has_colors(stream))
146
147
148    @mock.patch('atest_utils._has_colors')
149    def test_colorize(self, mock_has_colors):
150        """Test method colorize."""
151        original_str = "test string"
152        green_no = 2
153
154        # _has_colors() return False.
155        mock_has_colors.return_value = False
156        converted_str = atest_utils.colorize(original_str, green_no,
157                                             highlight=True)
158        self.assertEqual(original_str, converted_str)
159
160        # Green with highlight.
161        mock_has_colors.return_value = True
162        converted_str = atest_utils.colorize(original_str, green_no,
163                                             highlight=True)
164        green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str
165        self.assertEqual(green_highlight_string, converted_str)
166
167        # Green, no highlight.
168        mock_has_colors.return_value = True
169        converted_str = atest_utils.colorize(original_str, green_no,
170                                             highlight=False)
171        green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
172        self.assertEqual(green_no_highlight_string, converted_str)
173
174
175    @mock.patch('atest_utils._has_colors')
176    def test_colorful_print(self, mock_has_colors):
177        """Test method colorful_print."""
178        testing_str = "color_print_test"
179        green_no = 2
180
181        # _has_colors() return False.
182        mock_has_colors.return_value = False
183        capture_output = StringIO()
184        sys.stdout = capture_output
185        atest_utils.colorful_print(testing_str, green_no, highlight=True,
186                                   auto_wrap=False)
187        sys.stdout = sys.__stdout__
188        uncolored_string = testing_str
189        self.assertEqual(capture_output.getvalue(), uncolored_string)
190
191        # Green with highlight, but no wrap.
192        mock_has_colors.return_value = True
193        capture_output = StringIO()
194        sys.stdout = capture_output
195        atest_utils.colorful_print(testing_str, green_no, highlight=True,
196                                   auto_wrap=False)
197        sys.stdout = sys.__stdout__
198        green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str
199        self.assertEqual(capture_output.getvalue(),
200                         green_highlight_no_wrap_string)
201
202        # Green, no highlight, no wrap.
203        mock_has_colors.return_value = True
204        capture_output = StringIO()
205        sys.stdout = capture_output
206        atest_utils.colorful_print(testing_str, green_no, highlight=False,
207                                   auto_wrap=False)
208        sys.stdout = sys.__stdout__
209        green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
210        self.assertEqual(capture_output.getvalue(),
211                         green_no_high_no_wrap_string)
212
213        # Green with highlight and wrap.
214        mock_has_colors.return_value = True
215        capture_output = StringIO()
216        sys.stdout = capture_output
217        atest_utils.colorful_print(testing_str, green_no, highlight=True,
218                                   auto_wrap=True)
219        sys.stdout = sys.__stdout__
220        green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str
221        self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
222
223        # Green with wrap, but no highlight.
224        mock_has_colors.return_value = True
225        capture_output = StringIO()
226        sys.stdout = capture_output
227        atest_utils.colorful_print(testing_str, green_no, highlight=False,
228                                   auto_wrap=True)
229        sys.stdout = sys.__stdout__
230        green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
231        self.assertEqual(capture_output.getvalue(),
232                         green_wrap_no_highlight_string)
233
234    @mock.patch('socket.gethostname')
235    @mock.patch('subprocess.check_output')
236    def test_is_external_run(self, mock_output, mock_hostname):
237        """Test method is_external_run."""
238        mock_output.return_value = ''
239        mock_hostname.return_value = ''
240        self.assertTrue(atest_utils.is_external_run())
241
242        mock_output.return_value = 'test@other.com'
243        mock_hostname.return_value = 'abc.com'
244        self.assertTrue(atest_utils.is_external_run())
245
246        mock_output.return_value = 'test@other.com'
247        mock_hostname.return_value = 'abc.google.com'
248        self.assertFalse(atest_utils.is_external_run())
249
250        mock_output.return_value = 'test@other.com'
251        mock_hostname.return_value = 'abc.google.def.com'
252        self.assertTrue(atest_utils.is_external_run())
253
254        mock_output.return_value = 'test@google.com'
255        self.assertFalse(atest_utils.is_external_run())
256
257        mock_output.return_value = 'test@other.com'
258        mock_hostname.return_value = 'c.googlers.com'
259        self.assertFalse(atest_utils.is_external_run())
260
261        mock_output.return_value = 'test@other.com'
262        mock_hostname.return_value = 'a.googlers.com'
263        self.assertTrue(atest_utils.is_external_run())
264
265        mock_output.side_effect = OSError()
266        self.assertTrue(atest_utils.is_external_run())
267
268        mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd')
269        self.assertTrue(atest_utils.is_external_run())
270
271    @mock.patch('metrics.metrics_base.get_user_type')
272    def test_print_data_collection_notice(self, mock_get_user_type):
273        """Test method print_data_collection_notice."""
274
275        # get_user_type return 1(external).
276        mock_get_user_type.return_value = 1
277        notice_str = ('\n==================\nNotice:\n'
278                      '  We collect anonymous usage statistics'
279                      ' in accordance with our'
280                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
281                      ' Contributor License Agreement (https://opensource.google.com/docs/cla/),'
282                      ' Privacy Policy (https://policies.google.com/privacy) and'
283                      ' Terms of Service (https://policies.google.com/terms).'
284                      '\n==================\n\n')
285        capture_output = StringIO()
286        sys.stdout = capture_output
287        atest_utils.print_data_collection_notice()
288        sys.stdout = sys.__stdout__
289        uncolored_string = notice_str
290        self.assertEqual(capture_output.getvalue(), uncolored_string)
291
292        # get_user_type return 0(internal).
293        mock_get_user_type.return_value = 0
294        notice_str = ('\n==================\nNotice:\n'
295                      '  We collect usage statistics'
296                      ' in accordance with our'
297                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
298                      ' Contributor License Agreement (https://cla.developers.google.com/),'
299                      ' Privacy Policy (https://policies.google.com/privacy) and'
300                      ' Terms of Service (https://policies.google.com/terms).'
301                      '\n==================\n\n')
302        capture_output = StringIO()
303        sys.stdout = capture_output
304        atest_utils.print_data_collection_notice()
305        sys.stdout = sys.__stdout__
306        uncolored_string = notice_str
307        self.assertEqual(capture_output.getvalue(), uncolored_string)
308
309    @mock.patch('builtins.input')
310    @mock.patch('json.load')
311    def test_update_test_runner_cmd(self, mock_json_load_data, mock_input):
312        """Test method handle_test_runner_cmd without enable do_verification."""
313        former_cmd_str = 'Former cmds ='
314        write_result_str = 'Save result mapping to test_result'
315        tmp_file = tempfile.NamedTemporaryFile()
316        input_cmd = 'atest_args'
317        runner_cmds = ['cmd1', 'cmd2']
318        capture_output = StringIO()
319        sys.stdout = capture_output
320        # Previous data is empty. Should not enter strtobool.
321        # If entered, exception will be raised cause test fail.
322        mock_json_load_data.return_value = {}
323        atest_utils.handle_test_runner_cmd(input_cmd,
324                                           runner_cmds,
325                                           do_verification=False,
326                                           result_path=tmp_file.name)
327        sys.stdout = sys.__stdout__
328        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
329        # Previous data is the same as the new input. Should not enter strtobool.
330        # If entered, exception will be raised cause test fail
331        capture_output = StringIO()
332        sys.stdout = capture_output
333        mock_json_load_data.return_value = {input_cmd:runner_cmds}
334        atest_utils.handle_test_runner_cmd(input_cmd,
335                                           runner_cmds,
336                                           do_verification=False,
337                                           result_path=tmp_file.name)
338        sys.stdout = sys.__stdout__
339        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
340        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
341        # Previous data has different cmds. Should enter strtobool not update,
342        # should not find write_result_str.
343        prev_cmds = ['cmd1']
344        mock_input.return_value = 'n'
345        capture_output = StringIO()
346        sys.stdout = capture_output
347        mock_json_load_data.return_value = {input_cmd:prev_cmds}
348        atest_utils.handle_test_runner_cmd(input_cmd,
349                                           runner_cmds,
350                                           do_verification=False,
351                                           result_path=tmp_file.name)
352        sys.stdout = sys.__stdout__
353        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
354
355    @mock.patch('json.load')
356    def test_verify_test_runner_cmd(self, mock_json_load_data):
357        """Test method handle_test_runner_cmd without enable update_result."""
358        tmp_file = tempfile.NamedTemporaryFile()
359        input_cmd = 'atest_args'
360        runner_cmds = ['cmd1', 'cmd2']
361        # Previous data is the same as the new input. Should not raise exception.
362        mock_json_load_data.return_value = {input_cmd:runner_cmds}
363        atest_utils.handle_test_runner_cmd(input_cmd,
364                                           runner_cmds,
365                                           do_verification=True,
366                                           result_path=tmp_file.name)
367        # Previous data has different cmds. Should enter strtobool and hit
368        # exception.
369        prev_cmds = ['cmd1']
370        mock_json_load_data.return_value = {input_cmd:prev_cmds}
371        self.assertRaises(atest_error.DryRunVerificationError,
372                          atest_utils.handle_test_runner_cmd,
373                          input_cmd,
374                          runner_cmds,
375                          do_verification=True,
376                          result_path=tmp_file.name)
377
378    def test_get_test_info_cache_path(self):
379        """Test method get_test_info_cache_path."""
380        input_file_name = 'mytest_name'
381        cache_root = '/a/b/c'
382        expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name).
383                                                       encode()).hexdigest())
384        self.assertEqual(os.path.join(cache_root, expect_hashed_name),
385                         atest_utils.get_test_info_cache_path(input_file_name,
386                                                              cache_root))
387
388    def test_get_and_load_cache(self):
389        """Test method update_test_info_cache and load_test_info_cache."""
390        test_reference = 'myTestRefA'
391        test_cache_dir = tempfile.mkdtemp()
392        atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A],
393                                           test_cache_dir)
394        unittest_utils.assert_equal_testinfo_sets(
395            self, set([TEST_INFO_A]),
396            atest_utils.load_test_info_cache(test_reference, test_cache_dir))
397
398    @mock.patch('os.getcwd')
399    def test_get_build_cmd(self, mock_cwd):
400        """Test method get_build_cmd."""
401        build_top = '/home/a/b/c'
402        rel_path = 'd/e'
403        mock_cwd.return_value = os.path.join(build_top, rel_path)
404        os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top}
405        with mock.patch.dict('os.environ', os_environ_mock, clear=True):
406            expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode']
407            self.assertEqual(expected_cmd, atest_utils.get_build_cmd())
408
409    @mock.patch('subprocess.check_output')
410    def test_get_modified_files(self, mock_co):
411        """Test method get_modified_files"""
412        mock_co.side_effect = [
413            x.encode('utf-8') for x in ['/a/b/',
414                                        '\n',
415                                        'test_fp1.java\nc/test_fp2.java']]
416        self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
417                         atest_utils.get_modified_files(''))
418        mock_co.side_effect = [
419            x.encode('utf-8') for x in ['/a/b/',
420                                        'test_fp4',
421                                        '/test_fp3.java']]
422        self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'},
423                         atest_utils.get_modified_files(''))
424
425    def test_delimiter(self):
426        """Test method delimiter"""
427        self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
428
429    def test_has_python_module(self):
430        """Test method has_python_module"""
431        self.assertFalse(atest_utils.has_python_module('M_M'))
432        self.assertTrue(atest_utils.has_python_module('os'))
433
434    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
435    def test_read_zip_single_text(self, _matched):
436        """Test method extract_zip_text include only one text file."""
437        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
438                                TEST_ZIP_DATA_DIR, TEST_SINGLE_ZIP_NAME)
439        expect_content = '\nfile1_line1\nfile1_line2\n'
440        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
441
442    @mock.patch.object(atest_utils, 'matched_tf_error_log', return_value=True)
443    def test_read_zip_multi_text(self, _matched):
444        """Test method extract_zip_text include multiple text files."""
445        zip_path = os.path.join(unittest_constants.TEST_DATA_DIR,
446                                TEST_ZIP_DATA_DIR, TEST_MULTI_ZIP_NAME)
447        expect_content = ('\nfile1_line1\nfile1_line2\n\nfile2_line1\n'
448                          'file2_line2\n')
449        self.assertEqual(expect_content, atest_utils.extract_zip_text(zip_path))
450
451    def test_matched_tf_error_log(self):
452        """Test method extract_zip_text include multiple text files."""
453        matched_content = '05-25 17:37:04 E/XXXXX YYYYY'
454        not_matched_content = '05-25 17:37:04 I/XXXXX YYYYY'
455        # Test matched content
456        self.assertEqual(True,
457                         atest_utils.matched_tf_error_log(matched_content))
458        # Test not matched content
459        self.assertEqual(False,
460                         atest_utils.matched_tf_error_log(not_matched_content))
461
462    @mock.patch('os.chmod')
463    @mock.patch('shutil.copy2')
464    @mock.patch('atest_utils.has_valid_cert')
465    @mock.patch('subprocess.check_output')
466    @mock.patch('os.path.exists')
467    def test_get_flakes(self, mock_path_exists, mock_output, mock_valid_cert,
468                        _cpc, _cm):
469        """Test method get_flakes."""
470        # Test par file does not exist.
471        mock_path_exists.return_value = False
472        self.assertEqual(None, atest_utils.get_flakes())
473        # Test par file exists.
474        mock_path_exists.return_value = True
475        mock_output.return_value = (b'flake_percent:0.10001\n'
476                                    b'postsubmit_flakes_per_week:12.0')
477        mock_valid_cert.return_value = True
478        expected_flake_info = {'flake_percent':'0.10001',
479                               'postsubmit_flakes_per_week':'12.0'}
480        self.assertEqual(expected_flake_info,
481                         atest_utils.get_flakes())
482        # Test no valid cert
483        mock_valid_cert.return_value = False
484        self.assertEqual(None,
485                         atest_utils.get_flakes())
486
487    @mock.patch('subprocess.check_call')
488    def test_has_valid_cert(self, mock_call):
489        """Test method has_valid_cert."""
490        # raise subprocess.CalledProcessError
491        mock_call.raiseError.side_effect = subprocess.CalledProcessError
492        self.assertFalse(atest_utils.has_valid_cert())
493        with mock.patch("constants.CERT_STATUS_CMD", ''):
494            self.assertFalse(atest_utils.has_valid_cert())
495        with mock.patch("constants.CERT_STATUS_CMD", 'CMD'):
496            # has valid cert
497            mock_call.return_value = 0
498            self.assertTrue(atest_utils.has_valid_cert())
499            # no valid cert
500            mock_call.return_value = 4
501            self.assertFalse(atest_utils.has_valid_cert())
502
503    # pylint: disable=no-member
504    def test_read_test_record_proto(self):
505        """Test method read_test_record."""
506        test_record_file_path = os.path.join(unittest_constants.TEST_DATA_DIR,
507                                             "test_record.proto.testonly")
508        test_record = atest_utils.read_test_record(test_record_file_path)
509        self.assertEqual(test_record.children[0].inline_test_record.test_record_id,
510                         'x86 hello_world_test')
511
512    def test_is_valid_json_file_file_not_exist(self):
513        """Test method is_valid_json_file if file not exist."""
514        json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR,
515                                      "not_exist.json")
516        self.assertFalse(atest_utils.is_valid_json_file(json_file_path))
517
518    def test_is_valid_json_file_content_valid(self):
519        """Test method is_valid_json_file if file exist and content is valid."""
520        json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR,
521                                      "module-info.json")
522        self.assertTrue(atest_utils.is_valid_json_file(json_file_path))
523
524    def test_is_valid_json_file_content_not_valid(self):
525        """Test method is_valid_json_file if file exist but content is valid."""
526        json_file_path = os.path.join(unittest_constants.TEST_DATA_DIR,
527                                      "not-valid-module-info.json")
528        self.assertFalse(atest_utils.is_valid_json_file(json_file_path))
529
530    @mock.patch('subprocess.Popen')
531    @mock.patch('os.getenv')
532    def test_get_manifest_branch(self, mock_env, mock_popen):
533        """Test method get_manifest_branch"""
534        mock_env.return_value = 'any_path'
535        process = mock_popen.return_value
536        process.communicate.return_value = (REPO_INFO_OUTPUT, '')
537        self.assertEqual('test_branch', atest_utils.get_manifest_branch())
538
539        mock_env.return_value = 'any_path'
540        process.communicate.return_value = ('not_matched_branch_pattern.', '')
541        self.assertEqual(None, atest_utils.get_manifest_branch())
542
543        mock_env.return_value = 'any_path'
544        process.communicate.side_effect = subprocess.TimeoutExpired(
545            1,
546            'repo info')
547        self.assertEqual(None, atest_utils.get_manifest_branch())
548
549        mock_env.return_value = None
550        process.communicate.return_value = (REPO_INFO_OUTPUT, '')
551        self.assertEqual(None, atest_utils.get_manifest_branch())
552
553    def test_has_wildcard(self):
554        """Test method of has_wildcard"""
555        self.assertFalse(atest_utils.has_wildcard('test1'))
556        self.assertFalse(atest_utils.has_wildcard(['test1']))
557        self.assertTrue(atest_utils.has_wildcard('test1?'))
558        self.assertTrue(atest_utils.has_wildcard(['test1', 'b*', 'a?b*']))
559
560    # pylint: disable=anomalous-backslash-in-string
561    def test_quote(self):
562        """Test method of quote()"""
563        target_str = r'TEST_(F|P)[0-9].*\w$'
564        expected_str = '\'TEST_(F|P)[0-9].*\w$\''
565        self.assertEqual(atest_utils.quote(target_str), expected_str)
566        self.assertEqual(atest_utils.quote('TEST_P224'), 'TEST_P224')
567
568    @mock.patch('builtins.input', return_value='')
569    def test_prompt_with_yn_result(self, mock_input):
570        """Test method of prompt_with_yn_result"""
571        msg = 'Do you want to continue?'
572        mock_input.return_value = ''
573        self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
574        self.assertFalse(atest_utils.prompt_with_yn_result(msg, False))
575        mock_input.return_value = 'y'
576        self.assertTrue(atest_utils.prompt_with_yn_result(msg, True))
577        mock_input.return_value = 'nO'
578        self.assertFalse(atest_utils.prompt_with_yn_result(msg, True))
579
580    def test_get_android_junit_config_filters(self):
581        """Test method of get_android_junit_config_filters"""
582        no_filter_test_config = os.path.join(
583            unittest_constants.TEST_DATA_DIR,
584            "filter_configs", "no_filter.cfg")
585        self.assertEqual({},
586                         atest_utils.get_android_junit_config_filters(
587                             no_filter_test_config))
588
589        filtered_test_config = os.path.join(
590            unittest_constants.TEST_DATA_DIR,
591            'filter_configs', 'filter.cfg')
592        filter_dict = atest_utils.get_android_junit_config_filters(
593            filtered_test_config)
594        include_annotations = filter_dict.get(constants.INCLUDE_ANNOTATION)
595        include_annotations.sort()
596        self.assertEqual(
597            ['include1', 'include2'],
598            include_annotations)
599        exclude_annotation = filter_dict.get(constants.EXCLUDE_ANNOTATION)
600        exclude_annotation.sort()
601        self.assertEqual(
602            ['exclude1', 'exclude2'],
603            exclude_annotation)
604
605    def test_md5sum(self):
606        """Test method of md5sum"""
607        exist_string = os.path.join(unittest_constants.TEST_DATA_DIR,
608                                    unittest_constants.JSON_FILE)
609        inexist_string = os.path.join(unittest_constants.TEST_DATA_DIR,
610                                      unittest_constants.CLASS_NAME)
611        self.assertEqual(
612            atest_utils.md5sum(exist_string), 'f02c1a648f16e5e9d7035bb11486ac2b')
613        self.assertEqual(
614            atest_utils.md5sum(inexist_string), '')
615
616    def test_check_md5(self):
617        """Test method of check_md5"""
618        file1 = os.path.join(unittest_constants.TEST_DATA_DIR,
619                            unittest_constants.JSON_FILE)
620        checksum_file = '/tmp/_tmp_module-info.json'
621        atest_utils.save_md5([file1], '/tmp/_tmp_module-info.json')
622        self.assertTrue(atest_utils.check_md5(checksum_file))
623        os.remove(checksum_file)
624        self.assertFalse(atest_utils.check_md5(checksum_file))
625        self.assertTrue(atest_utils.check_md5(checksum_file, missing_ok=True))
626
627    def test_get_config_parameter(self):
628        """Test method of get_config_parameter"""
629        parameter_config = os.path.join(
630            unittest_constants.TEST_DATA_DIR,
631            "parameter_config", "parameter.cfg")
632        no_parameter_config = os.path.join(
633            unittest_constants.TEST_DATA_DIR,
634            "parameter_config", "no_parameter.cfg")
635
636        # Test parameter empty value
637        self.assertEqual(set(),
638                         atest_utils.get_config_parameter(
639                             no_parameter_config))
640
641        # Test parameter empty value
642        self.assertEqual({'value_1', 'value_2', 'value_3', 'value_4'},
643                         atest_utils.get_config_parameter(
644                             parameter_config))
645
646    def test_get_config_device(self):
647        """Test method of get_config_device"""
648        device_config = os.path.join(
649            unittest_constants.TEST_DATA_DIR,
650            "parameter_config", "multiple_device.cfg")
651        self.assertEqual({'device_1', 'device_2'},
652                         atest_utils.get_config_device(device_config))
653
654    def test_get_mainline_param(self):
655        """Test method of get_mainline_param"""
656        mainline_param_config = os.path.join(
657            unittest_constants.TEST_DATA_DIR,
658            "parameter_config", "mainline_param.cfg")
659        self.assertEqual({'foo1.apex', 'foo2.apk+foo3.apk'},
660                         atest_utils.get_mainline_param(
661                             mainline_param_config))
662        no_mainline_param_config = os.path.join(
663            unittest_constants.TEST_DATA_DIR,
664            "parameter_config", "parameter.cfg")
665        self.assertEqual(set(),
666                         atest_utils.get_mainline_param(
667                             no_mainline_param_config))
668
669    def test_get_full_annotation_class_name(self):
670        """Test method of get_full_annotation_class_name."""
671        app_mode_full = 'android.platform.test.annotations.AppModeFull'
672        presubmit = 'android.platform.test.annotations.Presubmit'
673        module_info = {'srcs': [os.path.join(unittest_constants.TEST_DATA_DIR,
674                                'annotation_testing',
675                                'Annotation.src')]}
676        # get annotation class from keyword
677        self.assertEqual(
678            atest_utils.get_full_annotation_class_name(module_info, 'presubmit'),
679            presubmit)
680        # get annotation class from an accurate fqcn keyword.
681        self.assertEqual(
682            atest_utils.get_full_annotation_class_name(module_info, presubmit),
683            presubmit)
684        # accept fqcn keyword in lowercase.
685        self.assertEqual(
686            atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.presubmit'),
687            presubmit)
688        # unable to get annotation class from keyword.
689        self.assertNotEqual(
690            atest_utils.get_full_annotation_class_name(module_info, 'appleModefull'), app_mode_full)
691        # do not support partial-correct keyword.
692        self.assertNotEqual(
693            atest_utils.get_full_annotation_class_name(module_info, 'android.platform.test.annotations.pres'), presubmit)
694
695    def test_has_mixed_type_filters_one_module_with_one_type_return_false(self):
696        """Test method of has_mixed_type_filters"""
697        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
698        test_data_1 = {constants.TI_FILTER: [filter_1]}
699        test_info_1 = test_info.TestInfo('MODULE', 'RUNNER',
700                                set(), test_data_1,
701                                'SUITE', '',
702                                set())
703        self.assertFalse(atest_utils.has_mixed_type_filters([test_info_1]))
704
705    def test_has_mixed_type_filters_one_module_with_mixed_types_return_true(self):
706        """Test method of has_mixed_type_filters"""
707        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
708        filter_2 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
709        test_data_2 = {constants.TI_FILTER: [filter_1, filter_2]}
710        test_info_2 = test_info.TestInfo('MODULE', 'RUNNER',
711                                set(), test_data_2,
712                                'SUITE', '',
713                                set())
714        self.assertTrue(atest_utils.has_mixed_type_filters([test_info_2]))
715
716    def test_has_mixed_type_filters_two_module_with_mixed_types_return_false(self):
717        """Test method of has_mixed_type_filters"""
718        filter_1 = test_info.TestFilter('CLASS', frozenset(['METHOD']))
719        test_data_1 = {constants.TI_FILTER: [filter_1]}
720        test_info_1 = test_info.TestInfo('MODULE', 'RUNNER',
721                                set(), test_data_1,
722                                'SUITE', '',
723                                set())
724        filter_3 = test_info.TestFilter('CLASS', frozenset(['METHOD*']))
725        test_data_3 = {constants.TI_FILTER: [filter_3]}
726        test_info_3 = test_info.TestInfo('MODULE3', 'RUNNER',
727                                set(), test_data_3,
728                                'SUITE', '',
729                                set())
730        self.assertFalse(atest_utils.has_mixed_type_filters(
731            [test_info_1, test_info_3]))
732
733    def test_get_filter_types(self):
734        """Test method of get_filter_types."""
735        filters = set(['CLASS#METHOD'])
736        expect_types = set([FilterType.REGULAR_FILTER.value])
737        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
738
739        filters = set(['CLASS#METHOD*'])
740        expect_types = set([FilterType.WILDCARD_FILTER.value])
741        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
742
743        filters = set(['CLASS#METHOD', 'CLASS#METHOD*'])
744        expect_types = set([FilterType.WILDCARD_FILTER.value,
745                          FilterType.REGULAR_FILTER.value])
746        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
747
748        filters = set(['CLASS#METHOD?', 'CLASS#METHOD*'])
749        expect_types = set([FilterType.WILDCARD_FILTER.value])
750        self.assertEqual(atest_utils.get_filter_types(filters), expect_types)
751
752if __name__ == "__main__":
753    unittest.main()
754