1#!/usr/bin/env python3 2# 3# Copyright 2018 Google, Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import logging 17 18from acts.controllers import adb 19from acts import asserts 20from acts import utils 21from acts.controllers.adb import AdbError 22from acts.logger import epoch_to_log_line_timestamp 23from acts.utils import get_current_epoch_time 24from acts.logger import normalize_log_line_timestamp 25from acts.utils import start_standing_subprocess 26from acts.utils import stop_standing_subprocess 27from acts.test_utils.net import connectivity_const as cconst 28from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection 29from acts.test_utils.tel.tel_test_utils import verify_http_connection 30from acts.test_utils.wifi import wifi_test_utils as wutils 31 32import os 33import re 34import time 35import urllib.request 36 37VPN_CONST = cconst.VpnProfile 38VPN_TYPE = cconst.VpnProfileType 39VPN_PARAMS = cconst.VpnReqParams 40TCPDUMP_PATH = "/data/local/tmp/tcpdump" 41 42 43def verify_lte_data_and_tethering_supported(ad): 44 """Verify if LTE data is enabled and tethering supported""" 45 wutils.wifi_toggle_state(ad, False) 46 ad.droid.telephonyToggleDataConnection(True) 47 wait_for_cell_data_connection(ad.log, ad, True) 48 asserts.assert_true( 49 verify_http_connection(ad.log, ad), 50 "HTTP verification failed on cell data connection") 51 asserts.assert_true( 52 ad.droid.connectivityIsTetheringSupported(), 53 "Tethering is not supported for the provider") 54 wutils.wifi_toggle_state(ad, True) 55 56 57def set_chrome_browser_permissions(ad): 58 """Set chrome browser start with no-first-run verification. 59 Give permission to read from and write to storage 60 """ 61 commands = ["pm grant com.android.chrome " 62 "android.permission.READ_EXTERNAL_STORAGE", 63 "pm grant com.android.chrome " 64 "android.permission.WRITE_EXTERNAL_STORAGE", 65 "rm /data/local/chrome-command-line", 66 "am set-debug-app --persistent com.android.chrome", 67 'echo "chrome --no-default-browser-check --no-first-run ' 68 '--disable-fre" > /data/local/tmp/chrome-command-line'] 69 for cmd in commands: 70 try: 71 ad.adb.shell(cmd) 72 except AdbError: 73 logging.warning("adb command %s failed on %s" % (cmd, ad.serial)) 74 75 76def verify_ping_to_vpn_ip(ad, vpn_ping_addr): 77 """ Verify if IP behind VPN server is pingable. 78 Ping should pass, if VPN is connected. 79 Ping should fail, if VPN is disconnected. 80 81 Args: 82 ad: android device object 83 """ 84 ping_result = None 85 pkt_loss = "100% packet loss" 86 try: 87 ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr) 88 except AdbError: 89 pass 90 return ping_result and pkt_loss not in ping_result 91 92 93def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr): 94 """ Test logic for each legacy VPN connection 95 96 Steps: 97 1. Generate profile for the VPN type 98 2. Establish connection to the server 99 3. Verify that connection is established using LegacyVpnInfo 100 4. Verify the connection by pinging the IP behind VPN 101 5. Stop the VPN connection 102 6. Check the connection status 103 7. Verify that ping to IP behind VPN fails 104 105 Args: 106 1. ad: Android device object 107 2. VpnProfileType (1 of the 6 types supported by Android) 108 """ 109 # Wait for sometime so that VPN server flushes all interfaces and 110 # connections after graceful termination 111 time.sleep(10) 112 113 ad.adb.shell("ip xfrm state flush") 114 ad.log.info("Connecting to: %s", vpn_profile) 115 ad.droid.vpnStartLegacyVpn(vpn_profile) 116 time.sleep(cconst.VPN_TIMEOUT) 117 118 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 119 asserts.assert_equal(connected_vpn_info["state"], 120 cconst.VPN_STATE_CONNECTED, 121 "Unable to establish VPN connection for %s" 122 % vpn_profile) 123 124 ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr) 125 ip_xfrm_state = ad.adb.shell("ip xfrm state") 126 match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) 127 if match_obj: 128 ip_xfrm_state = format(match_obj.group(0)).split() 129 ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) 130 131 ad.droid.vpnStopLegacyVpn() 132 asserts.assert_true(ping_result, 133 "Ping to the internal IP failed. " 134 "Expected to pass as VPN is connected") 135 136 connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() 137 asserts.assert_true(not connected_vpn_info, 138 "Unable to terminate VPN connection for %s" 139 % vpn_profile) 140 141 142def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, 143 ipsec_server_type, log_path): 144 """ Download the certificates from VPN server and push to sdcard of DUT 145 146 Args: 147 ad: android device object 148 vpn_params: vpn params from config file 149 vpn_type: 1 of the 6 VPN types 150 vpn_server_addr: server addr to connect to 151 ipsec_server_type: ipsec version - strongswan or openswan 152 log_path: log path to download cert 153 154 Returns: 155 Client cert file name on DUT's sdcard 156 """ 157 url = "http://%s%s%s" % (vpn_server_addr, 158 vpn_params['cert_path_vpnserver'], 159 vpn_params['client_pkcs_file_name']) 160 local_cert_name = "%s_%s_%s" % (vpn_type.name, 161 ipsec_server_type, 162 vpn_params['client_pkcs_file_name']) 163 164 local_file_path = os.path.join(log_path, local_cert_name) 165 try: 166 ret = urllib.request.urlopen(url) 167 with open(local_file_path, "wb") as f: 168 f.write(ret.read()) 169 except Exception: 170 asserts.fail("Unable to download certificate from the server") 171 172 ad.adb.push("%s sdcard/" % local_file_path) 173 return local_cert_name 174 175 176def generate_legacy_vpn_profile(ad, 177 vpn_params, 178 vpn_type, 179 vpn_server_addr, 180 ipsec_server_type, 181 log_path): 182 """ Generate legacy VPN profile for a VPN 183 184 Args: 185 ad: android device object 186 vpn_params: vpn params from config file 187 vpn_type: 1 of the 6 VPN types 188 vpn_server_addr: server addr to connect to 189 ipsec_server_type: ipsec version - strongswan or openswan 190 log_path: log path to download cert 191 192 Returns: 193 Vpn profile 194 """ 195 vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], 196 VPN_CONST.PWD: vpn_params['vpn_password'], 197 VPN_CONST.TYPE: vpn_type.value, 198 VPN_CONST.SERVER: vpn_server_addr, } 199 vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, 200 ipsec_server_type) 201 if vpn_type.name == "PPTP": 202 vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name 203 204 psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) 205 rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) 206 207 if vpn_type.name in psk_set: 208 vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] 209 elif vpn_type.name in rsa_set: 210 cert_name = download_load_certs(ad, 211 vpn_params, 212 vpn_type, 213 vpn_server_addr, 214 ipsec_server_type, 215 log_path) 216 vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] 217 vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] 218 ad.droid.installCertificate(vpn_profile, cert_name, 219 vpn_params['cert_password']) 220 else: 221 vpn_profile[VPN_CONST.MPPE] = "mppe" 222 223 return vpn_profile 224 225 226def start_tcpdump(ad, test_name): 227 """Start tcpdump on all interfaces 228 229 Args: 230 ad: android device object. 231 test_name: tcpdump file name will have this 232 """ 233 ad.log.info("Starting tcpdump on all interfaces") 234 try: 235 ad.adb.shell("killall -9 tcpdump") 236 except AdbError: 237 ad.log.warn("Killing existing tcpdump processes failed") 238 out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH) 239 if "No such file" in out or not out: 240 ad.adb.shell("mkdir %s" % TCPDUMP_PATH) 241 else: 242 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 243 244 begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) 245 begin_time = normalize_log_line_timestamp(begin_time) 246 247 file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) 248 ad.log.info("tcpdump file is %s", file_name) 249 cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial, 250 file_name) 251 try: 252 return start_standing_subprocess(cmd, 5) 253 except Exception: 254 ad.log.exception('Could not start standing process %s' % repr(cmd)) 255 256 return None 257 258def stop_tcpdump(ad, 259 proc, 260 test_name, 261 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT): 262 """Stops tcpdump on any iface 263 Pulls the tcpdump file in the tcpdump dir 264 265 Args: 266 ad: android device object. 267 proc: need to know which pid to stop 268 test_name: test name to save the tcpdump file 269 adb_pull_timeout: timeout for adb_pull 270 271 Returns: 272 log_path of the tcpdump file 273 """ 274 ad.log.info("Stopping and pulling tcpdump if any") 275 if proc is None: 276 return None 277 try: 278 stop_standing_subprocess(proc) 279 except Exception as e: 280 ad.log.warning(e) 281 log_path = os.path.join(ad.log_path, test_name) 282 utils.create_dir(log_path) 283 ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path), timeout=adb_pull_timeout) 284 ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) 285 file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) 286 return "%s/%s" % (log_path, file_name) 287