1#!/usr/bin/env python 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Tests for acloud.internal.lib.utils.""" 17 18import collections 19import errno 20import getpass 21import grp 22import os 23import shutil 24import subprocess 25import tempfile 26import time 27import webbrowser 28 29import unittest 30 31from unittest import mock 32 33from acloud import errors 34from acloud.internal.lib import driver_test_lib 35from acloud.internal.lib import utils 36 37 38GroupInfo = collections.namedtuple("GroupInfo", [ 39 "gr_name", 40 "gr_passwd", 41 "gr_gid", 42 "gr_mem"]) 43 44# Tkinter may not be supported so mock it out. 45try: 46 import Tkinter 47except ImportError: 48 Tkinter = mock.Mock() 49 50 51class FakeTkinter: 52 """Fake implementation of Tkinter.Tk()""" 53 54 def __init__(self, width=None, height=None): 55 self.width = width 56 self.height = height 57 58 # pylint: disable=invalid-name 59 def winfo_screenheight(self): 60 """Return the screen height.""" 61 return self.height 62 63 # pylint: disable=invalid-name 64 def winfo_screenwidth(self): 65 """Return the screen width.""" 66 return self.width 67 68 69# pylint: disable=too-many-public-methods 70class UtilsTest(driver_test_lib.BaseDriverTest): 71 """Test Utils.""" 72 73 def TestTempDirSuccess(self): 74 """Test create a temp dir.""" 75 self.Patch(os, "chmod") 76 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 77 self.Patch(shutil, "rmtree") 78 with utils.TempDir(): 79 pass 80 # Verify. 81 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 82 shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member 83 84 def TestTempDirExceptionRaised(self): 85 """Test create a temp dir and exception is raised within with-clause.""" 86 self.Patch(os, "chmod") 87 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 88 self.Patch(shutil, "rmtree") 89 90 class ExpectedException(Exception): 91 """Expected exception.""" 92 93 def _Call(): 94 with utils.TempDir(): 95 raise ExpectedException("Expected exception.") 96 97 # Verify. ExpectedException should be raised. 98 self.assertRaises(ExpectedException, _Call) 99 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member 100 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 101 102 def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name 103 """Test create a temp dir and dir no longer exists during deletion.""" 104 self.Patch(os, "chmod") 105 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 106 expected_error = EnvironmentError() 107 expected_error.errno = errno.ENOENT 108 self.Patch(shutil, "rmtree", side_effect=expected_error) 109 110 def _Call(): 111 with utils.TempDir(): 112 pass 113 114 # Verify no exception should be raised when rmtree raises 115 # EnvironmentError with errno.ENOENT, i.e. 116 # directory no longer exists. 117 _Call() 118 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 119 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 120 121 def testTempDirWhenDeleteEncounterError(self): 122 """Test create a temp dir and encoutered error during deletion.""" 123 self.Patch(os, "chmod") 124 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 125 expected_error = OSError("Expected OS Error") 126 self.Patch(shutil, "rmtree", side_effect=expected_error) 127 128 def _Call(): 129 with utils.TempDir(): 130 pass 131 132 # Verify OSError should be raised. 133 self.assertRaises(OSError, _Call) 134 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 135 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 136 137 def testTempDirOrininalErrorRaised(self): 138 """Test original error is raised even if tmp dir deletion failed.""" 139 self.Patch(os, "chmod") 140 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir") 141 expected_error = OSError("Expected OS Error") 142 self.Patch(shutil, "rmtree", side_effect=expected_error) 143 144 class ExpectedException(Exception): 145 """Expected exception.""" 146 147 def _Call(): 148 with utils.TempDir(): 149 raise ExpectedException("Expected Exception") 150 151 # Verify. 152 # ExpectedException should be raised, and OSError 153 # should not be raised. 154 self.assertRaises(ExpectedException, _Call) 155 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member 156 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member 157 158 def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name 159 """Test when the key pair already exists.""" 160 public_key = "/fake/public_key" 161 private_key = "/fake/private_key" 162 self.Patch(os.path, "exists", side_effect=[True, True]) 163 self.Patch(subprocess, "check_call") 164 self.Patch(os, "makedirs", return_value=True) 165 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 166 self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member 167 168 def testCreateSshKeyPairKeyAreCreated(self): 169 """Test when the key pair created.""" 170 public_key = "/fake/public_key" 171 private_key = "/fake/private_key" 172 self.Patch(os.path, "exists", return_value=False) 173 self.Patch(os, "makedirs", return_value=True) 174 self.Patch(subprocess, "check_call") 175 self.Patch(os, "rename") 176 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 177 self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member 178 subprocess.check_call.assert_called_with( #pylint: disable=no-member 179 utils.SSH_KEYGEN_CMD + 180 ["-C", getpass.getuser(), "-f", private_key], 181 stdout=mock.ANY, 182 stderr=mock.ANY) 183 184 def testCreatePublicKeyAreCreated(self): 185 """Test when the PublicKey created.""" 186 public_key = "/fake/public_key" 187 private_key = "/fake/private_key" 188 self.Patch(os.path, "exists", side_effect=[False, True, True]) 189 self.Patch(os, "makedirs", return_value=True) 190 mock_open = mock.mock_open(read_data=public_key) 191 self.Patch(subprocess, "check_output") 192 self.Patch(os, "rename") 193 with mock.patch("builtins.open", mock_open): 194 utils.CreateSshKeyPairIfNotExist(private_key, public_key) 195 self.assertEqual(subprocess.check_output.call_count, 1) #pylint: disable=no-member 196 subprocess.check_output.assert_called_with( #pylint: disable=no-member 197 utils.SSH_KEYGEN_PUB_CMD +["-f", private_key]) 198 199 def TestRetryOnException(self): 200 """Test Retry.""" 201 202 def _IsValueError(exc): 203 return isinstance(exc, ValueError) 204 205 num_retry = 5 206 207 @utils.RetryOnException(_IsValueError, num_retry) 208 def _RaiseAndRetry(sentinel): 209 sentinel.alert() 210 raise ValueError("Fake error.") 211 212 sentinel = mock.MagicMock() 213 self.assertRaises(ValueError, _RaiseAndRetry, sentinel) 214 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 215 216 def testRetryExceptionType(self): 217 """Test RetryExceptionType function.""" 218 219 def _RaiseAndRetry(sentinel): 220 sentinel.alert() 221 raise ValueError("Fake error.") 222 223 num_retry = 5 224 sentinel = mock.MagicMock() 225 self.assertRaises( 226 ValueError, 227 utils.RetryExceptionType, (KeyError, ValueError), 228 num_retry, 229 _RaiseAndRetry, 230 0, # sleep_multiplier 231 1, # retry_backoff_factor 232 sentinel=sentinel) 233 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 234 235 def testRetry(self): 236 """Test Retry.""" 237 mock_sleep = self.Patch(time, "sleep") 238 239 def _RaiseAndRetry(sentinel): 240 sentinel.alert() 241 raise ValueError("Fake error.") 242 243 num_retry = 5 244 sentinel = mock.MagicMock() 245 self.assertRaises( 246 ValueError, 247 utils.RetryExceptionType, (ValueError, KeyError), 248 num_retry, 249 _RaiseAndRetry, 250 1, # sleep_multiplier 251 2, # retry_backoff_factor 252 sentinel=sentinel) 253 254 self.assertEqual(1 + num_retry, sentinel.alert.call_count) 255 mock_sleep.assert_has_calls( 256 [ 257 mock.call(1), 258 mock.call(2), 259 mock.call(4), 260 mock.call(8), 261 mock.call(16) 262 ]) 263 264 @mock.patch("builtins.input") 265 def testGetAnswerFromList(self, mock_raw_input): 266 """Test GetAnswerFromList.""" 267 answer_list = ["image1.zip", "image2.zip", "image3.zip"] 268 mock_raw_input.return_value = 0 269 with self.assertRaises(SystemExit): 270 utils.GetAnswerFromList(answer_list) 271 mock_raw_input.side_effect = [1, 2, 3, 4] 272 self.assertEqual(utils.GetAnswerFromList(answer_list), 273 ["image1.zip"]) 274 self.assertEqual(utils.GetAnswerFromList(answer_list), 275 ["image2.zip"]) 276 self.assertEqual(utils.GetAnswerFromList(answer_list), 277 ["image3.zip"]) 278 self.assertEqual(utils.GetAnswerFromList(answer_list, 279 enable_choose_all=True), 280 answer_list) 281 282 @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.") 283 @mock.patch.object(Tkinter, "Tk") 284 def testCalculateVNCScreenRatio(self, mock_tk): 285 """Test Calculating the scale ratio of VNC display.""" 286 # Get scale-down ratio if screen height is smaller than AVD height. 287 mock_tk.return_value = FakeTkinter(height=800, width=1200) 288 avd_h = 1920 289 avd_w = 1080 290 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4) 291 292 # Get scale-down ratio if screen width is smaller than AVD width. 293 mock_tk.return_value = FakeTkinter(height=800, width=1200) 294 avd_h = 900 295 avd_w = 1920 296 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 297 298 # Scale ratio = 1 if screen is larger than AVD. 299 mock_tk.return_value = FakeTkinter(height=1080, width=1920) 300 avd_h = 800 301 avd_w = 1280 302 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1) 303 304 # Get the scale if ratio of width is smaller than the 305 # ratio of height. 306 mock_tk.return_value = FakeTkinter(height=1200, width=800) 307 avd_h = 1920 308 avd_w = 1080 309 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6) 310 311 def testCheckUserInGroups(self): 312 """Test CheckUserInGroups.""" 313 self.Patch(getpass, "getuser", return_value="user_0") 314 self.Patch(grp, "getgrall", return_value=[ 315 GroupInfo("fake_group1", "passwd_1", 0, ["user_1", "user_2"]), 316 GroupInfo("fake_group2", "passwd_2", 1, ["user_1", "user_2"])]) 317 self.Patch(grp, "getgrnam", return_value=GroupInfo( 318 "fake_group1", "passwd_1", 0, ["user_1", "user_2"])) 319 # Test Group name doesn't exist. 320 self.assertFalse(utils.CheckUserInGroups(["Non_exist_group"])) 321 322 # Test User isn't in group. 323 self.assertFalse(utils.CheckUserInGroups(["fake_group1"])) 324 325 # Test User is in group. 326 self.Patch(getpass, "getuser", return_value="user_1") 327 self.assertTrue(utils.CheckUserInGroups(["fake_group1"])) 328 329 @mock.patch.object(utils, "CheckUserInGroups") 330 def testAddUserGroupsToCmd(self, mock_user_group): 331 """Test AddUserGroupsToCmd.""" 332 command = "test_command" 333 groups = ["group1", "group2"] 334 # Don't add user group in command 335 mock_user_group.return_value = True 336 expected_value = "test_command" 337 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 338 groups)) 339 340 # Add user group in command 341 mock_user_group.return_value = False 342 expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF" 343 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command, 344 groups)) 345 346 # pylint: disable=invalid-name 347 def testTimeoutException(self): 348 """Test TimeoutException.""" 349 @utils.TimeoutException(1, "should time out") 350 def functionThatWillTimeOut(): 351 """Test decorator of @utils.TimeoutException should timeout.""" 352 time.sleep(5) 353 354 self.assertRaises(errors.FunctionTimeoutError, 355 functionThatWillTimeOut) 356 357 358 def testTimeoutExceptionNoTimeout(self): 359 """Test No TimeoutException.""" 360 @utils.TimeoutException(5, "shouldn't time out") 361 def functionThatShouldNotTimeout(): 362 """Test decorator of @utils.TimeoutException shouldn't timeout.""" 363 return None 364 try: 365 functionThatShouldNotTimeout() 366 except errors.FunctionTimeoutError: 367 self.fail("shouldn't timeout") 368 369 def testEstablishSshTunnel(self): 370 """Test EstablishSshTunnel.""" 371 ip_addr = "1.1.1.1" 372 rsa_key_file = "/tmp/rsa_file" 373 port_mapping = [(1111, 2222), (8888, 9999)] 374 ssh_user = "fake_user" 375 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 376 utils.EstablishSshTunnel(ip_addr, rsa_key_file, ssh_user, 377 port_mapping, "-o command='shell %s %h'") 378 arg_list = ["-i", rsa_key_file, "-o", "ControlPath=none", 379 "-o", "UserKnownHostsFile=/dev/null", 380 "-o", "StrictHostKeyChecking=no", 381 "-L", "1111:127.0.0.1:2222", 382 "-L", "8888:127.0.0.1:9999", 383 "-N", "-f", 384 "-l", ssh_user, ip_addr, 385 "-o", "command=shell %s %h"] 386 mock_execute_command.assert_called_with("ssh", arg_list) 387 388 def testAutoConnectCreateSSHTunnelFail(self): 389 """Test auto connect.""" 390 fake_ip_addr = "1.1.1.1" 391 fake_rsa_key_file = "/tmp/rsa_file" 392 fake_target_vnc_port = 8888 393 target_adb_port = 9999 394 ssh_user = "fake_user" 395 call_side_effect = subprocess.CalledProcessError(123, "fake", 396 "fake error") 397 result = utils.ForwardedPorts(vnc_port=None, adb_port=None) 398 self.Patch(utils, "EstablishSshTunnel", side_effect=call_side_effect) 399 self.assertEqual(result, utils.AutoConnect(fake_ip_addr, 400 fake_rsa_key_file, 401 fake_target_vnc_port, 402 target_adb_port, 403 ssh_user)) 404 405 def testAutoConnectWithExtraArgs(self): 406 """Test extra args will be the same with expanded args.""" 407 fake_ip_addr = "1.1.1.1" 408 fake_rsa_key_file = "/tmp/rsa_file" 409 fake_target_vnc_port = 8888 410 target_adb_port = 9999 411 ssh_user = "fake_user" 412 fake_port = 12345 413 self.Patch(utils, "PickFreePort", return_value=fake_port) 414 mock_execute_command = self.Patch(utils, "_ExecuteCommand") 415 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 416 extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'" 417 utils.AutoConnect(ip_addr=fake_ip_addr, 418 rsa_key_file=fake_rsa_key_file, 419 target_vnc_port=fake_target_vnc_port, 420 target_adb_port=target_adb_port, 421 ssh_user=ssh_user, 422 client_adb_port=fake_port, 423 extra_args_ssh_tunnel=extra_args_ssh_tunnel) 424 mock_establish_ssh_tunnel.assert_called_with( 425 fake_ip_addr, 426 fake_rsa_key_file, 427 ssh_user, 428 [utils.PortMapping(fake_port, target_adb_port), 429 utils.PortMapping(fake_port, fake_target_vnc_port)], 430 extra_args_ssh_tunnel) 431 mock_execute_command.assert_called_with( 432 "adb", ["connect", "127.0.0.1:12345"]) 433 434 def testEstablishWebRTCSshTunnel(self): 435 """Test establish WebRTC ssh tunnel.""" 436 fake_ip_addr = "1.1.1.1" 437 fake_rsa_key_file = "/tmp/rsa_file" 438 ssh_user = "fake_user" 439 fake_webrtc_local_port = 12345 440 self.Patch(utils, "GetWebRTCServerPort", return_value=8443) 441 mock_establish_ssh_tunnel = self.Patch(utils, "EstablishSshTunnel") 442 fake_port_mapping = [utils.PortMapping(port, port) for port in range(15555, 15579 + 1)] + [utils.PortMapping(12345, 8443)] 443 444 utils.EstablishWebRTCSshTunnel( 445 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 446 ssh_user=ssh_user, webrtc_local_port=fake_webrtc_local_port) 447 mock_establish_ssh_tunnel.assert_called_with( 448 fake_ip_addr, 449 fake_rsa_key_file, 450 ssh_user, 451 fake_port_mapping, 452 None) 453 454 mock_establish_ssh_tunnel.reset_mock() 455 extra_args_ssh_tunnel = "-o command='shell %s %h'" 456 utils.EstablishWebRTCSshTunnel( 457 ip_addr=fake_ip_addr, rsa_key_file=fake_rsa_key_file, 458 ssh_user=ssh_user, extra_args_ssh_tunnel=extra_args_ssh_tunnel, 459 webrtc_local_port=fake_webrtc_local_port) 460 mock_establish_ssh_tunnel.assert_called_with( 461 fake_ip_addr, 462 fake_rsa_key_file, 463 ssh_user, 464 fake_port_mapping, 465 extra_args_ssh_tunnel) 466 467 def testGetWebRTCServerPort(self): 468 """test GetWebRTCServerPort.""" 469 fake_ip_addr = "1.1.1.1" 470 fake_rsa_key_file = "/tmp/rsa_file" 471 ssh_user = "fake_user" 472 extra_args_ssh_tunnel = "-o command='shell %s %h'" 473 fake_subprocess = mock.MagicMock() 474 fake_subprocess.returncode = 0 475 fake_subprocess.communicate = mock.MagicMock( 476 return_value=('', '')) 477 self.Patch(subprocess, "Popen", return_value=fake_subprocess) 478 self.assertEqual(1443, utils.GetWebRTCServerPort( 479 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 480 481 # Test the case that find "webrtc_operator" process. 482 webrtc_operator_process = "11:45 bin/webrtc_operator -assets_dir=assets" 483 fake_subprocess.communicate = mock.MagicMock( 484 return_value=(webrtc_operator_process, '')) 485 self.assertEqual(8443, utils.GetWebRTCServerPort( 486 fake_ip_addr, fake_rsa_key_file,ssh_user,extra_args_ssh_tunnel)) 487 488 def testGetWebrtcPortFromSSHTunnel(self): 489 """"Test Get forwarding webrtc port from ssh tunnel.""" 490 fake_ps_output = ("/fake_ps_1 --fake arg \n" 491 "/fake_ps_2 --fake arg \n" 492 "/usr/bin/ssh -i ~/.ssh/acloud_rsa -o ControlPath=none " 493 "-o UserKnownHostsFile=/dev/null " 494 "-o StrictHostKeyChecking=no -L 15551:127.0.0.1:15551 " 495 "-L 12345:127.0.0.1:8443 -N -f -l user 1.1.1.1").encode() 496 self.Patch(subprocess, "check_output", return_value=fake_ps_output) 497 webrtc_ports = utils.GetWebrtcPortFromSSHTunnel("1.1.1.1") 498 self.assertEqual(12345, webrtc_ports) 499 500 @mock.patch("acloud.internal.lib.utils.subprocess") 501 def testFindRemoteFiles(self, mock_subprocess): 502 """Test FindRemoteFiles.""" 503 mock_ssh = mock.Mock() 504 505 paths = utils.FindRemoteFiles(mock_ssh, []) 506 mock_subprocess.run.assert_not_called() 507 self.assertEqual([], paths) 508 509 mock_ssh.GetBaseCmd.return_value = "mock_ssh" 510 mock_subprocess.run.return_value = mock.Mock( 511 returncode=0, stderr=b'stderr', stdout=b'file1\nfile2\n') 512 paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"]) 513 self.assertEqual(["file1", "file2"], paths) 514 mock_subprocess.run.assert_called_with( 515 'mock_ssh find -H dir1 dir2 -type f', 516 shell=True, capture_output=True, check=False) 517 518 mock_subprocess.run.return_value = mock.Mock( 519 returncode=0, stderr=b'', stdout=b'') 520 paths = utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"]) 521 self.assertEqual([], paths) 522 523 mock_subprocess.run.return_value = mock.Mock( 524 returncode=1, stderr=b'', stdout=b'') 525 with self.assertRaises(errors.SubprocessFail): 526 utils.FindRemoteFiles(mock_ssh, ["dir1", "dir2"]) 527 528 # pylint: disable=protected-access, no-member 529 def testCleanupSSVncviwer(self): 530 """test cleanup ssvnc viewer.""" 531 fake_vnc_port = 9999 532 fake_ss_vncviewer_pattern = utils._SSVNC_VIEWER_PATTERN % { 533 "vnc_port": fake_vnc_port} 534 self.Patch(utils, "IsCommandRunning", return_value=True) 535 self.Patch(subprocess, "check_call", return_value=True) 536 utils.CleanupSSVncviewer(fake_vnc_port) 537 subprocess.check_call.assert_called_with(["pkill", "-9", "-f", fake_ss_vncviewer_pattern]) 538 539 subprocess.check_call.call_count = 0 540 self.Patch(utils, "IsCommandRunning", return_value=False) 541 utils.CleanupSSVncviewer(fake_vnc_port) 542 subprocess.check_call.assert_not_called() 543 544 def testLaunchBrowserFromReport(self): 545 """test launch browser from report.""" 546 self.Patch(webbrowser, "open_new_tab") 547 fake_report = mock.MagicMock(data={}) 548 549 # test remote instance 550 self.Patch(os.environ, "get", return_value=True) 551 fake_report.data = { 552 "devices": [{"instance_name": "remote_cf_instance_name", 553 "ip": "192.168.1.1",},],} 554 555 utils.LaunchBrowserFromReport(fake_report) 556 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 557 webbrowser.open_new_tab.call_count = 0 558 559 # test local instance 560 fake_report.data = { 561 "devices": [{"instance_name": "local-instance1", 562 "ip": "127.0.0.1:6250",},],} 563 utils.LaunchBrowserFromReport(fake_report) 564 webbrowser.open_new_tab.assert_called_once_with("https://localhost:8443") 565 webbrowser.open_new_tab.call_count = 0 566 567 # verify terminal can't support launch webbrowser. 568 self.Patch(os.environ, "get", return_value=False) 569 utils.LaunchBrowserFromReport(fake_report) 570 self.assertEqual(webbrowser.open_new_tab.call_count, 0) 571 572 def testSetExecutable(self): 573 """test setting a file to be executable.""" 574 with tempfile.NamedTemporaryFile(delete=True) as temp_file: 575 utils.SetExecutable(temp_file.name) 576 self.assertEqual(os.stat(temp_file.name).st_mode & 0o777, 0o755) 577 578 def testSetDirectoryTreeExecutable(self): 579 """test setting a file in a directory to be executable.""" 580 with tempfile.TemporaryDirectory() as temp_dir: 581 subdir = os.path.join(temp_dir, "subdir") 582 file_path = os.path.join(subdir, "file") 583 os.makedirs(subdir) 584 with open(file_path, "w"): 585 pass 586 utils.SetDirectoryTreeExecutable(temp_dir) 587 self.assertEqual(os.stat(file_path).st_mode & 0o777, 0o755) 588 589 def testSetCvdPort(self): 590 """test base_instance_num.""" 591 utils.SetCvdPorts(2) 592 self.assertEqual(utils.GetCvdPorts().adb_port, 6521) 593 self.assertEqual(utils.GetCvdPorts().vnc_port, 6445) 594 utils.SetCvdPorts(None) 595 596 597 @mock.patch.object(utils, "PrintColorString") 598 def testPrintDeviceSummary(self, mock_print): 599 """test PrintDeviceSummary.""" 600 fake_report = mock.MagicMock(data={}) 601 fake_report.data = { 602 "devices": [{"instance_name": "remote_cf_instance_name", 603 "ip": "192.168.1.1", 604 "device_serial": "127.0.0.1:399"},],} 605 utils.PrintDeviceSummary(fake_report) 606 self.assertEqual(mock_print.call_count, 7) 607 608 # Test for OpenWrt device case. 609 fake_report.data = { 610 "devices": [{"instance_name": "remote_cf_instance_name", 611 "ip": "192.168.1.1", 612 "ssh_command": "fake_ssh_cmd", 613 "screen_command": "fake_screen_cmd"},],} 614 mock_print.reset_mock() 615 utils.PrintDeviceSummary(fake_report) 616 self.assertEqual(mock_print.call_count, 13) 617 618 # Test for fail case 619 fake_report.data = { 620 "errors": "Fail to create devices"} 621 mock_print.reset_mock() 622 utils.PrintDeviceSummary(fake_report) 623 self.assertEqual(mock_print.call_count, 3) 624 625 # pylint: disable=protected-access 626 def testIsSupportedKvm(self): 627 """Test IsSupportedKvm.""" 628 self.Patch(os.path, "exists", return_value=True) 629 self.assertTrue(utils.IsSupportedKvm()) 630 631 self.Patch(os.path, "exists", return_value=False) 632 self.assertFalse(utils.IsSupportedKvm()) 633 634 635if __name__ == "__main__": 636 unittest.main() 637