1# Copyright 2018 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for create_common.""" 15 16import collections 17import os 18import shutil 19import tempfile 20import unittest 21 22from unittest import mock 23 24from acloud import errors 25from acloud.create import create_common 26from acloud.internal.lib import android_build_client 27from acloud.internal.lib import auth 28from acloud.internal.lib import driver_test_lib 29from acloud.internal.lib import utils 30 31 32ExtraFile = collections.namedtuple("ExtraFile", ["source", "target"]) 33 34 35class FakeZipFile: 36 """Fake implementation of ZipFile()""" 37 38 # pylint: disable=invalid-name,unused-argument,no-self-use 39 def write(self, filename, arcname=None, compress_type=None): 40 """Fake write method.""" 41 return 42 43 # pylint: disable=invalid-name,no-self-use 44 def close(self): 45 """Fake close method.""" 46 return 47 48 49# pylint: disable=invalid-name,protected-access 50class CreateCommonTest(driver_test_lib.BaseDriverTest): 51 """Test create_common functions.""" 52 53 # pylint: disable=protected-access 54 def testProcessHWPropertyWithInvalidArgs(self): 55 """Test ParseKeyValuePairArgs with invalid args.""" 56 # Checking wrong property value. 57 args_str = "cpu:3,disk:" 58 with self.assertRaises(errors.MalformedDictStringError): 59 create_common.ParseKeyValuePairArgs(args_str) 60 61 # Checking wrong property format. 62 args_str = "cpu:3,disk" 63 with self.assertRaises(errors.MalformedDictStringError): 64 create_common.ParseKeyValuePairArgs(args_str) 65 66 def testParseHWPropertyStr(self): 67 """Test ParseKeyValuePairArgs.""" 68 expected_dict = {"cpu": "2", "resolution": "1080x1920", "dpi": "240", 69 "memory": "4g", "disk": "4g"} 70 args_str = "cpu:2,resolution:1080x1920,dpi:240,memory:4g,disk:4g" 71 result_dict = create_common.ParseKeyValuePairArgs(args_str) 72 self.assertTrue(expected_dict == result_dict) 73 74 def testGetNonEmptyEnvVars(self): 75 """Test GetNonEmptyEnvVars.""" 76 with mock.patch.dict("acloud.internal.lib.utils.os.environ", 77 {"A": "", "B": "b"}, 78 clear=True): 79 self.assertEqual( 80 ["b"], create_common.GetNonEmptyEnvVars("A", "B", "C")) 81 82 def testParseExtraFilesArgs(self): 83 """Test ParseExtraFilesArgs.""" 84 expected_result = [ExtraFile(source="local_path", target="gce_path")] 85 files_info = ["local_path,gce_path"] 86 self.assertEqual(expected_result, 87 create_common.ParseExtraFilesArgs(files_info)) 88 89 # Test multiple files 90 expected_result = [ExtraFile(source="local_path1", target="gce_path1"), 91 ExtraFile(source="local_path2", target="gce_path2")] 92 files_info = ["local_path1,gce_path1", 93 "local_path2,gce_path2"] 94 self.assertEqual(expected_result, 95 create_common.ParseExtraFilesArgs(files_info)) 96 97 # Test wrong file info format. 98 files_info = ["local_path"] 99 with self.assertRaises(errors.MalformedDictStringError): 100 create_common.ParseExtraFilesArgs(files_info) 101 102 def testGetCvdHostPackage(self): 103 """test GetCvdHostPackage.""" 104 # Can't find the cvd host package 105 with mock.patch("os.path.exists") as exists: 106 exists.return_value = False 107 self.assertRaises( 108 errors.GetCvdLocalHostPackageError, 109 create_common.GetCvdHostPackage) 110 111 self.Patch(os.environ, "get", return_value="/fake_dir2") 112 self.Patch(utils, "GetDistDir", return_value="/fake_dir1") 113 self.Patch(os.path, "exists", 114 side_effect=lambda path: 115 path == "/fake_dir1/cvd-host_package.tar.gz") 116 117 # Find cvd host in dist dir. 118 self.assertEqual( 119 create_common.GetCvdHostPackage(), 120 "/fake_dir1/cvd-host_package.tar.gz") 121 122 # Find cvd host in host out dir. 123 self.Patch(os.environ, "get", return_value="/fake_dir2") 124 self.Patch(utils, "GetDistDir", return_value=None) 125 with mock.patch("os.path.exists") as exists: 126 exists.side_effect = lambda path: \ 127 path == "/fake_dir2/cvd-host_package.tar.gz" 128 self.assertEqual( 129 create_common.GetCvdHostPackage(), 130 "/fake_dir2/cvd-host_package.tar.gz") 131 with mock.patch("os.path.exists") as exists: 132 exists.side_effect = lambda path: \ 133 path == "/fake_dir2/cvd-host_package" 134 self.assertEqual( 135 create_common.GetCvdHostPackage(), 136 "/fake_dir2/cvd-host_package") 137 138 # Find cvd host in specified path. 139 package_path = "/tool_dir/cvd-host_package.tar.gz" 140 self.Patch(utils, "GetDistDir", return_value=None) 141 with mock.patch("os.path.exists") as exists: 142 exists.return_value = True 143 self.assertEqual( 144 create_common.GetCvdHostPackage(package_path), 145 "/tool_dir/cvd-host_package.tar.gz") 146 147 @mock.patch("acloud.create.create_common.os.path.isfile", 148 side_effect=lambda path: path == "/dir/name") 149 @mock.patch("acloud.create.create_common.os.path.isdir", 150 side_effect=lambda path: path == "/dir") 151 @mock.patch("acloud.create.create_common.os.listdir", 152 return_value=["name", "name2"]) 153 def testFindLocalImage(self, _mock_listdir, _mock_isdir, _mock_isfile): 154 """Test FindLocalImage.""" 155 self.assertEqual( 156 "/dir/name", 157 create_common.FindLocalImage("/test/../dir/name", "not_exist")) 158 159 self.assertEqual("/dir/name", 160 create_common.FindLocalImage("/dir/", "name")) 161 162 163 self.assertIsNone(create_common.FindLocalImage("/dir", "not_exist", 164 raise_error=False)) 165 with self.assertRaises(errors.GetLocalImageError): 166 create_common.FindLocalImage("/dir", "not_exist") 167 168 with self.assertRaises(errors.GetLocalImageError): 169 create_common.FindLocalImage("/dir", "name.?", raise_error=False) 170 171 def testFindBootImage(self): 172 """Test FindBootImage.""" 173 with tempfile.TemporaryDirectory() as temp_dir: 174 with self.assertRaises(errors.GetLocalImageError): 175 create_common.FindBootImage(temp_dir) 176 177 boot_image_path = os.path.join(temp_dir, "boot.img") 178 self.CreateFile(boot_image_path, b"invalid") 179 with self.assertRaises(errors.GetLocalImageError): 180 create_common.FindBootImage(temp_dir) 181 os.remove(boot_image_path) 182 183 boot_image_path = os.path.join(temp_dir, "boot.img") 184 self.CreateFile(boot_image_path, b"ANDROID!") 185 self.assertEqual(boot_image_path, 186 create_common.FindBootImage(temp_dir)) 187 os.remove(boot_image_path) 188 189 boot_image_path = os.path.join(temp_dir, "boot-5.10.img") 190 self.CreateFile(boot_image_path, b"ANDROID!") 191 self.assertEqual(boot_image_path, 192 create_common.FindBootImage(temp_dir)) 193 194 def testFindSystemImages(self): 195 """Test FindSystemImages.""" 196 with tempfile.TemporaryDirectory() as temp_dir: 197 with self.assertRaises(errors.GetLocalImageError): 198 create_common.FindSystemImages(temp_dir) 199 200 image_dir = os.path.join(temp_dir, "IMAGES") 201 system_image_path = os.path.join(image_dir, "system.img") 202 self.CreateFile(system_image_path) 203 self.assertEqual((system_image_path, None, None), 204 create_common.FindSystemImages(temp_dir)) 205 self.assertEqual((system_image_path, None, None), 206 create_common.FindSystemImages(image_dir)) 207 self.assertEqual((system_image_path, None, None), 208 create_common.FindSystemImages(system_image_path)) 209 210 system_ext_image_path = os.path.join(image_dir, "system_ext.img") 211 self.CreateFile(system_ext_image_path) 212 product_image_path = os.path.join(image_dir, "product.img") 213 self.CreateFile(product_image_path) 214 self.assertEqual( 215 (system_image_path, system_ext_image_path, product_image_path), 216 create_common.FindSystemImages(temp_dir)) 217 self.assertEqual( 218 (system_image_path, system_ext_image_path, product_image_path), 219 create_common.FindSystemImages(image_dir)) 220 221 222 def testFindVendorBootImage(self): 223 """Test FindVendorBootImage.""" 224 with tempfile.TemporaryDirectory() as temp_dir: 225 with self.assertRaises(errors.GetLocalImageError): 226 create_common.FindVendorBootImage(temp_dir) 227 228 boot_image_path = os.path.join(temp_dir, "vendor_boot.img") 229 self.CreateFile(boot_image_path) 230 self.assertEqual(boot_image_path, 231 create_common.FindVendorBootImage(boot_image_path)) 232 self.assertEqual(boot_image_path, 233 create_common.FindVendorBootImage(temp_dir)) 234 235 236 @mock.patch.object(utils, "Decompress") 237 def testDownloadRemoteArtifact(self, mock_decompress): 238 """Test Download cuttlefish package.""" 239 mock_build_client = mock.MagicMock() 240 self.Patch( 241 android_build_client, 242 "AndroidBuildClient", 243 return_value=mock_build_client) 244 self.Patch(auth, "CreateCredentials", return_value=mock.MagicMock()) 245 avd_spec = mock.MagicMock() 246 avd_spec.cfg = mock.MagicMock() 247 avd_spec.remote_image = {"build_target" : "aosp_cf_x86_64_phone-userdebug", 248 "build_id": "1234"} 249 build_id = "1234" 250 build_target = "aosp_cf_x86_64_phone-userdebug" 251 checkfile1 = "aosp_cf_x86_phone-img-1234.zip" 252 checkfile2 = "cvd-host_package.tar.gz" 253 extract_path = "/tmp/1234" 254 255 create_common.DownloadRemoteArtifact( 256 avd_spec.cfg, 257 avd_spec.remote_image["build_target"], 258 avd_spec.remote_image["build_id"], 259 checkfile1, 260 extract_path, 261 decompress=True) 262 263 self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1) 264 mock_build_client.DownloadArtifact.assert_called_once_with( 265 build_target, 266 build_id, 267 checkfile1, 268 "%s/%s" % (extract_path, checkfile1)) 269 self.assertEqual(mock_decompress.call_count, 1) 270 271 mock_decompress.call_count = 0 272 mock_build_client.DownloadArtifact.call_count = 0 273 create_common.DownloadRemoteArtifact( 274 avd_spec.cfg, 275 avd_spec.remote_image["build_target"], 276 avd_spec.remote_image["build_id"], 277 checkfile2, 278 extract_path) 279 280 self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1) 281 mock_build_client.DownloadArtifact.assert_called_once_with( 282 build_target, 283 build_id, 284 checkfile2, 285 "%s/%s" % (extract_path, checkfile2)) 286 self.assertEqual(mock_decompress.call_count, 0) 287 288 def testPrepareLocalInstanceDir(self): 289 """test PrepareLocalInstanceDir.""" 290 temp_dir = tempfile.mkdtemp() 291 try: 292 cvd_home_dir = os.path.join(temp_dir, "local-instance-1") 293 mock_avd_spec = mock.Mock(local_instance_dir=None) 294 create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec) 295 self.assertTrue(os.path.isdir(cvd_home_dir) and 296 not os.path.islink(cvd_home_dir)) 297 298 link_target_dir = os.path.join(temp_dir, "cvd_home") 299 os.mkdir(link_target_dir) 300 mock_avd_spec.local_instance_dir = link_target_dir 301 create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec) 302 self.assertTrue(os.path.islink(cvd_home_dir) and 303 os.path.samefile(cvd_home_dir, link_target_dir)) 304 finally: 305 shutil.rmtree(temp_dir) 306 307 308if __name__ == "__main__": 309 unittest.main() 310