1#!/usr/bin/env python 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Tests for acloud.internal.lib.utils.""" 17 18import collections 19import errno 20import getpass 21import grp 22import os 23import shutil 24import subprocess 25import tempfile 26import time 27import webbrowser 28 29import unittest 30 31from unittest import mock 32 33from acloud import errors 34from acloud.internal.lib import driver_test_lib 35from acloud.internal.lib import utils 36 37 38GroupInfo = collections.namedtuple("GroupInfo", [ 39 "gr_name", 40 "gr_passwd", 41 "gr_gid", 42 "gr_mem"]) 43 44# Tkinter may not be supported so mock it out. 45try: 46 import Tkinter 47except ImportError: 48 Tkinter = mock.Mock() 49 50 51class FakeTkinter: 52 """Fake implementation of Tkinter.Tk()""" 53 54 def __init__(self, width=None, height=None): 55 self.width = width 56 self.height = height 57 58 # pylint: disable=invalid-name 59 def winfo_screenheight(self): 60 """Return the screen height.""" 61 return self.height 62 63 # pylint: disable=invalid-name 64 def winfo_screenwidth(self): 65 """Return the screen width.""" 66 return self.width 67 68 69# pylint: disable=too-many-public-methods 70class UtilsTest(driver_test_lib.BaseDriverTest): 71 """Test Utils.""" 72 73 def TestTempDirSuccess(self): 74 """Test create a temp dir.""" 75 self.Patch(os, "chmod") 76 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 77 self.Patch(shutil, "rmtree") 78 with utils.TempDir(): 79 pass 80 # Verify. 81 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 82 shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member 83 84 def TestTempDirExceptionRaised(self): 85 """Test create a temp dir and exception is raised within with-clause.""" 86 self.Patch(os, "chmod") 87 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 88 self.Patch(shutil, "rmtree") 89 90 class ExpectedException(Exception): 91 """Expected exception.""" 92 93 def _Call(): 94 with utils.TempDir(): 95 raise ExpectedException("Expected exception.") 96 97 # Verify. ExpectedException should be raised. 98 self.assertRaises(ExpectedException, _Call) 99 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 100 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 101 102 def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name 103 """Test create a temp dir and dir no longer exists during deletion.""" 104 self.Patch(os, "chmod") 105 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 106 expected_error = EnvironmentError() 107 expected_error.errno = errno.ENOENT 108 self.Patch(shutil, "rmtree", side_effect=expected_error) 109 110 def _Call(): 111 with utils.TempDir(): 112 pass 113 114 # Verify no exception should be raised when rmtree raises 115 # EnvironmentError with errno.ENOENT, i.e. 116 # directory no longer exists. 117 _Call() 118 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 119 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 120 121 def testTempDirWhenDeleteEncounterError(self): 122 """Test create a temp dir and encoutered error during deletion.""" 123 self.Patch(os, "chmod") 124 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 125 expected_error = OSError("Expected OS Error") 126 self.Patch(shutil, "rmtree", side_effect=expected_error) 127 128 def _Call(): 129 with utils.TempDir(): 130 pass 131 132 # Verify OSError should be raised. 133 self.assertRaises(OSError, _Call) 134 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 135 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 136 137 def testTempDirOrininalErrorRaised(self): 138 """Test original error is raised even if tmp dir deletion failed.""" 139 self.Patch(os, "chmod") 140 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 141 expected_error = OSError("Expected OS Error") 142 self.Patch(shutil, "rmtree", side_effect=expected_error) 143 144 class ExpectedException(Exception): 145 """Expected exception.""" 146 147 def _Call(): 148 with utils.TempDir(): 149 raise ExpectedException("Expected Exception") 150 151 # Verify. 152 # ExpectedException should be raised, and OSError 153 # should not be raised. 154 self.assertRaises(ExpectedException, _Call) 155 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 156 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 157 158 def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name 159 """Test when the key pair already exists.""" 160 public_key = "/fake/public_key" 161 private_key = "/fake/private_key" 162 self.Patch(os.path, "exists", side_effect=[True, True]) 163 self.Patch(subprocess, "check_call") 164 self.Patch(os, "makedirs", return_value=True) 165 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 166 self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member 167 168 def testCreateSshKeyPairKeyAreCreated(self): 169 """Test when the key pair created.""" 170 public_key = "/fake/public_key" 171 private_key = "/fake/private_key" 172 self.Patch(os.path, "exists", return_value=False) 173 self.Patch(os, "makedirs", return_value=True) 174 self.Patch(subprocess, "check_call") 175 self.Patch(os, "rename") 176 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 177 self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member 178 subprocess.check_call.assert_called_with( #pylint: disable=no-member 179 utils.SSH_KEYGEN_CMD + 180 ["-C", getpass.getuser(), "-f", private_key], 181 stdout=mock.ANY, 182 stderr=mock.ANY) 183 184 def testCreatePublicKeyAreCreated(self): 185 """Test when the PublicKey created.""" 186 public_key = "/fake/public_key" 187 private_key = "/fake/private_key" 188 self.Patch(os.path, "exists", side_effect=[False, True, True]) 189 self.Patch(os, "makedirs", return_value=True) 190 mock_open = mock.mock_open(read_data=public_key) 191 self.Patch(subprocess, "check_output") 192 self.Patch(os, "rename") 193 with mock.patch("builtins.open", mock_open): 194 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 195 self.assertEqual(subprocess.check_output.call_count, 1) #pylint: disable=no-member 196 subprocess.check_output.assert_called_with( #pylint: disable=no-member 197 utils.SSH_KEYGEN_PUB_CMD +["-f", private_key]) 198 199 def TestRetryOnException(self): 200 """Test Retry.""" 201 202 def _IsValueError(exc): 203 return isinstance(exc, ValueError) 204 205 num_retry = 5 206 207 @utils.RetryOnException(_IsValueError, num_retry) 208 def _RaiseAndRetry(sentinel): 209 sentinel.alert() 210 raise ValueError("Fake error.") 211 212 sentinel = mock.MagicMock() 213 self.assertRaises(ValueError, _RaiseAndRetry, sentinel) 214 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 215 216 def testRetryExceptionType(self): 217 """Test RetryExceptionType function.""" 218 219 def _RaiseAndRetry(sentinel): 220 sentinel.alert() 221 raise ValueError("Fake error.") 222 223 num_retry = 5 224 sentinel = mock.MagicMock() 225 self.assertRaises( 226 ValueError, 227 utils.RetryExceptionType, (KeyError, ValueError), 228 num_retry, 229 _RaiseAndRetry, 230 0, # sleep_multiplier 231 1, # retry_backoff_factor 232 sentinel=sentinel) 233 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 234 235 def testRetry(self): 236 """Test Retry.""" 237 mock_sleep = self.Patch(time, "sleep") 238 239 def _RaiseAndRetry(sentinel): 240 sentinel.alert() 241 raise ValueError("Fake error.") 242 243 num_retry = 5 244 sentinel = mock.MagicMock() 245 self.assertRaises( 246 ValueError, 247 utils.RetryExceptionType, (ValueError, KeyError), 248 num_retry, 249 _RaiseAndRetry, 250 1, # sleep_multiplier 251 2, # retry_backoff_factor 252 sentinel=sentinel) 253 254 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 255 mock_sleep.assert_has_calls( 256 [ 257 mock.call(1), 258 mock.call(2), 259 mock.call(4), 260 mock.call(8), 261 mock.call(16) 262 ]) 263 264 @mock.patch("builtins.input") 265 def testGetAnswerFromList(self, mock_raw_input): 266 """Test GetAnswerFromList.""" 267 answer_list = ["image1.zip", "image2.zip", "image3.zip"] 268 mock_raw_input.return_value = 0 269 with self.assertRaises(SystemExit): 270 utils.GetAnswerFromList(answer_list) 271 mock_raw_input.side_effect = [1, 2, 3, 4] 272 self.assertEqual(utils.GetAnswerFromList(answer_list), 273 ["image1.zip"]) 274 self.assertEqual(utils.GetAnswerFromList(answer_list), 275 ["image2.zip"]) 276 self.assertEqual(utils.GetAnswerFromList(answer_list), 277 ["image3.zip"]) 278 self.assertEqual(utils.GetAnswerFromList(answer_list, 279 enable_choose_all=True), 280 answer_list) 281 282 @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.") 283 @mock.patch.object(Tkinter, "Tk") 284 def testCalculateVNCScreenRatio(self, mock_tk): 285 """Test Calculating the scale ratio of VNC display.""" 286 # Get scale-down ratio if screen height is smaller than AVD height. 287 mock_tk.return_value = FakeTkinter(height=800, width=1200) 288 avd_h = 1920 289 avd_w = 1080 290 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4) 291 292 # Get scale-down ratio if screen width is smaller than AVD width. 293 mock_tk.return_value = FakeTkinter(height=800, width=1200) 294 avd_h = 900 295 avd_w = 1920 296 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 297 298 # Scale ratio = 1 if screen is larger than AVD. 299 mock_tk.return_value = FakeTkinter(height=1080, width=1920) 300 avd_h = 800 301 avd_w = 1280 302 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1) 303 304 # Get the scale if ratio of width is smaller than the 305 # ratio of height. 306 mock_tk.return_value = FakeTkinter(height=1200, width=800) 307 avd_h = 1920 308 avd_w = 1080 309 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 310 311 def testCheckUserInGroups(self): 312 """Test CheckUserInGroups.""" 313 self.Patch(getpass, "getuser", return_value="user_0") 314 self.Patch(grp, "getgrall", return_value=[ 315 GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]), 316 GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])]) 317 self.Patch(grp, "getgrnam", return_value=GroupInfo( 318 "fake_group1", "passwd_1", 0, ["user_1", "user_2"])) 319 # Test Group name doesn't exist. 320 self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"])) 321 322 # Test User isn't in group. 323 self.assertFalse(utils.CheckUserInGroups(["fake_group1"])) 324 325 # Test User is in group. 326 self.Patch(getpass, "getuser", return_value="user_1") 327 self.assertTrue(utils.CheckUserInGroups(["fake_group1"])) 328 329 @mock.patch.object(utils, "CheckUserInGroups") 330 def testAddUserGroupsToCmd(self, mock_user_group): 331 """Test AddUserGroupsToCmd.""" 332 command = "test_command" 333 groups = ["group1", "group2"] 334 # Don't add user group in command 335 mock_user_group.return_value = True 336 expected_value = "test_command" 337 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 338 groups)) 339 340 # Add user group in command 341 mock_user_group.return_value = False 342 expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF" 343 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 344 groups)) 345 346 # pylint: disable=invalid-name 347 def testTimeoutException(self): 348 """Test TimeoutException.""" 349 @utils.TimeoutException(1, "should time out") 350 def functionThatWillTimeOut(): 351 """Test decorator of @utils.TimeoutException should timeout.""" 352 time.sleep(5) 353 354 self.assertRaises(errors.FunctionTimeoutError, 355 functionThatWillTimeOut) 356 357 358 def testTimeoutExceptionNoTimeout(self): 359 """Test No TimeoutException.""" 360 @utils.TimeoutException(5, "shouldn't time out") 361 def functionThatShouldNotTimeout(): 362 """Test decorator of @utils.TimeoutException shouldn't timeout.""" 363 return None 364 try: 365 functionThatShouldNotTimeout() 366 except errors.FunctionTimeoutError: 367 self.fail("shouldn't timeout") 368 369 def testEstablishSshTunnel(self): 370 """Test EstablishSshTunnel.""" 371 ip_addr = "1.1.1.1" 372 rsa_key_file = "/tmp/rsa_file" 373 port_mapping = [(1111, 2222), (8888, 9999)] 374 ssh_user = "fake_user" 375 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 376 utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user, 377 port_mapping, "-o command='shell %s %h'") 378 arg_list = ["-i", rsa_key_file, "-o", "UserKnownHostsFile=/dev/null", 379 "-o", "StrictHostKeyChecking=no", 380 "-L", "1111:127.0.0.1:2222", 381 "-L", "8888:127.0.0.1:9999", 382 "-N", "-f", 383 "-l", ssh_user, ip_addr, 384 "-o", "command=shell %s %h"] 385 mock_execute_command.assert_called_with("ssh", arg_list) 386 387 def testAutoConnectCreateSSHTunnelFail(self): 388 """Test auto connect.""" 389 fake_ip_addr = "1.1.1.1" 390 fake_rsa_key_file = "/tmp/rsa_file" 391 fake_target_vnc_port = 8888 392 target_adb_port = 9999 393 ssh_user = "fake_user" 394 call_side_effect = subprocess.CalledProcessError(123, "fake", 395 "fake error") 396 result = utils.ForwardedPorts(vnc_port=None, adb_port=None) 397 self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect) 398 self.assertEqual(result, utils.AutoConnect(fake_ip_addr, 399 fake_rsa_key_file, 400 fake_target_vnc_port, 401 target_adb_port, 402 ssh_user)) 403 404 def testAutoConnectWithExtraArgs(self): 405 """Test extra args will be the same with expanded args.""" 406 fake_ip_addr = "1.1.1.1" 407 fake_rsa_key_file = "/tmp/rsa_file" 408 fake_target_vnc_port = 8888 409 target_adb_port = 9999 410 ssh_user = "fake_user" 411 fake_port = 12345 412 self.Patch(utils, "PickFreePort", return_value=fake_port) 413 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 414 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 415 extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'" 416 utils.AutoConnect(ip_addr=fake_ip_addr, 417 rsa_key_file=fake_rsa_key_file, 418 target_vnc_port=fake_target_vnc_port, 419 target_adb_port=target_adb_port, 420 ssh_user=ssh_user, 421 client_adb_port=fake_port, 422 extra_args_ssh_tunnel=extra_args_ssh_tunnel) 423 mock_establish_ssh_tunnel.assert_called_with( 424 fake_ip_addr, 425 fake_rsa_key_file, 426 ssh_user, 427 [utils.PortMapping(fake_port, target_adb_port), 428 utils.PortMapping(fake_port, fake_target_vnc_port)], 429 extra_args_ssh_tunnel) 430 mock_execute_command.assert_called_with( 431 "adb", ["connect", "127.0.0.1:12345"]) 432 433 def testEstablishWebRTCSshTunnel(self): 434 """Test establish WebRTC ssh tunnel.""" 435 fake_ip_addr = "1.1.1.1" 436 fake_rsa_key_file = "/tmp/rsa_file" 437 ssh_user = "fake_user" 438 fake_webrtc_local_port = 12345 439 self.Patch(utils, "GetWebRTCServerPort", return_value=8443) 440 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 441 fake_port_mapping = [utils.PortMapping(15550, 15550), 442 utils.PortMapping(15551, 15551), 443 utils.PortMapping(15552, 15552), 444 utils.PortMapping(12345, 8443)] 445 446 utils.EstablishWebRTCSshTunnel( 447 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 448 ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port) 449 mock_establish_ssh_tunnel.assert_called_with( 450 fake_ip_addr, 451 fake_rsa_key_file, 452 ssh_user, 453 fake_port_mapping, 454 None) 455 456 mock_establish_ssh_tunnel.reset_mock() 457 extra_args_ssh_tunnel = "-o command='shell %s %h'" 458 utils.EstablishWebRTCSshTunnel( 459 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 460 ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel, 461 webrtc_local_port=fake_webrtc_local_port) 462 mock_establish_ssh_tunnel.assert_called_with( 463 fake_ip_addr, 464 fake_rsa_key_file, 465 ssh_user, 466 fake_port_mapping, 467 extra_args_ssh_tunnel) 468 469 def testGetWebRTCServerPort(self): 470 """test GetWebRTCServerPort.""" 471 fake_ip_addr = "1.1.1.1" 472 fake_rsa_key_file = "/tmp/rsa_file" 473 ssh_user = "fake_user" 474 extra_args_ssh_tunnel = "-o command='shell %s %h'" 475 fake_subprocess = mock.MagicMock() 476 fake_subprocess.returncode = 0 477 fake_subprocess.communicate = mock.MagicMock( 478 return_value=('', '')) 479 self.Patch(subprocess, "Popen", return_value=fake_subprocess) 480 self.assertEqual(1443, utils.GetWebRTCServerPort( 481 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 482 483 # Test the case that find "webrtc_operator" process. 484 webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets" 485 fake_subprocess.communicate = mock.MagicMock( 486 return_value=(webrtc_operator_process, '')) 487 self.assertEqual(8443, utils.GetWebRTCServerPort( 488 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 489 490 def testGetWebrtcPortFromSSHTunnel(self): 491 """"Test Get forwarding webrtc port from ssh tunnel.""" 492 fake_ps_output = ("/fake_ps_1 --fake arg \n" 493 "/fake_ps_2 --fake arg \n" 494 "/usr/bin/ssh -i ~/.ssh/acloud_rsa " 495 "-o UserKnownHostsFile=/dev/null " 496 "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 " 497 "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode() 498 self.Patch(subprocess, "check_output", return_value=fake_ps_output) 499 webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1") 500 self.assertEqual(12345, webrtc_ports) 501 502 # pylint: disable=protected-access, no-member 503 def testCleanupSSVncviwer(self): 504 """test cleanup ssvnc viewer.""" 505 fake_vnc_port = 9999 506 fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % { 507 "vnc_port": fake_vnc_port} 508 self.Patch(utils, "IsCommandRunning", return_value=True) 509 self.Patch(subprocess, "check_call", return_value=True) 510 utils.CleanupSSVncviewer(fake_vnc_port) 511 subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern]) 512 513 subprocess.check_call.call_count = 0 514 self.Patch(utils, "IsCommandRunning", return_value=False) 515 utils.CleanupSSVncviewer(fake_vnc_port) 516 subprocess.check_call.assert_not_called() 517 518 def testLaunchBrowserFromReport(self): 519 """test launch browser from report.""" 520 self.Patch(webbrowser, "open_new_tab") 521 fake_report = mock.MagicMock(data={}) 522 523 # test remote instance 524 self.Patch(os.environ, "get", return_value=True) 525 fake_report.data = { 526 "devices": [{"instance_name": "remote_cf_instance_name", 527 "ip": "192.168.1.1",},],} 528 529 utils.LaunchBrowserFromReport(fake_report) 530 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 531 webbrowser.open_new_tab.call_count = 0 532 533 # test local instance 534 fake_report.data = { 535 "devices": [{"instance_name": "local-instance1", 536 "ip": "127.0.0.1:6250",},],} 537 utils.LaunchBrowserFromReport(fake_report) 538 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 539 webbrowser.open_new_tab.call_count = 0 540 541 # verify terminal can't support launch webbrowser. 542 self.Patch(os.environ, "get", return_value=False) 543 utils.LaunchBrowserFromReport(fake_report) 544 self.assertEqual(webbrowser.open_new_tab.call_count, 0) 545 546 def testSetExecutable(self): 547 """test setting a file to be executable.""" 548 with tempfile.NamedTemporaryFile(delete=True) as temp_file: 549 utils.SetExecutable(temp_file.name) 550 self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755) 551 552 def testSetDirectoryTreeExecutable(self): 553 """test setting a file in a directory to be executable.""" 554 with tempfile.TemporaryDirectory() as temp_dir: 555 subdir = os.path.join(temp_dir, "subdir") 556 file_path = os.path.join(subdir, "file") 557 os.makedirs(subdir) 558 with open(file_path, "w"): 559 pass 560 utils.SetDirectoryTreeExecutable(temp_dir) 561 self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755) 562 563 def testSetCvdPort(self): 564 """test base_instance_num.""" 565 utils.SetCvdPorts(2) 566 self.assertEqual(utils.GetCvdPorts().adb_port, 6521) 567 self.assertEqual(utils.GetCvdPorts().vnc_port, 6445) 568 utils.SetCvdPorts(None) 569 570 571 @mock.patch.object(utils, "PrintColorString") 572 def testPrintDeviceSummary(self, mock_print): 573 """test PrintDeviceSummary.""" 574 fake_report = mock.MagicMock(data={}) 575 fake_report.data = { 576 "devices": [{"instance_name": "remote_cf_instance_name", 577 "ip": "192.168.1.1", 578 "device_serial": "127.0.0.1:399"},],} 579 utils.PrintDeviceSummary(fake_report) 580 self.assertEqual(mock_print.call_count, 7) 581 582 # Test for OpenWrt device case. 583 fake_report.data = { 584 "devices": [{"instance_name": "remote_cf_instance_name", 585 "ip": "192.168.1.1", 586 "ssh_command": "fake_ssh_cmd", 587 "screen_command": "fake_screen_cmd"},],} 588 mock_print.reset_mock() 589 utils.PrintDeviceSummary(fake_report) 590 self.assertEqual(mock_print.call_count, 13) 591 592 # Test for fail case 593 fake_report.data = { 594 "errors": "Fail to create devices"} 595 mock_print.reset_mock() 596 utils.PrintDeviceSummary(fake_report) 597 self.assertEqual(mock_print.call_count, 3) 598 599 # pylint: disable=protected-access 600 def testIsSupportedKvm(self): 601 """Test IsSupportedKvm.""" 602 self.Patch(os.path, "exists", return_value=True) 603 self.assertTrue(utils.IsSupportedKvm()) 604 605 self.Patch(os.path, "exists", return_value=False) 606 self.assertFalse(utils.IsSupportedKvm()) 607 608 609if __name__ == "__main__": 610 unittest.main() 611