• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18import re
19import socket
20import time
21import urllib.request
22
23from acts import asserts
24from acts import signals
25from acts import utils
26from acts.controllers import adb
27from acts.controllers.adb_lib.error import AdbError
28from acts.libs.proc import job
29from acts.utils import start_standing_subprocess
30from acts.utils import stop_standing_subprocess
31from acts_contrib.test_utils.net import connectivity_const as cconst
32from acts_contrib.test_utils.tel import tel_defines
33from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
34from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name
35from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
36
37from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
38
39VPN_CONST = cconst.VpnProfile
40VPN_TYPE = cconst.VpnProfileType
41VPN_PARAMS = cconst.VpnReqParams
42TCPDUMP_PATH = "/data/local/tmp/"
43USB_CHARGE_MODE = "svc usb setFunctions"
44USB_TETHERING_MODE = "svc usb setFunctions rndis"
45ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0"
46DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1"
47DEVICE_IP_ADDRESS = "ip address"
48LOCALHOST = "192.168.1.1"
49
50GCE_SSH = "gcloud compute ssh "
51GCE_SCP = "gcloud compute scp "
52
53
54def verify_lte_data_and_tethering_supported(ad):
55    """Verify if LTE data is enabled and tethering supported."""
56    wutils.wifi_toggle_state(ad, False)
57    ad.droid.telephonyToggleDataConnection(True)
58    wait_for_cell_data_connection(ad.log, ad, True)
59    asserts.assert_true(
60        verify_http_connection(ad.log, ad),
61        "HTTP verification failed on cell data connection")
62    asserts.assert_true(
63        ad.droid.connectivityIsTetheringSupported(),
64        "Tethering is not supported for the provider")
65    wutils.wifi_toggle_state(ad, True)
66
67
68def set_chrome_browser_permissions(ad):
69    """Set chrome browser start with no-first-run verification.
70
71    Give permission to read from and write to storage
72
73    Args:
74        ad: android device object
75    """
76    commands = ["pm grant com.android.chrome "
77                "android.permission.READ_EXTERNAL_STORAGE",
78                "pm grant com.android.chrome "
79                "android.permission.WRITE_EXTERNAL_STORAGE",
80                "rm /data/local/chrome-command-line",
81                "am set-debug-app --persistent com.android.chrome",
82                'echo "chrome --no-default-browser-check --no-first-run '
83                '--disable-fre" > /data/local/tmp/chrome-command-line']
84    for cmd in commands:
85        try:
86            ad.adb.shell(cmd)
87        except AdbError:
88            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
89
90
91def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
92    """Verify if IP behind VPN server is pingable.
93
94    Ping should pass, if VPN is connected.
95    Ping should fail, if VPN is disconnected.
96
97    Args:
98        ad: android device object
99        vpn_ping_addr: target ping addr
100    """
101    ping_result = None
102    pkt_loss = "100% packet loss"
103    logging.info("Pinging: %s" % vpn_ping_addr)
104    try:
105        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
106    except AdbError:
107        pass
108    return ping_result and pkt_loss not in ping_result
109
110
111def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
112    """Test logic for each legacy VPN connection.
113
114    Steps:
115      1. Generate profile for the VPN type
116      2. Establish connection to the server
117      3. Verify that connection is established using LegacyVpnInfo
118      4. Verify the connection by pinging the IP behind VPN
119      5. Stop the VPN connection
120      6. Check the connection status
121      7. Verify that ping to IP behind VPN fails
122
123    Args:
124        ad: Android device object
125        vpn_profile: object contains attribute for create vpn profile
126        vpn_ping_addr: addr to verify vpn connection
127    """
128    # Wait for sometime so that VPN server flushes all interfaces and
129    # connections after graceful termination
130    time.sleep(10)
131
132    ad.adb.shell("ip xfrm state flush")
133    ad.log.info("Connecting to: %s", vpn_profile)
134    ad.droid.vpnStartLegacyVpn(vpn_profile)
135    time.sleep(cconst.VPN_TIMEOUT)
136
137    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
138    asserts.assert_equal(connected_vpn_info["state"],
139                         cconst.VPN_STATE_CONNECTED,
140                         "Unable to establish VPN connection for %s"
141                         % vpn_profile)
142
143    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
144    ip_xfrm_state = ad.adb.shell("ip xfrm state")
145    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
146    if match_obj:
147        ip_xfrm_state = format(match_obj.group(0)).split()
148        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
149
150    ad.droid.vpnStopLegacyVpn()
151    asserts.assert_true(ping_result,
152                        "Ping to the internal IP failed. "
153                        "Expected to pass as VPN is connected")
154
155    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
156    asserts.assert_true(not connected_vpn_info,
157                        "Unable to terminate VPN connection for %s"
158                        % vpn_profile)
159
160
161def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
162                        ipsec_server_type, log_path):
163    """Download the certificates from VPN server and push to sdcard of DUT.
164
165    Args:
166      ad: android device object
167      vpn_params: vpn params from config file
168      vpn_type: 1 of the 6 VPN types
169      vpn_server_addr: server addr to connect to
170      ipsec_server_type: ipsec version - strongswan or openswan
171      log_path: log path to download cert
172
173    Returns:
174      Client cert file name on DUT's sdcard
175    """
176    url = "http://%s%s%s" % (vpn_server_addr,
177                             vpn_params['cert_path_vpnserver'],
178                             vpn_params['client_pkcs_file_name'])
179    logging.info("URL is: %s" % url)
180    if vpn_server_addr == LOCALHOST:
181        ad.droid.httpDownloadFile(url, "/sdcard/")
182        return vpn_params['client_pkcs_file_name']
183
184    local_cert_name = "%s_%s_%s" % (vpn_type.name,
185                                    ipsec_server_type,
186                                    vpn_params['client_pkcs_file_name'])
187    local_file_path = os.path.join(log_path, local_cert_name)
188    try:
189        ret = urllib.request.urlopen(url)
190        with open(local_file_path, "wb") as f:
191            f.write(ret.read())
192    except Exception:
193        asserts.fail("Unable to download certificate from the server")
194
195    ad.adb.push("%s sdcard/" % local_file_path)
196    return local_cert_name
197
198
199def generate_legacy_vpn_profile(ad,
200                                vpn_params,
201                                vpn_type,
202                                vpn_server_addr,
203                                ipsec_server_type,
204                                log_path):
205    """Generate legacy VPN profile for a VPN.
206
207    Args:
208      ad: android device object
209      vpn_params: vpn params from config file
210      vpn_type: 1 of the 6 VPN types
211      vpn_server_addr: server addr to connect to
212      ipsec_server_type: ipsec version - strongswan or openswan
213      log_path: log path to download cert
214
215    Returns:
216      Vpn profile
217    """
218    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
219                   VPN_CONST.PWD: vpn_params['vpn_password'],
220                   VPN_CONST.TYPE: vpn_type.value,
221                   VPN_CONST.SERVER: vpn_server_addr, }
222    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
223                                                  ipsec_server_type)
224    if vpn_type.name == "PPTP":
225        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
226
227    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
228    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
229
230    if vpn_type.name in psk_set:
231        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
232    elif vpn_type.name in rsa_set:
233        cert_name = download_load_certs(ad,
234                                        vpn_params,
235                                        vpn_type,
236                                        vpn_server_addr,
237                                        ipsec_server_type,
238                                        log_path)
239        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
240        ad.droid.installCertificate(vpn_profile, cert_name,
241                                    vpn_params['cert_password'])
242    else:
243        vpn_profile[VPN_CONST.MPPE] = "mppe"
244
245    return vpn_profile
246
247
248def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path):
249    """Generate VPN profile for IKEv2 VPN.
250
251    Args:
252        ad: android device object.
253        vpn_params: vpn params from config file.
254        vpn_type: ikev2 vpn type.
255        server_addr: vpn server addr.
256        log_path: log path to download cert.
257
258    Returns:
259        Vpn profile.
260    """
261    vpn_profile = {
262        VPN_CONST.TYPE: vpn_type.value,
263        VPN_CONST.SERVER: server_addr,
264    }
265
266    if vpn_type.name == "IKEV2_IPSEC_USER_PASS":
267        vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"]
268        vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"]
269        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
270        cert_name = download_load_certs(
271            ad, vpn_params, vpn_type, vpn_params["server_addr"],
272            "IKEV2_IPSEC_USER_PASS", log_path)
273        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
274        ad.droid.installCertificate(
275            vpn_profile, cert_name, vpn_params['cert_password'])
276    elif vpn_type.name == "IKEV2_IPSEC_PSK":
277        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
278        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"]
279    else:
280        vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % (
281            vpn_params["vpn_identity"], server_addr)
282        logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr))
283        cert_name = download_load_certs(
284            ad, vpn_params, vpn_type, vpn_params["server_addr"],
285            "IKEV2_IPSEC_RSA", log_path)
286        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
287        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
288        ad.droid.installCertificate(
289            vpn_profile, cert_name, vpn_params['cert_password'])
290
291    return vpn_profile
292
293
294def start_tcpdump(ad, test_name, interface="any"):
295    """Start tcpdump on all interfaces.
296
297    Args:
298        ad: android device object.
299        test_name: tcpdump file name will have this
300    """
301    ad.log.info("Starting tcpdump on all interfaces")
302    ad.adb.shell("killall -9 tcpdump", ignore_status=True)
303    ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True)
304    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
305
306    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
307    ad.log.info("tcpdump file is %s", file_name)
308    cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial,
309                                                           interface, file_name)
310    try:
311        return start_standing_subprocess(cmd, 5)
312    except Exception:
313        ad.log.exception('Could not start standing process %s' % repr(cmd))
314
315    return None
316
317
318def stop_tcpdump(ad,
319                 proc,
320                 test_name,
321                 pull_dump=True,
322                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
323    """Stops tcpdump on any iface.
324
325       Pulls the tcpdump file in the tcpdump dir if necessary.
326
327    Args:
328        ad: android device object.
329        proc: need to know which pid to stop
330        test_name: test name to save the tcpdump file
331        pull_dump: pull tcpdump file or not
332        adb_pull_timeout: timeout for adb_pull
333
334    Returns:
335      log_path of the tcpdump file
336    """
337    ad.log.info("Stopping and pulling tcpdump if any")
338    if proc is None:
339        return None
340    try:
341        stop_standing_subprocess(proc)
342    except Exception as e:
343        ad.log.warning(e)
344    if pull_dump:
345        log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial)
346        os.makedirs(log_path, exist_ok=True)
347        ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path),
348                    timeout=adb_pull_timeout)
349        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
350        file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
351        return "%s/%s" % (log_path, file_name)
352    return None
353
354
355def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
356    """Start tcpdump on gce server.
357
358    Args:
359        ad: android device object
360        test_name: test case name
361        dest_port: port to collect tcpdump
362        gce: dictionary of gce instance
363
364    Returns:
365       process id and pcap file path from gce server
366    """
367    ad.log.info("Starting tcpdump on gce server")
368
369    # pcap file name
370    fname = "/tmp/%s_%s_%s_%s" % \
371        (test_name, ad.model, ad.serial,
372         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
373
374    # start tcpdump
375    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
376        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
377    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
378        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
379    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
380    utils.exe_cmd(gce_ssh_cmd)
381
382    # get process id
383    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
384    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
385    if not tcpdump_pid:
386        raise signals.TestFailure("Failed to start tcpdump on gce server")
387    return tcpdump_pid[1], fname
388
389
390def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
391    """Stop and pull tcpdump file from gce server.
392
393    Args:
394        ad: android device object
395        tcpdump_pid: process id for tcpdump file
396        fname: tcpdump file path
397        gce: dictionary of gce instance
398
399    Returns:
400       pcap file from gce server
401    """
402    ad.log.info("Stop and pull pcap file from gce server")
403
404    # stop tcpdump
405    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
406    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
407        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
408    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
409    utils.exe_cmd(gce_ssh_cmd)
410
411    # verify tcpdump is stopped
412    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
413    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
414    if tcpdump_pid in res.split():
415        raise signals.TestFailure("Failed to stop tcpdump on gce server")
416    if not fname:
417        return None
418
419    # pull pcap file
420    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
421        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
422    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
423    utils.exe_cmd(pull_file)
424    if not os.path.exists(
425        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
426        raise signals.TestFailure("Failed to pull tcpdump from gce server")
427
428    # delete pcaps
429    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
430
431    # return pcap file
432    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
433    return pcap_file
434
435
436def is_ipaddress_ipv6(ip_address):
437    """Verify if the given string is a valid IPv6 address.
438
439    Args:
440        ip_address: string containing the IP address
441
442    Returns:
443        True: if valid ipv6 address
444        False: if not
445    """
446    try:
447        socket.inet_pton(socket.AF_INET6, ip_address)
448        return True
449    except socket.error:
450        return False
451
452
453def carrier_supports_ipv6(dut):
454    """Verify if carrier supports ipv6.
455
456    Args:
457        dut: Android device that is being checked
458
459    Returns:
460        True: if carrier supports ipv6
461        False: if not
462    """
463
464    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
465    operator = get_operator_name("log", dut)
466    return operator in carrier_supports_ipv6
467
468
469def is_carrier_supports_entitlement(dut):
470    """Verify if carrier supports entitlement
471
472    Args:
473        dut: Android device
474
475    Return:
476        True if carrier supports entitlement, otherwise false
477    """
478
479    carriers_supports_entitlement = [
480        tel_defines.CARRIER_VZW,
481        tel_defines.CARRIER_ATT,
482        tel_defines.CARRIER_TMO,
483        tel_defines.CARRIER_SBM]
484    operator = get_operator_name("log", dut)
485    return operator in carriers_supports_entitlement
486
487
488def set_cap_net_raw_capability():
489    """Set the CAP_NET_RAW capability
490
491    To send the Scapy packets, we need to get the CAP_NET_RAW capability first.
492    """
493    cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))"
494    utils.exe_cmd(cap_net_raw)
495    cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))"
496    utils.exe_cmd(cap_python)
497
498
499def supports_ipv6_tethering(self, dut):
500    """Check if provider supports IPv6 tethering.
501
502    Args:
503        dut: Android device that is being checked
504
505    Returns:
506        True: if provider supports IPv6 tethering
507        False: if not
508    """
509    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
510    operator = get_operator_name(self.log, dut)
511    return operator in carrier_supports_tethering
512
513
514def start_usb_tethering(ad):
515    """Start USB tethering using #startTethering API.
516
517    Enable USB tethering by #startTethering API will break the RPC session,
518    Make sure you call #ad.recreate_services after call this API - b/149116235
519
520    Args:
521        ad: AndroidDevice object
522    """
523    ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False)
524
525
526def stop_usb_tethering(ad):
527    """Stop USB tethering.
528
529    Args:
530        ad: android device object
531    """
532    ad.log.info("Stopping USB Tethering")
533    ad.stop_services()
534    ad.adb.shell(USB_CHARGE_MODE)
535    ad.adb.wait_for_device(timeout=120)
536    ad.start_services()
537
538
539def wait_for_new_iface(old_ifaces):
540    """Wait for the new interface to come up.
541
542    Args:
543        old_ifaces: list of old interfaces
544    """
545    old_set = set(old_ifaces)
546    # Try 10 times to find a new interface with a 1s sleep every time
547    # (equivalent to a 9s timeout)
548    for _ in range(0, 10):
549        new_ifaces = set(get_if_list()) - old_set
550        asserts.assert_true(len(new_ifaces) < 2,
551                            "Too many new interfaces after turning on "
552                            "tethering")
553        if len(new_ifaces) == 1:
554            # enable the new iface before return
555            new_iface = new_ifaces.pop()
556            enable_iface(new_iface)
557            return new_iface
558        time.sleep(1)
559    asserts.fail("Timeout waiting for tethering interface on host")
560
561
562def get_if_list():
563    """Returns a list containing all network interfaces.
564
565    The newest version of Scapy.get_if_list() returns the cached interfaces,
566    which might be out-dated, and unable to perceive the interface changes.
567    Use this method when need to monitoring the network interfaces changes.
568    Reference: https://github.com/secdev/scapy/pull/2707
569
570    Returns:
571        A list of the latest network interfaces. For example:
572        ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88']
573    """
574    from scapy.config import conf
575    from scapy.compat import plain_str
576
577    # Get ifconfig output
578    result = job.run([conf.prog.ifconfig])
579    if result.exit_status:
580        raise asserts.fail(
581            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
582
583    interfaces = [
584        line[:line.find(':')] for line in plain_str(result.stdout).splitlines()
585        if ": flags" in line.lower()
586    ]
587    return interfaces
588
589
590def enable_hardware_offload(ad):
591    """Enable hardware offload using adb shell command.
592
593    Args:
594        ad: Android device object
595    """
596    ad.log.info("Enabling hardware offload.")
597    ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True)
598    ad.reboot()
599    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
600
601
602def disable_hardware_offload(ad):
603    """Disable hardware offload using adb shell command.
604
605    Args:
606        ad: Android device object
607    """
608    ad.log.info("Disabling hardware offload.")
609    ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True)
610    ad.reboot()
611    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
612
613
614def enable_iface(iface):
615    """Enable network interfaces.
616
617    Some network interface might disabled as default, need to enable before
618    using it.
619
620    Args:
621        iface: network interface that need to enable
622    """
623    from scapy.compat import plain_str
624
625    result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True)
626    if result.exit_status:
627        raise asserts.fail(
628            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
629