1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18 19from acts.controllers import adb 20from acts import asserts 21from acts import signals 22from acts import utils 23from acts.controllers.adb import AdbError 24from acts.utils import start_standing_subprocess 25from acts.utils import stop_standing_subprocess 26from acts.test_utils.net import connectivity_const as cconst 27from acts.test_utils.tel.tel_test_utils import get_operator_name 28from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 29from acts.test_utils.tel.tel_test_utils import verify_http_connection 30from acts.test_utils.wifi import wifi_test_utils as wutils 31from scapy.all import get_if_list 32 33import os 34import re 35import time 36import urllib.request 37 38VPN_CONST = cconst.VpnProfile 39VPN_TYPE = cconst.VpnProfileType 40VPN_PARAMS = cconst.VpnReqParams 41TCPDUMP_PATH = "/data/local/tmp/" 42USB_CHARGE_MODE = "svc usb setFunctions" 43USB_TETHERING_MODE = "svc usb setFunctions rndis" 44DEVICE_IP_ADDRESS = "ip address" 45 46GCE_SSH = "gcloud compute ssh " 47GCE_SCP = "gcloud compute scp " 48 49 50def verify_lte_data_and_tethering_supported(ad): 51 """Verify if LTE data is enabled and tethering supported""" 52 wutils.wifi_toggle_state(ad, False) 53 ad.droid.telephonyToggleDataConnection(True) 54 wait_for_cell_data_connection(ad.log, ad, True) 55 asserts.assert_true( 56 verify_http_connection(ad.log, ad), 57 "HTTP verification failed on cell data connection") 58 asserts.assert_true( 59 ad.droid.connectivityIsTetheringSupported(), 60 "Tethering is not supported for the provider") 61 wutils.wifi_toggle_state(ad, True) 62 63 64def set_chrome_browser_permissions(ad): 65 """Set chrome browser start with no-first-run verification. 66 Give permission to read from and write to storage 67 """ 68 commands = ["pm grant com.android.chrome " 69 "android.permission.READ_EXTERNAL_STORAGE", 70 "pm grant com.android.chrome " 71 "android.permission.WRITE_EXTERNAL_STORAGE", 72 "rm /data/local/chrome-command-line", 73 "am set-debug-app --persistent com.android.chrome", 74 'echo "chrome --no-default-browser-check --no-first-run ' 75 '--disable-fre" > /data/local/tmp/chrome-command-line'] 76 for cmd in commands: 77 try: 78 ad.adb.shell(cmd) 79 except AdbError: 80 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 81 82 83def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 84 """ Verify if IP behind VPN server is pingable. 85 Ping should pass, if VPN is connected. 86 Ping should fail, if VPN is disconnected. 87 88 Args: 89 ad: android device object 90 """ 91 ping_result = None 92 pkt_loss = "100% packet loss" 93 logging.info("Pinging: %s" % vpn_ping_addr) 94 try: 95 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 96 except AdbError: 97 pass 98 return ping_result and pkt_loss not in ping_result 99 100 101def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 102 """ Test logic for each legacy VPN connection 103 104 Steps: 105 1. Generate profile for the VPN type 106 2. Establish connection to the server 107 3. Verify that connection is established using LegacyVpnInfo 108 4. Verify the connection by pinging the IP behind VPN 109 5. Stop the VPN connection 110 6. Check the connection status 111 7. Verify that ping to IP behind VPN fails 112 113 Args: 114 1. ad: Android device object 115 2. VpnProfileType (1 of the 6 types supported by Android) 116 """ 117 # Wait for sometime so that VPN server flushes all interfaces and 118 # connections after graceful termination 119 time.sleep(10) 120 121 ad.adb.shell("ip xfrm state flush") 122 ad.log.info("Connecting to: %s", vpn_profile) 123 ad.droid.vpnStartLegacyVpn(vpn_profile) 124 time.sleep(cconst.VPN_TIMEOUT) 125 126 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 127 asserts.assert_equal(connected_vpn_info["state"], 128 cconst.VPN_STATE_CONNECTED, 129 "Unable to establish VPN connection for %s" 130 % vpn_profile) 131 132 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 133 ip_xfrm_state = ad.adb.shell("ip xfrm state") 134 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 135 if match_obj: 136 ip_xfrm_state = format(match_obj.group(0)).split() 137 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 138 139 ad.droid.vpnStopLegacyVpn() 140 asserts.assert_true(ping_result, 141 "Ping to the internal IP failed. " 142 "Expected to pass as VPN is connected") 143 144 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 145 asserts.assert_true(not connected_vpn_info, 146 "Unable to terminate VPN connection for %s" 147 % vpn_profile) 148 149 150def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 151 ipsec_server_type, log_path): 152 """ Download the certificates from VPN server and push to sdcard of DUT 153 154 Args: 155 ad: android device object 156 vpn_params: vpn params from config file 157 vpn_type: 1 of the 6 VPN types 158 vpn_server_addr: server addr to connect to 159 ipsec_server_type: ipsec version - strongswan or openswan 160 log_path: log path to download cert 161 162 Returns: 163 Client cert file name on DUT's sdcard 164 """ 165 url = "http://%s%s%s" % (vpn_server_addr, 166 vpn_params['cert_path_vpnserver'], 167 vpn_params['client_pkcs_file_name']) 168 local_cert_name = "%s_%s_%s" % (vpn_type.name, 169 ipsec_server_type, 170 vpn_params['client_pkcs_file_name']) 171 172 local_file_path = os.path.join(log_path, local_cert_name) 173 logging.info("URL is: %s" % url) 174 try: 175 ret = urllib.request.urlopen(url) 176 with open(local_file_path, "wb") as f: 177 f.write(ret.read()) 178 except Exception: 179 asserts.fail("Unable to download certificate from the server") 180 181 ad.adb.push("%s sdcard/" % local_file_path) 182 return local_cert_name 183 184 185def generate_legacy_vpn_profile(ad, 186 vpn_params, 187 vpn_type, 188 vpn_server_addr, 189 ipsec_server_type, 190 log_path): 191 """ Generate legacy VPN profile for a VPN 192 193 Args: 194 ad: android device object 195 vpn_params: vpn params from config file 196 vpn_type: 1 of the 6 VPN types 197 vpn_server_addr: server addr to connect to 198 ipsec_server_type: ipsec version - strongswan or openswan 199 log_path: log path to download cert 200 201 Returns: 202 Vpn profile 203 """ 204 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 205 VPN_CONST.PWD: vpn_params['vpn_password'], 206 VPN_CONST.TYPE: vpn_type.value, 207 VPN_CONST.SERVER: vpn_server_addr, } 208 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 209 ipsec_server_type) 210 if vpn_type.name == "PPTP": 211 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 212 213 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 214 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 215 216 if vpn_type.name in psk_set: 217 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 218 elif vpn_type.name in rsa_set: 219 cert_name = download_load_certs(ad, 220 vpn_params, 221 vpn_type, 222 vpn_server_addr, 223 ipsec_server_type, 224 log_path) 225 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 226 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 227 ad.droid.installCertificate(vpn_profile, cert_name, 228 vpn_params['cert_password']) 229 else: 230 vpn_profile[VPN_CONST.MPPE] = "mppe" 231 232 return vpn_profile 233 234 235def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path): 236 """Generate VPN profile for IKEv2 VPN. 237 238 Args: 239 ad: android device object. 240 vpn_params: vpn params from config file. 241 vpn_type: ikev2 vpn type. 242 server_addr: vpn server addr. 243 log_path: log path to download cert. 244 245 Returns: 246 Vpn profile. 247 """ 248 vpn_profile = { 249 VPN_CONST.TYPE: vpn_type.value, 250 VPN_CONST.SERVER: server_addr, 251 } 252 253 if vpn_type.name == "IKEV2_IPSEC_USER_PASS": 254 vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"] 255 vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"] 256 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 257 cert_name = download_load_certs( 258 ad, vpn_params, vpn_type, server_addr, "IKEV2_IPSEC_USER_PASS", 259 log_path) 260 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 261 ad.droid.installCertificate( 262 vpn_profile, cert_name, vpn_params['cert_password']) 263 elif vpn_type.name == "IKEV2_IPSEC_PSK": 264 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 265 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"] 266 else: 267 vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % ( 268 vpn_params["vpn_identity"], vpn_params["server_addr"]) 269 cert_name = download_load_certs( 270 ad, vpn_params, vpn_type, server_addr, "IKEV2_IPSEC_RSA", log_path) 271 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 272 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 273 ad.droid.installCertificate( 274 vpn_profile, cert_name, vpn_params['cert_password']) 275 276 return vpn_profile 277 278 279def start_tcpdump(ad, test_name): 280 """Start tcpdump on all interfaces 281 282 Args: 283 ad: android device object. 284 test_name: tcpdump file name will have this 285 """ 286 ad.log.info("Starting tcpdump on all interfaces") 287 ad.adb.shell("killall -9 tcpdump", ignore_status=True) 288 ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True) 289 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 290 291 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 292 ad.log.info("tcpdump file is %s", file_name) 293 cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial, 294 file_name) 295 try: 296 return start_standing_subprocess(cmd, 5) 297 except Exception: 298 ad.log.exception('Could not start standing process %s' % repr(cmd)) 299 300 return None 301 302def stop_tcpdump(ad, 303 proc, 304 test_name, 305 pull_dump=True, 306 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 307 """Stops tcpdump on any iface 308 Pulls the tcpdump file in the tcpdump dir if necessary 309 310 Args: 311 ad: android device object. 312 proc: need to know which pid to stop 313 test_name: test name to save the tcpdump file 314 pull_dump: pull tcpdump file or not 315 adb_pull_timeout: timeout for adb_pull 316 317 Returns: 318 log_path of the tcpdump file 319 """ 320 ad.log.info("Stopping and pulling tcpdump if any") 321 if proc is None: 322 return None 323 try: 324 stop_standing_subprocess(proc) 325 except Exception as e: 326 ad.log.warning(e) 327 if pull_dump: 328 log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial) 329 os.makedirs(log_path, exist_ok=True) 330 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), 331 timeout=adb_pull_timeout) 332 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 333 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 334 return "%s/%s" % (log_path, file_name) 335 return None 336 337def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 338 """ Start tcpdump on gce server 339 340 Args: 341 ad: android device object 342 test_name: test case name 343 dest_port: port to collect tcpdump 344 gce: dictionary of gce instance 345 346 Returns: 347 process id and pcap file path from gce server 348 """ 349 ad.log.info("Starting tcpdump on gce server") 350 351 # pcap file name 352 fname = "/tmp/%s_%s_%s_%s" % \ 353 (test_name, ad.model, ad.serial, 354 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 355 356 # start tcpdump 357 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 358 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 359 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 360 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 361 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 362 utils.exe_cmd(gce_ssh_cmd) 363 364 # get process id 365 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 366 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 367 if not tcpdump_pid: 368 raise signals.TestFailure("Failed to start tcpdump on gce server") 369 return tcpdump_pid[1], fname 370 371def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 372 """ Stop and pull tcpdump file from gce server 373 374 Args: 375 ad: android device object 376 tcpdump_pid: process id for tcpdump file 377 fname: tcpdump file path 378 gce: dictionary of gce instance 379 380 Returns: 381 pcap file from gce server 382 """ 383 ad.log.info("Stop and pull pcap file from gce server") 384 385 # stop tcpdump 386 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 387 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 388 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 389 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 390 utils.exe_cmd(gce_ssh_cmd) 391 392 # verify tcpdump is stopped 393 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 394 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 395 if tcpdump_pid in res.split(): 396 raise signals.TestFailure("Failed to stop tcpdump on gce server") 397 if not fname: 398 return None 399 400 # pull pcap file 401 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 402 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 403 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 404 utils.exe_cmd(pull_file) 405 if not os.path.exists( 406 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 407 raise signals.TestFailure("Failed to pull tcpdump from gce server") 408 409 # delete pcaps 410 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 411 412 # return pcap file 413 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 414 return pcap_file 415 416def is_ipaddress_ipv6(ip_address): 417 """Verify if the given string is a valid IPv6 address. 418 419 Args: 420 ip_address: string containing the IP address 421 422 Returns: 423 True: if valid ipv6 address 424 False: if not 425 """ 426 try: 427 socket.inet_pton(socket.AF_INET6, ip_address) 428 return True 429 except socket.error: 430 return False 431 432def carrier_supports_ipv6(dut): 433 """Verify if carrier supports ipv6. 434 435 Args: 436 dut: Android device that is being checked 437 438 Returns: 439 True: if carrier supports ipv6 440 False: if not 441 """ 442 443 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 444 operator = get_operator_name("log", dut) 445 return operator in carrier_supports_ipv6 446 447def supports_ipv6_tethering(self, dut): 448 """ Check if provider supports IPv6 tethering. 449 450 Returns: 451 True: if provider supports IPv6 tethering 452 False: if not 453 """ 454 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 455 operator = get_operator_name(self.log, dut) 456 return operator in carrier_supports_tethering 457 458 459def start_usb_tethering(ad): 460 """Start USB tethering. 461 462 Args: 463 ad: android device object 464 """ 465 # TODO: test USB tethering by #startTethering API - b/149116235 466 ad.log.info("Starting USB Tethering") 467 ad.stop_services() 468 ad.adb.shell(USB_TETHERING_MODE, ignore_status=True) 469 ad.adb.wait_for_device() 470 ad.start_services() 471 if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS): 472 raise signals.TestFailure("Unable to enable USB tethering.") 473 474 475def stop_usb_tethering(ad): 476 """Stop USB tethering. 477 478 Args: 479 ad: android device object 480 """ 481 ad.log.info("Stopping USB Tethering") 482 ad.stop_services() 483 ad.adb.shell(USB_CHARGE_MODE) 484 ad.adb.wait_for_device() 485 ad.start_services() 486 487 488def wait_for_new_iface(old_ifaces): 489 """Wait for the new interface to come up. 490 491 Args: 492 old_ifaces: list of old interfaces 493 """ 494 old_set = set(old_ifaces) 495 # Try 10 times to find a new interface with a 1s sleep every time 496 # (equivalent to a 9s timeout) 497 for i in range(0, 10): 498 new_ifaces = set(get_if_list()) - old_set 499 asserts.assert_true(len(new_ifaces) < 2, 500 "Too many new interfaces after turning on " 501 "tethering") 502 if len(new_ifaces) == 1: 503 return new_ifaces.pop() 504 time.sleep(1) 505 asserts.fail("Timeout waiting for tethering interface on host") 506