• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env vpython3
2# Copyright 2022 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""File for testing flash_device.py."""
6
7import os
8import subprocess
9import unittest
10import unittest.mock as mock
11
12import boot_device
13import flash_device
14
15_TEST_IMAGE_DIR = 'test/image/dir'
16_TEST_PRODUCT = 'test_product'
17_TEST_VERSION = 'test.version'
18
19
20# pylint: disable=too-many-public-methods,protected-access
21class FlashDeviceTest(unittest.TestCase):
22    """Unittests for flash_device.py."""
23
24    def setUp(self) -> None:
25        context_mock = mock.Mock()
26        context_mock.__enter__ = mock.Mock(return_value=None)
27        context_mock.__exit__ = mock.Mock(return_value=None)
28        ffx_mock = mock.Mock()
29        ffx_mock.returncode = 0
30        ffx_patcher = mock.patch('common.run_ffx_command',
31                                 return_value=ffx_mock)
32        sdk_hash_patcher = mock.patch('flash_device.get_sdk_hash',
33                                      return_value=(_TEST_PRODUCT,
34                                                    _TEST_VERSION))
35        swarming_patcher = mock.patch('flash_device.running_unattended',
36                                      return_value=False)
37        time_sleep = mock.patch('time.sleep')
38        self._ffx_mock = ffx_patcher.start()
39        self._sdk_hash_mock = sdk_hash_patcher.start()
40        self._swarming_mock = swarming_patcher.start()
41        self._time_sleep = time_sleep.start()
42        self.addCleanup(self._ffx_mock.stop)
43        self.addCleanup(self._sdk_hash_mock.stop)
44        self.addCleanup(self._swarming_mock.stop)
45        self.addCleanup(self._time_sleep.stop)
46
47    def test_update_required_on_ignore_returns_immediately(self) -> None:
48        """Test |os_check|='ignore' skips all checks."""
49        result, new_image_dir = flash_device.update_required(
50            'ignore', 'some-image-dir', None)
51
52        self.assertFalse(result)
53        self.assertEqual(new_image_dir, 'some-image-dir')
54
55    def test_update_required_raises_value_error_if_no_image_dir(self) -> None:
56        """Test |os_check|!='ignore' checks that image dir is non-Falsey."""
57        with self.assertRaises(ValueError):
58            flash_device.update_required('update', None, None)
59
60    def test_update_required_logs_missing_image_dir(self) -> None:
61        """Test |os_check|!='ignore' warns if image dir does not exist."""
62        with mock.patch('os.path.exists', return_value=False), \
63                mock.patch('flash_device.find_image_in_sdk'), \
64                mock.patch('flash_device._get_system_info'), \
65                self.assertLogs() as logger:
66            flash_device.update_required('update', 'some/image/dir', None)
67            self.assertIn('image directory does not exist', logger.output[0])
68
69    def test_update_required_searches_and_returns_sdk_if_image_found(self
70                                                                     ) -> None:
71        """Test |os_check|!='ignore' searches for image dir in SDK."""
72        with mock.patch('os.path.exists', return_value=False), \
73                mock.patch('flash_device.find_image_in_sdk') as mock_find, \
74                mock.patch('flash_device._get_system_info'), \
75                mock.patch('common.SDK_ROOT', 'path/to/sdk/dir'), \
76                self.assertLogs():
77            mock_find.return_value = 'path/to/image/dir'
78            update_required, new_image_dir = flash_device.update_required(
79                'update', 'product-bundle', None, None)
80            self.assertTrue(update_required)
81            self.assertEqual(new_image_dir, 'path/to/image/dir')
82            mock_find.assert_called_once_with('product-bundle')
83
84    def test_update_required_raises_file_not_found_error(self) -> None:
85        """Test |os_check|!='ignore' raises FileNotFoundError if no path."""
86        with mock.patch('os.path.exists', return_value=False), \
87                mock.patch('flash_device.find_image_in_sdk',
88                           return_value=None), \
89                mock.patch('common.SDK_ROOT', 'path/to/sdk/dir'), \
90                self.assertLogs(), \
91                self.assertRaises(FileNotFoundError):
92            flash_device.update_required('update', 'product-bundle', None)
93
94    def test_update_ignore(self) -> None:
95        """Test setting |os_check| to 'ignore'."""
96
97        flash_device.update(_TEST_IMAGE_DIR, 'ignore', None)
98        self.assertEqual(self._ffx_mock.call_count, 0)
99        self.assertEqual(self._sdk_hash_mock.call_count, 0)
100
101    def test_dir_unspecified_value_error(self) -> None:
102        """Test ValueError raised when system_image_dir unspecified."""
103
104        with self.assertRaises(ValueError):
105            flash_device.update(None, 'check', None)
106
107    def test_update_system_info_match(self) -> None:
108        """Test no update when |os_check| is 'check' and system info matches."""
109
110        with mock.patch('os.path.exists', return_value=True):
111            self._ffx_mock.return_value.stdout = \
112                '[{"title": "Build", "child": [{"value": "%s"}, ' \
113                '{"value": "%s"}]}]' % (_TEST_VERSION, _TEST_PRODUCT)
114            flash_device.update(_TEST_IMAGE_DIR, 'check', None)
115            self.assertEqual(self._ffx_mock.call_count, 1)
116            self.assertEqual(self._sdk_hash_mock.call_count, 1)
117
118    def test_update_system_info_catches_boot_failure(self) -> None:
119        """Test update when |os_check=check| catches boot_device exceptions."""
120
121        self._swarming_mock.return_value = True
122        with mock.patch('os.path.exists', return_value=True), \
123                mock.patch('flash_device.boot_device') as mock_boot, \
124                mock.patch('flash_device.get_system_info') as mock_sys_info, \
125                mock.patch('flash_device.subprocess.run'):
126            mock_boot.side_effect = boot_device.StateTransitionError(
127                'Incorrect state')
128            self._ffx_mock.return_value.stdout = \
129                '[{"title": "Build", "child": [{"value": "wrong.version"}, ' \
130                '{"value": "wrong_product"}]}]'
131            flash_device.update(_TEST_IMAGE_DIR,
132                                'check',
133                                None,
134                                should_pave=False)
135            # Regular boot is to check the versions.
136            mock_boot.assert_called_once_with(mock.ANY,
137                                              boot_device.BootMode.REGULAR,
138                                              None)
139            self.assertEqual(self._ffx_mock.call_count, 1)
140
141            # get_system_info should not even be called due to early exit.
142            mock_sys_info.assert_not_called()
143
144    def test_update_system_info_mismatch(self) -> None:
145        """Test update when |os_check| is 'check' and system info does not
146        match."""
147
148        self._swarming_mock.return_value = True
149        with mock.patch('os.path.exists', return_value=True), \
150                mock.patch('flash_device.boot_device') as mock_boot, \
151                mock.patch('flash_device.subprocess.run'):
152            self._ffx_mock.return_value.stdout = \
153                '[{"title": "Build", "child": [{"value": "wrong.version"}, ' \
154                '{"value": "wrong_product"}]}]'
155            flash_device.update(_TEST_IMAGE_DIR,
156                                'check',
157                                None,
158                                should_pave=False)
159            # Regular boot is to check the versions.
160            mock_boot.assert_called_once_with(mock.ANY,
161                                              boot_device.BootMode.REGULAR,
162                                              None)
163            self.assertEqual(self._ffx_mock.call_count, 2)
164
165    def test_incorrect_target_info(self) -> None:
166        """Test update when |os_check| is 'check' and system info was not
167        retrieved."""
168        with mock.patch('os.path.exists', return_value=True):
169            self._ffx_mock.return_value.stdout = '[{"title": "badtitle"}]'
170            flash_device.update(_TEST_IMAGE_DIR,
171                                'check',
172                                None,
173                                should_pave=False)
174            self.assertEqual(self._ffx_mock.call_count, 2)
175
176    def test_update_with_serial_num(self) -> None:
177        """Test update when |serial_num| is specified."""
178
179        with mock.patch('time.sleep'), \
180                mock.patch('os.path.exists', return_value=True), \
181                mock.patch('flash_device.boot_device') as mock_boot:
182            flash_device.update(_TEST_IMAGE_DIR,
183                                'update',
184                                None,
185                                'test_serial',
186                                should_pave=False)
187            mock_boot.assert_called_once_with(mock.ANY,
188                                              boot_device.BootMode.BOOTLOADER,
189                                              'test_serial')
190        self.assertEqual(self._ffx_mock.call_count, 2)
191
192    def test_reboot_failure(self) -> None:
193        """Test update when |serial_num| is specified."""
194        self._ffx_mock.return_value.returncode = 1
195        with mock.patch('time.sleep'), \
196                mock.patch('os.path.exists', return_value=True), \
197                mock.patch('flash_device.running_unattended',
198                           return_value=True), \
199                mock.patch('flash_device.boot_device'):
200            required, _ = flash_device.update_required('check',
201                                                       _TEST_IMAGE_DIR, None)
202            self.assertEqual(required, True)
203
204    # pylint: disable=no-self-use
205    def test_update_calls_paving_if_specified(self) -> None:
206        """Test update calls pave if specified."""
207        with mock.patch('time.sleep'), \
208                mock.patch('os.path.exists', return_value=True), \
209                mock.patch('flash_device.running_unattended',
210                           return_value=True), \
211                mock.patch('flash_device.boot_device') as mock_boot, \
212                mock.patch('flash_device.pave') as mock_pave:
213            flash_device.update(_TEST_IMAGE_DIR,
214                                'update',
215                                'some-target-id',
216                                should_pave=True)
217
218            mock_boot.assert_called_once_with('some-target-id',
219                                              boot_device.BootMode.RECOVERY,
220                                              None)
221            mock_pave.assert_called_once_with(_TEST_IMAGE_DIR,
222                                              'some-target-id')
223
224    # pylint: enable=no-self-use
225
226    def test_update_raises_error_if_unattended_with_no_target(self) -> None:
227        """Test update raises error if no target specified."""
228
229        self._swarming_mock.return_value = True
230        with mock.patch('time.sleep'), \
231            mock.patch('flash_device.pave'), \
232            mock.patch('os.path.exists', return_value=True):
233            self.assertRaises(AssertionError,
234                              flash_device.update,
235                              _TEST_IMAGE_DIR,
236                              'update',
237                              None,
238                              should_pave=True)
239
240    def test_update_on_swarming(self) -> None:
241        """Test update on swarming bots."""
242
243        self._swarming_mock.return_value = True
244        with mock.patch('time.sleep'), \
245             mock.patch('os.path.exists', return_value=True), \
246             mock.patch('flash_device.boot_device') as mock_boot, \
247             mock.patch('subprocess.run'):
248            flash_device.update(_TEST_IMAGE_DIR,
249                                'update',
250                                None,
251                                'test_serial',
252                                should_pave=False)
253            mock_boot.assert_called_once_with(mock.ANY,
254                                              boot_device.BootMode.BOOTLOADER,
255                                              'test_serial')
256        self.assertEqual(self._ffx_mock.call_count, 2)
257
258    # pylint: disable=no-self-use
259    def test_update_with_pave_timeout_defaults_to_flash(self) -> None:
260        """Test update falls back to flash if pave fails."""
261        with mock.patch('time.sleep'), \
262                mock.patch('os.path.exists', return_value=True), \
263                mock.patch('flash_device.running_unattended',
264                           return_value=True), \
265                mock.patch('flash_device.pave') as mock_pave, \
266                mock.patch('flash_device.boot_device'), \
267                mock.patch('flash_device.flash') as mock_flash:
268            mock_pave.side_effect = subprocess.TimeoutExpired(
269                cmd='/some/cmd',
270                timeout=0,
271            )
272            flash_device.update(_TEST_IMAGE_DIR,
273                                'update',
274                                'some-target-id',
275                                should_pave=True)
276            mock_pave.assert_called_once_with(_TEST_IMAGE_DIR,
277                                              'some-target-id')
278            mock_flash.assert_called_once_with(_TEST_IMAGE_DIR,
279                                               'some-target-id', None)
280
281    # pylint: enable=no-self-use
282
283    def test_main(self) -> None:
284        """Tests |main| function."""
285
286        with mock.patch(
287                'sys.argv',
288            ['flash_device.py', '--os-check', 'ignore', '--no-pave']):
289            with mock.patch.dict(os.environ, {}):
290                flash_device.main()
291        self.assertEqual(self._ffx_mock.call_count, 0)
292# pylint: enable=too-many-public-methods,protected-access
293
294
295if __name__ == '__main__':
296    unittest.main()
297