• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18
19from acts.controllers import adb
20from acts import asserts
21from acts import signals
22from acts import utils
23from acts.controllers.adb import AdbError
24from acts.utils import start_standing_subprocess
25from acts.utils import stop_standing_subprocess
26from acts.test_utils.net import connectivity_const as cconst
27from acts.test_utils.tel.tel_test_utils import get_operator_name
28from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
29from acts.test_utils.tel.tel_test_utils import verify_http_connection
30from acts.test_utils.wifi import wifi_test_utils as wutils
31from scapy.all import get_if_list
32
33import os
34import re
35import time
36import urllib.request
37
38VPN_CONST = cconst.VpnProfile
39VPN_TYPE = cconst.VpnProfileType
40VPN_PARAMS = cconst.VpnReqParams
41TCPDUMP_PATH = "/data/local/tmp/"
42USB_CHARGE_MODE = "svc usb setFunctions"
43USB_TETHERING_MODE = "svc usb setFunctions rndis"
44DEVICE_IP_ADDRESS = "ip address"
45
46GCE_SSH = "gcloud compute ssh "
47GCE_SCP = "gcloud compute scp "
48
49
50def verify_lte_data_and_tethering_supported(ad):
51    """Verify if LTE data is enabled and tethering supported"""
52    wutils.wifi_toggle_state(ad, False)
53    ad.droid.telephonyToggleDataConnection(True)
54    wait_for_cell_data_connection(ad.log, ad, True)
55    asserts.assert_true(
56        verify_http_connection(ad.log, ad),
57        "HTTP verification failed on cell data connection")
58    asserts.assert_true(
59        ad.droid.connectivityIsTetheringSupported(),
60        "Tethering is not supported for the provider")
61    wutils.wifi_toggle_state(ad, True)
62
63
64def set_chrome_browser_permissions(ad):
65    """Set chrome browser start with no-first-run verification.
66    Give permission to read from and write to storage
67    """
68    commands = ["pm grant com.android.chrome "
69                "android.permission.READ_EXTERNAL_STORAGE",
70                "pm grant com.android.chrome "
71                "android.permission.WRITE_EXTERNAL_STORAGE",
72                "rm /data/local/chrome-command-line",
73                "am set-debug-app --persistent com.android.chrome",
74                'echo "chrome --no-default-browser-check --no-first-run '
75                '--disable-fre" > /data/local/tmp/chrome-command-line']
76    for cmd in commands:
77        try:
78            ad.adb.shell(cmd)
79        except AdbError:
80            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
81
82
83def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
84    """ Verify if IP behind VPN server is pingable.
85    Ping should pass, if VPN is connected.
86    Ping should fail, if VPN is disconnected.
87
88    Args:
89      ad: android device object
90    """
91    ping_result = None
92    pkt_loss = "100% packet loss"
93    logging.info("Pinging: %s" % vpn_ping_addr)
94    try:
95        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
96    except AdbError:
97        pass
98    return ping_result and pkt_loss not in ping_result
99
100
101def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
102    """ Test logic for each legacy VPN connection
103
104    Steps:
105      1. Generate profile for the VPN type
106      2. Establish connection to the server
107      3. Verify that connection is established using LegacyVpnInfo
108      4. Verify the connection by pinging the IP behind VPN
109      5. Stop the VPN connection
110      6. Check the connection status
111      7. Verify that ping to IP behind VPN fails
112
113    Args:
114      1. ad: Android device object
115      2. VpnProfileType (1 of the 6 types supported by Android)
116    """
117    # Wait for sometime so that VPN server flushes all interfaces and
118    # connections after graceful termination
119    time.sleep(10)
120
121    ad.adb.shell("ip xfrm state flush")
122    ad.log.info("Connecting to: %s", vpn_profile)
123    ad.droid.vpnStartLegacyVpn(vpn_profile)
124    time.sleep(cconst.VPN_TIMEOUT)
125
126    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
127    asserts.assert_equal(connected_vpn_info["state"],
128                         cconst.VPN_STATE_CONNECTED,
129                         "Unable to establish VPN connection for %s"
130                         % vpn_profile)
131
132    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
133    ip_xfrm_state = ad.adb.shell("ip xfrm state")
134    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
135    if match_obj:
136        ip_xfrm_state = format(match_obj.group(0)).split()
137        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
138
139    ad.droid.vpnStopLegacyVpn()
140    asserts.assert_true(ping_result,
141                        "Ping to the internal IP failed. "
142                        "Expected to pass as VPN is connected")
143
144    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
145    asserts.assert_true(not connected_vpn_info,
146                        "Unable to terminate VPN connection for %s"
147                        % vpn_profile)
148
149
150def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
151                        ipsec_server_type, log_path):
152    """ Download the certificates from VPN server and push to sdcard of DUT
153
154    Args:
155      ad: android device object
156      vpn_params: vpn params from config file
157      vpn_type: 1 of the 6 VPN types
158      vpn_server_addr: server addr to connect to
159      ipsec_server_type: ipsec version - strongswan or openswan
160      log_path: log path to download cert
161
162    Returns:
163      Client cert file name on DUT's sdcard
164    """
165    url = "http://%s%s%s" % (vpn_server_addr,
166                             vpn_params['cert_path_vpnserver'],
167                             vpn_params['client_pkcs_file_name'])
168    local_cert_name = "%s_%s_%s" % (vpn_type.name,
169                                    ipsec_server_type,
170                                    vpn_params['client_pkcs_file_name'])
171
172    local_file_path = os.path.join(log_path, local_cert_name)
173    logging.info("URL is: %s" % url)
174    try:
175        ret = urllib.request.urlopen(url)
176        with open(local_file_path, "wb") as f:
177            f.write(ret.read())
178    except Exception:
179        asserts.fail("Unable to download certificate from the server")
180
181    ad.adb.push("%s sdcard/" % local_file_path)
182    return local_cert_name
183
184
185def generate_legacy_vpn_profile(ad,
186                                vpn_params,
187                                vpn_type,
188                                vpn_server_addr,
189                                ipsec_server_type,
190                                log_path):
191    """ Generate legacy VPN profile for a VPN
192
193    Args:
194      ad: android device object
195      vpn_params: vpn params from config file
196      vpn_type: 1 of the 6 VPN types
197      vpn_server_addr: server addr to connect to
198      ipsec_server_type: ipsec version - strongswan or openswan
199      log_path: log path to download cert
200
201    Returns:
202      Vpn profile
203    """
204    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
205                   VPN_CONST.PWD: vpn_params['vpn_password'],
206                   VPN_CONST.TYPE: vpn_type.value,
207                   VPN_CONST.SERVER: vpn_server_addr, }
208    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
209                                                  ipsec_server_type)
210    if vpn_type.name == "PPTP":
211        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
212
213    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
214    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
215
216    if vpn_type.name in psk_set:
217        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
218    elif vpn_type.name in rsa_set:
219        cert_name = download_load_certs(ad,
220                                        vpn_params,
221                                        vpn_type,
222                                        vpn_server_addr,
223                                        ipsec_server_type,
224                                        log_path)
225        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
226        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
227        ad.droid.installCertificate(vpn_profile, cert_name,
228                                    vpn_params['cert_password'])
229    else:
230        vpn_profile[VPN_CONST.MPPE] = "mppe"
231
232    return vpn_profile
233
234
235def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path):
236    """Generate VPN profile for IKEv2 VPN.
237
238    Args:
239        ad: android device object.
240        vpn_params: vpn params from config file.
241        vpn_type: ikev2 vpn type.
242        server_addr: vpn server addr.
243        log_path: log path to download cert.
244
245    Returns:
246        Vpn profile.
247    """
248    vpn_profile = {
249        VPN_CONST.TYPE: vpn_type.value,
250        VPN_CONST.SERVER: server_addr,
251    }
252
253    if vpn_type.name == "IKEV2_IPSEC_USER_PASS":
254        vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"]
255        vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"]
256        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
257        cert_name = download_load_certs(
258            ad, vpn_params, vpn_type, server_addr, "IKEV2_IPSEC_USER_PASS",
259            log_path)
260        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
261        ad.droid.installCertificate(
262            vpn_profile, cert_name, vpn_params['cert_password'])
263    elif vpn_type.name == "IKEV2_IPSEC_PSK":
264        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
265        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"]
266    else:
267        vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % (
268            vpn_params["vpn_identity"], vpn_params["server_addr"])
269        cert_name = download_load_certs(
270            ad, vpn_params, vpn_type, server_addr, "IKEV2_IPSEC_RSA", log_path)
271        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
272        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
273        ad.droid.installCertificate(
274            vpn_profile, cert_name, vpn_params['cert_password'])
275
276    return vpn_profile
277
278
279def start_tcpdump(ad, test_name):
280    """Start tcpdump on all interfaces
281
282    Args:
283        ad: android device object.
284        test_name: tcpdump file name will have this
285    """
286    ad.log.info("Starting tcpdump on all interfaces")
287    ad.adb.shell("killall -9 tcpdump", ignore_status=True)
288    ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True)
289    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
290
291    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
292    ad.log.info("tcpdump file is %s", file_name)
293    cmd = "adb -s {} shell tcpdump -i any -s0 -w {}".format(ad.serial,
294                                                            file_name)
295    try:
296        return start_standing_subprocess(cmd, 5)
297    except Exception:
298        ad.log.exception('Could not start standing process %s' % repr(cmd))
299
300    return None
301
302def stop_tcpdump(ad,
303                 proc,
304                 test_name,
305                 pull_dump=True,
306                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
307    """Stops tcpdump on any iface
308       Pulls the tcpdump file in the tcpdump dir if necessary
309
310    Args:
311        ad: android device object.
312        proc: need to know which pid to stop
313        test_name: test name to save the tcpdump file
314        pull_dump: pull tcpdump file or not
315        adb_pull_timeout: timeout for adb_pull
316
317    Returns:
318      log_path of the tcpdump file
319    """
320    ad.log.info("Stopping and pulling tcpdump if any")
321    if proc is None:
322        return None
323    try:
324        stop_standing_subprocess(proc)
325    except Exception as e:
326        ad.log.warning(e)
327    if pull_dump:
328        log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial)
329        os.makedirs(log_path, exist_ok=True)
330        ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path),
331                timeout=adb_pull_timeout)
332        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
333        file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
334        return "%s/%s" % (log_path, file_name)
335    return None
336
337def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
338    """ Start tcpdump on gce server
339
340    Args:
341        ad: android device object
342        test_name: test case name
343        dest_port: port to collect tcpdump
344        gce: dictionary of gce instance
345
346    Returns:
347       process id and pcap file path from gce server
348    """
349    ad.log.info("Starting tcpdump on gce server")
350
351    # pcap file name
352    fname = "/tmp/%s_%s_%s_%s" % \
353        (test_name, ad.model, ad.serial,
354         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
355
356    # start tcpdump
357    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
358        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
359    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
360        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
361    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
362    utils.exe_cmd(gce_ssh_cmd)
363
364    # get process id
365    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
366    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
367    if not tcpdump_pid:
368        raise signals.TestFailure("Failed to start tcpdump on gce server")
369    return tcpdump_pid[1], fname
370
371def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
372    """ Stop and pull tcpdump file from gce server
373
374    Args:
375        ad: android device object
376        tcpdump_pid: process id for tcpdump file
377        fname: tcpdump file path
378        gce: dictionary of gce instance
379
380    Returns:
381       pcap file from gce server
382    """
383    ad.log.info("Stop and pull pcap file from gce server")
384
385    # stop tcpdump
386    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
387    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
388        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
389    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
390    utils.exe_cmd(gce_ssh_cmd)
391
392    # verify tcpdump is stopped
393    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
394    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
395    if tcpdump_pid in res.split():
396        raise signals.TestFailure("Failed to stop tcpdump on gce server")
397    if not fname:
398        return None
399
400    # pull pcap file
401    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
402        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
403    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
404    utils.exe_cmd(pull_file)
405    if not os.path.exists(
406        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
407        raise signals.TestFailure("Failed to pull tcpdump from gce server")
408
409    # delete pcaps
410    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
411
412    # return pcap file
413    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
414    return pcap_file
415
416def is_ipaddress_ipv6(ip_address):
417    """Verify if the given string is a valid IPv6 address.
418
419    Args:
420        ip_address: string containing the IP address
421
422    Returns:
423        True: if valid ipv6 address
424        False: if not
425    """
426    try:
427        socket.inet_pton(socket.AF_INET6, ip_address)
428        return True
429    except socket.error:
430        return False
431
432def carrier_supports_ipv6(dut):
433    """Verify if carrier supports ipv6.
434
435    Args:
436        dut: Android device that is being checked
437
438    Returns:
439        True: if carrier supports ipv6
440        False: if not
441    """
442
443    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
444    operator = get_operator_name("log", dut)
445    return operator in carrier_supports_ipv6
446
447def supports_ipv6_tethering(self, dut):
448    """ Check if provider supports IPv6 tethering.
449
450    Returns:
451        True: if provider supports IPv6 tethering
452        False: if not
453    """
454    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
455    operator = get_operator_name(self.log, dut)
456    return operator in carrier_supports_tethering
457
458
459def start_usb_tethering(ad):
460    """Start USB tethering.
461
462    Args:
463        ad: android device object
464    """
465    # TODO: test USB tethering by #startTethering API - b/149116235
466    ad.log.info("Starting USB Tethering")
467    ad.stop_services()
468    ad.adb.shell(USB_TETHERING_MODE, ignore_status=True)
469    ad.adb.wait_for_device()
470    ad.start_services()
471    if "rndis" not in ad.adb.shell(DEVICE_IP_ADDRESS):
472        raise signals.TestFailure("Unable to enable USB tethering.")
473
474
475def stop_usb_tethering(ad):
476    """Stop USB tethering.
477
478    Args:
479        ad: android device object
480    """
481    ad.log.info("Stopping USB Tethering")
482    ad.stop_services()
483    ad.adb.shell(USB_CHARGE_MODE)
484    ad.adb.wait_for_device()
485    ad.start_services()
486
487
488def wait_for_new_iface(old_ifaces):
489    """Wait for the new interface to come up.
490
491    Args:
492        old_ifaces: list of old interfaces
493    """
494    old_set = set(old_ifaces)
495    # Try 10 times to find a new interface with a 1s sleep every time
496    # (equivalent to a 9s timeout)
497    for i in range(0, 10):
498        new_ifaces = set(get_if_list()) - old_set
499        asserts.assert_true(len(new_ifaces) < 2,
500                            "Too many new interfaces after turning on "
501                            "tethering")
502        if len(new_ifaces) == 1:
503            return new_ifaces.pop()
504        time.sleep(1)
505    asserts.fail("Timeout waiting for tethering interface on host")
506