1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for OtaTools.""" 15 16import os 17import shutil 18import tempfile 19import unittest 20 21from unittest import mock 22 23from acloud import errors 24from acloud.internal.lib import ota_tools 25 26_INPUT_MISC_INFO = """ 27mkbootimg_args= 28lpmake=lpmake 29dynamic_partition_list= system vendor 30vendor_image=/path/to/vendor.img 31super_super_device_size=3229614080 32""" 33 34_EXPECTED_MISC_INFO = """ 35mkbootimg_args= 36lpmake=%s 37dynamic_partition_list= system vendor 38super_super_device_size=3229614080 39system_image=/path/to/system.img 40vendor_image=/path/to/vendor.img 41""" 42 43_INPUT_SYSTEM_QEMU_CONFIG = """ 44out/target/product/generic_x86_64/vbmeta.img vbmeta 1 45out/target/product/generic_x86_64/super.img super 2 46""" 47 48_EXPECTED_SYSTEM_QEMU_CONFIG = """ 49/path/to/vbmeta.img vbmeta 1 50/path/to/super.img super 2 51""" 52 53 54def _GetImage(name): 55 """Return the image path that appears in the expected output.""" 56 return "/path/to/" + name + ".img" 57 58 59class CapturedFile: 60 """Capture intermediate files created by OtaTools.""" 61 62 def __init__(self): 63 self.path = None 64 self.contents = None 65 66 def Load(self, path): 67 """Load file contents to this object.""" 68 self.path = path 69 if not os.path.isfile(path): 70 return 71 with open(path, "r") as f: 72 self.contents = f.read() 73 74 75class OtaToolsTest(unittest.TestCase): 76 """Test OtaToolsTest methods.""" 77 78 def setUp(self): 79 self._temp_dir = tempfile.mkdtemp() 80 os.mkdir(os.path.join(self._temp_dir, "bin")) 81 self._ota = ota_tools.OtaTools(self._temp_dir) 82 self._captured_files = [] 83 84 def tearDown(self): 85 shutil.rmtree(self._temp_dir) 86 for path in self._captured_files: 87 if os.path.isfile(path): 88 os.remove(path) 89 90 @staticmethod 91 def _CreateFile(path, contents): 92 """Create and write to a file.""" 93 parent_dir = os.path.dirname(path) 94 if not os.path.exists(parent_dir): 95 os.makedirs(parent_dir) 96 with open(path, "w") as f: 97 f.write(contents) 98 99 def _CreateBinary(self, name): 100 """Create an empty file in bin directory.""" 101 path = os.path.join(self._temp_dir, "bin", name) 102 self._CreateFile(path, "") 103 return path 104 105 @staticmethod 106 def _MockPopen(return_value): 107 """Create a mock Popen object.""" 108 popen = mock.Mock() 109 popen.communicate.return_value = ("stdout", "stderr") 110 popen.returncode = return_value 111 popen.poll.return_value = return_value 112 return popen 113 114 @staticmethod 115 def _MockPopenTimeout(): 116 """Create a mock Popen object that raises timeout error.""" 117 popen = mock.Mock() 118 popen.communicate.side_effect = errors.FunctionTimeoutError( 119 "unit test") 120 popen.returncode = None 121 popen.poll.return_value = None 122 return popen 123 124 def testFindOtaTools(self): 125 """Test FindOtaTools.""" 126 # CVD host package contains lpmake but not all tools. 127 self._CreateBinary("lpmake") 128 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 129 {"ANDROID_HOST_OUT": self._temp_dir, 130 "ANDROID_SOONG_HOST_OUT": self._temp_dir}, clear=True): 131 with self.assertRaises(errors.CheckPathError): 132 ota_tools.FindOtaTools([self._temp_dir]) 133 134 # The function identifies OTA tool directory by build_super_image. 135 self._CreateBinary("build_super_image") 136 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 137 dict(), clear=True): 138 self.assertEqual(ota_tools.FindOtaTools([self._temp_dir]), 139 self._temp_dir) 140 141 # ANDROID_HOST_OUT contains OTA tools in build environment. 142 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 143 {"ANDROID_HOST_OUT": self._temp_dir, 144 "ANDROID_SOONG_HOST_OUT": self._temp_dir}, clear=True): 145 self.assertEqual(ota_tools.FindOtaTools([]), self._temp_dir) 146 147 def testGetImageForPartition(self): 148 """Test GetImageForPartition.""" 149 image_dir = os.path.join(self._temp_dir, "images") 150 vendor_path = os.path.join(image_dir, "vendor.img") 151 override_system_path = os.path.join(self._temp_dir, "system.img") 152 self._CreateFile(vendor_path, "") 153 self._CreateFile(os.path.join(image_dir, "system.img"), "") 154 self._CreateFile(override_system_path, "") 155 156 returned_path = ota_tools.GetImageForPartition( 157 "system", image_dir, system=override_system_path) 158 self.assertEqual(returned_path, override_system_path) 159 160 returned_path = ota_tools.GetImageForPartition( 161 "vendor", image_dir, system=override_system_path) 162 self.assertEqual(returned_path, vendor_path) 163 164 with self.assertRaises(errors.GetLocalImageError): 165 ota_tools.GetImageForPartition("not_exist", image_dir) 166 167 with self.assertRaises(errors.GetLocalImageError): 168 ota_tools.GetImageForPartition( 169 "system", image_dir, 170 system=os.path.join(self._temp_dir, "not_exist")) 171 172 # pylint: disable=broad-except 173 def _TestBuildSuperImage(self, mock_popen, mock_popen_object, 174 expected_error=None): 175 """Test BuildSuperImage. 176 177 Args: 178 mock_popen: Mock class of subprocess.Popen. 179 popen_return_value: Mock object of subprocess.Popen. 180 expected_error: The error type that BuildSuperImage should raise. 181 """ 182 build_super_image = self._CreateBinary("build_super_image") 183 lpmake = self._CreateBinary("lpmake") 184 185 misc_info_path = os.path.join(self._temp_dir, "misc_info.txt") 186 self._CreateFile(misc_info_path, _INPUT_MISC_INFO) 187 188 rewritten_misc_info = CapturedFile() 189 190 def _CaptureMiscInfo(cmd, **_kwargs): 191 self._captured_files.append(cmd[1]) 192 rewritten_misc_info.Load(cmd[1]) 193 return mock_popen_object 194 195 mock_popen.side_effect = _CaptureMiscInfo 196 197 try: 198 self._ota.BuildSuperImage("/unit/test", misc_info_path, _GetImage) 199 if expected_error: 200 self.fail(expected_error.__name__ + " is not raised.") 201 except Exception as e: 202 if not expected_error or not isinstance(e, expected_error): 203 raise 204 205 expected_cmd = ( 206 build_super_image, 207 rewritten_misc_info.path, 208 "/unit/test", 209 ) 210 211 mock_popen.assert_called_once() 212 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 213 self.assertEqual(rewritten_misc_info.contents, 214 _EXPECTED_MISC_INFO % lpmake) 215 self.assertFalse(os.path.exists(rewritten_misc_info.path)) 216 217 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 218 def testBuildSuperImageSuccess(self, mock_popen): 219 """Test BuildSuperImage.""" 220 self._TestBuildSuperImage(mock_popen, self._MockPopen(return_value=0)) 221 222 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 223 def testBuildSuperImageTimeout(self, mock_popen): 224 """Test BuildSuperImage with command timeout.""" 225 self._TestBuildSuperImage(mock_popen, self._MockPopenTimeout(), 226 errors.FunctionTimeoutError) 227 228 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 229 def testMakeDisabledVbmetaImageSuccess(self, mock_popen): 230 """Test MakeDisabledVbmetaImage.""" 231 avbtool = self._CreateBinary("avbtool") 232 233 mock_popen.return_value = self._MockPopen(return_value=0) 234 235 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 236 {"PYTHONPATH": "/unit/test"}, clear=True): 237 self._ota.MakeDisabledVbmetaImage("/unit/test") 238 239 expected_cmd = ( 240 avbtool, "make_vbmeta_image", 241 "--flag", "2", 242 "--padding_size", "4096", 243 "--output", "/unit/test", 244 ) 245 246 mock_popen.assert_called_once() 247 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 248 self.assertFalse(mock_popen.call_args[1]["env"]) 249 250 # pylint: disable=broad-except 251 def _TestMkCombinedImg(self, mock_popen, mock_popen_object, 252 expected_error=None): 253 """Test MkCombinedImg. 254 255 Args: 256 mock_popen: Mock class of subprocess.Popen. 257 mock_popen_object: Mock object of subprocess.Popen. 258 expected_error: The error type that MkCombinedImg should raise. 259 """ 260 mk_combined_img = self._CreateBinary("mk_combined_img") 261 sgdisk = self._CreateBinary("sgdisk") 262 simg2img = self._CreateBinary("simg2img") 263 264 config_path = os.path.join(self._temp_dir, "misc_info.txt") 265 self._CreateFile(config_path, _INPUT_SYSTEM_QEMU_CONFIG) 266 267 rewritten_config = CapturedFile() 268 269 def _CaptureSystemQemuConfig(cmd, **_kwargs): 270 self._captured_files.append(cmd[2]) 271 rewritten_config.Load(cmd[2]) 272 return mock_popen_object 273 274 mock_popen.side_effect = _CaptureSystemQemuConfig 275 276 try: 277 self._ota.MkCombinedImg("/unit/test", config_path, _GetImage) 278 if expected_error: 279 self.fail(expected_error.__name__ + " is not raised.") 280 except Exception as e: 281 if not expected_error or not isinstance(e, expected_error): 282 raise 283 284 expected_cmd = ( 285 mk_combined_img, 286 "-i", rewritten_config.path, 287 "-o", "/unit/test", 288 ) 289 290 expected_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 291 292 mock_popen.assert_called_once() 293 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 294 self.assertEqual(mock_popen.call_args[1].get("env"), expected_env) 295 self.assertEqual(rewritten_config.contents, 296 _EXPECTED_SYSTEM_QEMU_CONFIG) 297 self.assertFalse(os.path.exists(rewritten_config.path)) 298 299 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 300 def testMkCombinedImgSuccess(self, mock_popen): 301 """Test MkCombinedImg.""" 302 return self._TestMkCombinedImg(mock_popen, 303 self._MockPopen(return_value=0)) 304 305 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 306 def testMkCombinedImgFailure(self, mock_popen): 307 """Test MkCombinedImg with command failure.""" 308 return self._TestMkCombinedImg(mock_popen, 309 self._MockPopen(return_value=1), 310 errors.SubprocessFail) 311 312 313if __name__ == "__main__": 314 unittest.main() 315