• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2018, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Unittests for atest_utils."""
18
19# pylint: disable=line-too-long
20
21import hashlib
22import os
23import subprocess
24import sys
25import tempfile
26import unittest
27
28from io import StringIO
29from unittest import mock
30
31import atest_error
32import atest_utils
33import constants
34import unittest_utils
35
36from test_finders import test_info
37
38
39TEST_MODULE_NAME_A = 'ModuleNameA'
40TEST_RUNNER_A = 'FakeTestRunnerA'
41TEST_BUILD_TARGET_A = set(['bt1', 'bt2'])
42TEST_DATA_A = {'test_data_a_1': 'a1',
43               'test_data_a_2': 'a2'}
44TEST_SUITE_A = 'FakeSuiteA'
45TEST_MODULE_CLASS_A = 'FAKE_MODULE_CLASS_A'
46TEST_INSTALL_LOC_A = set(['host', 'device'])
47TEST_FINDER_A = 'MODULE'
48TEST_INFO_A = test_info.TestInfo(TEST_MODULE_NAME_A, TEST_RUNNER_A,
49                                 TEST_BUILD_TARGET_A, TEST_DATA_A,
50                                 TEST_SUITE_A, TEST_MODULE_CLASS_A,
51                                 TEST_INSTALL_LOC_A)
52TEST_INFO_A.test_finder = TEST_FINDER_A
53
54#pylint: disable=protected-access
55class AtestUtilsUnittests(unittest.TestCase):
56    """Unit tests for atest_utils.py"""
57
58    def test_capture_fail_section_has_fail_section(self):
59        """Test capture_fail_section when has fail section."""
60        test_list = ['AAAAAA', 'FAILED: Error1', '^\n', 'Error2\n',
61                     '[  6% 191/2997] BBBBBB\n', 'CCCCC',
62                     '[  20% 322/2997] DDDDDD\n', 'EEEEE']
63        want_list = ['FAILED: Error1', '^\n', 'Error2\n']
64        self.assertEqual(want_list,
65                         atest_utils._capture_fail_section(test_list))
66
67    def test_capture_fail_section_no_fail_section(self):
68        """Test capture_fail_section when no fail section."""
69        test_list = ['[ 6% 191/2997] XXXXX', 'YYYYY: ZZZZZ']
70        want_list = []
71        self.assertEqual(want_list,
72                         atest_utils._capture_fail_section(test_list))
73
74    def test_is_test_mapping(self):
75        """Test method is_test_mapping."""
76        tm_option_attributes = [
77            'test_mapping',
78            'include_subdirs'
79        ]
80        for attr_to_test in tm_option_attributes:
81            args = mock.Mock()
82            for attr in tm_option_attributes:
83                setattr(args, attr, attr == attr_to_test)
84            args.tests = []
85            self.assertTrue(
86                atest_utils.is_test_mapping(args),
87                'Failed to validate option %s' % attr_to_test)
88
89        args = mock.Mock()
90        for attr in tm_option_attributes:
91            setattr(args, attr, False)
92        args.tests = [':group_name']
93        self.assertTrue(atest_utils.is_test_mapping(args))
94
95        args = mock.Mock()
96        for attr in tm_option_attributes:
97            setattr(args, attr, False)
98        args.tests = [':test1', 'test2']
99        self.assertFalse(atest_utils.is_test_mapping(args))
100
101        args = mock.Mock()
102        for attr in tm_option_attributes:
103            setattr(args, attr, False)
104        args.tests = ['test2']
105        self.assertFalse(atest_utils.is_test_mapping(args))
106
107    @mock.patch('curses.tigetnum')
108    def test_has_colors(self, mock_curses_tigetnum):
109        """Test method _has_colors."""
110        # stream is file I/O
111        stream = open('/tmp/test_has_colors.txt', 'wb')
112        self.assertFalse(atest_utils._has_colors(stream))
113        stream.close()
114
115        # stream is not a tty(terminal).
116        stream = mock.Mock()
117        stream.isatty.return_value = False
118        self.assertFalse(atest_utils._has_colors(stream))
119
120        # stream is a tty(terminal) and colors < 2.
121        stream = mock.Mock()
122        stream.isatty.return_value = True
123        mock_curses_tigetnum.return_value = 1
124        self.assertFalse(atest_utils._has_colors(stream))
125
126        # stream is a tty(terminal) and colors > 2.
127        stream = mock.Mock()
128        stream.isatty.return_value = True
129        mock_curses_tigetnum.return_value = 256
130        self.assertTrue(atest_utils._has_colors(stream))
131
132
133    @mock.patch('atest_utils._has_colors')
134    def test_colorize(self, mock_has_colors):
135        """Test method colorize."""
136        original_str = "test string"
137        green_no = 2
138
139        # _has_colors() return False.
140        mock_has_colors.return_value = False
141        converted_str = atest_utils.colorize(original_str, green_no,
142                                             highlight=True)
143        self.assertEqual(original_str, converted_str)
144
145        # Green with highlight.
146        mock_has_colors.return_value = True
147        converted_str = atest_utils.colorize(original_str, green_no,
148                                             highlight=True)
149        green_highlight_string = '\x1b[1;42m%s\x1b[0m' % original_str
150        self.assertEqual(green_highlight_string, converted_str)
151
152        # Green, no highlight.
153        mock_has_colors.return_value = True
154        converted_str = atest_utils.colorize(original_str, green_no,
155                                             highlight=False)
156        green_no_highlight_string = '\x1b[1;32m%s\x1b[0m' % original_str
157        self.assertEqual(green_no_highlight_string, converted_str)
158
159
160    @mock.patch('atest_utils._has_colors')
161    def test_colorful_print(self, mock_has_colors):
162        """Test method colorful_print."""
163        testing_str = "color_print_test"
164        green_no = 2
165
166        # _has_colors() return False.
167        mock_has_colors.return_value = False
168        capture_output = StringIO()
169        sys.stdout = capture_output
170        atest_utils.colorful_print(testing_str, green_no, highlight=True,
171                                   auto_wrap=False)
172        sys.stdout = sys.__stdout__
173        uncolored_string = testing_str
174        self.assertEqual(capture_output.getvalue(), uncolored_string)
175
176        # Green with highlight, but no wrap.
177        mock_has_colors.return_value = True
178        capture_output = StringIO()
179        sys.stdout = capture_output
180        atest_utils.colorful_print(testing_str, green_no, highlight=True,
181                                   auto_wrap=False)
182        sys.stdout = sys.__stdout__
183        green_highlight_no_wrap_string = '\x1b[1;42m%s\x1b[0m' % testing_str
184        self.assertEqual(capture_output.getvalue(),
185                         green_highlight_no_wrap_string)
186
187        # Green, no highlight, no wrap.
188        mock_has_colors.return_value = True
189        capture_output = StringIO()
190        sys.stdout = capture_output
191        atest_utils.colorful_print(testing_str, green_no, highlight=False,
192                                   auto_wrap=False)
193        sys.stdout = sys.__stdout__
194        green_no_high_no_wrap_string = '\x1b[1;32m%s\x1b[0m' % testing_str
195        self.assertEqual(capture_output.getvalue(),
196                         green_no_high_no_wrap_string)
197
198        # Green with highlight and wrap.
199        mock_has_colors.return_value = True
200        capture_output = StringIO()
201        sys.stdout = capture_output
202        atest_utils.colorful_print(testing_str, green_no, highlight=True,
203                                   auto_wrap=True)
204        sys.stdout = sys.__stdout__
205        green_highlight_wrap_string = '\x1b[1;42m%s\x1b[0m\n' % testing_str
206        self.assertEqual(capture_output.getvalue(), green_highlight_wrap_string)
207
208        # Green with wrap, but no highlight.
209        mock_has_colors.return_value = True
210        capture_output = StringIO()
211        sys.stdout = capture_output
212        atest_utils.colorful_print(testing_str, green_no, highlight=False,
213                                   auto_wrap=True)
214        sys.stdout = sys.__stdout__
215        green_wrap_no_highlight_string = '\x1b[1;32m%s\x1b[0m\n' % testing_str
216        self.assertEqual(capture_output.getvalue(),
217                         green_wrap_no_highlight_string)
218
219    @mock.patch('socket.gethostname')
220    @mock.patch('subprocess.check_output')
221    def test_is_external_run(self, mock_output, mock_hostname):
222        """Test method is_external_run."""
223        mock_output.return_value = ''
224        mock_hostname.return_value = ''
225        self.assertTrue(atest_utils.is_external_run())
226
227        mock_output.return_value = 'test@other.com'
228        mock_hostname.return_value = 'abc.com'
229        self.assertTrue(atest_utils.is_external_run())
230
231        mock_output.return_value = 'test@other.com'
232        mock_hostname.return_value = 'abc.google.com'
233        self.assertFalse(atest_utils.is_external_run())
234
235        mock_output.return_value = 'test@other.com'
236        mock_hostname.return_value = 'abc.google.def.com'
237        self.assertTrue(atest_utils.is_external_run())
238
239        mock_output.return_value = 'test@google.com'
240        self.assertFalse(atest_utils.is_external_run())
241
242        mock_output.return_value = 'test@other.com'
243        mock_hostname.return_value = 'c.googlers.com'
244        self.assertFalse(atest_utils.is_external_run())
245
246        mock_output.return_value = 'test@other.com'
247        mock_hostname.return_value = 'a.googlers.com'
248        self.assertTrue(atest_utils.is_external_run())
249
250        mock_output.side_effect = OSError()
251        self.assertTrue(atest_utils.is_external_run())
252
253        mock_output.side_effect = subprocess.CalledProcessError(1, 'cmd')
254        self.assertTrue(atest_utils.is_external_run())
255
256    @mock.patch('metrics.metrics_base.get_user_type')
257    def test_print_data_collection_notice(self, mock_get_user_type):
258        """Test method print_data_collection_notice."""
259
260        # get_user_type return 1(external).
261        mock_get_user_type.return_value = 1
262        notice_str = ('\n==================\nNotice:\n'
263                      '  We collect anonymous usage statistics'
264                      ' in accordance with our'
265                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
266                      ' Contributor License Agreement (https://opensource.google.com/docs/cla/),'
267                      ' Privacy Policy (https://policies.google.com/privacy) and'
268                      ' Terms of Service (https://policies.google.com/terms).'
269                      '\n==================\n\n')
270        capture_output = StringIO()
271        sys.stdout = capture_output
272        atest_utils.print_data_collection_notice()
273        sys.stdout = sys.__stdout__
274        uncolored_string = notice_str
275        self.assertEqual(capture_output.getvalue(), uncolored_string)
276
277        # get_user_type return 0(internal).
278        mock_get_user_type.return_value = 0
279        notice_str = ('\n==================\nNotice:\n'
280                      '  We collect usage statistics'
281                      ' in accordance with our'
282                      ' Content Licenses (https://source.android.com/setup/start/licenses),'
283                      ' Contributor License Agreement (https://cla.developers.google.com/),'
284                      ' Privacy Policy (https://policies.google.com/privacy) and'
285                      ' Terms of Service (https://policies.google.com/terms).'
286                      '\n==================\n\n')
287        capture_output = StringIO()
288        sys.stdout = capture_output
289        atest_utils.print_data_collection_notice()
290        sys.stdout = sys.__stdout__
291        uncolored_string = notice_str
292        self.assertEqual(capture_output.getvalue(), uncolored_string)
293
294    @mock.patch('builtins.input')
295    @mock.patch('json.load')
296    def test_update_test_runner_cmd(self, mock_json_load_data, mock_input):
297        """Test method handle_test_runner_cmd without enable do_verification."""
298        former_cmd_str = 'Former cmds ='
299        write_result_str = 'Save result mapping to test_result'
300        tmp_file = tempfile.NamedTemporaryFile()
301        input_cmd = 'atest_args'
302        runner_cmds = ['cmd1', 'cmd2']
303        capture_output = StringIO()
304        sys.stdout = capture_output
305        # Previous data is empty. Should not enter strtobool.
306        # If entered, exception will be raised cause test fail.
307        mock_json_load_data.return_value = {}
308        atest_utils.handle_test_runner_cmd(input_cmd,
309                                           runner_cmds,
310                                           do_verification=False,
311                                           result_path=tmp_file.name)
312        sys.stdout = sys.__stdout__
313        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
314        # Previous data is the same as the new input. Should not enter strtobool.
315        # If entered, exception will be raised cause test fail
316        capture_output = StringIO()
317        sys.stdout = capture_output
318        mock_json_load_data.return_value = {input_cmd:runner_cmds}
319        atest_utils.handle_test_runner_cmd(input_cmd,
320                                           runner_cmds,
321                                           do_verification=False,
322                                           result_path=tmp_file.name)
323        sys.stdout = sys.__stdout__
324        self.assertEqual(capture_output.getvalue().find(former_cmd_str), -1)
325        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
326        # Previous data has different cmds. Should enter strtobool not update,
327        # should not find write_result_str.
328        prev_cmds = ['cmd1']
329        mock_input.return_value = 'n'
330        capture_output = StringIO()
331        sys.stdout = capture_output
332        mock_json_load_data.return_value = {input_cmd:prev_cmds}
333        atest_utils.handle_test_runner_cmd(input_cmd,
334                                           runner_cmds,
335                                           do_verification=False,
336                                           result_path=tmp_file.name)
337        sys.stdout = sys.__stdout__
338        self.assertEqual(capture_output.getvalue().find(write_result_str), -1)
339
340    @mock.patch('json.load')
341    def test_verify_test_runner_cmd(self, mock_json_load_data):
342        """Test method handle_test_runner_cmd without enable update_result."""
343        tmp_file = tempfile.NamedTemporaryFile()
344        input_cmd = 'atest_args'
345        runner_cmds = ['cmd1', 'cmd2']
346        # Previous data is the same as the new input. Should not raise exception.
347        mock_json_load_data.return_value = {input_cmd:runner_cmds}
348        atest_utils.handle_test_runner_cmd(input_cmd,
349                                           runner_cmds,
350                                           do_verification=True,
351                                           result_path=tmp_file.name)
352        # Previous data has different cmds. Should enter strtobool and hit
353        # exception.
354        prev_cmds = ['cmd1']
355        mock_json_load_data.return_value = {input_cmd:prev_cmds}
356        self.assertRaises(atest_error.DryRunVerificationError,
357                          atest_utils.handle_test_runner_cmd,
358                          input_cmd,
359                          runner_cmds,
360                          do_verification=True,
361                          result_path=tmp_file.name)
362
363    def test_get_test_info_cache_path(self):
364        """Test method get_test_info_cache_path."""
365        input_file_name = 'mytest_name'
366        cache_root = '/a/b/c'
367        expect_hashed_name = ('%s.cache' % hashlib.md5(str(input_file_name).
368                                                       encode()).hexdigest())
369        self.assertEqual(os.path.join(cache_root, expect_hashed_name),
370                         atest_utils.get_test_info_cache_path(input_file_name,
371                                                              cache_root))
372
373    def test_get_and_load_cache(self):
374        """Test method update_test_info_cache and load_test_info_cache."""
375        test_reference = 'myTestRefA'
376        test_cache_dir = tempfile.mkdtemp()
377        atest_utils.update_test_info_cache(test_reference, [TEST_INFO_A],
378                                           test_cache_dir)
379        unittest_utils.assert_equal_testinfo_sets(
380            self, set([TEST_INFO_A]),
381            atest_utils.load_test_info_cache(test_reference, test_cache_dir))
382
383    @mock.patch('os.getcwd')
384    def test_get_build_cmd(self, mock_cwd):
385        """Test method get_build_cmd."""
386        build_top = '/home/a/b/c'
387        rel_path = 'd/e'
388        mock_cwd.return_value = os.path.join(build_top, rel_path)
389        os_environ_mock = {constants.ANDROID_BUILD_TOP: build_top}
390        with mock.patch.dict('os.environ', os_environ_mock, clear=True):
391            expected_cmd = ['../../build/soong/soong_ui.bash', '--make-mode']
392            self.assertEqual(expected_cmd, atest_utils.get_build_cmd())
393
394    @mock.patch('subprocess.check_output')
395    def test_get_modified_files(self, mock_co):
396        """Test method get_modified_files"""
397        mock_co.side_effect = [
398            x.encode('utf-8') for x in ['/a/b/',
399                                        '\n',
400                                        'test_fp1.java\nc/test_fp2.java']]
401        self.assertEqual({'/a/b/test_fp1.java', '/a/b/c/test_fp2.java'},
402                         atest_utils.get_modified_files(''))
403        mock_co.side_effect = [
404            x.encode('utf-8') for x in ['/a/b/',
405                                        'test_fp4',
406                                        '/test_fp3.java']]
407        self.assertEqual({'/a/b/test_fp4', '/a/b/test_fp3.java'},
408                         atest_utils.get_modified_files(''))
409
410    def test_delimiter(self):
411        """Test method delimiter"""
412        self.assertEqual('\n===\n\n', atest_utils.delimiter('=', 3, 1, 2))
413
414if __name__ == "__main__":
415    unittest.main()
416