1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18import re 19import socket 20import time 21import urllib.request 22 23from acts import asserts 24from acts import signals 25from acts import utils 26from acts.controllers import adb 27from acts.controllers.adb_lib.error import AdbError 28from acts.libs.proc import job 29from acts.utils import start_standing_subprocess 30from acts.utils import stop_standing_subprocess 31from acts_contrib.test_utils.net import connectivity_const as cconst 32from acts_contrib.test_utils.tel import tel_defines 33from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 34from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name 35from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 36 37from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 38 39VPN_CONST = cconst.VpnProfile 40VPN_TYPE = cconst.VpnProfileType 41VPN_PARAMS = cconst.VpnReqParams 42TCPDUMP_PATH = "/data/local/tmp/" 43USB_CHARGE_MODE = "svc usb setFunctions" 44USB_TETHERING_MODE = "svc usb setFunctions rndis" 45ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0" 46DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1" 47DEVICE_IP_ADDRESS = "ip address" 48LOCALHOST = "192.168.1.1" 49 50GCE_SSH = "gcloud compute ssh " 51GCE_SCP = "gcloud compute scp " 52 53 54def verify_lte_data_and_tethering_supported(ad): 55 """Verify if LTE data is enabled and tethering supported.""" 56 wutils.wifi_toggle_state(ad, False) 57 ad.droid.telephonyToggleDataConnection(True) 58 wait_for_cell_data_connection(ad.log, ad, True) 59 asserts.assert_true( 60 verify_http_connection(ad.log, ad), 61 "HTTP verification failed on cell data connection") 62 asserts.assert_true( 63 ad.droid.connectivityIsTetheringSupported(), 64 "Tethering is not supported for the provider") 65 wutils.wifi_toggle_state(ad, True) 66 67 68def set_chrome_browser_permissions(ad): 69 """Set chrome browser start with no-first-run verification. 70 71 Give permission to read from and write to storage 72 73 Args: 74 ad: android device object 75 """ 76 commands = ["pm grant com.android.chrome " 77 "android.permission.READ_EXTERNAL_STORAGE", 78 "pm grant com.android.chrome " 79 "android.permission.WRITE_EXTERNAL_STORAGE", 80 "rm /data/local/chrome-command-line", 81 "am set-debug-app --persistent com.android.chrome", 82 'echo "chrome --no-default-browser-check --no-first-run ' 83 '--disable-fre" > /data/local/tmp/chrome-command-line'] 84 for cmd in commands: 85 try: 86 ad.adb.shell(cmd) 87 except AdbError: 88 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 89 90 91def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 92 """Verify if IP behind VPN server is pingable. 93 94 Ping should pass, if VPN is connected. 95 Ping should fail, if VPN is disconnected. 96 97 Args: 98 ad: android device object 99 vpn_ping_addr: target ping addr 100 """ 101 ping_result = None 102 pkt_loss = "100% packet loss" 103 logging.info("Pinging: %s" % vpn_ping_addr) 104 try: 105 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 106 except AdbError: 107 pass 108 return ping_result and pkt_loss not in ping_result 109 110 111def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 112 """Test logic for each legacy VPN connection. 113 114 Steps: 115 1. Generate profile for the VPN type 116 2. Establish connection to the server 117 3. Verify that connection is established using LegacyVpnInfo 118 4. Verify the connection by pinging the IP behind VPN 119 5. Stop the VPN connection 120 6. Check the connection status 121 7. Verify that ping to IP behind VPN fails 122 123 Args: 124 ad: Android device object 125 vpn_profile: object contains attribute for create vpn profile 126 vpn_ping_addr: addr to verify vpn connection 127 """ 128 # Wait for sometime so that VPN server flushes all interfaces and 129 # connections after graceful termination 130 time.sleep(10) 131 132 ad.adb.shell("ip xfrm state flush") 133 ad.log.info("Connecting to: %s", vpn_profile) 134 ad.droid.vpnStartLegacyVpn(vpn_profile) 135 time.sleep(cconst.VPN_TIMEOUT) 136 137 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 138 asserts.assert_equal(connected_vpn_info["state"], 139 cconst.VPN_STATE_CONNECTED, 140 "Unable to establish VPN connection for %s" 141 % vpn_profile) 142 143 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 144 ip_xfrm_state = ad.adb.shell("ip xfrm state") 145 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 146 if match_obj: 147 ip_xfrm_state = format(match_obj.group(0)).split() 148 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 149 150 ad.droid.vpnStopLegacyVpn() 151 asserts.assert_true(ping_result, 152 "Ping to the internal IP failed. " 153 "Expected to pass as VPN is connected") 154 155 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 156 asserts.assert_true(not connected_vpn_info, 157 "Unable to terminate VPN connection for %s" 158 % vpn_profile) 159 160 161def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 162 ipsec_server_type, log_path): 163 """Download the certificates from VPN server and push to sdcard of DUT. 164 165 Args: 166 ad: android device object 167 vpn_params: vpn params from config file 168 vpn_type: 1 of the 6 VPN types 169 vpn_server_addr: server addr to connect to 170 ipsec_server_type: ipsec version - strongswan or openswan 171 log_path: log path to download cert 172 173 Returns: 174 Client cert file name on DUT's sdcard 175 """ 176 url = "http://%s%s%s" % (vpn_server_addr, 177 vpn_params['cert_path_vpnserver'], 178 vpn_params['client_pkcs_file_name']) 179 logging.info("URL is: %s" % url) 180 if vpn_server_addr == LOCALHOST: 181 ad.droid.httpDownloadFile(url, "/sdcard/") 182 return vpn_params['client_pkcs_file_name'] 183 184 local_cert_name = "%s_%s_%s" % (vpn_type.name, 185 ipsec_server_type, 186 vpn_params['client_pkcs_file_name']) 187 local_file_path = os.path.join(log_path, local_cert_name) 188 try: 189 ret = urllib.request.urlopen(url) 190 with open(local_file_path, "wb") as f: 191 f.write(ret.read()) 192 except Exception: 193 asserts.fail("Unable to download certificate from the server") 194 195 ad.adb.push("%s sdcard/" % local_file_path) 196 return local_cert_name 197 198 199def generate_legacy_vpn_profile(ad, 200 vpn_params, 201 vpn_type, 202 vpn_server_addr, 203 ipsec_server_type, 204 log_path): 205 """Generate legacy VPN profile for a VPN. 206 207 Args: 208 ad: android device object 209 vpn_params: vpn params from config file 210 vpn_type: 1 of the 6 VPN types 211 vpn_server_addr: server addr to connect to 212 ipsec_server_type: ipsec version - strongswan or openswan 213 log_path: log path to download cert 214 215 Returns: 216 Vpn profile 217 """ 218 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 219 VPN_CONST.PWD: vpn_params['vpn_password'], 220 VPN_CONST.TYPE: vpn_type.value, 221 VPN_CONST.SERVER: vpn_server_addr, } 222 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 223 ipsec_server_type) 224 if vpn_type.name == "PPTP": 225 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 226 227 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 228 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 229 230 if vpn_type.name in psk_set: 231 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 232 elif vpn_type.name in rsa_set: 233 cert_name = download_load_certs(ad, 234 vpn_params, 235 vpn_type, 236 vpn_server_addr, 237 ipsec_server_type, 238 log_path) 239 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 240 ad.droid.installCertificate(vpn_profile, cert_name, 241 vpn_params['cert_password']) 242 else: 243 vpn_profile[VPN_CONST.MPPE] = "mppe" 244 245 return vpn_profile 246 247 248def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path): 249 """Generate VPN profile for IKEv2 VPN. 250 251 Args: 252 ad: android device object. 253 vpn_params: vpn params from config file. 254 vpn_type: ikev2 vpn type. 255 server_addr: vpn server addr. 256 log_path: log path to download cert. 257 258 Returns: 259 Vpn profile. 260 """ 261 vpn_profile = { 262 VPN_CONST.TYPE: vpn_type.value, 263 VPN_CONST.SERVER: server_addr, 264 } 265 266 if vpn_type.name == "IKEV2_IPSEC_USER_PASS": 267 vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"] 268 vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"] 269 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 270 cert_name = download_load_certs( 271 ad, vpn_params, vpn_type, vpn_params["server_addr"], 272 "IKEV2_IPSEC_USER_PASS", log_path) 273 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 274 ad.droid.installCertificate( 275 vpn_profile, cert_name, vpn_params['cert_password']) 276 elif vpn_type.name == "IKEV2_IPSEC_PSK": 277 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 278 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"] 279 else: 280 vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % ( 281 vpn_params["vpn_identity"], server_addr) 282 logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr)) 283 cert_name = download_load_certs( 284 ad, vpn_params, vpn_type, vpn_params["server_addr"], 285 "IKEV2_IPSEC_RSA", log_path) 286 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 287 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 288 ad.droid.installCertificate( 289 vpn_profile, cert_name, vpn_params['cert_password']) 290 291 return vpn_profile 292 293 294def start_tcpdump(ad, test_name, interface="any"): 295 """Start tcpdump on all interfaces. 296 297 Args: 298 ad: android device object. 299 test_name: tcpdump file name will have this 300 """ 301 ad.log.info("Starting tcpdump on all interfaces") 302 ad.adb.shell("killall -9 tcpdump", ignore_status=True) 303 ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True) 304 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 305 306 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 307 ad.log.info("tcpdump file is %s", file_name) 308 cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial, 309 interface, file_name) 310 try: 311 return start_standing_subprocess(cmd, 5) 312 except Exception: 313 ad.log.exception('Could not start standing process %s' % repr(cmd)) 314 315 return None 316 317 318def stop_tcpdump(ad, 319 proc, 320 test_name, 321 pull_dump=True, 322 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 323 """Stops tcpdump on any iface. 324 325 Pulls the tcpdump file in the tcpdump dir if necessary. 326 327 Args: 328 ad: android device object. 329 proc: need to know which pid to stop 330 test_name: test name to save the tcpdump file 331 pull_dump: pull tcpdump file or not 332 adb_pull_timeout: timeout for adb_pull 333 334 Returns: 335 log_path of the tcpdump file 336 """ 337 ad.log.info("Stopping and pulling tcpdump if any") 338 if proc is None: 339 return None 340 try: 341 stop_standing_subprocess(proc) 342 except Exception as e: 343 ad.log.warning(e) 344 if pull_dump: 345 log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial) 346 os.makedirs(log_path, exist_ok=True) 347 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), 348 timeout=adb_pull_timeout) 349 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 350 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 351 return "%s/%s" % (log_path, file_name) 352 return None 353 354 355def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 356 """Start tcpdump on gce server. 357 358 Args: 359 ad: android device object 360 test_name: test case name 361 dest_port: port to collect tcpdump 362 gce: dictionary of gce instance 363 364 Returns: 365 process id and pcap file path from gce server 366 """ 367 ad.log.info("Starting tcpdump on gce server") 368 369 # pcap file name 370 fname = "/tmp/%s_%s_%s_%s" % \ 371 (test_name, ad.model, ad.serial, 372 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 373 374 # start tcpdump 375 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 376 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 377 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 378 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 379 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 380 utils.exe_cmd(gce_ssh_cmd) 381 382 # get process id 383 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 384 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 385 if not tcpdump_pid: 386 raise signals.TestFailure("Failed to start tcpdump on gce server") 387 return tcpdump_pid[1], fname 388 389 390def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 391 """Stop and pull tcpdump file from gce server. 392 393 Args: 394 ad: android device object 395 tcpdump_pid: process id for tcpdump file 396 fname: tcpdump file path 397 gce: dictionary of gce instance 398 399 Returns: 400 pcap file from gce server 401 """ 402 ad.log.info("Stop and pull pcap file from gce server") 403 404 # stop tcpdump 405 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 406 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 407 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 408 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 409 utils.exe_cmd(gce_ssh_cmd) 410 411 # verify tcpdump is stopped 412 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 413 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 414 if tcpdump_pid in res.split(): 415 raise signals.TestFailure("Failed to stop tcpdump on gce server") 416 if not fname: 417 return None 418 419 # pull pcap file 420 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 421 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 422 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 423 utils.exe_cmd(pull_file) 424 if not os.path.exists( 425 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 426 raise signals.TestFailure("Failed to pull tcpdump from gce server") 427 428 # delete pcaps 429 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 430 431 # return pcap file 432 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 433 return pcap_file 434 435 436def is_ipaddress_ipv6(ip_address): 437 """Verify if the given string is a valid IPv6 address. 438 439 Args: 440 ip_address: string containing the IP address 441 442 Returns: 443 True: if valid ipv6 address 444 False: if not 445 """ 446 try: 447 socket.inet_pton(socket.AF_INET6, ip_address) 448 return True 449 except socket.error: 450 return False 451 452 453def carrier_supports_ipv6(dut): 454 """Verify if carrier supports ipv6. 455 456 Args: 457 dut: Android device that is being checked 458 459 Returns: 460 True: if carrier supports ipv6 461 False: if not 462 """ 463 464 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 465 operator = get_operator_name("log", dut) 466 return operator in carrier_supports_ipv6 467 468 469def is_carrier_supports_entitlement(dut): 470 """Verify if carrier supports entitlement 471 472 Args: 473 dut: Android device 474 475 Return: 476 True if carrier supports entitlement, otherwise false 477 """ 478 479 carriers_supports_entitlement = [ 480 tel_defines.CARRIER_VZW, 481 tel_defines.CARRIER_ATT, 482 tel_defines.CARRIER_TMO, 483 tel_defines.CARRIER_SBM] 484 operator = get_operator_name("log", dut) 485 return operator in carriers_supports_entitlement 486 487 488def set_cap_net_raw_capability(): 489 """Set the CAP_NET_RAW capability 490 491 To send the Scapy packets, we need to get the CAP_NET_RAW capability first. 492 """ 493 cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))" 494 utils.exe_cmd(cap_net_raw) 495 cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))" 496 utils.exe_cmd(cap_python) 497 498 499def supports_ipv6_tethering(self, dut): 500 """Check if provider supports IPv6 tethering. 501 502 Args: 503 dut: Android device that is being checked 504 505 Returns: 506 True: if provider supports IPv6 tethering 507 False: if not 508 """ 509 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 510 operator = get_operator_name(self.log, dut) 511 return operator in carrier_supports_tethering 512 513 514def start_usb_tethering(ad): 515 """Start USB tethering using #startTethering API. 516 517 Enable USB tethering by #startTethering API will break the RPC session, 518 Make sure you call #ad.recreate_services after call this API - b/149116235 519 520 Args: 521 ad: AndroidDevice object 522 """ 523 ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False) 524 525 526def stop_usb_tethering(ad): 527 """Stop USB tethering. 528 529 Args: 530 ad: android device object 531 """ 532 ad.log.info("Stopping USB Tethering") 533 ad.stop_services() 534 ad.adb.shell(USB_CHARGE_MODE) 535 ad.adb.wait_for_device(timeout=120) 536 ad.start_services() 537 538 539def wait_for_new_iface(old_ifaces): 540 """Wait for the new interface to come up. 541 542 Args: 543 old_ifaces: list of old interfaces 544 """ 545 old_set = set(old_ifaces) 546 # Try 10 times to find a new interface with a 1s sleep every time 547 # (equivalent to a 9s timeout) 548 for _ in range(0, 10): 549 new_ifaces = set(get_if_list()) - old_set 550 asserts.assert_true(len(new_ifaces) < 2, 551 "Too many new interfaces after turning on " 552 "tethering") 553 if len(new_ifaces) == 1: 554 # enable the new iface before return 555 new_iface = new_ifaces.pop() 556 enable_iface(new_iface) 557 return new_iface 558 time.sleep(1) 559 asserts.fail("Timeout waiting for tethering interface on host") 560 561 562def get_if_list(): 563 """Returns a list containing all network interfaces. 564 565 The newest version of Scapy.get_if_list() returns the cached interfaces, 566 which might be out-dated, and unable to perceive the interface changes. 567 Use this method when need to monitoring the network interfaces changes. 568 Reference: https://github.com/secdev/scapy/pull/2707 569 570 Returns: 571 A list of the latest network interfaces. For example: 572 ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88'] 573 """ 574 from scapy.config import conf 575 from scapy.compat import plain_str 576 577 # Get ifconfig output 578 result = job.run([conf.prog.ifconfig]) 579 if result.exit_status: 580 raise asserts.fail( 581 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 582 583 interfaces = [ 584 line[:line.find(':')] for line in plain_str(result.stdout).splitlines() 585 if ": flags" in line.lower() 586 ] 587 return interfaces 588 589 590def enable_hardware_offload(ad): 591 """Enable hardware offload using adb shell command. 592 593 Args: 594 ad: Android device object 595 """ 596 ad.log.info("Enabling hardware offload.") 597 ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True) 598 ad.reboot() 599 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 600 601 602def disable_hardware_offload(ad): 603 """Disable hardware offload using adb shell command. 604 605 Args: 606 ad: Android device object 607 """ 608 ad.log.info("Disabling hardware offload.") 609 ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True) 610 ad.reboot() 611 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 612 613 614def enable_iface(iface): 615 """Enable network interfaces. 616 617 Some network interface might disabled as default, need to enable before 618 using it. 619 620 Args: 621 iface: network interface that need to enable 622 """ 623 from scapy.compat import plain_str 624 625 result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True) 626 if result.exit_status: 627 raise asserts.fail( 628 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 629