1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17import os 18import re 19import time 20import urllib.request 21 22from acts import asserts 23from acts import signals 24from acts import utils 25from acts.controllers import adb 26from acts.controllers.adb_lib.error import AdbError 27from acts.utils import start_standing_subprocess 28from acts.utils import stop_standing_subprocess 29from acts_contrib.test_utils.net import connectivity_const as cconst 30from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 31from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name 32from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 33from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 34from scapy.all import get_if_list 35 36 37VPN_CONST = cconst.VpnProfile 38VPN_TYPE = cconst.VpnProfileType 39VPN_PARAMS = cconst.VpnReqParams 40TCPDUMP_PATH = "/data/local/tmp/" 41USB_CHARGE_MODE = "svc usb setFunctions" 42USB_TETHERING_MODE = "svc usb setFunctions rndis" 43DEVICE_IP_ADDRESS = "ip address" 44LOCALHOST = "192.168.1.1" 45 46GCE_SSH = "gcloud compute ssh " 47GCE_SCP = "gcloud compute scp " 48 49 50def verify_lte_data_and_tethering_supported(ad): 51 """Verify if LTE data is enabled and tethering supported.""" 52 wutils.wifi_toggle_state(ad, False) 53 ad.droid.telephonyToggleDataConnection(True) 54 wait_for_cell_data_connection(ad.log, ad, True) 55 asserts.assert_true( 56 verify_http_connection(ad.log, ad), 57 "HTTP verification failed on cell data connection") 58 asserts.assert_true( 59 ad.droid.connectivityIsTetheringSupported(), 60 "Tethering is not supported for the provider") 61 wutils.wifi_toggle_state(ad, True) 62 63 64def set_chrome_browser_permissions(ad): 65 """Set chrome browser start with no-first-run verification. 66 67 Give permission to read from and write to storage 68 69 Args: 70 ad: android device object 71 """ 72 commands = ["pm grant com.android.chrome " 73 "android.permission.READ_EXTERNAL_STORAGE", 74 "pm grant com.android.chrome " 75 "android.permission.WRITE_EXTERNAL_STORAGE", 76 "rm /data/local/chrome-command-line", 77 "am set-debug-app --persistent com.android.chrome", 78 'echo "chrome --no-default-browser-check --no-first-run ' 79 '--disable-fre" > /data/local/tmp/chrome-command-line'] 80 for cmd in commands: 81 try: 82 ad.adb.shell(cmd) 83 except AdbError: 84 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 85 86 87def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 88 """Verify if IP behind VPN server is pingable. 89 90 Ping should pass, if VPN is connected. 91 Ping should fail, if VPN is disconnected. 92 93 Args: 94 ad: android device object 95 vpn_ping_addr: target ping addr 96 """ 97 ping_result = None 98 pkt_loss = "100% packet loss" 99 logging.info("Pinging: %s" % vpn_ping_addr) 100 try: 101 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 102 except AdbError: 103 pass 104 return ping_result and pkt_loss not in ping_result 105 106 107def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 108 """Test logic for each legacy VPN connection. 109 110 Steps: 111 1. Generate profile for the VPN type 112 2. Establish connection to the server 113 3. Verify that connection is established using LegacyVpnInfo 114 4. Verify the connection by pinging the IP behind VPN 115 5. Stop the VPN connection 116 6. Check the connection status 117 7. Verify that ping to IP behind VPN fails 118 119 Args: 120 ad: Android device object 121 vpn_profile: object contains attribute for create vpn profile 122 vpn_ping_addr: addr to verify vpn connection 123 """ 124 # Wait for sometime so that VPN server flushes all interfaces and 125 # connections after graceful termination 126 time.sleep(10) 127 128 ad.adb.shell("ip xfrm state flush") 129 ad.log.info("Connecting to: %s", vpn_profile) 130 ad.droid.vpnStartLegacyVpn(vpn_profile) 131 time.sleep(cconst.VPN_TIMEOUT) 132 133 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 134 asserts.assert_equal(connected_vpn_info["state"], 135 cconst.VPN_STATE_CONNECTED, 136 "Unable to establish VPN connection for %s" 137 % vpn_profile) 138 139 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 140 ip_xfrm_state = ad.adb.shell("ip xfrm state") 141 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 142 if match_obj: 143 ip_xfrm_state = format(match_obj.group(0)).split() 144 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 145 146 ad.droid.vpnStopLegacyVpn() 147 asserts.assert_true(ping_result, 148 "Ping to the internal IP failed. " 149 "Expected to pass as VPN is connected") 150 151 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 152 asserts.assert_true(not connected_vpn_info, 153 "Unable to terminate VPN connection for %s" 154 % vpn_profile) 155 156 157def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 158 ipsec_server_type, log_path): 159 """Download the certificates from VPN server and push to sdcard of DUT. 160 161 Args: 162 ad: android device object 163 vpn_params: vpn params from config file 164 vpn_type: 1 of the 6 VPN types 165 vpn_server_addr: server addr to connect to 166 ipsec_server_type: ipsec version - strongswan or openswan 167 log_path: log path to download cert 168 169 Returns: 170 Client cert file name on DUT's sdcard 171 """ 172 url = "http://%s%s%s" % (vpn_server_addr, 173 vpn_params['cert_path_vpnserver'], 174 vpn_params['client_pkcs_file_name']) 175 logging.info("URL is: %s" % url) 176 if vpn_server_addr == LOCALHOST: 177 ad.droid.httpDownloadFile(url, "/sdcard/") 178 return vpn_params['client_pkcs_file_name'] 179 180 local_cert_name = "%s_%s_%s" % (vpn_type.name, 181 ipsec_server_type, 182 vpn_params['client_pkcs_file_name']) 183 local_file_path = os.path.join(log_path, local_cert_name) 184 try: 185 ret = urllib.request.urlopen(url) 186 with open(local_file_path, "wb") as f: 187 f.write(ret.read()) 188 except Exception: 189 asserts.fail("Unable to download certificate from the server") 190 191 ad.adb.push("%s sdcard/" % local_file_path) 192 return local_cert_name 193 194 195def generate_legacy_vpn_profile(ad, 196 vpn_params, 197 vpn_type, 198 vpn_server_addr, 199 ipsec_server_type, 200 log_path): 201 """Generate legacy VPN profile for a VPN. 202 203 Args: 204 ad: android device object 205 vpn_params: vpn params from config file 206 vpn_type: 1 of the 6 VPN types 207 vpn_server_addr: server addr to connect to 208 ipsec_server_type: ipsec version - strongswan or openswan 209 log_path: log path to download cert 210 211 Returns: 212 Vpn profile 213 """ 214 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 215 VPN_CONST.PWD: vpn_params['vpn_password'], 216 VPN_CONST.TYPE: vpn_type.value, 217 VPN_CONST.SERVER: vpn_server_addr, } 218 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 219 ipsec_server_type) 220 if vpn_type.name == "PPTP": 221 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 222 223 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 224 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 225 226 if vpn_type.name in psk_set: 227 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 228 elif vpn_type.name in rsa_set: 229 cert_name = download_load_certs(ad, 230 vpn_params, 231 vpn_type, 232 vpn_server_addr, 233 ipsec_server_type, 234 log_path) 235 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 236 ad.droid.installCertificate(vpn_profile, cert_name, 237 vpn_params['cert_password']) 238 else: 239 vpn_profile[VPN_CONST.MPPE] = "mppe" 240 241 return vpn_profile 242 243 244def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path): 245 """Generate VPN profile for IKEv2 VPN. 246 247 Args: 248 ad: android device object. 249 vpn_params: vpn params from config file. 250 vpn_type: ikev2 vpn type. 251 server_addr: vpn server addr. 252 log_path: log path to download cert. 253 254 Returns: 255 Vpn profile. 256 """ 257 vpn_profile = { 258 VPN_CONST.TYPE: vpn_type.value, 259 VPN_CONST.SERVER: server_addr, 260 } 261 262 if vpn_type.name == "IKEV2_IPSEC_USER_PASS": 263 vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"] 264 vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"] 265 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 266 cert_name = download_load_certs( 267 ad, vpn_params, vpn_type, vpn_params["server_addr"], 268 "IKEV2_IPSEC_USER_PASS", log_path) 269 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 270 ad.droid.installCertificate( 271 vpn_profile, cert_name, vpn_params['cert_password']) 272 elif vpn_type.name == "IKEV2_IPSEC_PSK": 273 vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"] 274 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"] 275 else: 276 vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % ( 277 vpn_params["vpn_identity"], server_addr) 278 logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr)) 279 cert_name = download_load_certs( 280 ad, vpn_params, vpn_type, vpn_params["server_addr"], 281 "IKEV2_IPSEC_RSA", log_path) 282 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 283 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 284 ad.droid.installCertificate( 285 vpn_profile, cert_name, vpn_params['cert_password']) 286 287 return vpn_profile 288 289 290def start_tcpdump(ad, test_name): 291 """Start tcpdump on all interfaces. 292 293 Args: 294 ad: android device object. 295 test_name: tcpdump file name will have this 296 """ 297 ad.log.info("Starting tcpdump on all interfaces") 298 ad.adb.shell("killall -9 tcpdump", ignore_status=True) 299 ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True) 300 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 301 302 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 303 ad.log.info("tcpdump file is %s", file_name) 304 cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial, 305 file_name) 306 try: 307 return start_standing_subprocess(cmd, 5) 308 except Exception: 309 ad.log.exception('Could not start standing process %s' % repr(cmd)) 310 311 return None 312 313 314def stop_tcpdump(ad, 315 proc, 316 test_name, 317 pull_dump=True, 318 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 319 """Stops tcpdump on any iface. 320 321 Pulls the tcpdump file in the tcpdump dir if necessary. 322 323 Args: 324 ad: android device object. 325 proc: need to know which pid to stop 326 test_name: test name to save the tcpdump file 327 pull_dump: pull tcpdump file or not 328 adb_pull_timeout: timeout for adb_pull 329 330 Returns: 331 log_path of the tcpdump file 332 """ 333 ad.log.info("Stopping and pulling tcpdump if any") 334 if proc is None: 335 return None 336 try: 337 stop_standing_subprocess(proc) 338 except Exception as e: 339 ad.log.warning(e) 340 if pull_dump: 341 log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial) 342 os.makedirs(log_path, exist_ok=True) 343 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), 344 timeout=adb_pull_timeout) 345 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 346 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 347 return "%s/%s" % (log_path, file_name) 348 return None 349 350 351def start_tcpdump_gce_server(ad, test_name, dest_port, gce): 352 """Start tcpdump on gce server. 353 354 Args: 355 ad: android device object 356 test_name: test case name 357 dest_port: port to collect tcpdump 358 gce: dictionary of gce instance 359 360 Returns: 361 process id and pcap file path from gce server 362 """ 363 ad.log.info("Starting tcpdump on gce server") 364 365 # pcap file name 366 fname = "/tmp/%s_%s_%s_%s" % \ 367 (test_name, ad.model, ad.serial, 368 time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time()))) 369 370 # start tcpdump 371 tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \ 372 %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname) 373 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 374 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 375 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 376 utils.exe_cmd(gce_ssh_cmd) 377 378 # get process id 379 ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname) 380 tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split() 381 if not tcpdump_pid: 382 raise signals.TestFailure("Failed to start tcpdump on gce server") 383 return tcpdump_pid[1], fname 384 385 386def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce): 387 """Stop and pull tcpdump file from gce server. 388 389 Args: 390 ad: android device object 391 tcpdump_pid: process id for tcpdump file 392 fname: tcpdump file path 393 gce: dictionary of gce instance 394 395 Returns: 396 pcap file from gce server 397 """ 398 ad.log.info("Stop and pull pcap file from gce server") 399 400 # stop tcpdump 401 tcpdump_cmd = "sudo kill %s" % tcpdump_pid 402 gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \ 403 (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 404 gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd) 405 utils.exe_cmd(gce_ssh_cmd) 406 407 # verify tcpdump is stopped 408 ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd 409 res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore") 410 if tcpdump_pid in res.split(): 411 raise signals.TestFailure("Failed to stop tcpdump on gce server") 412 if not fname: 413 return None 414 415 # pull pcap file 416 gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \ 417 (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"]) 418 pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path) 419 utils.exe_cmd(pull_file) 420 if not os.path.exists( 421 "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])): 422 raise signals.TestFailure("Failed to pull tcpdump from gce server") 423 424 # delete pcaps 425 utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname)) 426 427 # return pcap file 428 pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1]) 429 return pcap_file 430 431 432def is_ipaddress_ipv6(ip_address): 433 """Verify if the given string is a valid IPv6 address. 434 435 Args: 436 ip_address: string containing the IP address 437 438 Returns: 439 True: if valid ipv6 address 440 False: if not 441 """ 442 try: 443 socket.inet_pton(socket.AF_INET6, ip_address) 444 return True 445 except socket.error: 446 return False 447 448 449def carrier_supports_ipv6(dut): 450 """Verify if carrier supports ipv6. 451 452 Args: 453 dut: Android device that is being checked 454 455 Returns: 456 True: if carrier supports ipv6 457 False: if not 458 """ 459 460 carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 461 operator = get_operator_name("log", dut) 462 return operator in carrier_supports_ipv6 463 464 465def supports_ipv6_tethering(self, dut): 466 """Check if provider supports IPv6 tethering. 467 468 Args: 469 dut: Android device that is being checked 470 471 Returns: 472 True: if provider supports IPv6 tethering 473 False: if not 474 """ 475 carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"] 476 operator = get_operator_name(self.log, dut) 477 return operator in carrier_supports_tethering 478 479 480def start_usb_tethering(ad): 481 """Start USB tethering. 482 483 Args: 484 ad: android device object 485 """ 486 # TODO: test USB tethering by #startTethering API - b/149116235 487 ad.log.info("Starting USB Tethering") 488 ad.stop_services() 489 ad.adb.shell(USB_TETHERING_MODE, ignore_status=True) 490 ad.adb.wait_for_device() 491 ad.start_services() 492 if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS): 493 raise signals.TestFailure("Unable to enable USB tethering.") 494 495 496def stop_usb_tethering(ad): 497 """Stop USB tethering. 498 499 Args: 500 ad: android device object 501 """ 502 ad.log.info("Stopping USB Tethering") 503 ad.stop_services() 504 ad.adb.shell(USB_CHARGE_MODE) 505 ad.adb.wait_for_device() 506 ad.start_services() 507 508 509def wait_for_new_iface(old_ifaces): 510 """Wait for the new interface to come up. 511 512 Args: 513 old_ifaces: list of old interfaces 514 """ 515 old_set = set(old_ifaces) 516 # Try 10 times to find a new interface with a 1s sleep every time 517 # (equivalent to a 9s timeout) 518 for _ in range(0, 10): 519 new_ifaces = set(get_if_list()) - old_set 520 asserts.assert_true(len(new_ifaces) < 2, 521 "Too many new interfaces after turning on " 522 "tethering") 523 if len(new_ifaces) == 1: 524 return new_ifaces.pop() 525 time.sleep(1) 526 asserts.fail("Timeout waiting for tethering interface on host") 527