1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18import re 19import time 20import urllib.request 21 22from acts import asserts 23from acts import signals 24from acts import utils 25from acts.controllers import adb 26from acts.controllers.adb_lib.error import AdbError 27from acts.libs.proc import job 28from acts.utils import start_standing_subprocess 29from acts.utils import stop_standing_subprocess 30from acts_contrib.test_utils.net import connectivity_const as cconst 31from acts_contrib.test_utils.tel import tel_defines 32from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 33from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name 34from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 35 36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 37 38VPN_CONST = cconst.VpnProfile 39VPN_TYPE = cconst.VpnProfileType 40VPN_PARAMS = cconst.VpnReqParams 41TCPDUMP_PATH = "/data/local/tmp/" 42USB_CHARGE_MODE = "svc usb setFunctions" 43USB_TETHERING_MODE = "svc usb setFunctions rndis" 44ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0" 45DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1" 46DEVICE_IP_ADDRESS = "ip address" 47LOCALHOST = "192.168.1.1" 48 49GCE_SSH = "gcloud compute ssh " 50GCE_SCP = "gcloud compute scp " 51 52 53def verify_lte_data_and_tethering_supported(ad): 54 """Verify if LTE data is enabled and tethering supported.""" 55 wutils.wifi_toggle_state(ad, False) 56 ad.droid.telephonyToggleDataConnection(True) 57 wait_for_cell_data_connection(ad.log, ad, True) 58 asserts.assert_true( 59 verify_http_connection(ad.log, ad), 60 "HTTP verification failed on cell data connection") 61 asserts.assert_true( 62 ad.droid.connectivityIsTetheringSupported(), 63 "Tethering is not supported for the provider") 64 wutils.wifi_toggle_state(ad, True) 65 66 67def set_chrome_browser_permissions(ad): 68 """Set chrome browser start with no-first-run verification. 69 70 Give permission to read from and write to storage 71 72 Args: 73 ad: android device object 74 """ 75 commands = ["pm grant com.android.chrome " 76 "android.permission.READ_EXTERNAL_STORAGE", 77 "pm grant com.android.chrome " 78 "android.permission.WRITE_EXTERNAL_STORAGE", 79 "rm /data/local/chrome-command-line", 80 "am set-debug-app --persistent com.android.chrome", 81 'echo "chrome --no-default-browser-check --no-first-run ' 82 '--disable-fre" > /data/local/tmp/chrome-command-line'] 83 for cmd in commands: 84 try: 85 ad.adb.shell(cmd) 86 except AdbError: 87 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 88 89 90def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 91 """Verify if IP behind VPN server is pingable. 92 93 Ping should pass, if VPN is connected. 94 Ping should fail, if VPN is disconnected. 95 96 Args: 97 ad: android device object 98 vpn_ping_addr: target ping addr 99 """ 100 ping_result = None 101 pkt_loss = "100% packet loss" 102 logging.info("Pinging: %s" % vpn_ping_addr) 103 try: 104 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 105 except AdbError: 106 pass 107 return ping_result and pkt_loss not in ping_result 108 109 110def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 111 """Test logic for each legacy VPN connection. 112 113 Steps: 114 1. Generate profile for the VPN type 115 2. Establish connection to the server 116 3. Verify that connection is established using LegacyVpnInfo 117 4. Verify the connection by pinging the IP behind VPN 118 5. Stop the VPN connection 119 6. Check the connection status 120 7. Verify that ping to IP behind VPN fails 121 122 Args: 123 ad: Android device object 124 vpn_profile: object contains attribute for create vpn profile 125 vpn_ping_addr: addr to verify vpn connection 126 """ 127 # Wait for sometime so that VPN server flushes all interfaces and 128 # connections after graceful termination 129 time.sleep(10) 130 131 ad.adb.shell("ip xfrm state flush") 132 ad.log.info("Connecting to: %s", vpn_profile) 133 ad.droid.vpnStartLegacyVpn(vpn_profile) 134 time.sleep(cconst.VPN_TIMEOUT) 135 136 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 137 asserts.assert_equal(connected_vpn_info["state"], 138 cconst.VPN_STATE_CONNECTED, 139 "Unable to establish VPN connection for %s" 140 % vpn_profile) 141 142 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 143 ip_xfrm_state = ad.adb.shell("ip xfrm state") 144 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 145 if match_obj: 146 ip_xfrm_state = format(match_obj.group(0)).split() 147 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 148 149 ad.droid.vpnStopLegacyVpn() 150 asserts.assert_true(ping_result, 151 "Ping to the internal IP failed. " 152 "Expected to pass as VPN is connected") 153 154 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 155 asserts.assert_true(not connected_vpn_info, 156 "Unable to terminate VPN connection for %s" 157 % vpn_profile) 158 159 160def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 161 ipsec_server_type, log_path): 162 """Download the certificates from VPN server and push to sdcard of DUT. 163 164 Args: 165 ad: android device object 166 vpn_params: vpn params from config file 167 vpn_type: 1 of the 6 VPN types 168 vpn_server_addr: server addr to connect to 169 ipsec_server_type: ipsec version - strongswan or openswan 170 log_path: log path to download cert 171 172 Returns: 173 Client cert file name on DUT's sdcard 174 """ 175 url = "http://%s%s%s" % (vpn_server_addr, 176 vpn_params['cert_path_vpnserver'], 177 vpn_params['client_pkcs_file_name']) 178 logging.info("URL is: %s" % url) 179 if vpn_server_addr == LOCALHOST: 180 ad.droid.httpDownloadFile(url, "/sdcard/") 181 return vpn_params['client_pkcs_file_name'] 182 183 local_cert_name = "%s_%s_%s" % (vpn_type.name, 184 ipsec_server_type, 185 vpn_params['client_pkcs_file_name']) 186 local_file_path = os.path.join(log_path, local_cert_name) 187 try: 188 ret = urllib.request.urlopen(url) 189 with open(local_file_path, "wb") as f: 190 f.write(ret.read()) 191 except Exception: 192 asserts.fail("Unable to download certificate from the server") 193 194 ad.adb.push("%s sdcard/" % local_file_path) 195 return local_cert_name 196 197 198def generate_legacy_vpn_profile(ad, 199 vpn_params, 200 vpn_type, 201 vpn_server_addr, 202 ipsec_server_type, 203 log_path): 204 """Generate legacy VPN profile for a VPN. 205 206 Args: 207 ad: android device object 208 vpn_params: vpn params from config file 209 vpn_type: 1 of the 6 VPN types 210 vpn_server_addr: server addr to connect to 211 ipsec_server_type: ipsec version - strongswan or openswan 212 log_path: log path to download cert 213 214 Returns: 215 Vpn profile 216 """ 217 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 218 VPN_CONST.PWD: vpn_params['vpn_password'], 219 VPN_CONST.TYPE: vpn_type.value, 220 VPN_CONST.SERVER: vpn_server_addr, } 221 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 222 ipsec_server_type) 223 if vpn_type.name == "PPTP": 224 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 225 226 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 227 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 228 229 if vpn_type.name in psk_set: 230 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 231 elif vpn_type.name in rsa_set: 232 cert_name = download_load_certs(ad, 233 vpn_params, 234 vpn_type, 235 vpn_server_addr, 236 ipsec_server_type, 237 log_path) 238 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 239 ad.droid.installCertificate(vpn_profile, cert_name, 240 vpn_params['cert_password']) 241 else: 242 vpn_profile[VPN_CONST.MPPE] = "mppe" 243 244 return vpn_profile 245 246 247def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path): 248 """Generate VPN profile for IKEv2 VPN. 249 250 Args: 251 ad: android device object. 252 vpn_params: vpn params from config file. 253 vpn_type: ikev2 vpn type. 254 server_addr: vpn server addr. 255 log_path: log path to download cert. 256 257 Returns: 258 Vpn profile. 259 """ 260 vpn_profile = { 261 VPN_CONST.TYPE: vpn_type.value, 262 VPN_CONST.SERVER: server_addr, 263 } 264 265 if vpn_type.name == "IKEV2_IPSEC_USER_PASS": 266 vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"] 267 vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"] 268 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 269 cert_name = download_load_certs( 270 ad, vpn_params, vpn_type, vpn_params["server_addr"], 271 "IKEV2_IPSEC_USER_PASS", log_path) 272 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 273 ad.droid.installCertificate( 274 vpn_profile, cert_name, vpn_params['cert_password']) 275 elif vpn_type.name == "IKEV2_IPSEC_PSK": 276 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 277 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"] 278 else: 279 vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % ( 280 vpn_params["vpn_identity"], server_addr) 281 logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr)) 282 cert_name = download_load_certs( 283 ad, vpn_params, vpn_type, vpn_params["server_addr"], 284 "IKEV2_IPSEC_RSA", log_path) 285 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 286 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 287 ad.droid.installCertificate( 288 vpn_profile, cert_name, vpn_params['cert_password']) 289 290 return vpn_profile 291 292 293def start_tcpdump(ad, test_name, interface="any"): 294 """Start tcpdump on all interfaces. 295 296 Args: 297 ad: android device object. 298 test_name: tcpdump file name will have this 299 """ 300 ad.log.info("Starting tcpdump on all interfaces") 301 ad.adb.shell("killall -9 tcpdump", ignore_status=True) 302 ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True) 303 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 304 305 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 306 ad.log.info("tcpdump file is %s", file_name) 307 cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial, 308 interface, file_name) 309 try: 310 return start_standing_subprocess(cmd, 5) 311 except Exception: 312 ad.log.exception('Could not start standing process %s' % repr(cmd)) 313 314 return None 315 316 317def stop_tcpdump(ad, 318 proc, 319 test_name, 320 pull_dump=True, 321 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 322 """Stops tcpdump on any iface. 323 324 Pulls the tcpdump file in the tcpdump dir if necessary. 325 326 Args: 327 ad: android device object. 328 proc: need to know which pid to stop 329 test_name: test name to save the tcpdump file 330 pull_dump: pull tcpdump file or not 331 adb_pull_timeout: timeout for adb_pull 332 333 Returns: 334 log_path of the tcpdump file 335 """ 336 ad.log.info("Stopping and pulling tcpdump if any") 337 if proc is None: 338 return None 339 try: 340 stop_standing_subprocess(proc) 341 except Exception as e: 342 ad.log.warning(e) 343 if pull_dump: 344 log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial) 345 os.makedirs(log_path, exist_ok=True) 346 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), 347 timeout=adb_pull_timeout) 348 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 349 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 350 return "%s/%s" % (log_path, file_name) 351 return None 352 353 354def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 355 """Start tcpdump on gce server. 356 357 Args: 358 ad: android device object 359 test_name: test case name 360 dest_port: port to collect tcpdump 361 gce: dictionary of gce instance 362 363 Returns: 364 process id and pcap file path from gce server 365 """ 366 ad.log.info("Starting tcpdump on gce server") 367 368 # pcap file name 369 fname = "/tmp/%s_%s_%s_%s" % \ 370 (test_name, ad.model, ad.serial, 371 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 372 373 # start tcpdump 374 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 375 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 376 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 377 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 378 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 379 utils.exe_cmd(gce_ssh_cmd) 380 381 # get process id 382 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 383 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 384 if not tcpdump_pid: 385 raise signals.TestFailure("Failed to start tcpdump on gce server") 386 return tcpdump_pid[1], fname 387 388 389def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 390 """Stop and pull tcpdump file from gce server. 391 392 Args: 393 ad: android device object 394 tcpdump_pid: process id for tcpdump file 395 fname: tcpdump file path 396 gce: dictionary of gce instance 397 398 Returns: 399 pcap file from gce server 400 """ 401 ad.log.info("Stop and pull pcap file from gce server") 402 403 # stop tcpdump 404 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 405 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 406 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 407 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 408 utils.exe_cmd(gce_ssh_cmd) 409 410 # verify tcpdump is stopped 411 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 412 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 413 if tcpdump_pid in res.split(): 414 raise signals.TestFailure("Failed to stop tcpdump on gce server") 415 if not fname: 416 return None 417 418 # pull pcap file 419 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 420 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 421 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 422 utils.exe_cmd(pull_file) 423 if not os.path.exists( 424 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 425 raise signals.TestFailure("Failed to pull tcpdump from gce server") 426 427 # delete pcaps 428 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 429 430 # return pcap file 431 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 432 return pcap_file 433 434 435def is_ipaddress_ipv6(ip_address): 436 """Verify if the given string is a valid IPv6 address. 437 438 Args: 439 ip_address: string containing the IP address 440 441 Returns: 442 True: if valid ipv6 address 443 False: if not 444 """ 445 try: 446 socket.inet_pton(socket.AF_INET6, ip_address) 447 return True 448 except socket.error: 449 return False 450 451 452def carrier_supports_ipv6(dut): 453 """Verify if carrier supports ipv6. 454 455 Args: 456 dut: Android device that is being checked 457 458 Returns: 459 True: if carrier supports ipv6 460 False: if not 461 """ 462 463 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 464 operator = get_operator_name("log", dut) 465 return operator in carrier_supports_ipv6 466 467 468def is_carrier_supports_entitlement(dut): 469 """Verify if carrier supports entitlement 470 471 Args: 472 dut: Android device 473 474 Return: 475 True if carrier supports entitlement, otherwise false 476 """ 477 478 carriers_supports_entitlement = [ 479 tel_defines.CARRIER_VZW, 480 tel_defines.CARRIER_ATT, 481 tel_defines.CARRIER_TMO, 482 tel_defines.CARRIER_SBM] 483 operator = get_operator_name("log", dut) 484 return operator in carriers_supports_entitlement 485 486 487def set_cap_net_raw_capability(): 488 """Set the CAP_NET_RAW capability 489 490 To send the Scapy packets, we need to get the CAP_NET_RAW capability first. 491 """ 492 cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))" 493 utils.exe_cmd(cap_net_raw) 494 cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))" 495 utils.exe_cmd(cap_python) 496 497 498def supports_ipv6_tethering(self, dut): 499 """Check if provider supports IPv6 tethering. 500 501 Args: 502 dut: Android device that is being checked 503 504 Returns: 505 True: if provider supports IPv6 tethering 506 False: if not 507 """ 508 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 509 operator = get_operator_name(self.log, dut) 510 return operator in carrier_supports_tethering 511 512 513def start_usb_tethering(ad): 514 """Start USB tethering using #startTethering API. 515 516 Enable USB tethering by #startTethering API will break the RPC session, 517 Make sure you call #ad.recreate_services after call this API - b/149116235 518 519 Args: 520 ad: AndroidDevice object 521 """ 522 ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False) 523 524 525def stop_usb_tethering(ad): 526 """Stop USB tethering. 527 528 Args: 529 ad: android device object 530 """ 531 ad.log.info("Stopping USB Tethering") 532 ad.stop_services() 533 ad.adb.shell(USB_CHARGE_MODE) 534 ad.adb.wait_for_device() 535 ad.start_services() 536 537 538def wait_for_new_iface(old_ifaces): 539 """Wait for the new interface to come up. 540 541 Args: 542 old_ifaces: list of old interfaces 543 """ 544 old_set = set(old_ifaces) 545 # Try 10 times to find a new interface with a 1s sleep every time 546 # (equivalent to a 9s timeout) 547 for _ in range(0, 10): 548 new_ifaces = set(get_if_list()) - old_set 549 asserts.assert_true(len(new_ifaces) < 2, 550 "Too many new interfaces after turning on " 551 "tethering") 552 if len(new_ifaces) == 1: 553 # enable the new iface before return 554 new_iface = new_ifaces.pop() 555 enable_iface(new_iface) 556 return new_iface 557 time.sleep(1) 558 asserts.fail("Timeout waiting for tethering interface on host") 559 560 561def get_if_list(): 562 """Returns a list containing all network interfaces. 563 564 The newest version of Scapy.get_if_list() returns the cached interfaces, 565 which might be out-dated, and unable to perceive the interface changes. 566 Use this method when need to monitoring the network interfaces changes. 567 Reference: https://github.com/secdev/scapy/pull/2707 568 569 Returns: 570 A list of the latest network interfaces. For example: 571 ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88'] 572 """ 573 from scapy.config import conf 574 from scapy.compat import plain_str 575 576 # Get ifconfig output 577 result = job.run([conf.prog.ifconfig]) 578 if result.exit_status: 579 raise asserts.fail( 580 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 581 582 interfaces = [ 583 line[:line.find(':')] for line in plain_str(result.stdout).splitlines() 584 if ": flags" in line.lower() 585 ] 586 return interfaces 587 588 589def enable_hardware_offload(ad): 590 """Enable hardware offload using adb shell command. 591 592 Args: 593 ad: Android device object 594 """ 595 ad.log.info("Enabling hardware offload.") 596 ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True) 597 ad.reboot() 598 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 599 600 601def disable_hardware_offload(ad): 602 """Disable hardware offload using adb shell command. 603 604 Args: 605 ad: Android device object 606 """ 607 ad.log.info("Disabling hardware offload.") 608 ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True) 609 ad.reboot() 610 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 611 612 613def enable_iface(iface): 614 """Enable network interfaces. 615 616 Some network interface might disabled as default, need to enable before 617 using it. 618 619 Args: 620 iface: network interface that need to enable 621 """ 622 from scapy.compat import plain_str 623 624 result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True) 625 if result.exit_status: 626 raise asserts.fail( 627 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 628