1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for OtaTools.""" 15 16import os 17import shutil 18import tempfile 19import unittest 20 21from unittest import mock 22 23from acloud import errors 24from acloud.internal.lib import ota_tools 25 26_INPUT_MISC_INFO = """ 27mkbootimg_args= 28lpmake=lpmake 29dynamic_partition_list= system vendor 30vendor_image=/path/to/vendor.img 31super_super_device_size=3229614080 32""" 33 34_EXPECTED_MISC_INFO = """ 35mkbootimg_args= 36lpmake=%s 37dynamic_partition_list= system vendor 38super_super_device_size=3229614080 39system_image=/path/to/system.img 40vendor_image=/path/to/vendor.img 41""" 42 43_INPUT_SYSTEM_QEMU_CONFIG = """ 44out/target/product/generic_x86_64/vbmeta.img vbmeta 1 45out/target/product/generic_x86_64/super.img super 2 46""" 47 48_EXPECTED_SYSTEM_QEMU_CONFIG = """ 49/path/to/vbmeta.img vbmeta 1 50/path/to/super.img super 2 51""" 52 53 54def _GetImage(name): 55 """Return the image path that appears in the expected output.""" 56 return "/path/to/" + name + ".img" 57 58 59class CapturedFile: 60 """Capture intermediate files created by OtaTools.""" 61 62 def __init__(self): 63 self.path = None 64 self.contents = None 65 66 def Load(self, path): 67 """Load file contents to this object.""" 68 self.path = path 69 if not os.path.isfile(path): 70 return 71 with open(path, "r") as f: 72 self.contents = f.read() 73 74 75class OtaToolsTest(unittest.TestCase): 76 """Test OtaToolsTest methods.""" 77 78 def setUp(self): 79 self._temp_dir = tempfile.mkdtemp() 80 os.mkdir(os.path.join(self._temp_dir, "bin")) 81 self._ota = ota_tools.OtaTools(self._temp_dir) 82 self._captured_files = [] 83 84 def tearDown(self): 85 shutil.rmtree(self._temp_dir) 86 for path in self._captured_files: 87 if os.path.isfile(path): 88 os.remove(path) 89 90 @staticmethod 91 def _CreateFile(path, contents): 92 """Create and write to a file.""" 93 parent_dir = os.path.dirname(path) 94 if not os.path.exists(parent_dir): 95 os.makedirs(parent_dir) 96 with open(path, "w") as f: 97 f.write(contents) 98 99 def _CreateBinary(self, name): 100 """Create an empty file in bin directory.""" 101 path = os.path.join(self._temp_dir, "bin", name) 102 self._CreateFile(path, "") 103 return path 104 105 @staticmethod 106 def _MockPopen(return_value): 107 """Create a mock Popen object.""" 108 popen = mock.Mock() 109 popen.communicate.return_value = ("stdout", "stderr") 110 popen.returncode = return_value 111 popen.poll.return_value = return_value 112 return popen 113 114 @staticmethod 115 def _MockPopenTimeout(): 116 """Create a mock Popen object that raises timeout error.""" 117 popen = mock.Mock() 118 popen.communicate.side_effect = errors.FunctionTimeoutError( 119 "unit test") 120 popen.returncode = None 121 popen.poll.return_value = None 122 return popen 123 124 # pylint: disable=protected-access 125 def testFindOtaTools(self): 126 """Test FindOtaToolsDir and FindOtaTools.""" 127 # CVD host package contains lpmake but not all tools. 128 self._CreateBinary("lpmake") 129 with self.assertRaises(errors.CheckPathError): 130 ota_tools.FindOtaToolsDir([self._temp_dir]) 131 132 # The function identifies OTA tool directory by build_super_image. 133 self._CreateBinary("build_super_image") 134 self.assertEqual(ota_tools.FindOtaToolsDir([self._temp_dir]), 135 self._temp_dir) 136 self.assertEqual( 137 ota_tools.FindOtaTools([self._temp_dir])._ota_tools_dir, 138 self._temp_dir) 139 140 def testGetImageForPartition(self): 141 """Test GetImageForPartition.""" 142 image_dir = os.path.join(self._temp_dir, "images") 143 vendor_path = os.path.join(image_dir, "vendor.img") 144 override_system_path = os.path.join(self._temp_dir, "system.img") 145 self._CreateFile(vendor_path, "") 146 self._CreateFile(os.path.join(image_dir, "system.img"), "") 147 self._CreateFile(override_system_path, "") 148 149 returned_path = ota_tools.GetImageForPartition( 150 "system", image_dir, system=override_system_path) 151 self.assertEqual(returned_path, override_system_path) 152 153 returned_path = ota_tools.GetImageForPartition( 154 "vendor", image_dir, system=override_system_path) 155 self.assertEqual(returned_path, vendor_path) 156 157 with self.assertRaises(errors.GetLocalImageError): 158 ota_tools.GetImageForPartition("not_exist", image_dir) 159 160 with self.assertRaises(errors.GetLocalImageError): 161 ota_tools.GetImageForPartition( 162 "system", image_dir, 163 system=os.path.join(self._temp_dir, "not_exist")) 164 165 # pylint: disable=broad-except 166 def _TestBuildSuperImage(self, mock_popen, mock_popen_object, 167 expected_error=None): 168 """Test BuildSuperImage. 169 170 Args: 171 mock_popen: Mock class of subprocess.Popen. 172 popen_return_value: Mock object of subprocess.Popen. 173 expected_error: The error type that BuildSuperImage should raise. 174 """ 175 build_super_image = self._CreateBinary("build_super_image") 176 lpmake = self._CreateBinary("lpmake") 177 178 misc_info_path = os.path.join(self._temp_dir, "misc_info.txt") 179 self._CreateFile(misc_info_path, _INPUT_MISC_INFO) 180 181 rewritten_misc_info = CapturedFile() 182 183 def _CaptureMiscInfo(cmd, **_kwargs): 184 self._captured_files.append(cmd[1]) 185 rewritten_misc_info.Load(cmd[1]) 186 return mock_popen_object 187 188 mock_popen.side_effect = _CaptureMiscInfo 189 190 try: 191 self._ota.BuildSuperImage("/unit/test", misc_info_path, _GetImage) 192 if expected_error: 193 self.fail(expected_error.__name__ + " is not raised.") 194 except Exception as e: 195 if not expected_error or not isinstance(e, expected_error): 196 raise 197 198 expected_cmd = ( 199 build_super_image, 200 rewritten_misc_info.path, 201 "/unit/test", 202 ) 203 204 mock_popen.assert_called_once() 205 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 206 self.assertEqual(rewritten_misc_info.contents, 207 _EXPECTED_MISC_INFO % lpmake) 208 self.assertFalse(os.path.exists(rewritten_misc_info.path)) 209 210 @mock.patch("acloud.internal.lib.utils.subprocess.Popen") 211 def testBuildSuperImageSuccess(self, mock_popen): 212 """Test BuildSuperImage.""" 213 self._TestBuildSuperImage(mock_popen, self._MockPopen(return_value=0)) 214 215 @mock.patch("acloud.internal.lib.utils.subprocess.Popen") 216 def testBuildSuperImageTimeout(self, mock_popen): 217 """Test BuildSuperImage with command timeout.""" 218 self._TestBuildSuperImage(mock_popen, self._MockPopenTimeout(), 219 errors.FunctionTimeoutError) 220 221 @mock.patch("acloud.internal.lib.utils.subprocess.Popen") 222 def testMakeDisabledVbmetaImageSuccess(self, mock_popen): 223 """Test MakeDisabledVbmetaImage.""" 224 avbtool = self._CreateBinary("avbtool") 225 226 mock_popen.return_value = self._MockPopen(return_value=0) 227 228 with mock.patch.dict("acloud.internal.lib.utils.os.environ", 229 {"PYTHONPATH": "/unit/test"}, clear=True): 230 self._ota.MakeDisabledVbmetaImage("/unit/test") 231 232 expected_cmd = ( 233 avbtool, "make_vbmeta_image", 234 "--flag", "2", 235 "--padding_size", "4096", 236 "--output", "/unit/test", 237 ) 238 239 mock_popen.assert_called_once() 240 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 241 self.assertFalse(mock_popen.call_args[1]["env"]) 242 243 # pylint: disable=broad-except 244 def _TestMkCombinedImg(self, mock_popen, mock_popen_object, 245 expected_error=None): 246 """Test MkCombinedImg. 247 248 Args: 249 mock_popen: Mock class of subprocess.Popen. 250 mock_popen_object: Mock object of subprocess.Popen. 251 expected_error: The error type that MkCombinedImg should raise. 252 """ 253 mk_combined_img = self._CreateBinary("mk_combined_img") 254 sgdisk = self._CreateBinary("sgdisk") 255 simg2img = self._CreateBinary("simg2img") 256 257 config_path = os.path.join(self._temp_dir, "misc_info.txt") 258 self._CreateFile(config_path, _INPUT_SYSTEM_QEMU_CONFIG) 259 260 rewritten_config = CapturedFile() 261 262 def _CaptureSystemQemuConfig(cmd, **_kwargs): 263 self._captured_files.append(cmd[2]) 264 rewritten_config.Load(cmd[2]) 265 return mock_popen_object 266 267 mock_popen.side_effect = _CaptureSystemQemuConfig 268 269 try: 270 self._ota.MkCombinedImg("/unit/test", config_path, _GetImage) 271 if expected_error: 272 self.fail(expected_error.__name__ + " is not raised.") 273 except Exception as e: 274 if not expected_error or not isinstance(e, expected_error): 275 raise 276 277 expected_cmd = ( 278 mk_combined_img, 279 "-i", rewritten_config.path, 280 "-o", "/unit/test", 281 ) 282 283 expected_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 284 285 mock_popen.assert_called_once() 286 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 287 self.assertEqual(mock_popen.call_args[1].get("env"), expected_env) 288 self.assertEqual(rewritten_config.contents, 289 _EXPECTED_SYSTEM_QEMU_CONFIG) 290 self.assertFalse(os.path.exists(rewritten_config.path)) 291 292 @mock.patch("acloud.internal.lib.utils.subprocess.Popen") 293 def testMkCombinedImgSuccess(self, mock_popen): 294 """Test MkCombinedImg.""" 295 return self._TestMkCombinedImg(mock_popen, 296 self._MockPopen(return_value=0)) 297 298 @mock.patch("acloud.internal.lib.utils.subprocess.Popen") 299 def testMkCombinedImgFailure(self, mock_popen): 300 """Test MkCombinedImg with command failure.""" 301 return self._TestMkCombinedImg(mock_popen, 302 self._MockPopen(return_value=1), 303 errors.SubprocessFail) 304 305 306if __name__ == "__main__": 307 unittest.main() 308