• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Tests for acloud.internal.lib.utils."""
17
18import collections
19import errno
20import getpass
21import grp
22import os
23import shutil
24import subprocess
25import tempfile
26import time
27import webbrowser
28
29import unittest
30
31from unittest import mock
32
33from acloud import errors
34from acloud.internal.lib import driver_test_lib
35from acloud.internal.lib import utils
36
37
38GroupInfo = collections.namedtuple("GroupInfo", [
39    "gr_name",
40    "gr_passwd",
41    "gr_gid",
42    "gr_mem"])
43
44# Tkinter may not be supported so mock it out.
45try:
46    import Tkinter
47except ImportError:
48    Tkinter = mock.Mock()
49
50
51class FakeTkinter:
52    """Fake implementation of Tkinter.Tk()"""
53
54    def __init__(self, width=None, height=None):
55        self.width = width
56        self.height = height
57
58    # pylint: disable=invalid-name
59    def winfo_screenheight(self):
60        """Return the screen height."""
61        return self.height
62
63    # pylint: disable=invalid-name
64    def winfo_screenwidth(self):
65        """Return the screen width."""
66        return self.width
67
68
69# pylint: disable=too-many-public-methods
70class UtilsTest(driver_test_lib.BaseDriverTest):
71    """Test Utils."""
72
73    def TestTempDirSuccess(self):
74        """Test create a temp dir."""
75        self.Patch(os, "chmod")
76        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
77        self.Patch(shutil, "rmtree")
78        with utils.TempDir():
79            pass
80        # Verify.
81        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
82        shutil.rmtree.assert_called_with("/tmp/tempdir")  # pylint: disable=no-member
83
84    def TestTempDirExceptionRaised(self):
85        """Test create a temp dir and exception is raised within with-clause."""
86        self.Patch(os, "chmod")
87        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
88        self.Patch(shutil, "rmtree")
89
90        class ExpectedException(Exception):
91            """Expected exception."""
92
93        def _Call():
94            with utils.TempDir():
95                raise ExpectedException("Expected exception.")
96
97        # Verify. ExpectedException should be raised.
98        self.assertRaises(ExpectedException, _Call)
99        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
100        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
101
102    def testTempDirWhenDeleteTempDirNoLongerExist(self):  # pylint: disable=invalid-name
103        """Test create a temp dir and dir no longer exists during deletion."""
104        self.Patch(os, "chmod")
105        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
106        expected_error = EnvironmentError()
107        expected_error.errno = errno.ENOENT
108        self.Patch(shutil, "rmtree", side_effect=expected_error)
109
110        def _Call():
111            with utils.TempDir():
112                pass
113
114        # Verify no exception should be raised when rmtree raises
115        # EnvironmentError with errno.ENOENT, i.e.
116        # directory no longer exists.
117        _Call()
118        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
119        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
120
121    def testTempDirWhenDeleteEncounterError(self):
122        """Test create a temp dir and encoutered error during deletion."""
123        self.Patch(os, "chmod")
124        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
125        expected_error = OSError("Expected OS Error")
126        self.Patch(shutil, "rmtree", side_effect=expected_error)
127
128        def _Call():
129            with utils.TempDir():
130                pass
131
132        # Verify OSError should be raised.
133        self.assertRaises(OSError, _Call)
134        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
135        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
136
137    def testTempDirOrininalErrorRaised(self):
138        """Test original error is raised even if tmp dir deletion failed."""
139        self.Patch(os, "chmod")
140        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
141        expected_error = OSError("Expected OS Error")
142        self.Patch(shutil, "rmtree", side_effect=expected_error)
143
144        class ExpectedException(Exception):
145            """Expected exception."""
146
147        def _Call():
148            with utils.TempDir():
149                raise ExpectedException("Expected Exception")
150
151        # Verify.
152        # ExpectedException should be raised, and OSError
153        # should not be raised.
154        self.assertRaises(ExpectedException, _Call)
155        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
156        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
157
158    def testCreateSshKeyPairKeyAlreadyExists(self):  #pylint: disable=invalid-name
159        """Test when the key pair already exists."""
160        public_key = "/fake/public_key"
161        private_key = "/fake/private_key"
162        self.Patch(os.path, "exists", side_effect=[True, True])
163        self.Patch(subprocess, "check_call")
164        self.Patch(os, "makedirs", return_value=True)
165        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
166        self.assertEqual(subprocess.check_call.call_count, 0)  #pylint: disable=no-member
167
168    def testCreateSshKeyPairKeyAreCreated(self):
169        """Test when the key pair created."""
170        public_key = "/fake/public_key"
171        private_key = "/fake/private_key"
172        self.Patch(os.path, "exists", return_value=False)
173        self.Patch(os, "makedirs", return_value=True)
174        self.Patch(subprocess, "check_call")
175        self.Patch(os, "rename")
176        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
177        self.assertEqual(subprocess.check_call.call_count, 1)  #pylint: disable=no-member
178        subprocess.check_call.assert_called_with(  #pylint: disable=no-member
179            utils.SSH_KEYGEN_CMD +
180            ["-C", getpass.getuser(), "-f", private_key],
181            stdout=mock.ANY,
182            stderr=mock.ANY)
183
184    def testCreatePublicKeyAreCreated(self):
185        """Test when the PublicKey created."""
186        public_key = "/fake/public_key"
187        private_key = "/fake/private_key"
188        self.Patch(os.path, "exists", side_effect=[False, True, True])
189        self.Patch(os, "makedirs", return_value=True)
190        mock_open = mock.mock_open(read_data=public_key)
191        self.Patch(subprocess, "check_output")
192        self.Patch(os, "rename")
193        with mock.patch("builtins.open", mock_open):
194            utils.CreateSshKeyPairIfNotExist(private_key, public_key)
195        self.assertEqual(subprocess.check_output.call_count, 1)  #pylint: disable=no-member
196        subprocess.check_output.assert_called_with(  #pylint: disable=no-member
197            utils.SSH_KEYGEN_PUB_CMD +["-f", private_key])
198
199    def TestRetryOnException(self):
200        """Test Retry."""
201
202        def _IsValueError(exc):
203            return isinstance(exc, ValueError)
204
205        num_retry = 5
206
207        @utils.RetryOnException(_IsValueError, num_retry)
208        def _RaiseAndRetry(sentinel):
209            sentinel.alert()
210            raise ValueError("Fake error.")
211
212        sentinel = mock.MagicMock()
213        self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
214        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
215
216    def testRetryExceptionType(self):
217        """Test RetryExceptionType function."""
218
219        def _RaiseAndRetry(sentinel):
220            sentinel.alert()
221            raise ValueError("Fake error.")
222
223        num_retry = 5
224        sentinel = mock.MagicMock()
225        self.assertRaises(
226            ValueError,
227            utils.RetryExceptionType, (KeyError, ValueError),
228            num_retry,
229            _RaiseAndRetry,
230            0, # sleep_multiplier
231            1, # retry_backoff_factor
232            sentinel=sentinel)
233        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
234
235    def testRetry(self):
236        """Test Retry."""
237        mock_sleep = self.Patch(time, "sleep")
238
239        def _RaiseAndRetry(sentinel):
240            sentinel.alert()
241            raise ValueError("Fake error.")
242
243        num_retry = 5
244        sentinel = mock.MagicMock()
245        self.assertRaises(
246            ValueError,
247            utils.RetryExceptionType, (ValueError, KeyError),
248            num_retry,
249            _RaiseAndRetry,
250            1, # sleep_multiplier
251            2, # retry_backoff_factor
252            sentinel=sentinel)
253
254        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
255        mock_sleep.assert_has_calls(
256            [
257                mock.call(1),
258                mock.call(2),
259                mock.call(4),
260                mock.call(8),
261                mock.call(16)
262            ])
263
264    @mock.patch("builtins.input")
265    def testGetAnswerFromList(self, mock_raw_input):
266        """Test GetAnswerFromList."""
267        answer_list = ["image1.zip", "image2.zip", "image3.zip"]
268        mock_raw_input.return_value = 0
269        with self.assertRaises(SystemExit):
270            utils.GetAnswerFromList(answer_list)
271        mock_raw_input.side_effect = [1, 2, 3, 4]
272        self.assertEqual(utils.GetAnswerFromList(answer_list),
273                         ["image1.zip"])
274        self.assertEqual(utils.GetAnswerFromList(answer_list),
275                         ["image2.zip"])
276        self.assertEqual(utils.GetAnswerFromList(answer_list),
277                         ["image3.zip"])
278        self.assertEqual(utils.GetAnswerFromList(answer_list,
279                                                 enable_choose_all=True),
280                         answer_list)
281
282    @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.")
283    @mock.patch.object(Tkinter, "Tk")
284    def testCalculateVNCScreenRatio(self, mock_tk):
285        """Test Calculating the scale ratio of VNC display."""
286        # Get scale-down ratio if screen height is smaller than AVD height.
287        mock_tk.return_value = FakeTkinter(height=800, width=1200)
288        avd_h = 1920
289        avd_w = 1080
290        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4)
291
292        # Get scale-down ratio if screen width is smaller than AVD width.
293        mock_tk.return_value = FakeTkinter(height=800, width=1200)
294        avd_h = 900
295        avd_w = 1920
296        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
297
298        # Scale ratio = 1 if screen is larger than AVD.
299        mock_tk.return_value = FakeTkinter(height=1080, width=1920)
300        avd_h = 800
301        avd_w = 1280
302        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1)
303
304        # Get the scale if ratio of width is smaller than the
305        # ratio of height.
306        mock_tk.return_value = FakeTkinter(height=1200, width=800)
307        avd_h = 1920
308        avd_w = 1080
309        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
310
311    def testCheckUserInGroups(self):
312        """Test CheckUserInGroups."""
313        self.Patch(getpass, "getuser", return_value="user_0")
314        self.Patch(grp, "getgrall", return_value=[
315            GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]),
316            GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])])
317        self.Patch(grp, "getgrnam", return_value=GroupInfo(
318            "fake_group1", "passwd_1", 0, ["user_1", "user_2"]))
319        # Test Group name doesn't exist.
320        self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"]))
321
322        # Test User isn't in group.
323        self.assertFalse(utils.CheckUserInGroups(["fake_group1"]))
324
325        # Test User is in group.
326        self.Patch(getpass, "getuser", return_value="user_1")
327        self.assertTrue(utils.CheckUserInGroups(["fake_group1"]))
328
329    @mock.patch.object(utils, "CheckUserInGroups")
330    def testAddUserGroupsToCmd(self, mock_user_group):
331        """Test AddUserGroupsToCmd."""
332        command = "test_command"
333        groups = ["group1", "group2"]
334        # Don't add user group in command
335        mock_user_group.return_value = True
336        expected_value = "test_command"
337        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
338                                                                  groups))
339
340        # Add user group in command
341        mock_user_group.return_value = False
342        expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF"
343        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
344                                                                  groups))
345
346    # pylint: disable=invalid-name
347    def testTimeoutException(self):
348        """Test TimeoutException."""
349        @utils.TimeoutException(1, "should time out")
350        def functionThatWillTimeOut():
351            """Test decorator of @utils.TimeoutException should timeout."""
352            time.sleep(5)
353
354        self.assertRaises(errors.FunctionTimeoutError,
355                          functionThatWillTimeOut)
356
357
358    def testTimeoutExceptionNoTimeout(self):
359        """Test No TimeoutException."""
360        @utils.TimeoutException(5, "shouldn't time out")
361        def functionThatShouldNotTimeout():
362            """Test decorator of @utils.TimeoutException shouldn't timeout."""
363            return None
364        try:
365            functionThatShouldNotTimeout()
366        except errors.FunctionTimeoutError:
367            self.fail("shouldn't timeout")
368
369    def testEstablishSshTunnel(self):
370        """Test EstablishSshTunnel."""
371        ip_addr = "1.1.1.1"
372        rsa_key_file = "/tmp/rsa_file"
373        port_mapping = [(1111, 2222), (8888, 9999)]
374        ssh_user = "fake_user"
375        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
376        utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user,
377                                 port_mapping, "-o command='shell %s %h'")
378        arg_list = ["-i", rsa_key_file, "-o", "UserKnownHostsFile=/dev/null",
379                    "-o", "StrictHostKeyChecking=no",
380                    "-L", "1111:127.0.0.1:2222",
381                    "-L", "8888:127.0.0.1:9999",
382                    "-N", "-f",
383                    "-l", ssh_user, ip_addr,
384                    "-o", "command=shell %s %h"]
385        mock_execute_command.assert_called_with("ssh", arg_list)
386
387    def testAutoConnectCreateSSHTunnelFail(self):
388        """Test auto connect."""
389        fake_ip_addr = "1.1.1.1"
390        fake_rsa_key_file = "/tmp/rsa_file"
391        fake_target_vnc_port = 8888
392        target_adb_port = 9999
393        target_fastboot_port = 7777
394        ssh_user = "fake_user"
395        call_side_effect = subprocess.CalledProcessError(123, "fake",
396                                                         "fake error")
397        result = utils.ForwardedPorts(vnc_port=None, adb_port=None, fastboot_port=None)
398        self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect)
399        self.assertEqual(result, utils.AutoConnect(fake_ip_addr,
400                                                   fake_rsa_key_file,
401                                                   fake_target_vnc_port,
402                                                   target_adb_port,
403                                                   target_fastboot_port,
404                                                   ssh_user))
405
406    def testAutoConnectWithExtraArgs(self):
407        """Test extra args will be the same with expanded args."""
408        fake_ip_addr = "1.1.1.1"
409        fake_rsa_key_file = "/tmp/rsa_file"
410        fake_target_vnc_port = 8888
411        target_adb_port = 9999
412        target_fastboot_port = 7777
413        ssh_user = "fake_user"
414        fake_port = 12345
415        self.Patch(utils, "PickFreePort", return_value=fake_port)
416        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
417        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
418        extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'"
419        utils.AutoConnect(ip_addr=fake_ip_addr,
420                          rsa_key_file=fake_rsa_key_file,
421                          target_vnc_port=fake_target_vnc_port,
422                          target_adb_port=target_adb_port,
423                          target_fastboot_port=target_fastboot_port,
424                          ssh_user=ssh_user,
425                          client_adb_port=fake_port,
426                          client_fastboot_port=fake_port,
427                          extra_args_ssh_tunnel=extra_args_ssh_tunnel)
428        mock_establish_ssh_tunnel.assert_called_with(
429            fake_ip_addr,
430            fake_rsa_key_file,
431            ssh_user,
432            [utils.PortMapping(fake_port, target_adb_port),
433             utils.PortMapping(fake_port, target_fastboot_port),
434             utils.PortMapping(fake_port, fake_target_vnc_port)],
435            extra_args_ssh_tunnel)
436        mock_execute_command.assert_called_with(
437            "adb", ["connect", "127.0.0.1:12345"])
438
439    def testEstablishWebRTCSshTunnel(self):
440        """Test establish WebRTC ssh tunnel."""
441        fake_ip_addr = "1.1.1.1"
442        fake_rsa_key_file = "/tmp/rsa_file"
443        ssh_user = "fake_user"
444        fake_webrtc_local_port = 12345
445        self.Patch(utils, "GetWebRTCServerPort", return_value=8443)
446        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
447        fake_port_mapping = [utils.PortMapping(port, port) for port in range(15555, 15579 + 1)] + [utils.PortMapping(12345, 8443)]
448
449        utils.EstablishWebRTCSshTunnel(
450            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
451            ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port)
452        mock_establish_ssh_tunnel.assert_called_with(
453            fake_ip_addr,
454            fake_rsa_key_file,
455            ssh_user,
456            fake_port_mapping,
457            None)
458
459        mock_establish_ssh_tunnel.reset_mock()
460        extra_args_ssh_tunnel = "-o command='shell %s %h'"
461        utils.EstablishWebRTCSshTunnel(
462            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
463            ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel,
464            webrtc_local_port=fake_webrtc_local_port)
465        mock_establish_ssh_tunnel.assert_called_with(
466            fake_ip_addr,
467            fake_rsa_key_file,
468            ssh_user,
469            fake_port_mapping,
470            extra_args_ssh_tunnel)
471
472    def testGetWebRTCServerPort(self):
473        """test GetWebRTCServerPort."""
474        fake_ip_addr = "1.1.1.1"
475        fake_rsa_key_file = "/tmp/rsa_file"
476        ssh_user = "fake_user"
477        extra_args_ssh_tunnel = "-o command='shell %s %h'"
478        fake_subprocess = mock.MagicMock()
479        fake_subprocess.returncode = 0
480        fake_subprocess.communicate = mock.MagicMock(
481            return_value=('', ''))
482        self.Patch(subprocess, "Popen", return_value=fake_subprocess)
483        self.assertEqual(1443, utils.GetWebRTCServerPort(
484            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
485
486        # Test the case that find "webrtc_operator" process.
487        webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets"
488        fake_subprocess.communicate = mock.MagicMock(
489            return_value=(webrtc_operator_process, ''))
490        self.assertEqual(8443, utils.GetWebRTCServerPort(
491            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
492
493    def testGetWebrtcPortFromSSHTunnel(self):
494        """"Test Get forwarding webrtc port from ssh tunnel."""
495        fake_ps_output = ("/fake_ps_1 --fake arg \n"
496                          "/fake_ps_2 --fake arg \n"
497                          "/usr/bin/ssh -i ~/.ssh/acloud_rsa "
498                          "-o UserKnownHostsFile=/dev/null "
499                          "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 "
500                          "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode()
501        self.Patch(subprocess, "check_output", return_value=fake_ps_output)
502        webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1")
503        self.assertEqual(12345, webrtc_ports)
504
505    @mock.patch("acloud.internal.lib.utils.subprocess")
506    def testFindRemoteFiles(self, mock_subprocess):
507        """Test FindRemoteFiles."""
508        mock_ssh = mock.Mock()
509
510        paths = utils.FindRemoteFiles(mock_ssh, [])
511        mock_subprocess.run.assert_not_called()
512        self.assertEqual([], paths)
513
514        mock_ssh.GetBaseCmd.return_value = "mock_ssh"
515        mock_subprocess.run.return_value = mock.Mock(
516            stderr=b'stderr', stdout=b'file1\nfile2\n')
517        paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"])
518        self.assertEqual(["file1", "file2"], paths)
519        mock_subprocess.run.assert_called_with(
520            'mock_ssh find -H dir1 dir2 -type f',
521            shell=True, capture_output=True, check=False)
522
523        mock_subprocess.run.return_value = mock.Mock(stderr=None, stdout=b'')
524        paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"])
525        self.assertEqual([], paths)
526
527    # pylint: disable=protected-access, no-member
528    def testCleanupSSVncviwer(self):
529        """test cleanup ssvnc viewer."""
530        fake_vnc_port = 9999
531        fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % {
532            "vnc_port": fake_vnc_port}
533        self.Patch(utils, "IsCommandRunning", return_value=True)
534        self.Patch(subprocess, "check_call", return_value=True)
535        utils.CleanupSSVncviewer(fake_vnc_port)
536        subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern])
537
538        subprocess.check_call.call_count = 0
539        self.Patch(utils, "IsCommandRunning", return_value=False)
540        utils.CleanupSSVncviewer(fake_vnc_port)
541        subprocess.check_call.assert_not_called()
542
543    def testLaunchBrowserFromReport(self):
544        """test launch browser from report."""
545        self.Patch(webbrowser, "open_new_tab")
546        fake_report = mock.MagicMock(data={})
547
548        # test remote instance
549        self.Patch(os.environ, "get", return_value=True)
550        fake_report.data = {
551            "devices": [{"instance_name": "remote_cf_instance_name",
552                         "ip": "192.168.1.1",},],}
553
554        utils.LaunchBrowserFromReport(fake_report)
555        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
556        webbrowser.open_new_tab.call_count = 0
557
558        # test local instance
559        fake_report.data = {
560            "devices": [{"instance_name": "local-instance1",
561                         "ip": "127.0.0.1:6250",},],}
562        utils.LaunchBrowserFromReport(fake_report)
563        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
564        webbrowser.open_new_tab.call_count = 0
565
566        # verify terminal can't support launch webbrowser.
567        self.Patch(os.environ, "get", return_value=False)
568        utils.LaunchBrowserFromReport(fake_report)
569        self.assertEqual(webbrowser.open_new_tab.call_count, 0)
570
571    def testSetExecutable(self):
572        """test setting a file to be executable."""
573        with tempfile.NamedTemporaryFile(delete=True) as temp_file:
574            utils.SetExecutable(temp_file.name)
575            self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755)
576
577    def testSetDirectoryTreeExecutable(self):
578        """test setting a file in a directory to be executable."""
579        with tempfile.TemporaryDirectory() as temp_dir:
580            subdir = os.path.join(temp_dir, "subdir")
581            file_path = os.path.join(subdir, "file")
582            os.makedirs(subdir)
583            with open(file_path, "w"):
584                pass
585            utils.SetDirectoryTreeExecutable(temp_dir)
586            self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755)
587
588    def testSetCvdPort(self):
589        """test base_instance_num."""
590        utils.SetCvdPorts(2)
591        self.assertEqual(utils.GetCvdPorts().adb_port, 6521)
592        self.assertEqual(utils.GetCvdPorts().fastboot_port, 7521)
593        self.assertEqual(utils.GetCvdPorts().vnc_port, 6445)
594        utils.SetCvdPorts(None)
595
596
597    @mock.patch.object(utils, "PrintColorString")
598    def testPrintDeviceSummary(self, mock_print):
599        """test PrintDeviceSummary."""
600        fake_report = mock.MagicMock(data={})
601        fake_report.data = {
602            "devices": [{"instance_name": "remote_cf_instance_name",
603                         "ip": "192.168.1.1",
604                         "device_serial": "127.0.0.1:399"},],}
605        utils.PrintDeviceSummary(fake_report)
606        self.assertEqual(mock_print.call_count, 7)
607
608        # Test for OpenWrt device case.
609        fake_report.data = {
610            "devices": [{"instance_name": "remote_cf_instance_name",
611                         "ip": "192.168.1.1",
612                         "ssh_command": "fake_ssh_cmd",
613                         "screen_command": "fake_screen_cmd"},],}
614        mock_print.reset_mock()
615        utils.PrintDeviceSummary(fake_report)
616        self.assertEqual(mock_print.call_count, 13)
617
618        # Test for fail case
619        fake_report.data = {
620            "errors": "Fail to create devices"}
621        mock_print.reset_mock()
622        utils.PrintDeviceSummary(fake_report)
623        self.assertEqual(mock_print.call_count, 3)
624
625    # pylint: disable=protected-access
626    def testIsSupportedKvm(self):
627        """Test IsSupportedKvm."""
628        self.Patch(os.path, "exists", return_value=True)
629        self.assertTrue(utils.IsSupportedKvm())
630
631        self.Patch(os.path, "exists", return_value=False)
632        self.assertFalse(utils.IsSupportedKvm())
633
634
635if __name__ == "__main__":
636    unittest.main()
637