• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Tests for acloud.internal.lib.utils."""
17
18import collections
19import errno
20import getpass
21import grp
22import os
23import shutil
24import subprocess
25import tempfile
26import time
27import webbrowser
28
29import unittest
30
31from unittest import mock
32
33from acloud import errors
34from acloud.internal.lib import driver_test_lib
35from acloud.internal.lib import utils
36
37
38GroupInfo = collections.namedtuple("GroupInfo", [
39    "gr_name",
40    "gr_passwd",
41    "gr_gid",
42    "gr_mem"])
43
44# Tkinter may not be supported so mock it out.
45try:
46    import Tkinter
47except ImportError:
48    Tkinter = mock.Mock()
49
50
51class FakeTkinter:
52    """Fake implementation of Tkinter.Tk()"""
53
54    def __init__(self, width=None, height=None):
55        self.width = width
56        self.height = height
57
58    # pylint: disable=invalid-name
59    def winfo_screenheight(self):
60        """Return the screen height."""
61        return self.height
62
63    # pylint: disable=invalid-name
64    def winfo_screenwidth(self):
65        """Return the screen width."""
66        return self.width
67
68
69# pylint: disable=too-many-public-methods
70class UtilsTest(driver_test_lib.BaseDriverTest):
71    """Test Utils."""
72
73    def TestTempDirSuccess(self):
74        """Test create a temp dir."""
75        self.Patch(os, "chmod")
76        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
77        self.Patch(shutil, "rmtree")
78        with utils.TempDir():
79            pass
80        # Verify.
81        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
82        shutil.rmtree.assert_called_with("/tmp/tempdir")  # pylint: disable=no-member
83
84    def TestTempDirExceptionRaised(self):
85        """Test create a temp dir and exception is raised within with-clause."""
86        self.Patch(os, "chmod")
87        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
88        self.Patch(shutil, "rmtree")
89
90        class ExpectedException(Exception):
91            """Expected exception."""
92
93        def _Call():
94            with utils.TempDir():
95                raise ExpectedException("Expected exception.")
96
97        # Verify. ExpectedException should be raised.
98        self.assertRaises(ExpectedException, _Call)
99        tempfile.mkdtemp.assert_called_once()  # pylint: disable=no-member
100        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
101
102    def testTempDirWhenDeleteTempDirNoLongerExist(self):  # pylint: disable=invalid-name
103        """Test create a temp dir and dir no longer exists during deletion."""
104        self.Patch(os, "chmod")
105        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
106        expected_error = EnvironmentError()
107        expected_error.errno = errno.ENOENT
108        self.Patch(shutil, "rmtree", side_effect=expected_error)
109
110        def _Call():
111            with utils.TempDir():
112                pass
113
114        # Verify no exception should be raised when rmtree raises
115        # EnvironmentError with errno.ENOENT, i.e.
116        # directory no longer exists.
117        _Call()
118        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
119        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
120
121    def testTempDirWhenDeleteEncounterError(self):
122        """Test create a temp dir and encoutered error during deletion."""
123        self.Patch(os, "chmod")
124        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
125        expected_error = OSError("Expected OS Error")
126        self.Patch(shutil, "rmtree", side_effect=expected_error)
127
128        def _Call():
129            with utils.TempDir():
130                pass
131
132        # Verify OSError should be raised.
133        self.assertRaises(OSError, _Call)
134        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
135        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
136
137    def testTempDirOrininalErrorRaised(self):
138        """Test original error is raised even if tmp dir deletion failed."""
139        self.Patch(os, "chmod")
140        self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
141        expected_error = OSError("Expected OS Error")
142        self.Patch(shutil, "rmtree", side_effect=expected_error)
143
144        class ExpectedException(Exception):
145            """Expected exception."""
146
147        def _Call():
148            with utils.TempDir():
149                raise ExpectedException("Expected Exception")
150
151        # Verify.
152        # ExpectedException should be raised, and OSError
153        # should not be raised.
154        self.assertRaises(ExpectedException, _Call)
155        tempfile.mkdtemp.assert_called_once()  #pylint: disable=no-member
156        shutil.rmtree.assert_called_with("/tmp/tempdir")  #pylint: disable=no-member
157
158    def testCreateSshKeyPairKeyAlreadyExists(self):  #pylint: disable=invalid-name
159        """Test when the key pair already exists."""
160        public_key = "/fake/public_key"
161        private_key = "/fake/private_key"
162        self.Patch(os.path, "exists", side_effect=[True, True])
163        self.Patch(subprocess, "check_call")
164        self.Patch(os, "makedirs", return_value=True)
165        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
166        self.assertEqual(subprocess.check_call.call_count, 0)  #pylint: disable=no-member
167
168    def testCreateSshKeyPairKeyAreCreated(self):
169        """Test when the key pair created."""
170        public_key = "/fake/public_key"
171        private_key = "/fake/private_key"
172        self.Patch(os.path, "exists", return_value=False)
173        self.Patch(os, "makedirs", return_value=True)
174        self.Patch(subprocess, "check_call")
175        self.Patch(os, "rename")
176        utils.CreateSshKeyPairIfNotExist(private_key, public_key)
177        self.assertEqual(subprocess.check_call.call_count, 1)  #pylint: disable=no-member
178        subprocess.check_call.assert_called_with(  #pylint: disable=no-member
179            utils.SSH_KEYGEN_CMD +
180            ["-C", getpass.getuser(), "-f", private_key],
181            stdout=mock.ANY,
182            stderr=mock.ANY)
183
184    def testCreatePublicKeyAreCreated(self):
185        """Test when the PublicKey created."""
186        public_key = "/fake/public_key"
187        private_key = "/fake/private_key"
188        self.Patch(os.path, "exists", side_effect=[False, True, True])
189        self.Patch(os, "makedirs", return_value=True)
190        mock_open = mock.mock_open(read_data=public_key)
191        self.Patch(subprocess, "check_output")
192        self.Patch(os, "rename")
193        with mock.patch("builtins.open", mock_open):
194            utils.CreateSshKeyPairIfNotExist(private_key, public_key)
195        self.assertEqual(subprocess.check_output.call_count, 1)  #pylint: disable=no-member
196        subprocess.check_output.assert_called_with(  #pylint: disable=no-member
197            utils.SSH_KEYGEN_PUB_CMD +["-f", private_key])
198
199    def TestRetryOnException(self):
200        """Test Retry."""
201
202        def _IsValueError(exc):
203            return isinstance(exc, ValueError)
204
205        num_retry = 5
206
207        @utils.RetryOnException(_IsValueError, num_retry)
208        def _RaiseAndRetry(sentinel):
209            sentinel.alert()
210            raise ValueError("Fake error.")
211
212        sentinel = mock.MagicMock()
213        self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
214        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
215
216    def testRetryExceptionType(self):
217        """Test RetryExceptionType function."""
218
219        def _RaiseAndRetry(sentinel):
220            sentinel.alert()
221            raise ValueError("Fake error.")
222
223        num_retry = 5
224        sentinel = mock.MagicMock()
225        self.assertRaises(
226            ValueError,
227            utils.RetryExceptionType, (KeyError, ValueError),
228            num_retry,
229            _RaiseAndRetry,
230            0, # sleep_multiplier
231            1, # retry_backoff_factor
232            sentinel=sentinel)
233        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
234
235    def testRetry(self):
236        """Test Retry."""
237        mock_sleep = self.Patch(time, "sleep")
238
239        def _RaiseAndRetry(sentinel):
240            sentinel.alert()
241            raise ValueError("Fake error.")
242
243        num_retry = 5
244        sentinel = mock.MagicMock()
245        self.assertRaises(
246            ValueError,
247            utils.RetryExceptionType, (ValueError, KeyError),
248            num_retry,
249            _RaiseAndRetry,
250            1, # sleep_multiplier
251            2, # retry_backoff_factor
252            sentinel=sentinel)
253
254        self.assertEqual(1 + num_retry, sentinel.alert.call_count)
255        mock_sleep.assert_has_calls(
256            [
257                mock.call(1),
258                mock.call(2),
259                mock.call(4),
260                mock.call(8),
261                mock.call(16)
262            ])
263
264    @mock.patch("builtins.input")
265    def testGetAnswerFromList(self, mock_raw_input):
266        """Test GetAnswerFromList."""
267        answer_list = ["image1.zip", "image2.zip", "image3.zip"]
268        mock_raw_input.return_value = 0
269        with self.assertRaises(SystemExit):
270            utils.GetAnswerFromList(answer_list)
271        mock_raw_input.side_effect = [1, 2, 3, 4]
272        self.assertEqual(utils.GetAnswerFromList(answer_list),
273                         ["image1.zip"])
274        self.assertEqual(utils.GetAnswerFromList(answer_list),
275                         ["image2.zip"])
276        self.assertEqual(utils.GetAnswerFromList(answer_list),
277                         ["image3.zip"])
278        self.assertEqual(utils.GetAnswerFromList(answer_list,
279                                                 enable_choose_all=True),
280                         answer_list)
281
282    @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.")
283    @mock.patch.object(Tkinter, "Tk")
284    def testCalculateVNCScreenRatio(self, mock_tk):
285        """Test Calculating the scale ratio of VNC display."""
286        # Get scale-down ratio if screen height is smaller than AVD height.
287        mock_tk.return_value = FakeTkinter(height=800, width=1200)
288        avd_h = 1920
289        avd_w = 1080
290        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4)
291
292        # Get scale-down ratio if screen width is smaller than AVD width.
293        mock_tk.return_value = FakeTkinter(height=800, width=1200)
294        avd_h = 900
295        avd_w = 1920
296        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
297
298        # Scale ratio = 1 if screen is larger than AVD.
299        mock_tk.return_value = FakeTkinter(height=1080, width=1920)
300        avd_h = 800
301        avd_w = 1280
302        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1)
303
304        # Get the scale if ratio of width is smaller than the
305        # ratio of height.
306        mock_tk.return_value = FakeTkinter(height=1200, width=800)
307        avd_h = 1920
308        avd_w = 1080
309        self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
310
311    def testCheckUserInGroups(self):
312        """Test CheckUserInGroups."""
313        self.Patch(getpass, "getuser", return_value="user_0")
314        self.Patch(grp, "getgrall", return_value=[
315            GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]),
316            GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])])
317        self.Patch(grp, "getgrnam", return_value=GroupInfo(
318            "fake_group1", "passwd_1", 0, ["user_1", "user_2"]))
319        # Test Group name doesn't exist.
320        self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"]))
321
322        # Test User isn't in group.
323        self.assertFalse(utils.CheckUserInGroups(["fake_group1"]))
324
325        # Test User is in group.
326        self.Patch(getpass, "getuser", return_value="user_1")
327        self.assertTrue(utils.CheckUserInGroups(["fake_group1"]))
328
329    @mock.patch.object(utils, "CheckUserInGroups")
330    def testAddUserGroupsToCmd(self, mock_user_group):
331        """Test AddUserGroupsToCmd."""
332        command = "test_command"
333        groups = ["group1", "group2"]
334        # Don't add user group in command
335        mock_user_group.return_value = True
336        expected_value = "test_command"
337        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
338                                                                  groups))
339
340        # Add user group in command
341        mock_user_group.return_value = False
342        expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF"
343        self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
344                                                                  groups))
345
346    # pylint: disable=invalid-name
347    def testTimeoutException(self):
348        """Test TimeoutException."""
349        @utils.TimeoutException(1, "should time out")
350        def functionThatWillTimeOut():
351            """Test decorator of @utils.TimeoutException should timeout."""
352            time.sleep(5)
353
354        self.assertRaises(errors.FunctionTimeoutError,
355                          functionThatWillTimeOut)
356
357
358    def testTimeoutExceptionNoTimeout(self):
359        """Test No TimeoutException."""
360        @utils.TimeoutException(5, "shouldn't time out")
361        def functionThatShouldNotTimeout():
362            """Test decorator of @utils.TimeoutException shouldn't timeout."""
363            return None
364        try:
365            functionThatShouldNotTimeout()
366        except errors.FunctionTimeoutError:
367            self.fail("shouldn't timeout")
368
369    def testEstablishSshTunnel(self):
370        """Test EstablishSshTunnel."""
371        ip_addr = "1.1.1.1"
372        rsa_key_file = "/tmp/rsa_file"
373        port_mapping = [(1111, 2222), (8888, 9999)]
374        ssh_user = "fake_user"
375        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
376        utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user,
377                                 port_mapping, "-o command='shell %s %h'")
378        arg_list = ["-i", rsa_key_file, "-o", "UserKnownHostsFile=/dev/null",
379                    "-o", "StrictHostKeyChecking=no",
380                    "-L", "1111:127.0.0.1:2222",
381                    "-L", "8888:127.0.0.1:9999",
382                    "-N", "-f",
383                    "-l", ssh_user, ip_addr,
384                    "-o", "command=shell %s %h"]
385        mock_execute_command.assert_called_with("ssh", arg_list)
386
387    def testAutoConnectCreateSSHTunnelFail(self):
388        """Test auto connect."""
389        fake_ip_addr = "1.1.1.1"
390        fake_rsa_key_file = "/tmp/rsa_file"
391        fake_target_vnc_port = 8888
392        target_adb_port = 9999
393        ssh_user = "fake_user"
394        call_side_effect = subprocess.CalledProcessError(123, "fake",
395                                                         "fake error")
396        result = utils.ForwardedPorts(vnc_port=None, adb_port=None)
397        self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect)
398        self.assertEqual(result, utils.AutoConnect(fake_ip_addr,
399                                                   fake_rsa_key_file,
400                                                   fake_target_vnc_port,
401                                                   target_adb_port,
402                                                   ssh_user))
403
404    def testAutoConnectWithExtraArgs(self):
405        """Test extra args will be the same with expanded args."""
406        fake_ip_addr = "1.1.1.1"
407        fake_rsa_key_file = "/tmp/rsa_file"
408        fake_target_vnc_port = 8888
409        target_adb_port = 9999
410        ssh_user = "fake_user"
411        fake_port = 12345
412        self.Patch(utils, "PickFreePort", return_value=fake_port)
413        mock_execute_command = self.Patch(utils, "_ExecuteCommand")
414        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
415        extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'"
416        utils.AutoConnect(ip_addr=fake_ip_addr,
417                          rsa_key_file=fake_rsa_key_file,
418                          target_vnc_port=fake_target_vnc_port,
419                          target_adb_port=target_adb_port,
420                          ssh_user=ssh_user,
421                          client_adb_port=fake_port,
422                          extra_args_ssh_tunnel=extra_args_ssh_tunnel)
423        mock_establish_ssh_tunnel.assert_called_with(
424            fake_ip_addr,
425            fake_rsa_key_file,
426            ssh_user,
427            [utils.PortMapping(fake_port, target_adb_port),
428             utils.PortMapping(fake_port, fake_target_vnc_port)],
429            extra_args_ssh_tunnel)
430        mock_execute_command.assert_called_with(
431            "adb", ["connect", "127.0.0.1:12345"])
432
433    def testEstablishWebRTCSshTunnel(self):
434        """Test establish WebRTC ssh tunnel."""
435        fake_ip_addr = "1.1.1.1"
436        fake_rsa_key_file = "/tmp/rsa_file"
437        ssh_user = "fake_user"
438        fake_webrtc_local_port = 12345
439        self.Patch(utils, "GetWebRTCServerPort", return_value=8443)
440        mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel")
441        fake_port_mapping = [utils.PortMapping(15550, 15550),
442                             utils.PortMapping(15551, 15551),
443                             utils.PortMapping(15552, 15552),
444                             utils.PortMapping(12345, 8443)]
445
446        utils.EstablishWebRTCSshTunnel(
447            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
448            ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port)
449        mock_establish_ssh_tunnel.assert_called_with(
450            fake_ip_addr,
451            fake_rsa_key_file,
452            ssh_user,
453            fake_port_mapping,
454            None)
455
456        mock_establish_ssh_tunnel.reset_mock()
457        extra_args_ssh_tunnel = "-o command='shell %s %h'"
458        utils.EstablishWebRTCSshTunnel(
459            ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file,
460            ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel,
461            webrtc_local_port=fake_webrtc_local_port)
462        mock_establish_ssh_tunnel.assert_called_with(
463            fake_ip_addr,
464            fake_rsa_key_file,
465            ssh_user,
466            fake_port_mapping,
467            extra_args_ssh_tunnel)
468
469    def testGetWebRTCServerPort(self):
470        """test GetWebRTCServerPort."""
471        fake_ip_addr = "1.1.1.1"
472        fake_rsa_key_file = "/tmp/rsa_file"
473        ssh_user = "fake_user"
474        extra_args_ssh_tunnel = "-o command='shell %s %h'"
475        fake_subprocess = mock.MagicMock()
476        fake_subprocess.returncode = 0
477        fake_subprocess.communicate = mock.MagicMock(
478            return_value=('', ''))
479        self.Patch(subprocess, "Popen", return_value=fake_subprocess)
480        self.assertEqual(1443, utils.GetWebRTCServerPort(
481            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
482
483        # Test the case that find "webrtc_operator" process.
484        webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets"
485        fake_subprocess.communicate = mock.MagicMock(
486            return_value=(webrtc_operator_process, ''))
487        self.assertEqual(8443, utils.GetWebRTCServerPort(
488            fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel))
489
490    def testGetWebrtcPortFromSSHTunnel(self):
491        """"Test Get forwarding webrtc port from ssh tunnel."""
492        fake_ps_output = ("/fake_ps_1 --fake arg \n"
493                          "/fake_ps_2 --fake arg \n"
494                          "/usr/bin/ssh -i ~/.ssh/acloud_rsa "
495                          "-o UserKnownHostsFile=/dev/null "
496                          "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 "
497                          "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode()
498        self.Patch(subprocess, "check_output", return_value=fake_ps_output)
499        webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1")
500        self.assertEqual(12345, webrtc_ports)
501
502    # pylint: disable=protected-access, no-member
503    def testCleanupSSVncviwer(self):
504        """test cleanup ssvnc viewer."""
505        fake_vnc_port = 9999
506        fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % {
507            "vnc_port": fake_vnc_port}
508        self.Patch(utils, "IsCommandRunning", return_value=True)
509        self.Patch(subprocess, "check_call", return_value=True)
510        utils.CleanupSSVncviewer(fake_vnc_port)
511        subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern])
512
513        subprocess.check_call.call_count = 0
514        self.Patch(utils, "IsCommandRunning", return_value=False)
515        utils.CleanupSSVncviewer(fake_vnc_port)
516        subprocess.check_call.assert_not_called()
517
518    def testLaunchBrowserFromReport(self):
519        """test launch browser from report."""
520        self.Patch(webbrowser, "open_new_tab")
521        fake_report = mock.MagicMock(data={})
522
523        # test remote instance
524        self.Patch(os.environ, "get", return_value=True)
525        fake_report.data = {
526            "devices": [{"instance_name": "remote_cf_instance_name",
527                         "ip": "192.168.1.1",},],}
528
529        utils.LaunchBrowserFromReport(fake_report)
530        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
531        webbrowser.open_new_tab.call_count = 0
532
533        # test local instance
534        fake_report.data = {
535            "devices": [{"instance_name": "local-instance1",
536                         "ip": "127.0.0.1:6250",},],}
537        utils.LaunchBrowserFromReport(fake_report)
538        webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443")
539        webbrowser.open_new_tab.call_count = 0
540
541        # verify terminal can't support launch webbrowser.
542        self.Patch(os.environ, "get", return_value=False)
543        utils.LaunchBrowserFromReport(fake_report)
544        self.assertEqual(webbrowser.open_new_tab.call_count, 0)
545
546    def testSetExecutable(self):
547        """test setting a file to be executable."""
548        with tempfile.NamedTemporaryFile(delete=True) as temp_file:
549            utils.SetExecutable(temp_file.name)
550            self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755)
551
552    def testSetDirectoryTreeExecutable(self):
553        """test setting a file in a directory to be executable."""
554        with tempfile.TemporaryDirectory() as temp_dir:
555            subdir = os.path.join(temp_dir, "subdir")
556            file_path = os.path.join(subdir, "file")
557            os.makedirs(subdir)
558            with open(file_path, "w"):
559                pass
560            utils.SetDirectoryTreeExecutable(temp_dir)
561            self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755)
562
563    def testSetCvdPort(self):
564        """test base_instance_num."""
565        utils.SetCvdPorts(2)
566        self.assertEqual(utils.GetCvdPorts().adb_port, 6521)
567        self.assertEqual(utils.GetCvdPorts().vnc_port, 6445)
568        utils.SetCvdPorts(None)
569
570
571    @mock.patch.object(utils, "PrintColorString")
572    def testPrintDeviceSummary(self, mock_print):
573        """test PrintDeviceSummary."""
574        fake_report = mock.MagicMock(data={})
575        fake_report.data = {
576            "devices": [{"instance_name": "remote_cf_instance_name",
577                         "ip": "192.168.1.1",
578                         "device_serial": "127.0.0.1:399"},],}
579        utils.PrintDeviceSummary(fake_report)
580        self.assertEqual(mock_print.call_count, 7)
581
582        # Test for OpenWrt device case.
583        fake_report.data = {
584            "devices": [{"instance_name": "remote_cf_instance_name",
585                         "ip": "192.168.1.1",
586                         "ssh_command": "fake_ssh_cmd",
587                         "screen_command": "fake_screen_cmd"},],}
588        mock_print.reset_mock()
589        utils.PrintDeviceSummary(fake_report)
590        self.assertEqual(mock_print.call_count, 13)
591
592        # Test for fail case
593        fake_report.data = {
594            "errors": "Fail to create devices"}
595        mock_print.reset_mock()
596        utils.PrintDeviceSummary(fake_report)
597        self.assertEqual(mock_print.call_count, 3)
598
599    # pylint: disable=protected-access
600    def testIsSupportedKvm(self):
601        """Test IsSupportedKvm."""
602        self.Patch(os.path, "exists", return_value=True)
603        self.assertTrue(utils.IsSupportedKvm())
604
605        self.Patch(os.path, "exists", return_value=False)
606        self.assertFalse(utils.IsSupportedKvm())
607
608
609if __name__ == "__main__":
610    unittest.main()
611