• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for OtaTools."""
15
16import os
17import shutil
18import tempfile
19import unittest
20
21from unittest import mock
22
23from acloud import errors
24from acloud.internal.lib import ota_tools
25
26_INPUT_MISC_INFO = """
27mkbootimg_args=
28lpmake=lpmake
29dynamic_partition_list= system vendor
30vendor_image=/path/to/vendor.img
31super_super_device_size=3229614080
32"""
33
34_EXPECTED_MISC_INFO = """
35mkbootimg_args=
36lpmake=%s
37dynamic_partition_list= system vendor
38super_super_device_size=3229614080
39system_image=/path/to/system.img
40vendor_image=/path/to/vendor.img
41"""
42
43_INPUT_SYSTEM_QEMU_CONFIG = """
44out/target/product/generic_x86_64/vbmeta.img vbmeta 1
45out/target/product/generic_x86_64/super.img super 2
46"""
47
48_EXPECTED_SYSTEM_QEMU_CONFIG = """
49/path/to/vbmeta.img vbmeta 1
50/path/to/super.img super 2
51"""
52
53
54def _GetImage(name):
55    """Return the image path that appears in the expected output."""
56    return "/path/to/" + name + ".img"
57
58
59class CapturedFile:
60    """Capture intermediate files created by OtaTools."""
61
62    def __init__(self):
63        self.path = None
64        self.contents = None
65
66    def Load(self, path):
67        """Load file contents to this object."""
68        self.path = path
69        if not os.path.isfile(path):
70            return
71        with open(path, "r") as f:
72            self.contents = f.read()
73
74
75class OtaToolsTest(unittest.TestCase):
76    """Test OtaToolsTest methods."""
77
78    def setUp(self):
79        self._temp_dir = tempfile.mkdtemp()
80        os.mkdir(os.path.join(self._temp_dir, "bin"))
81        self._ota = ota_tools.OtaTools(self._temp_dir)
82        self._captured_files = []
83
84    def tearDown(self):
85        shutil.rmtree(self._temp_dir)
86        for path in self._captured_files:
87            if os.path.isfile(path):
88                os.remove(path)
89
90    @staticmethod
91    def _CreateFile(path, contents):
92        """Create and write to a file."""
93        parent_dir = os.path.dirname(path)
94        if not os.path.exists(parent_dir):
95            os.makedirs(parent_dir)
96        with open(path, "w") as f:
97            f.write(contents)
98
99    def _CreateBinary(self, name):
100        """Create an empty file in bin directory."""
101        path = os.path.join(self._temp_dir, "bin", name)
102        self._CreateFile(path, "")
103        return path
104
105    @staticmethod
106    def _MockPopen(return_value):
107        """Create a mock Popen object."""
108        popen = mock.Mock()
109        popen.communicate.return_value = ("stdout", "stderr")
110        popen.returncode = return_value
111        popen.poll.return_value = return_value
112        return popen
113
114    @staticmethod
115    def _MockPopenTimeout():
116        """Create a mock Popen object that raises timeout error."""
117        popen = mock.Mock()
118        popen.communicate.side_effect = errors.FunctionTimeoutError(
119            "unit test")
120        popen.returncode = None
121        popen.poll.return_value = None
122        return popen
123
124    def testFindOtaTools(self):
125        """Test FindOtaTools."""
126        # CVD host package contains lpmake but not all tools.
127        self._CreateBinary("lpmake")
128        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
129                             {"ANDROID_HOST_OUT": self._temp_dir,
130                              "ANDROID_SOONG_HOST_OUT": self._temp_dir}, clear=True):
131            with self.assertRaises(errors.CheckPathError):
132                ota_tools.FindOtaTools([self._temp_dir])
133
134        # The function identifies OTA tool directory by build_super_image.
135        self._CreateBinary("build_super_image")
136        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
137                             dict(), clear=True):
138            self.assertEqual(ota_tools.FindOtaTools([self._temp_dir]),
139                             self._temp_dir)
140
141        # ANDROID_HOST_OUT contains OTA tools in build environment.
142        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
143                             {"ANDROID_HOST_OUT": self._temp_dir,
144                              "ANDROID_SOONG_HOST_OUT": self._temp_dir}, clear=True):
145            self.assertEqual(ota_tools.FindOtaTools([]), self._temp_dir)
146
147    def testGetImageForPartition(self):
148        """Test GetImageForPartition."""
149        image_dir = os.path.join(self._temp_dir, "images")
150        vendor_path = os.path.join(image_dir, "vendor.img")
151        override_system_path = os.path.join(self._temp_dir, "system.img")
152        self._CreateFile(vendor_path, "")
153        self._CreateFile(os.path.join(image_dir, "system.img"), "")
154        self._CreateFile(override_system_path, "")
155
156        returned_path = ota_tools.GetImageForPartition(
157            "system", image_dir, system=override_system_path)
158        self.assertEqual(returned_path, override_system_path)
159
160        returned_path = ota_tools.GetImageForPartition(
161            "vendor", image_dir, system=override_system_path)
162        self.assertEqual(returned_path, vendor_path)
163
164        with self.assertRaises(errors.GetLocalImageError):
165            ota_tools.GetImageForPartition("not_exist", image_dir)
166
167        with self.assertRaises(errors.GetLocalImageError):
168            ota_tools.GetImageForPartition(
169                "system", image_dir,
170                system=os.path.join(self._temp_dir, "not_exist"))
171
172    # pylint: disable=broad-except
173    def _TestBuildSuperImage(self, mock_popen, mock_popen_object,
174                             expected_error=None):
175        """Test BuildSuperImage.
176
177        Args:
178            mock_popen: Mock class of subprocess.Popen.
179            popen_return_value: Mock object of subprocess.Popen.
180            expected_error: The error type that BuildSuperImage should raise.
181        """
182        build_super_image = self._CreateBinary("build_super_image")
183        lpmake = self._CreateBinary("lpmake")
184
185        misc_info_path = os.path.join(self._temp_dir, "misc_info.txt")
186        self._CreateFile(misc_info_path, _INPUT_MISC_INFO)
187
188        rewritten_misc_info = CapturedFile()
189
190        def _CaptureMiscInfo(cmd, **_kwargs):
191            self._captured_files.append(cmd[1])
192            rewritten_misc_info.Load(cmd[1])
193            return mock_popen_object
194
195        mock_popen.side_effect = _CaptureMiscInfo
196
197        try:
198            self._ota.BuildSuperImage("/unit/test", misc_info_path, _GetImage)
199            if expected_error:
200                self.fail(expected_error.__name__ + " is not raised.")
201        except Exception as e:
202            if not expected_error or not isinstance(e, expected_error):
203                raise
204
205        expected_cmd = (
206            build_super_image,
207            rewritten_misc_info.path,
208            "/unit/test",
209        )
210
211        mock_popen.assert_called_once()
212        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
213        self.assertEqual(rewritten_misc_info.contents,
214                         _EXPECTED_MISC_INFO % lpmake)
215        self.assertFalse(os.path.exists(rewritten_misc_info.path))
216
217    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
218    def testBuildSuperImageSuccess(self, mock_popen):
219        """Test BuildSuperImage."""
220        self._TestBuildSuperImage(mock_popen, self._MockPopen(return_value=0))
221
222    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
223    def testBuildSuperImageTimeout(self, mock_popen):
224        """Test BuildSuperImage with command timeout."""
225        self._TestBuildSuperImage(mock_popen, self._MockPopenTimeout(),
226                                  errors.FunctionTimeoutError)
227
228    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
229    def testMakeDisabledVbmetaImageSuccess(self, mock_popen):
230        """Test MakeDisabledVbmetaImage."""
231        avbtool = self._CreateBinary("avbtool")
232
233        mock_popen.return_value = self._MockPopen(return_value=0)
234
235        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
236                             {"PYTHONPATH": "/unit/test"}, clear=True):
237            self._ota.MakeDisabledVbmetaImage("/unit/test")
238
239        expected_cmd = (
240            avbtool, "make_vbmeta_image",
241            "--flag", "2",
242            "--padding_size", "4096",
243            "--output", "/unit/test",
244        )
245
246        mock_popen.assert_called_once()
247        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
248        self.assertFalse(mock_popen.call_args[1]["env"])
249
250    # pylint: disable=broad-except
251    def _TestMkCombinedImg(self, mock_popen, mock_popen_object,
252                           expected_error=None):
253        """Test MkCombinedImg.
254
255        Args:
256            mock_popen: Mock class of subprocess.Popen.
257            mock_popen_object: Mock object of subprocess.Popen.
258            expected_error: The error type that MkCombinedImg should raise.
259        """
260        mk_combined_img = self._CreateBinary("mk_combined_img")
261        sgdisk = self._CreateBinary("sgdisk")
262        simg2img = self._CreateBinary("simg2img")
263
264        config_path = os.path.join(self._temp_dir, "misc_info.txt")
265        self._CreateFile(config_path, _INPUT_SYSTEM_QEMU_CONFIG)
266
267        rewritten_config = CapturedFile()
268
269        def _CaptureSystemQemuConfig(cmd, **_kwargs):
270            self._captured_files.append(cmd[2])
271            rewritten_config.Load(cmd[2])
272            return mock_popen_object
273
274        mock_popen.side_effect = _CaptureSystemQemuConfig
275
276        try:
277            self._ota.MkCombinedImg("/unit/test", config_path, _GetImage)
278            if expected_error:
279                self.fail(expected_error.__name__ + " is not raised.")
280        except Exception as e:
281            if not expected_error or not isinstance(e, expected_error):
282                raise
283
284        expected_cmd = (
285            mk_combined_img,
286            "-i", rewritten_config.path,
287            "-o", "/unit/test",
288        )
289
290        expected_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
291
292        mock_popen.assert_called_once()
293        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
294        self.assertEqual(mock_popen.call_args[1].get("env"), expected_env)
295        self.assertEqual(rewritten_config.contents,
296                         _EXPECTED_SYSTEM_QEMU_CONFIG)
297        self.assertFalse(os.path.exists(rewritten_config.path))
298
299    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
300    def testMkCombinedImgSuccess(self, mock_popen):
301        """Test MkCombinedImg."""
302        return self._TestMkCombinedImg(mock_popen,
303                                       self._MockPopen(return_value=0))
304
305    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
306    def testMkCombinedImgFailure(self, mock_popen):
307        """Test MkCombinedImg with command failure."""
308        return self._TestMkCombinedImg(mock_popen,
309                                       self._MockPopen(return_value=1),
310                                       errors.SubprocessFail)
311
312
313if __name__ == "__main__":
314    unittest.main()
315