1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18import re 19import time 20import urllib.request 21 22from acts import asserts 23from acts import signals 24from acts import utils 25from acts.controllers import adb 26from acts.controllers.adb_lib.error import AdbError 27from acts.libs.proc import job 28from acts.utils import start_standing_subprocess 29from acts.utils import stop_standing_subprocess 30from acts_contrib.test_utils.net import connectivity_const as cconst 31from acts_contrib.test_utils.tel import tel_defines 32from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 33from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name 34from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 35 36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 37from scapy.config import conf 38from scapy.compat import plain_str 39 40VPN_CONST = cconst.VpnProfile 41VPN_TYPE = cconst.VpnProfileType 42VPN_PARAMS = cconst.VpnReqParams 43TCPDUMP_PATH = "/data/local/tmp/" 44USB_CHARGE_MODE = "svc usb setFunctions" 45USB_TETHERING_MODE = "svc usb setFunctions rndis" 46ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0" 47DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1" 48DEVICE_IP_ADDRESS = "ip address" 49LOCALHOST = "192.168.1.1" 50 51GCE_SSH = "gcloud compute ssh " 52GCE_SCP = "gcloud compute scp " 53 54 55def verify_lte_data_and_tethering_supported(ad): 56 """Verify if LTE data is enabled and tethering supported.""" 57 wutils.wifi_toggle_state(ad, False) 58 ad.droid.telephonyToggleDataConnection(True) 59 wait_for_cell_data_connection(ad.log, ad, True) 60 asserts.assert_true( 61 verify_http_connection(ad.log, ad), 62 "HTTP verification failed on cell data connection") 63 asserts.assert_true( 64 ad.droid.connectivityIsTetheringSupported(), 65 "Tethering is not supported for the provider") 66 wutils.wifi_toggle_state(ad, True) 67 68 69def set_chrome_browser_permissions(ad): 70 """Set chrome browser start with no-first-run verification. 71 72 Give permission to read from and write to storage 73 74 Args: 75 ad: android device object 76 """ 77 commands = ["pm grant com.android.chrome " 78 "android.permission.READ_EXTERNAL_STORAGE", 79 "pm grant com.android.chrome " 80 "android.permission.WRITE_EXTERNAL_STORAGE", 81 "rm /data/local/chrome-command-line", 82 "am set-debug-app --persistent com.android.chrome", 83 'echo "chrome --no-default-browser-check --no-first-run ' 84 '--disable-fre" > /data/local/tmp/chrome-command-line'] 85 for cmd in commands: 86 try: 87 ad.adb.shell(cmd) 88 except AdbError: 89 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 90 91 92def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 93 """Verify if IP behind VPN server is pingable. 94 95 Ping should pass, if VPN is connected. 96 Ping should fail, if VPN is disconnected. 97 98 Args: 99 ad: android device object 100 vpn_ping_addr: target ping addr 101 """ 102 ping_result = None 103 pkt_loss = "100% packet loss" 104 logging.info("Pinging: %s" % vpn_ping_addr) 105 try: 106 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 107 except AdbError: 108 pass 109 return ping_result and pkt_loss not in ping_result 110 111 112def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 113 """Test logic for each legacy VPN connection. 114 115 Steps: 116 1. Generate profile for the VPN type 117 2. Establish connection to the server 118 3. Verify that connection is established using LegacyVpnInfo 119 4. Verify the connection by pinging the IP behind VPN 120 5. Stop the VPN connection 121 6. Check the connection status 122 7. Verify that ping to IP behind VPN fails 123 124 Args: 125 ad: Android device object 126 vpn_profile: object contains attribute for create vpn profile 127 vpn_ping_addr: addr to verify vpn connection 128 """ 129 # Wait for sometime so that VPN server flushes all interfaces and 130 # connections after graceful termination 131 time.sleep(10) 132 133 ad.adb.shell("ip xfrm state flush") 134 ad.log.info("Connecting to: %s", vpn_profile) 135 ad.droid.vpnStartLegacyVpn(vpn_profile) 136 time.sleep(cconst.VPN_TIMEOUT) 137 138 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 139 asserts.assert_equal(connected_vpn_info["state"], 140 cconst.VPN_STATE_CONNECTED, 141 "Unable to establish VPN connection for %s" 142 % vpn_profile) 143 144 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 145 ip_xfrm_state = ad.adb.shell("ip xfrm state") 146 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 147 if match_obj: 148 ip_xfrm_state = format(match_obj.group(0)).split() 149 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 150 151 ad.droid.vpnStopLegacyVpn() 152 asserts.assert_true(ping_result, 153 "Ping to the internal IP failed. " 154 "Expected to pass as VPN is connected") 155 156 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 157 asserts.assert_true(not connected_vpn_info, 158 "Unable to terminate VPN connection for %s" 159 % vpn_profile) 160 161 162def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 163 ipsec_server_type, log_path): 164 """Download the certificates from VPN server and push to sdcard of DUT. 165 166 Args: 167 ad: android device object 168 vpn_params: vpn params from config file 169 vpn_type: 1 of the 6 VPN types 170 vpn_server_addr: server addr to connect to 171 ipsec_server_type: ipsec version - strongswan or openswan 172 log_path: log path to download cert 173 174 Returns: 175 Client cert file name on DUT's sdcard 176 """ 177 url = "http://%s%s%s" % (vpn_server_addr, 178 vpn_params['cert_path_vpnserver'], 179 vpn_params['client_pkcs_file_name']) 180 logging.info("URL is: %s" % url) 181 if vpn_server_addr == LOCALHOST: 182 ad.droid.httpDownloadFile(url, "/sdcard/") 183 return vpn_params['client_pkcs_file_name'] 184 185 local_cert_name = "%s_%s_%s" % (vpn_type.name, 186 ipsec_server_type, 187 vpn_params['client_pkcs_file_name']) 188 local_file_path = os.path.join(log_path, local_cert_name) 189 try: 190 ret = urllib.request.urlopen(url) 191 with open(local_file_path, "wb") as f: 192 f.write(ret.read()) 193 except Exception: 194 asserts.fail("Unable to download certificate from the server") 195 196 ad.adb.push("%s sdcard/" % local_file_path) 197 return local_cert_name 198 199 200def generate_legacy_vpn_profile(ad, 201 vpn_params, 202 vpn_type, 203 vpn_server_addr, 204 ipsec_server_type, 205 log_path): 206 """Generate legacy VPN profile for a VPN. 207 208 Args: 209 ad: android device object 210 vpn_params: vpn params from config file 211 vpn_type: 1 of the 6 VPN types 212 vpn_server_addr: server addr to connect to 213 ipsec_server_type: ipsec version - strongswan or openswan 214 log_path: log path to download cert 215 216 Returns: 217 Vpn profile 218 """ 219 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 220 VPN_CONST.PWD: vpn_params['vpn_password'], 221 VPN_CONST.TYPE: vpn_type.value, 222 VPN_CONST.SERVER: vpn_server_addr, } 223 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 224 ipsec_server_type) 225 if vpn_type.name == "PPTP": 226 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 227 228 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 229 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 230 231 if vpn_type.name in psk_set: 232 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 233 elif vpn_type.name in rsa_set: 234 cert_name = download_load_certs(ad, 235 vpn_params, 236 vpn_type, 237 vpn_server_addr, 238 ipsec_server_type, 239 log_path) 240 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 241 ad.droid.installCertificate(vpn_profile, cert_name, 242 vpn_params['cert_password']) 243 else: 244 vpn_profile[VPN_CONST.MPPE] = "mppe" 245 246 return vpn_profile 247 248 249def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path): 250 """Generate VPN profile for IKEv2 VPN. 251 252 Args: 253 ad: android device object. 254 vpn_params: vpn params from config file. 255 vpn_type: ikev2 vpn type. 256 server_addr: vpn server addr. 257 log_path: log path to download cert. 258 259 Returns: 260 Vpn profile. 261 """ 262 vpn_profile = { 263 VPN_CONST.TYPE: vpn_type.value, 264 VPN_CONST.SERVER: server_addr, 265 } 266 267 if vpn_type.name == "IKEV2_IPSEC_USER_PASS": 268 vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"] 269 vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"] 270 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 271 cert_name = download_load_certs( 272 ad, vpn_params, vpn_type, vpn_params["server_addr"], 273 "IKEV2_IPSEC_USER_PASS", log_path) 274 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 275 ad.droid.installCertificate( 276 vpn_profile, cert_name, vpn_params['cert_password']) 277 elif vpn_type.name == "IKEV2_IPSEC_PSK": 278 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 279 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"] 280 else: 281 vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % ( 282 vpn_params["vpn_identity"], server_addr) 283 logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr)) 284 cert_name = download_load_certs( 285 ad, vpn_params, vpn_type, vpn_params["server_addr"], 286 "IKEV2_IPSEC_RSA", log_path) 287 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 288 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 289 ad.droid.installCertificate( 290 vpn_profile, cert_name, vpn_params['cert_password']) 291 292 return vpn_profile 293 294 295def start_tcpdump(ad, test_name, interface="any"): 296 """Start tcpdump on all interfaces. 297 298 Args: 299 ad: android device object. 300 test_name: tcpdump file name will have this 301 """ 302 ad.log.info("Starting tcpdump on all interfaces") 303 ad.adb.shell("killall -9 tcpdump", ignore_status=True) 304 ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True) 305 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 306 307 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 308 ad.log.info("tcpdump file is %s", file_name) 309 cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial, 310 interface, file_name) 311 try: 312 return start_standing_subprocess(cmd, 5) 313 except Exception: 314 ad.log.exception('Could not start standing process %s' % repr(cmd)) 315 316 return None 317 318 319def stop_tcpdump(ad, 320 proc, 321 test_name, 322 pull_dump=True, 323 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 324 """Stops tcpdump on any iface. 325 326 Pulls the tcpdump file in the tcpdump dir if necessary. 327 328 Args: 329 ad: android device object. 330 proc: need to know which pid to stop 331 test_name: test name to save the tcpdump file 332 pull_dump: pull tcpdump file or not 333 adb_pull_timeout: timeout for adb_pull 334 335 Returns: 336 log_path of the tcpdump file 337 """ 338 ad.log.info("Stopping and pulling tcpdump if any") 339 if proc is None: 340 return None 341 try: 342 stop_standing_subprocess(proc) 343 except Exception as e: 344 ad.log.warning(e) 345 if pull_dump: 346 log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial) 347 os.makedirs(log_path, exist_ok=True) 348 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), 349 timeout=adb_pull_timeout) 350 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 351 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 352 return "%s/%s" % (log_path, file_name) 353 return None 354 355 356def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 357 """Start tcpdump on gce server. 358 359 Args: 360 ad: android device object 361 test_name: test case name 362 dest_port: port to collect tcpdump 363 gce: dictionary of gce instance 364 365 Returns: 366 process id and pcap file path from gce server 367 """ 368 ad.log.info("Starting tcpdump on gce server") 369 370 # pcap file name 371 fname = "/tmp/%s_%s_%s_%s" % \ 372 (test_name, ad.model, ad.serial, 373 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 374 375 # start tcpdump 376 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 377 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 378 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 379 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 380 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 381 utils.exe_cmd(gce_ssh_cmd) 382 383 # get process id 384 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 385 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 386 if not tcpdump_pid: 387 raise signals.TestFailure("Failed to start tcpdump on gce server") 388 return tcpdump_pid[1], fname 389 390 391def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 392 """Stop and pull tcpdump file from gce server. 393 394 Args: 395 ad: android device object 396 tcpdump_pid: process id for tcpdump file 397 fname: tcpdump file path 398 gce: dictionary of gce instance 399 400 Returns: 401 pcap file from gce server 402 """ 403 ad.log.info("Stop and pull pcap file from gce server") 404 405 # stop tcpdump 406 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 407 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 408 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 409 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 410 utils.exe_cmd(gce_ssh_cmd) 411 412 # verify tcpdump is stopped 413 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 414 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 415 if tcpdump_pid in res.split(): 416 raise signals.TestFailure("Failed to stop tcpdump on gce server") 417 if not fname: 418 return None 419 420 # pull pcap file 421 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 422 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 423 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 424 utils.exe_cmd(pull_file) 425 if not os.path.exists( 426 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 427 raise signals.TestFailure("Failed to pull tcpdump from gce server") 428 429 # delete pcaps 430 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 431 432 # return pcap file 433 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 434 return pcap_file 435 436 437def is_ipaddress_ipv6(ip_address): 438 """Verify if the given string is a valid IPv6 address. 439 440 Args: 441 ip_address: string containing the IP address 442 443 Returns: 444 True: if valid ipv6 address 445 False: if not 446 """ 447 try: 448 socket.inet_pton(socket.AF_INET6, ip_address) 449 return True 450 except socket.error: 451 return False 452 453 454def carrier_supports_ipv6(dut): 455 """Verify if carrier supports ipv6. 456 457 Args: 458 dut: Android device that is being checked 459 460 Returns: 461 True: if carrier supports ipv6 462 False: if not 463 """ 464 465 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 466 operator = get_operator_name("log", dut) 467 return operator in carrier_supports_ipv6 468 469 470def is_carrier_supports_entitlement(dut): 471 """Verify if carrier supports entitlement 472 473 Args: 474 dut: Android device 475 476 Return: 477 True if carrier supports entitlement, otherwise false 478 """ 479 480 carriers_supports_entitlement = [ 481 tel_defines.CARRIER_VZW, 482 tel_defines.CARRIER_ATT, 483 tel_defines.CARRIER_TMO, 484 tel_defines.CARRIER_SBM] 485 operator = get_operator_name("log", dut) 486 return operator in carriers_supports_entitlement 487 488 489def set_cap_net_raw_capability(): 490 """Set the CAP_NET_RAW capability 491 492 To send the Scapy packets, we need to get the CAP_NET_RAW capability first. 493 """ 494 cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))" 495 utils.exe_cmd(cap_net_raw) 496 cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))" 497 utils.exe_cmd(cap_python) 498 499 500def supports_ipv6_tethering(self, dut): 501 """Check if provider supports IPv6 tethering. 502 503 Args: 504 dut: Android device that is being checked 505 506 Returns: 507 True: if provider supports IPv6 tethering 508 False: if not 509 """ 510 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 511 operator = get_operator_name(self.log, dut) 512 return operator in carrier_supports_tethering 513 514 515def start_usb_tethering(ad): 516 """Start USB tethering using #startTethering API. 517 518 Enable USB tethering by #startTethering API will break the RPC session, 519 Make sure you call #ad.recreate_services after call this API - b/149116235 520 521 Args: 522 ad: AndroidDevice object 523 """ 524 ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False) 525 526 527def stop_usb_tethering(ad): 528 """Stop USB tethering. 529 530 Args: 531 ad: android device object 532 """ 533 ad.log.info("Stopping USB Tethering") 534 ad.stop_services() 535 ad.adb.shell(USB_CHARGE_MODE) 536 ad.adb.wait_for_device() 537 ad.start_services() 538 539 540def wait_for_new_iface(old_ifaces): 541 """Wait for the new interface to come up. 542 543 Args: 544 old_ifaces: list of old interfaces 545 """ 546 old_set = set(old_ifaces) 547 # Try 10 times to find a new interface with a 1s sleep every time 548 # (equivalent to a 9s timeout) 549 for _ in range(0, 10): 550 new_ifaces = set(get_if_list()) - old_set 551 asserts.assert_true(len(new_ifaces) < 2, 552 "Too many new interfaces after turning on " 553 "tethering") 554 if len(new_ifaces) == 1: 555 # enable the new iface before return 556 new_iface = new_ifaces.pop() 557 enable_iface(new_iface) 558 return new_iface 559 time.sleep(1) 560 asserts.fail("Timeout waiting for tethering interface on host") 561 562 563def get_if_list(): 564 """Returns a list containing all network interfaces. 565 566 The newest version of Scapy.get_if_list() returns the cached interfaces, 567 which might be out-dated, and unable to perceive the interface changes. 568 Use this method when need to monitoring the network interfaces changes. 569 Reference: https://github.com/secdev/scapy/pull/2707 570 571 Returns: 572 A list of the latest network interfaces. For example: 573 ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88'] 574 """ 575 576 # Get ifconfig output 577 result = job.run([conf.prog.ifconfig]) 578 if result.exit_status: 579 raise asserts.fail( 580 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 581 582 interfaces = [ 583 line[:line.find(':')] for line in plain_str(result.stdout).splitlines() 584 if ": flags" in line.lower() 585 ] 586 return interfaces 587 588 589def enable_hardware_offload(ad): 590 """Enable hardware offload using adb shell command. 591 592 Args: 593 ad: Android device object 594 """ 595 ad.log.info("Enabling hardware offload.") 596 ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True) 597 ad.reboot() 598 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 599 600 601def disable_hardware_offload(ad): 602 """Disable hardware offload using adb shell command. 603 604 Args: 605 ad: Android device object 606 """ 607 ad.log.info("Disabling hardware offload.") 608 ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True) 609 ad.reboot() 610 time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT) 611 612 613def enable_iface(iface): 614 """Enable network interfaces. 615 616 Some network interface might disabled as default, need to enable before 617 using it. 618 619 Args: 620 iface: network interface that need to enable 621 """ 622 result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True) 623 if result.exit_status: 624 raise asserts.fail( 625 "Failed to execute ifconfig: {}".format(plain_str(result.stderr))) 626