1#!/usr/bin/env python 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Tests for acloud.internal.lib.utils.""" 17 18import collections 19import errno 20import getpass 21import grp 22import os 23import shutil 24import subprocess 25import tempfile 26import time 27import webbrowser 28 29import unittest 30 31from unittest import mock 32 33from acloud import errors 34from acloud.internal.lib import driver_test_lib 35from acloud.internal.lib import utils 36 37 38GroupInfo = collections.namedtuple("GroupInfo", [ 39 "gr_name", 40 "gr_passwd", 41 "gr_gid", 42 "gr_mem"]) 43 44# Tkinter may not be supported so mock it out. 45try: 46 import Tkinter 47except ImportError: 48 Tkinter = mock.Mock() 49 50 51class FakeTkinter: 52 """Fake implementation of Tkinter.Tk()""" 53 54 def __init__(self, width=None, height=None): 55 self.width = width 56 self.height = height 57 58 # pylint: disable=invalid-name 59 def winfo_screenheight(self): 60 """Return the screen height.""" 61 return self.height 62 63 # pylint: disable=invalid-name 64 def winfo_screenwidth(self): 65 """Return the screen width.""" 66 return self.width 67 68 69# pylint: disable=too-many-public-methods 70class UtilsTest(driver_test_lib.BaseDriverTest): 71 """Test Utils.""" 72 73 def TestTempDirSuccess(self): 74 """Test create a temp dir.""" 75 self.Patch(os, "chmod") 76 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 77 self.Patch(shutil, "rmtree") 78 with utils.TempDir(): 79 pass 80 # Verify. 81 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 82 shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member 83 84 def TestTempDirExceptionRaised(self): 85 """Test create a temp dir and exception is raised within with-clause.""" 86 self.Patch(os, "chmod") 87 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 88 self.Patch(shutil, "rmtree") 89 90 class ExpectedException(Exception): 91 """Expected exception.""" 92 93 def _Call(): 94 with utils.TempDir(): 95 raise ExpectedException("Expected exception.") 96 97 # Verify. ExpectedException should be raised. 98 self.assertRaises(ExpectedException, _Call) 99 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 100 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 101 102 def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name 103 """Test create a temp dir and dir no longer exists during deletion.""" 104 self.Patch(os, "chmod") 105 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 106 expected_error = EnvironmentError() 107 expected_error.errno = errno.ENOENT 108 self.Patch(shutil, "rmtree", side_effect=expected_error) 109 110 def _Call(): 111 with utils.TempDir(): 112 pass 113 114 # Verify no exception should be raised when rmtree raises 115 # EnvironmentError with errno.ENOENT, i.e. 116 # directory no longer exists. 117 _Call() 118 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 119 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 120 121 def testTempDirWhenDeleteEncounterError(self): 122 """Test create a temp dir and encoutered error during deletion.""" 123 self.Patch(os, "chmod") 124 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 125 expected_error = OSError("Expected OS Error") 126 self.Patch(shutil, "rmtree", side_effect=expected_error) 127 128 def _Call(): 129 with utils.TempDir(): 130 pass 131 132 # Verify OSError should be raised. 133 self.assertRaises(OSError, _Call) 134 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 135 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 136 137 def testTempDirOrininalErrorRaised(self): 138 """Test original error is raised even if tmp dir deletion failed.""" 139 self.Patch(os, "chmod") 140 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 141 expected_error = OSError("Expected OS Error") 142 self.Patch(shutil, "rmtree", side_effect=expected_error) 143 144 class ExpectedException(Exception): 145 """Expected exception.""" 146 147 def _Call(): 148 with utils.TempDir(): 149 raise ExpectedException("Expected Exception") 150 151 # Verify. 152 # ExpectedException should be raised, and OSError 153 # should not be raised. 154 self.assertRaises(ExpectedException, _Call) 155 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 156 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 157 158 def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name 159 """Test when the key pair already exists.""" 160 public_key = "/fake/public_key" 161 private_key = "/fake/private_key" 162 self.Patch(os.path, "exists", side_effect=[True, True]) 163 self.Patch(subprocess, "check_call") 164 self.Patch(os, "makedirs", return_value=True) 165 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 166 self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member 167 168 def testCreateSshKeyPairKeyAreCreated(self): 169 """Test when the key pair created.""" 170 public_key = "/fake/public_key" 171 private_key = "/fake/private_key" 172 self.Patch(os.path, "exists", return_value=False) 173 self.Patch(os, "makedirs", return_value=True) 174 self.Patch(subprocess, "check_call") 175 self.Patch(os, "rename") 176 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 177 self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member 178 subprocess.check_call.assert_called_with( #pylint: disable=no-member 179 utils.SSH_KEYGEN_CMD + 180 ["-C", getpass.getuser(), "-f", private_key], 181 stdout=mock.ANY, 182 stderr=mock.ANY) 183 184 def testCreatePublicKeyAreCreated(self): 185 """Test when the PublicKey created.""" 186 public_key = "/fake/public_key" 187 private_key = "/fake/private_key" 188 self.Patch(os.path, "exists", side_effect=[False, True, True]) 189 self.Patch(os, "makedirs", return_value=True) 190 mock_open = mock.mock_open(read_data=public_key) 191 self.Patch(subprocess, "check_output") 192 self.Patch(os, "rename") 193 with mock.patch("builtins.open", mock_open): 194 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 195 self.assertEqual(subprocess.check_output.call_count, 1) #pylint: disable=no-member 196 subprocess.check_output.assert_called_with( #pylint: disable=no-member 197 utils.SSH_KEYGEN_PUB_CMD +["-f", private_key]) 198 199 def TestRetryOnException(self): 200 """Test Retry.""" 201 202 def _IsValueError(exc): 203 return isinstance(exc, ValueError) 204 205 num_retry = 5 206 207 @utils.RetryOnException(_IsValueError, num_retry) 208 def _RaiseAndRetry(sentinel): 209 sentinel.alert() 210 raise ValueError("Fake error.") 211 212 sentinel = mock.MagicMock() 213 self.assertRaises(ValueError, _RaiseAndRetry, sentinel) 214 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 215 216 def testRetryExceptionType(self): 217 """Test RetryExceptionType function.""" 218 219 def _RaiseAndRetry(sentinel): 220 sentinel.alert() 221 raise ValueError("Fake error.") 222 223 num_retry = 5 224 sentinel = mock.MagicMock() 225 self.assertRaises( 226 ValueError, 227 utils.RetryExceptionType, (KeyError, ValueError), 228 num_retry, 229 _RaiseAndRetry, 230 0, # sleep_multiplier 231 1, # retry_backoff_factor 232 sentinel=sentinel) 233 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 234 235 def testRetry(self): 236 """Test Retry.""" 237 mock_sleep = self.Patch(time, "sleep") 238 239 def _RaiseAndRetry(sentinel): 240 sentinel.alert() 241 raise ValueError("Fake error.") 242 243 num_retry = 5 244 sentinel = mock.MagicMock() 245 self.assertRaises( 246 ValueError, 247 utils.RetryExceptionType, (ValueError, KeyError), 248 num_retry, 249 _RaiseAndRetry, 250 1, # sleep_multiplier 251 2, # retry_backoff_factor 252 sentinel=sentinel) 253 254 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 255 mock_sleep.assert_has_calls( 256 [ 257 mock.call(1), 258 mock.call(2), 259 mock.call(4), 260 mock.call(8), 261 mock.call(16) 262 ]) 263 264 @mock.patch("builtins.input") 265 def testGetAnswerFromList(self, mock_raw_input): 266 """Test GetAnswerFromList.""" 267 answer_list = ["image1.zip", "image2.zip", "image3.zip"] 268 mock_raw_input.return_value = 0 269 with self.assertRaises(SystemExit): 270 utils.GetAnswerFromList(answer_list) 271 mock_raw_input.side_effect = [1, 2, 3, 4] 272 self.assertEqual(utils.GetAnswerFromList(answer_list), 273 ["image1.zip"]) 274 self.assertEqual(utils.GetAnswerFromList(answer_list), 275 ["image2.zip"]) 276 self.assertEqual(utils.GetAnswerFromList(answer_list), 277 ["image3.zip"]) 278 self.assertEqual(utils.GetAnswerFromList(answer_list, 279 enable_choose_all=True), 280 answer_list) 281 282 @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.") 283 @mock.patch.object(Tkinter, "Tk") 284 def testCalculateVNCScreenRatio(self, mock_tk): 285 """Test Calculating the scale ratio of VNC display.""" 286 # Get scale-down ratio if screen height is smaller than AVD height. 287 mock_tk.return_value = FakeTkinter(height=800, width=1200) 288 avd_h = 1920 289 avd_w = 1080 290 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4) 291 292 # Get scale-down ratio if screen width is smaller than AVD width. 293 mock_tk.return_value = FakeTkinter(height=800, width=1200) 294 avd_h = 900 295 avd_w = 1920 296 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 297 298 # Scale ratio = 1 if screen is larger than AVD. 299 mock_tk.return_value = FakeTkinter(height=1080, width=1920) 300 avd_h = 800 301 avd_w = 1280 302 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1) 303 304 # Get the scale if ratio of width is smaller than the 305 # ratio of height. 306 mock_tk.return_value = FakeTkinter(height=1200, width=800) 307 avd_h = 1920 308 avd_w = 1080 309 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 310 311 def testCheckUserInGroups(self): 312 """Test CheckUserInGroups.""" 313 self.Patch(getpass, "getuser", return_value="user_0") 314 self.Patch(grp, "getgrall", return_value=[ 315 GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]), 316 GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])]) 317 self.Patch(grp, "getgrnam", return_value=GroupInfo( 318 "fake_group1", "passwd_1", 0, ["user_1", "user_2"])) 319 # Test Group name doesn't exist. 320 self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"])) 321 322 # Test User isn't in group. 323 self.assertFalse(utils.CheckUserInGroups(["fake_group1"])) 324 325 # Test User is in group. 326 self.Patch(getpass, "getuser", return_value="user_1") 327 self.assertTrue(utils.CheckUserInGroups(["fake_group1"])) 328 329 @mock.patch.object(utils, "CheckUserInGroups") 330 def testAddUserGroupsToCmd(self, mock_user_group): 331 """Test AddUserGroupsToCmd.""" 332 command = "test_command" 333 groups = ["group1", "group2"] 334 # Don't add user group in command 335 mock_user_group.return_value = True 336 expected_value = "test_command" 337 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 338 groups)) 339 340 # Add user group in command 341 mock_user_group.return_value = False 342 expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF" 343 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 344 groups)) 345 346 # pylint: disable=invalid-name 347 def testTimeoutException(self): 348 """Test TimeoutException.""" 349 @utils.TimeoutException(1, "should time out") 350 def functionThatWillTimeOut(): 351 """Test decorator of @utils.TimeoutException should timeout.""" 352 time.sleep(5) 353 354 self.assertRaises(errors.FunctionTimeoutError, 355 functionThatWillTimeOut) 356 357 358 def testTimeoutExceptionNoTimeout(self): 359 """Test No TimeoutException.""" 360 @utils.TimeoutException(5, "shouldn't time out") 361 def functionThatShouldNotTimeout(): 362 """Test decorator of @utils.TimeoutException shouldn't timeout.""" 363 return None 364 try: 365 functionThatShouldNotTimeout() 366 except errors.FunctionTimeoutError: 367 self.fail("shouldn't timeout") 368 369 def testEstablishSshTunnel(self): 370 """Test EstablishSshTunnel.""" 371 ip_addr = "1.1.1.1" 372 rsa_key_file = "/tmp/rsa_file" 373 port_mapping = [(1111, 2222), (8888, 9999)] 374 ssh_user = "fake_user" 375 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 376 utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user, 377 port_mapping, "-o command='shell %s %h'") 378 arg_list = ["-i", rsa_key_file, "-o", "UserKnownHostsFile=/dev/null", 379 "-o", "StrictHostKeyChecking=no", 380 "-L", "1111:127.0.0.1:2222", 381 "-L", "8888:127.0.0.1:9999", 382 "-N", "-f", 383 "-l", ssh_user, ip_addr, 384 "-o", "command=shell %s %h"] 385 mock_execute_command.assert_called_with("ssh", arg_list) 386 387 def testAutoConnectCreateSSHTunnelFail(self): 388 """Test auto connect.""" 389 fake_ip_addr = "1.1.1.1" 390 fake_rsa_key_file = "/tmp/rsa_file" 391 fake_target_vnc_port = 8888 392 target_adb_port = 9999 393 target_fastboot_port = 7777 394 ssh_user = "fake_user" 395 call_side_effect = subprocess.CalledProcessError(123, "fake", 396 "fake error") 397 result = utils.ForwardedPorts(vnc_port=None, adb_port=None, fastboot_port=None) 398 self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect) 399 self.assertEqual(result, utils.AutoConnect(fake_ip_addr, 400 fake_rsa_key_file, 401 fake_target_vnc_port, 402 target_adb_port, 403 target_fastboot_port, 404 ssh_user)) 405 406 def testAutoConnectWithExtraArgs(self): 407 """Test extra args will be the same with expanded args.""" 408 fake_ip_addr = "1.1.1.1" 409 fake_rsa_key_file = "/tmp/rsa_file" 410 fake_target_vnc_port = 8888 411 target_adb_port = 9999 412 target_fastboot_port = 7777 413 ssh_user = "fake_user" 414 fake_port = 12345 415 self.Patch(utils, "PickFreePort", return_value=fake_port) 416 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 417 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 418 extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'" 419 utils.AutoConnect(ip_addr=fake_ip_addr, 420 rsa_key_file=fake_rsa_key_file, 421 target_vnc_port=fake_target_vnc_port, 422 target_adb_port=target_adb_port, 423 target_fastboot_port=target_fastboot_port, 424 ssh_user=ssh_user, 425 client_adb_port=fake_port, 426 client_fastboot_port=fake_port, 427 extra_args_ssh_tunnel=extra_args_ssh_tunnel) 428 mock_establish_ssh_tunnel.assert_called_with( 429 fake_ip_addr, 430 fake_rsa_key_file, 431 ssh_user, 432 [utils.PortMapping(fake_port, target_adb_port), 433 utils.PortMapping(fake_port, target_fastboot_port), 434 utils.PortMapping(fake_port, fake_target_vnc_port)], 435 extra_args_ssh_tunnel) 436 mock_execute_command.assert_called_with( 437 "adb", ["connect", "127.0.0.1:12345"]) 438 439 def testEstablishWebRTCSshTunnel(self): 440 """Test establish WebRTC ssh tunnel.""" 441 fake_ip_addr = "1.1.1.1" 442 fake_rsa_key_file = "/tmp/rsa_file" 443 ssh_user = "fake_user" 444 fake_webrtc_local_port = 12345 445 self.Patch(utils, "GetWebRTCServerPort", return_value=8443) 446 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 447 fake_port_mapping = [utils.PortMapping(port, port) for port in range(15555, 15579 + 1)] + [utils.PortMapping(12345, 8443)] 448 449 utils.EstablishWebRTCSshTunnel( 450 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 451 ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port) 452 mock_establish_ssh_tunnel.assert_called_with( 453 fake_ip_addr, 454 fake_rsa_key_file, 455 ssh_user, 456 fake_port_mapping, 457 None) 458 459 mock_establish_ssh_tunnel.reset_mock() 460 extra_args_ssh_tunnel = "-o command='shell %s %h'" 461 utils.EstablishWebRTCSshTunnel( 462 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 463 ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel, 464 webrtc_local_port=fake_webrtc_local_port) 465 mock_establish_ssh_tunnel.assert_called_with( 466 fake_ip_addr, 467 fake_rsa_key_file, 468 ssh_user, 469 fake_port_mapping, 470 extra_args_ssh_tunnel) 471 472 def testGetWebRTCServerPort(self): 473 """test GetWebRTCServerPort.""" 474 fake_ip_addr = "1.1.1.1" 475 fake_rsa_key_file = "/tmp/rsa_file" 476 ssh_user = "fake_user" 477 extra_args_ssh_tunnel = "-o command='shell %s %h'" 478 fake_subprocess = mock.MagicMock() 479 fake_subprocess.returncode = 0 480 fake_subprocess.communicate = mock.MagicMock( 481 return_value=('', '')) 482 self.Patch(subprocess, "Popen", return_value=fake_subprocess) 483 self.assertEqual(1443, utils.GetWebRTCServerPort( 484 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 485 486 # Test the case that find "webrtc_operator" process. 487 webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets" 488 fake_subprocess.communicate = mock.MagicMock( 489 return_value=(webrtc_operator_process, '')) 490 self.assertEqual(8443, utils.GetWebRTCServerPort( 491 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 492 493 def testGetWebrtcPortFromSSHTunnel(self): 494 """"Test Get forwarding webrtc port from ssh tunnel.""" 495 fake_ps_output = ("/fake_ps_1 --fake arg \n" 496 "/fake_ps_2 --fake arg \n" 497 "/usr/bin/ssh -i ~/.ssh/acloud_rsa " 498 "-o UserKnownHostsFile=/dev/null " 499 "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 " 500 "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode() 501 self.Patch(subprocess, "check_output", return_value=fake_ps_output) 502 webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1") 503 self.assertEqual(12345, webrtc_ports) 504 505 @mock.patch("acloud.internal.lib.utils.subprocess") 506 def testFindRemoteFiles(self, mock_subprocess): 507 """Test FindRemoteFiles.""" 508 mock_ssh = mock.Mock() 509 510 paths = utils.FindRemoteFiles(mock_ssh, []) 511 mock_subprocess.run.assert_not_called() 512 self.assertEqual([], paths) 513 514 mock_ssh.GetBaseCmd.return_value = "mock_ssh" 515 mock_subprocess.run.return_value = mock.Mock( 516 stderr=b'stderr', stdout=b'file1\nfile2\n') 517 paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"]) 518 self.assertEqual(["file1", "file2"], paths) 519 mock_subprocess.run.assert_called_with( 520 'mock_ssh find -H dir1 dir2 -type f', 521 shell=True, capture_output=True, check=False) 522 523 mock_subprocess.run.return_value = mock.Mock(stderr=None, stdout=b'') 524 paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"]) 525 self.assertEqual([], paths) 526 527 # pylint: disable=protected-access, no-member 528 def testCleanupSSVncviwer(self): 529 """test cleanup ssvnc viewer.""" 530 fake_vnc_port = 9999 531 fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % { 532 "vnc_port": fake_vnc_port} 533 self.Patch(utils, "IsCommandRunning", return_value=True) 534 self.Patch(subprocess, "check_call", return_value=True) 535 utils.CleanupSSVncviewer(fake_vnc_port) 536 subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern]) 537 538 subprocess.check_call.call_count = 0 539 self.Patch(utils, "IsCommandRunning", return_value=False) 540 utils.CleanupSSVncviewer(fake_vnc_port) 541 subprocess.check_call.assert_not_called() 542 543 def testLaunchBrowserFromReport(self): 544 """test launch browser from report.""" 545 self.Patch(webbrowser, "open_new_tab") 546 fake_report = mock.MagicMock(data={}) 547 548 # test remote instance 549 self.Patch(os.environ, "get", return_value=True) 550 fake_report.data = { 551 "devices": [{"instance_name": "remote_cf_instance_name", 552 "ip": "192.168.1.1",},],} 553 554 utils.LaunchBrowserFromReport(fake_report) 555 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 556 webbrowser.open_new_tab.call_count = 0 557 558 # test local instance 559 fake_report.data = { 560 "devices": [{"instance_name": "local-instance1", 561 "ip": "127.0.0.1:6250",},],} 562 utils.LaunchBrowserFromReport(fake_report) 563 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 564 webbrowser.open_new_tab.call_count = 0 565 566 # verify terminal can't support launch webbrowser. 567 self.Patch(os.environ, "get", return_value=False) 568 utils.LaunchBrowserFromReport(fake_report) 569 self.assertEqual(webbrowser.open_new_tab.call_count, 0) 570 571 def testSetExecutable(self): 572 """test setting a file to be executable.""" 573 with tempfile.NamedTemporaryFile(delete=True) as temp_file: 574 utils.SetExecutable(temp_file.name) 575 self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755) 576 577 def testSetDirectoryTreeExecutable(self): 578 """test setting a file in a directory to be executable.""" 579 with tempfile.TemporaryDirectory() as temp_dir: 580 subdir = os.path.join(temp_dir, "subdir") 581 file_path = os.path.join(subdir, "file") 582 os.makedirs(subdir) 583 with open(file_path, "w"): 584 pass 585 utils.SetDirectoryTreeExecutable(temp_dir) 586 self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755) 587 588 def testSetCvdPort(self): 589 """test base_instance_num.""" 590 utils.SetCvdPorts(2) 591 self.assertEqual(utils.GetCvdPorts().adb_port, 6521) 592 self.assertEqual(utils.GetCvdPorts().fastboot_port, 7521) 593 self.assertEqual(utils.GetCvdPorts().vnc_port, 6445) 594 utils.SetCvdPorts(None) 595 596 597 @mock.patch.object(utils, "PrintColorString") 598 def testPrintDeviceSummary(self, mock_print): 599 """test PrintDeviceSummary.""" 600 fake_report = mock.MagicMock(data={}) 601 fake_report.data = { 602 "devices": [{"instance_name": "remote_cf_instance_name", 603 "ip": "192.168.1.1", 604 "device_serial": "127.0.0.1:399"},],} 605 utils.PrintDeviceSummary(fake_report) 606 self.assertEqual(mock_print.call_count, 7) 607 608 # Test for OpenWrt device case. 609 fake_report.data = { 610 "devices": [{"instance_name": "remote_cf_instance_name", 611 "ip": "192.168.1.1", 612 "ssh_command": "fake_ssh_cmd", 613 "screen_command": "fake_screen_cmd"},],} 614 mock_print.reset_mock() 615 utils.PrintDeviceSummary(fake_report) 616 self.assertEqual(mock_print.call_count, 13) 617 618 # Test for fail case 619 fake_report.data = { 620 "errors": "Fail to create devices"} 621 mock_print.reset_mock() 622 utils.PrintDeviceSummary(fake_report) 623 self.assertEqual(mock_print.call_count, 3) 624 625 # pylint: disable=protected-access 626 def testIsSupportedKvm(self): 627 """Test IsSupportedKvm.""" 628 self.Patch(os.path, "exists", return_value=True) 629 self.assertTrue(utils.IsSupportedKvm()) 630 631 self.Patch(os.path, "exists", return_value=False) 632 self.assertFalse(utils.IsSupportedKvm()) 633 634 635if __name__ == "__main__": 636 unittest.main() 637