• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Tests for acloud.internal.lib.utils."""
17
18import collections
19import errno
20import getpass
21import grp
22import os
23import shutil
24import subprocess
25import tempfile
26import time
27import webbrowser
28
29import unittest
30
31from unittest import mock
32
33from acloud import errors
34from acloud.internal.lib import driver_test_lib
35from acloud.internal.lib import utils
36
37
38GroupInfo = collections.namedtuple("GroupInfo", [
39    "gr_name",
40    "gr_passwd",
41    "gr_gid",
42    "gr_mem"])
43
44# Tkinter may not be supported so mock it out.
45try:
46    import Tkinter
47except ImportError:
48    Tkinter = mock.Mock()
49
50
51class FakeTkinter:
52    """Fake implementation of Tkinter.Tk()"""
53
54    def __init__(self, width=None, height=None):
55        self.width = width
56        self.height = height
57
58    # pylint: disable=invalid-name
59    def winfo_screenheight(self):
60        """Return the screen height."""
61        return self.height
62
63    # pylint: disable=invalid-name
64    def winfo_screenwidth(self):
65        """Return the screen width."""
66        return self.width
67
68
69# pylint: disable=too-many-public-methods
70class UtilsTest(driver_test_lib.BaseDriverTest):
71    """Test Utils."""
72
73    def TestTempDirSuccess(self):
74        """Test create a temp dir."""
75        self.Patch(os, "chmod")
76        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
77        self.Patch(shutil, "rmtree")
78        with utils.TempDir():
79            pass
80        # Verify.
81        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
82        shutil.rmtree.assert_called_with("/tmp/tempdir")  # pylint: disable=no-member
83
84    def TestTempDirExceptionRaised(self):
85        """Test create a temp dir and exception is raised within with-clause."""
86        self.Patch(os, "chmod")
87        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
88        self.Patch(shutil, "rmtree")
89
90        class ExpectedException(Exception):
91            """Expected exception."""
92
93        def _Call():
94            with utils.TempDir():
95                raise ExpectedException("Expected exception.")
96
97        # Verify. ExpectedException should be raised.
98        self.assertRaises(ExpectedException, _Call)
99        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
100        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
101
102    def testTempDirWhenDeleteTempDirNoLongerExist(self):  # pylint: disable=invalid-name
103        """Test create a temp dir and dir no longer exists during deletion."""
104        self.Patch(os, "chmod")
105        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
106        expected_error = EnvironmentError()
107        expected_error.errno = errno.ENOENT
108        self.Patch(shutil, "rmtree", side_effect=expected_error)
109
110        def _Call():
111            with utils.TempDir():
112                pass
113
114        # Verify no exception should be raised when rmtree raises
115        # EnvironmentError with errno.ENOENT, i.e.
116        # directory no longer exists.
117        _Call()
118        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
119        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
120
121    def testTempDirWhenDeleteEncounterError(self):
122        """Test create a temp dir and encoutered error during deletion."""
123        self.Patch(os, "chmod")
124        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
125        expected_error = OSError("Expected OS Error")
126        self.Patch(shutil, "rmtree", side_effect=expected_error)
127
128        def _Call():
129            with utils.TempDir():
130                pass
131
132        # Verify OSError should be raised.
133        self.assertRaises(OSError, _Call)
134        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
135        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
136
137    def testTempDirOrininalErrorRaised(self):
138        """Test original error is raised even if tmp dir deletion failed."""
139        self.Patch(os, "chmod")
140        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
141        expected_error = OSError("Expected OS Error")
142        self.Patch(shutil, "rmtree", side_effect=expected_error)
143
144        class ExpectedException(Exception):
145            """Expected exception."""
146
147        def _Call():
148            with utils.TempDir():
149                raise ExpectedException("Expected Exception")
150
151        # Verify.
152        # ExpectedException should be raised, and OSError
153        # should not be raised.
154        self.assertRaises(ExpectedException, _Call)
155        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
156        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
157
158    def testCreateSshKeyPairKeyAlreadyExists(self):  #pylint: disable=invalid-name
159        """Test when the key pair already exists."""
160        public_key = "/fake/public_key"
161        private_key = "/fake/private_key"
162        self.Patch(os.path, "exists", side_effect=[True, True])
163        self.Patch(subprocess, "check_call")
164        self.Patch(os, "makedirs", return_value=True)
165        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
166        self.assertEqual(subprocess.check_call.call_count, 0)  #pylint: disable=no-member
167
168    def testCreateSshKeyPairKeyAreCreated(self):
169        """Test when the key pair created."""
170        public_key = "/fake/public_key"
171        private_key = "/fake/private_key"
172        self.Patch(os.path, "exists", return_value=False)
173        self.Patch(os, "makedirs", return_value=True)
174        self.Patch(subprocess, "check_call")
175        self.Patch(os, "rename")
176        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
177        self.assertEqual(subprocess.check_call.call_count, 1)  #pylint: disable=no-member
178        subprocess.check_call.assert_called_with(  #pylint: disable=no-member
179            utils.SSH_KEYGEN_CMD +
180            ["-C", getpass.getuser(), "-f", private_key],
181            stdout=mock.ANY,
182            stderr=mock.ANY)
183
184    def testCreatePublicKeyAreCreated(self):
185        """Test when the PublicKey created."""
186        public_key = "/fake/public_key"
187        private_key = "/fake/private_key"
188        self.Patch(os.path, "exists", side_effect=[False, True, True])
189        self.Patch(os, "makedirs", return_value=True)
190        mock_open = mock.mock_open(read_data=public_key)
191        self.Patch(subprocess, "check_output")
192        self.Patch(os, "rename")
193        with mock.patch("builtins.open", mock_open):
194            utils.CreateSshKeyPairIfNotExist(private_key, public_key)
195        self.assertEqual(subprocess.check_output.call_count, 1)  #pylint: disable=no-member
196        subprocess.check_output.assert_called_with(  #pylint: disable=no-member
197            utils.SSH_KEYGEN_PUB_CMD +["-f", private_key])
198
199    def TestRetryOnException(self):
200        """Test Retry."""
201
202        def _IsValueError(exc):
203            return isinstance(exc, ValueError)
204
205        num_retry = 5
206
207        @utils.RetryOnException(_IsValueError, num_retry)
208        def _RaiseAndRetry(sentinel):
209            sentinel.alert()
210            raise ValueError("Fake error.")
211
212        sentinel = mock.MagicMock()
213        self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
214        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
215
216    def testRetryExceptionType(self):
217        """Test RetryExceptionType function."""
218
219        def _RaiseAndRetry(sentinel):
220            sentinel.alert()
221            raise ValueError("Fake error.")
222
223        num_retry = 5
224        sentinel = mock.MagicMock()
225        self.assertRaises(
226            ValueError,
227            utils.RetryExceptionType, (KeyError, ValueError),
228            num_retry,
229            _RaiseAndRetry,
230            0, # sleep_multiplier
231            1, # retry_backoff_factor
232            sentinel=sentinel)
233        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
234
235    def testRetry(self):
236        """Test Retry."""
237        mock_sleep = self.Patch(time, "sleep")
238
239        def _RaiseAndRetry(sentinel):
240            sentinel.alert()
241            raise ValueError("Fake error.")
242
243        num_retry = 5
244        sentinel = mock.MagicMock()
245        self.assertRaises(
246            ValueError,
247            utils.RetryExceptionType, (ValueError, KeyError),
248            num_retry,
249            _RaiseAndRetry,
250            1, # sleep_multiplier
251            2, # retry_backoff_factor
252            sentinel=sentinel)
253
254        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
255        mock_sleep.assert_has_calls(
256            [
257                mock.call(1),
258                mock.call(2),
259                mock.call(4),
260                mock.call(8),
261                mock.call(16)
262            ])
263
264    @mock.patch("builtins.input")
265    def testGetAnswerFromList(self, mock_raw_input):
266        """Test GetAnswerFromList."""
267        answer_list = ["image1.zip", "image2.zip", "image3.zip"]
268        mock_raw_input.return_value = 0
269        with self.assertRaises(SystemExit):
270            utils.GetAnswerFromList(answer_list)
271        mock_raw_input.side_effect = [1, 2, 3, 4]
272        self.assertEqual(utils.GetAnswerFromList(answer_list),
273                         ["image1.zip"])
274        self.assertEqual(utils.GetAnswerFromList(answer_list),
275                         ["image2.zip"])
276        self.assertEqual(utils.GetAnswerFromList(answer_list),
277                         ["image3.zip"])
278        self.assertEqual(utils.GetAnswerFromList(answer_list,
279                                                 enable_choose_all=True),
280                         answer_list)
281
282    @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.")
283    @mock.patch.object(Tkinter, "Tk")
284    def testCalculateVNCScreenRatio(self, mock_tk):
285        """Test Calculating the scale ratio of VNC display."""
286        # Get scale-down ratio if screen height is smaller than AVD height.
287        mock_tk.return_value = FakeTkinter(height=800, width=1200)
288        avd_h = 1920
289        avd_w = 1080
290        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4)
291
292        # Get scale-down ratio if screen width is smaller than AVD width.
293        mock_tk.return_value = FakeTkinter(height=800, width=1200)
294        avd_h = 900
295        avd_w = 1920
296        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
297
298        # Scale ratio = 1 if screen is larger than AVD.
299        mock_tk.return_value = FakeTkinter(height=1080, width=1920)
300        avd_h = 800
301        avd_w = 1280
302        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1)
303
304        # Get the scale if ratio of width is smaller than the
305        # ratio of height.
306        mock_tk.return_value = FakeTkinter(height=1200, width=800)
307        avd_h = 1920
308        avd_w = 1080
309        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
310
311    def testCheckUserInGroups(self):
312        """Test CheckUserInGroups."""
313        self.Patch(getpass, "getuser", return_value="user_0")
314        self.Patch(grp, "getgrall", return_value=[
315            GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]),
316            GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])])
317        self.Patch(grp, "getgrnam", return_value=GroupInfo(
318            "fake_group1", "passwd_1", 0, ["user_1", "user_2"]))
319        # Test Group name doesn't exist.
320        self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"]))
321
322        # Test User isn't in group.
323        self.assertFalse(utils.CheckUserInGroups(["fake_group1"]))
324
325        # Test User is in group.
326        self.Patch(getpass, "getuser", return_value="user_1")
327        self.assertTrue(utils.CheckUserInGroups(["fake_group1"]))
328
329    @mock.patch.object(utils, "CheckUserInGroups")
330    def testAddUserGroupsToCmd(self, mock_user_group):
331        """Test AddUserGroupsToCmd."""
332        command = "test_command"
333        groups = ["group1", "group2"]
334        # Don't add user group in command
335        mock_user_group.return_value = True
336        expected_value = "test_command"
337        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
338                                                                  groups))
339
340        # Add user group in command
341        mock_user_group.return_value = False
342        expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF"
343        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
344                                                                  groups))
345
346    # pylint: disable=invalid-name
347    def testTimeoutException(self):
348        """Test TimeoutException."""
349        @utils.TimeoutException(1, "should time out")
350        def functionThatWillTimeOut():
351            """Test decorator of @utils.TimeoutException should timeout."""
352            time.sleep(5)
353
354        self.assertRaises(errors.FunctionTimeoutError,
355                          functionThatWillTimeOut)
356
357
358    def testTimeoutExceptionNoTimeout(self):
359        """Test No TimeoutException."""
360        @utils.TimeoutException(5, "shouldn't time out")
361        def functionThatShouldNotTimeout():
362            """Test decorator of @utils.TimeoutException shouldn't timeout."""
363            return None
364        try:
365            functionThatShouldNotTimeout()
366        except errors.FunctionTimeoutError:
367            self.fail("shouldn't timeout")
368
369    def testEstablishSshTunnel(self):
370        """Test EstablishSshTunnel."""
371        ip_addr = "1.1.1.1"
372        rsa_key_file = "/tmp/rsa_file"
373        port_mapping = [(1111, 2222), (8888, 9999)]
374        ssh_user = "fake_user"
375        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
376        utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user,
377                                 port_mapping, "-o command='shell %s %h'")
378        arg_list = ["-i", rsa_key_file, "-o", "ControlPath=none",
379                    "-o", "UserKnownHostsFile=/dev/null",
380                    "-o", "StrictHostKeyChecking=no",
381                    "-L", "1111:127.0.0.1:2222",
382                    "-L", "8888:127.0.0.1:9999",
383                    "-N", "-f",
384                    "-l", ssh_user, ip_addr,
385                    "-o", "command=shell %s %h"]
386        mock_execute_command.assert_called_with("ssh", arg_list)
387
388    def testAutoConnectCreateSSHTunnelFail(self):
389        """Test auto connect."""
390        fake_ip_addr = "1.1.1.1"
391        fake_rsa_key_file = "/tmp/rsa_file"
392        fake_target_vnc_port = 8888
393        target_adb_port = 9999
394        ssh_user = "fake_user"
395        call_side_effect = subprocess.CalledProcessError(123, "fake",
396                                                         "fake error")
397        result = utils.ForwardedPorts(vnc_port=None, adb_port=None)
398        self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect)
399        self.assertEqual(result, utils.AutoConnect(fake_ip_addr,
400                                                   fake_rsa_key_file,
401                                                   fake_target_vnc_port,
402                                                   target_adb_port,
403                                                   ssh_user))
404
405    def testAutoConnectWithExtraArgs(self):
406        """Test extra args will be the same with expanded args."""
407        fake_ip_addr = "1.1.1.1"
408        fake_rsa_key_file = "/tmp/rsa_file"
409        fake_target_vnc_port = 8888
410        target_adb_port = 9999
411        ssh_user = "fake_user"
412        fake_port = 12345
413        self.Patch(utils, "PickFreePort", return_value=fake_port)
414        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
415        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
416        extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'"
417        utils.AutoConnect(ip_addr=fake_ip_addr,
418                          rsa_key_file=fake_rsa_key_file,
419                          target_vnc_port=fake_target_vnc_port,
420                          target_adb_port=target_adb_port,
421                          ssh_user=ssh_user,
422                          client_adb_port=fake_port,
423                          extra_args_ssh_tunnel=extra_args_ssh_tunnel)
424        mock_establish_ssh_tunnel.assert_called_with(
425            fake_ip_addr,
426            fake_rsa_key_file,
427            ssh_user,
428            [utils.PortMapping(fake_port, target_adb_port),
429             utils.PortMapping(fake_port, fake_target_vnc_port)],
430            extra_args_ssh_tunnel)
431        mock_execute_command.assert_called_with(
432            "adb", ["connect", "127.0.0.1:12345"])
433
434    def testEstablishWebRTCSshTunnel(self):
435        """Test establish WebRTC ssh tunnel."""
436        fake_ip_addr = "1.1.1.1"
437        fake_rsa_key_file = "/tmp/rsa_file"
438        ssh_user = "fake_user"
439        fake_webrtc_local_port = 12345
440        self.Patch(utils, "GetWebRTCServerPort", return_value=8443)
441        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
442        fake_port_mapping = [utils.PortMapping(port, port) for port in range(15555, 15579 + 1)] + [utils.PortMapping(12345, 8443)]
443
444        utils.EstablishWebRTCSshTunnel(
445            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
446            ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port)
447        mock_establish_ssh_tunnel.assert_called_with(
448            fake_ip_addr,
449            fake_rsa_key_file,
450            ssh_user,
451            fake_port_mapping,
452            None)
453
454        mock_establish_ssh_tunnel.reset_mock()
455        extra_args_ssh_tunnel = "-o command='shell %s %h'"
456        utils.EstablishWebRTCSshTunnel(
457            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
458            ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel,
459            webrtc_local_port=fake_webrtc_local_port)
460        mock_establish_ssh_tunnel.assert_called_with(
461            fake_ip_addr,
462            fake_rsa_key_file,
463            ssh_user,
464            fake_port_mapping,
465            extra_args_ssh_tunnel)
466
467    def testGetWebRTCServerPort(self):
468        """test GetWebRTCServerPort."""
469        fake_ip_addr = "1.1.1.1"
470        fake_rsa_key_file = "/tmp/rsa_file"
471        ssh_user = "fake_user"
472        extra_args_ssh_tunnel = "-o command='shell %s %h'"
473        fake_subprocess = mock.MagicMock()
474        fake_subprocess.returncode = 0
475        fake_subprocess.communicate = mock.MagicMock(
476            return_value=('', ''))
477        self.Patch(subprocess, "Popen", return_value=fake_subprocess)
478        self.assertEqual(1443, utils.GetWebRTCServerPort(
479            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
480
481        # Test the case that find "webrtc_operator" process.
482        webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets"
483        fake_subprocess.communicate = mock.MagicMock(
484            return_value=(webrtc_operator_process, ''))
485        self.assertEqual(8443, utils.GetWebRTCServerPort(
486            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
487
488    def testGetWebrtcPortFromSSHTunnel(self):
489        """"Test Get forwarding webrtc port from ssh tunnel."""
490        fake_ps_output = ("/fake_ps_1 --fake arg \n"
491                          "/fake_ps_2 --fake arg \n"
492                          "/usr/bin/ssh -i ~/.ssh/acloud_rsa -o ControlPath=none "
493                          "-o UserKnownHostsFile=/dev/null "
494                          "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 "
495                          "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode()
496        self.Patch(subprocess, "check_output", return_value=fake_ps_output)
497        webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1")
498        self.assertEqual(12345, webrtc_ports)
499
500    @mock.patch("acloud.internal.lib.utils.subprocess")
501    def testFindRemoteFiles(self, mock_subprocess):
502        """Test FindRemoteFiles."""
503        mock_ssh = mock.Mock()
504
505        paths = utils.FindRemoteFiles(mock_ssh, [])
506        mock_subprocess.run.assert_not_called()
507        self.assertEqual([], paths)
508
509        mock_ssh.GetBaseCmd.return_value = "mock_ssh"
510        mock_subprocess.run.return_value = mock.Mock(
511            returncode=0, stderr=b'stderr', stdout=b'file1\nfile2\n')
512        paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"])
513        self.assertEqual(["file1", "file2"], paths)
514        mock_subprocess.run.assert_called_with(
515            'mock_ssh find -H dir1 dir2 -type f',
516            shell=True, capture_output=True, check=False)
517
518        mock_subprocess.run.return_value = mock.Mock(
519            returncode=0, stderr=b'', stdout=b'')
520        paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"])
521        self.assertEqual([], paths)
522
523        mock_subprocess.run.return_value = mock.Mock(
524            returncode=1, stderr=b'', stdout=b'')
525        with self.assertRaises(errors.SubprocessFail):
526            utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"])
527
528    # pylint: disable=protected-access, no-member
529    def testCleanupSSVncviwer(self):
530        """test cleanup ssvnc viewer."""
531        fake_vnc_port = 9999
532        fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % {
533            "vnc_port": fake_vnc_port}
534        self.Patch(utils, "IsCommandRunning", return_value=True)
535        self.Patch(subprocess, "check_call", return_value=True)
536        utils.CleanupSSVncviewer(fake_vnc_port)
537        subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern])
538
539        subprocess.check_call.call_count = 0
540        self.Patch(utils, "IsCommandRunning", return_value=False)
541        utils.CleanupSSVncviewer(fake_vnc_port)
542        subprocess.check_call.assert_not_called()
543
544    def testLaunchBrowserFromReport(self):
545        """test launch browser from report."""
546        self.Patch(webbrowser, "open_new_tab")
547        fake_report = mock.MagicMock(data={})
548
549        # test remote instance
550        self.Patch(os.environ, "get", return_value=True)
551        fake_report.data = {
552            "devices": [{"instance_name": "remote_cf_instance_name",
553                         "ip": "192.168.1.1",},],}
554
555        utils.LaunchBrowserFromReport(fake_report)
556        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
557        webbrowser.open_new_tab.call_count = 0
558
559        # test local instance
560        fake_report.data = {
561            "devices": [{"instance_name": "local-instance1",
562                         "ip": "127.0.0.1:6250",},],}
563        utils.LaunchBrowserFromReport(fake_report)
564        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
565        webbrowser.open_new_tab.call_count = 0
566
567        # verify terminal can't support launch webbrowser.
568        self.Patch(os.environ, "get", return_value=False)
569        utils.LaunchBrowserFromReport(fake_report)
570        self.assertEqual(webbrowser.open_new_tab.call_count, 0)
571
572    def testSetExecutable(self):
573        """test setting a file to be executable."""
574        with tempfile.NamedTemporaryFile(delete=True) as temp_file:
575            utils.SetExecutable(temp_file.name)
576            self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755)
577
578    def testSetDirectoryTreeExecutable(self):
579        """test setting a file in a directory to be executable."""
580        with tempfile.TemporaryDirectory() as temp_dir:
581            subdir = os.path.join(temp_dir, "subdir")
582            file_path = os.path.join(subdir, "file")
583            os.makedirs(subdir)
584            with open(file_path, "w"):
585                pass
586            utils.SetDirectoryTreeExecutable(temp_dir)
587            self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755)
588
589    def testSetCvdPort(self):
590        """test base_instance_num."""
591        utils.SetCvdPorts(2)
592        self.assertEqual(utils.GetCvdPorts().adb_port, 6521)
593        self.assertEqual(utils.GetCvdPorts().vnc_port, 6445)
594        utils.SetCvdPorts(None)
595
596
597    @mock.patch.object(utils, "PrintColorString")
598    def testPrintDeviceSummary(self, mock_print):
599        """test PrintDeviceSummary."""
600        fake_report = mock.MagicMock(data={})
601        fake_report.data = {
602            "devices": [{"instance_name": "remote_cf_instance_name",
603                         "ip": "192.168.1.1",
604                         "device_serial": "127.0.0.1:399"},],}
605        utils.PrintDeviceSummary(fake_report)
606        self.assertEqual(mock_print.call_count, 7)
607
608        # Test for OpenWrt device case.
609        fake_report.data = {
610            "devices": [{"instance_name": "remote_cf_instance_name",
611                         "ip": "192.168.1.1",
612                         "ssh_command": "fake_ssh_cmd",
613                         "screen_command": "fake_screen_cmd"},],}
614        mock_print.reset_mock()
615        utils.PrintDeviceSummary(fake_report)
616        self.assertEqual(mock_print.call_count, 13)
617
618        # Test for fail case
619        fake_report.data = {
620            "errors": "Fail to create devices"}
621        mock_print.reset_mock()
622        utils.PrintDeviceSummary(fake_report)
623        self.assertEqual(mock_print.call_count, 3)
624
625    # pylint: disable=protected-access
626    def testIsSupportedKvm(self):
627        """Test IsSupportedKvm."""
628        self.Patch(os.path, "exists", return_value=True)
629        self.assertTrue(utils.IsSupportedKvm())
630
631        self.Patch(os.path, "exists", return_value=False)
632        self.assertFalse(utils.IsSupportedKvm())
633
634
635if __name__ == "__main__":
636    unittest.main()
637