• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for create_common."""
15
16import collections
17import os
18import shutil
19import tempfile
20import unittest
21
22from unittest import mock
23
24from acloud import errors
25from acloud.create import create_common
26from acloud.internal.lib import android_build_client
27from acloud.internal.lib import auth
28from acloud.internal.lib import driver_test_lib
29from acloud.internal.lib import utils
30
31
32ExtraFile = collections.namedtuple("ExtraFile", ["source", "target"])
33
34
35class FakeZipFile:
36    """Fake implementation of ZipFile()"""
37
38    # pylint: disable=invalid-name,unused-argument,no-self-use
39    def write(self, filename, arcname=None, compress_type=None):
40        """Fake write method."""
41        return
42
43    # pylint: disable=invalid-name,no-self-use
44    def close(self):
45        """Fake close method."""
46        return
47
48
49# pylint: disable=invalid-name,protected-access
50class CreateCommonTest(driver_test_lib.BaseDriverTest):
51    """Test create_common functions."""
52
53    # pylint: disable=protected-access
54    def testProcessHWPropertyWithInvalidArgs(self):
55        """Test ParseKeyValuePairArgs with invalid args."""
56        # Checking wrong property value.
57        args_str = "cpu:3,disk:"
58        with self.assertRaises(errors.MalformedDictStringError):
59            create_common.ParseKeyValuePairArgs(args_str)
60
61        # Checking wrong property format.
62        args_str = "cpu:3,disk"
63        with self.assertRaises(errors.MalformedDictStringError):
64            create_common.ParseKeyValuePairArgs(args_str)
65
66    def testParseHWPropertyStr(self):
67        """Test ParseKeyValuePairArgs."""
68        expected_dict = {"cpu": "2", "resolution": "1080x1920", "dpi": "240",
69                         "memory": "4g", "disk": "4g"}
70        args_str = "cpu:2,resolution:1080x1920,dpi:240,memory:4g,disk:4g"
71        result_dict = create_common.ParseKeyValuePairArgs(args_str)
72        self.assertTrue(expected_dict == result_dict)
73
74    def testGetNonEmptyEnvVars(self):
75        """Test GetNonEmptyEnvVars."""
76        with mock.patch.dict("acloud.internal.lib.utils.os.environ",
77                             {"A": "", "B": "b"},
78                             clear=True):
79            self.assertEqual(
80                ["b"], create_common.GetNonEmptyEnvVars("A", "B", "C"))
81
82    def testParseExtraFilesArgs(self):
83        """Test ParseExtraFilesArgs."""
84        expected_result = [ExtraFile(source="local_path", target="gce_path")]
85        files_info = ["local_path,gce_path"]
86        self.assertEqual(expected_result,
87                         create_common.ParseExtraFilesArgs(files_info))
88
89        # Test multiple files
90        expected_result = [ExtraFile(source="local_path1", target="gce_path1"),
91                           ExtraFile(source="local_path2", target="gce_path2")]
92        files_info = ["local_path1,gce_path1",
93                      "local_path2,gce_path2"]
94        self.assertEqual(expected_result,
95                         create_common.ParseExtraFilesArgs(files_info))
96
97        # Test wrong file info format.
98        files_info = ["local_path"]
99        with self.assertRaises(errors.MalformedDictStringError):
100            create_common.ParseExtraFilesArgs(files_info)
101
102    def testGetCvdHostPackage(self):
103        """test GetCvdHostPackage."""
104        # Can't find the cvd host package
105        with mock.patch("os.path.exists") as exists:
106            exists.return_value = False
107            self.assertRaises(
108                errors.GetCvdLocalHostPackageError,
109                create_common.GetCvdHostPackage)
110
111        self.Patch(os.environ, "get", return_value="/fake_dir2")
112        self.Patch(utils, "GetDistDir", return_value="/fake_dir1")
113        self.Patch(os.path, "exists",
114                   side_effect=lambda path:
115                       path == "/fake_dir1/cvd-host_package.tar.gz")
116
117        # Find cvd host in dist dir.
118        self.assertEqual(
119            create_common.GetCvdHostPackage(),
120            "/fake_dir1/cvd-host_package.tar.gz")
121
122        # Find cvd host in host out dir.
123        self.Patch(os.environ, "get", return_value="/fake_dir2")
124        self.Patch(utils, "GetDistDir", return_value=None)
125        with mock.patch("os.path.exists") as exists:
126            exists.side_effect = lambda path: \
127                path == "/fake_dir2/cvd-host_package.tar.gz"
128            self.assertEqual(
129                create_common.GetCvdHostPackage(),
130                "/fake_dir2/cvd-host_package.tar.gz")
131        with mock.patch("os.path.exists") as exists:
132            exists.side_effect = lambda path: \
133                path == "/fake_dir2/cvd-host_package"
134            self.assertEqual(
135                create_common.GetCvdHostPackage(),
136                "/fake_dir2/cvd-host_package")
137
138        # Find cvd host in specified path.
139        package_path = "/tool_dir/cvd-host_package.tar.gz"
140        self.Patch(utils, "GetDistDir", return_value=None)
141        with mock.patch("os.path.exists") as exists:
142            exists.return_value = True
143            self.assertEqual(
144                create_common.GetCvdHostPackage(package_path),
145                "/tool_dir/cvd-host_package.tar.gz")
146
147    @mock.patch("acloud.create.create_common.os.path.isfile",
148                side_effect=lambda path: path == "/dir/name")
149    @mock.patch("acloud.create.create_common.os.path.isdir",
150                side_effect=lambda path: path == "/dir")
151    @mock.patch("acloud.create.create_common.os.listdir",
152                return_value=["name", "name2"])
153    def testFindLocalImage(self, _mock_listdir, _mock_isdir, _mock_isfile):
154        """Test FindLocalImage."""
155        self.assertEqual(
156            "/dir/name",
157            create_common.FindLocalImage("/test/../dir/name", "not_exist"))
158
159        self.assertEqual("/dir/name",
160                         create_common.FindLocalImage("/dir/", "name"))
161
162
163        self.assertIsNone(create_common.FindLocalImage("/dir", "not_exist",
164                                                       raise_error=False))
165        with self.assertRaises(errors.GetLocalImageError):
166            create_common.FindLocalImage("/dir", "not_exist")
167
168        with self.assertRaises(errors.GetLocalImageError):
169            create_common.FindLocalImage("/dir", "name.?", raise_error=False)
170
171    def testFindBootImage(self):
172        """Test FindBootImage."""
173        with tempfile.TemporaryDirectory() as temp_dir:
174            with self.assertRaises(errors.GetLocalImageError):
175                create_common.FindBootImage(temp_dir)
176
177            boot_image_path = os.path.join(temp_dir, "boot.img")
178            self.CreateFile(boot_image_path, b"invalid")
179            with self.assertRaises(errors.GetLocalImageError):
180                create_common.FindBootImage(temp_dir)
181            os.remove(boot_image_path)
182
183            boot_image_path = os.path.join(temp_dir, "boot.img")
184            self.CreateFile(boot_image_path, b"ANDROID!")
185            self.assertEqual(boot_image_path,
186                             create_common.FindBootImage(temp_dir))
187            os.remove(boot_image_path)
188
189            boot_image_path = os.path.join(temp_dir, "boot-5.10.img")
190            self.CreateFile(boot_image_path, b"ANDROID!")
191            self.assertEqual(boot_image_path,
192                             create_common.FindBootImage(temp_dir))
193
194    def testFindSystemImages(self):
195        """Test FindSystemImages."""
196        with tempfile.TemporaryDirectory() as temp_dir:
197            with self.assertRaises(errors.GetLocalImageError):
198                create_common.FindSystemImages(temp_dir)
199
200            image_dir = os.path.join(temp_dir, "IMAGES")
201            system_image_path = os.path.join(image_dir, "system.img")
202            self.CreateFile(system_image_path)
203            self.assertEqual((system_image_path, None, None),
204                             create_common.FindSystemImages(temp_dir))
205            self.assertEqual((system_image_path, None, None),
206                             create_common.FindSystemImages(image_dir))
207            self.assertEqual((system_image_path, None, None),
208                             create_common.FindSystemImages(system_image_path))
209
210            system_ext_image_path = os.path.join(image_dir, "system_ext.img")
211            self.CreateFile(system_ext_image_path)
212            product_image_path = os.path.join(image_dir, "product.img")
213            self.CreateFile(product_image_path)
214            self.assertEqual(
215                (system_image_path, system_ext_image_path, product_image_path),
216                create_common.FindSystemImages(temp_dir))
217            self.assertEqual(
218                (system_image_path, system_ext_image_path, product_image_path),
219                create_common.FindSystemImages(image_dir))
220
221
222    def testFindVendorBootImage(self):
223        """Test FindVendorBootImage."""
224        with tempfile.TemporaryDirectory() as temp_dir:
225            with self.assertRaises(errors.GetLocalImageError):
226                create_common.FindVendorBootImage(temp_dir)
227
228            boot_image_path = os.path.join(temp_dir, "vendor_boot.img")
229            self.CreateFile(boot_image_path)
230            self.assertEqual(boot_image_path,
231                             create_common.FindVendorBootImage(boot_image_path))
232            self.assertEqual(boot_image_path,
233                             create_common.FindVendorBootImage(temp_dir))
234
235
236    @mock.patch.object(utils, "Decompress")
237    def testDownloadRemoteArtifact(self, mock_decompress):
238        """Test Download cuttlefish package."""
239        mock_build_client = mock.MagicMock()
240        self.Patch(
241            android_build_client,
242            "AndroidBuildClient",
243            return_value=mock_build_client)
244        self.Patch(auth, "CreateCredentials", return_value=mock.MagicMock())
245        avd_spec = mock.MagicMock()
246        avd_spec.cfg = mock.MagicMock()
247        avd_spec.remote_image = {"build_target" : "aosp_cf_x86_64_phone-userdebug",
248                                 "build_id": "1234"}
249        build_id = "1234"
250        build_target = "aosp_cf_x86_64_phone-userdebug"
251        checkfile1 = "aosp_cf_x86_phone-img-1234.zip"
252        checkfile2 = "cvd-host_package.tar.gz"
253        extract_path = "/tmp/1234"
254
255        create_common.DownloadRemoteArtifact(
256            avd_spec.cfg,
257            avd_spec.remote_image["build_target"],
258            avd_spec.remote_image["build_id"],
259            checkfile1,
260            extract_path,
261            decompress=True)
262
263        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
264        mock_build_client.DownloadArtifact.assert_called_once_with(
265            build_target,
266            build_id,
267            checkfile1,
268            "%s/%s" % (extract_path, checkfile1))
269        self.assertEqual(mock_decompress.call_count, 1)
270
271        mock_decompress.call_count = 0
272        mock_build_client.DownloadArtifact.call_count = 0
273        create_common.DownloadRemoteArtifact(
274            avd_spec.cfg,
275            avd_spec.remote_image["build_target"],
276            avd_spec.remote_image["build_id"],
277            checkfile2,
278            extract_path)
279
280        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
281        mock_build_client.DownloadArtifact.assert_called_once_with(
282            build_target,
283            build_id,
284            checkfile2,
285            "%s/%s" % (extract_path, checkfile2))
286        self.assertEqual(mock_decompress.call_count, 0)
287
288    def testPrepareLocalInstanceDir(self):
289        """test PrepareLocalInstanceDir."""
290        temp_dir = tempfile.mkdtemp()
291        try:
292            cvd_home_dir = os.path.join(temp_dir, "local-instance-1")
293            mock_avd_spec = mock.Mock(local_instance_dir=None)
294            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
295            self.assertTrue(os.path.isdir(cvd_home_dir) and
296                            not os.path.islink(cvd_home_dir))
297
298            link_target_dir = os.path.join(temp_dir, "cvd_home")
299            os.mkdir(link_target_dir)
300            mock_avd_spec.local_instance_dir = link_target_dir
301            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
302            self.assertTrue(os.path.islink(cvd_home_dir) and
303                            os.path.samefile(cvd_home_dir, link_target_dir))
304        finally:
305            shutil.rmtree(temp_dir)
306
307
308if __name__ == "__main__":
309    unittest.main()
310